#!/usr/bin/env python # coding: utf-8 # # RDD cơ bản # - Programmer chỉ định số lượng partitions. # - Driver tự phân chia partition đến các Workers tương ứng. # - Master parameter chỉ định số lượng workers cụ thể. # # # Các hàm transformations # - map(func): trả về tập dữ liệu phân tán mới bằng cách ánh xạ từng phần tử tập dữ liệu nguồn qua hàm func do programmer định nghĩa. # - filter(func): trả về tập dữ liệu phân tán mới bằng cách lọc ra các phần tử tập dữ liệu nguồn thoả điều kiện hàm func định nghĩa. # - distinct(): trả về tập dữ liệu phân tán mới chỉ chứa các phần tử riêng biệt từ tập dữ liệu nguồn. # - flatMap(func): tương tự như map(), nhưng có thể ánh xạ các phần tử nguồn sang 0 hoặc nhiều phần tử ở tập dữ liệu mới. Hàm func thường trả về kiểu Seg thay vì phần tử đơn lẻ. # In[1]: print "http://localhost:4040/jobs/" # In[2]: rdd = sc.parallelize([1, 2, 3, 4]) rdd.map(lambda x: x * 2).collect() # In[3]: rdd.filter(lambda x: x % 2 == 0).collect() # In[4]: rdd = sc.parallelize([1, 4, 2, 2, 3]) rdd.distinct().collect() # In[5]: rdd = sc.parallelize([1, 2, 3]) rdd.map(lambda x: [x, x + 5]).collect() # In[6]: rdd.flatMap(lambda x: [x, x + 5]).collect() # # Các hàm actions # - reduce(func): aggregate từng phần tử tập dữ liệu thông qua hàm func, hàm func nhận 2 đối số và trả về 1 giá trị. # - take(n): trả về mảng n phần tử. # - collect(): trả về tất cả các phần tử. CHÚ Ý: phải đảm bảo máy Driver đủ dung lượng để chứa kết quả trả về. # - takeOrdered(n, key=func): trả về n phần tử sắp xếp tăng dần hoặc sắp xếp theo hàm key. # In[7]: rdd = sc.parallelize([1, 2, 3]) rdd.reduce(lambda a, b: a * b) # In[8]: rdd.take(2) # In[9]: rdd.collect() # In[10]: rdd = sc.parallelize([5, 3, 1, 2]) rdd.takeOrdered(3, lambda s: -1 * s) # In[11]: rdd.takeOrdered(3) # In[12]: lines = sc.textFile("sample_text.txt", 4) print lines.count() # In[13]: print lines.count() # In[14]: lines = sc.textFile("sample_text.txt", 4) lines.cache() print lines.count() print lines.count() # # Key-Value RDDs # - Tương tự như Map Reduce, Spark hỗ trợ Key-Value pairs. # - Mỗi phần tử của Pair RDD là một cặp tuple. # ## Some Key-Value transformation # - reduceByKey(func): trả về tập dữ liệu phân tán mới (K, V). Trong đó, các giá trị cho từng key được tổng hợp bằng hàm reduce func có dạng (V, V) -> V. # - sortByKey(): trả về tập dữ liệu phân tán mới (K, V) sắp xếp tăng dần theo keys. # - groupByKey(): trả về tập dữ liệu phân tán mới (K, Iterable). # In[15]: rdd = sc.parallelize([(1, 2), (3, 4)]) rdd.collect() # In[16]: rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)]) rdd.reduceByKey(lambda a, b: a + b).collect() # In[17]: rdd = sc.parallelize([(1, "a"), (2, "c"), (1, "b")]) rdd.sortByKey().collect() # In[18]: rdd.groupByKey().collect() # # X.join(Y) # - Trả về tất cả các phần tử RDD keys khớp với X và Y. # - Mỗi cặp có định dạng (k, (v1, v2)). Trong đó, (k, v1) thuộc X và (k, v2) thuộc Y. # In[19]: x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) sorted(x.join(y).collect()) # # X.leftOuterJoin(Y) # - Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là: # - Tất cả các cặp (k, (v, w)) với w thuộc Y. # - Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k. # In[20]: x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2)]) sorted(x.leftOuterJoin(y).collect()) # # X.rightOuterJoin(Y) # - Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là: # - Tất cả các cặp (k, (v, w)) với v thuộc X. # - Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k. # In[21]: x = sc.parallelize([("a", 1)]) y = sc.parallelize([("a", 2), ("b", 4)]) sorted(x.rightOuterJoin(y).collect()) # # X.fullOuterJoin(Y) # - Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là: # - Tất cả các cặp (k, (v, w)) với w thuộc Y. # - Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k. # - Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là: # - Tất cả các cặp (k, (v, w)) với v thuộc X. # - Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k. # In[22]: x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("c", 8)]) sorted(x.fullOuterJoin(y).collect())