클러스터 : 여러 대의 서버가 마치 한 대의 서버처럼 동작하는 것
스파크 클러스터 : 대량의 데이터를 여러 서버로 나누어 병렬로 처리
Resilient Distributed Dataset. 회복력을 가진 분산 데이터 집합
RDD를 만들어 내는 방법을 기억하고 있음. 이를 통해 데이터를 다시 만들어 복구.
데이터를 여러 서버에 나누어 저장. 스파크는 데이터를 파티션 단위로 관리.
HDFS = 하둡 파일시스템
하둡 파일시스템을 사용함으로써 나타내는 스파트 특징
스파크 프로그램을 실행 = 스파크 잡(Job)을 실행. 각 서버마다 익스큐터(executor)라는 프로세스 생성.
메인 함수를 가지고 있는 프로그램.
스파크컨텍스트를 생성하고 그 인스턴스를 포함하고 있는 프로그램.
트랜스포메이션 : RDD의 형태를 변형하는 연산. 결과물로 새로운 RDD가 생성이 됨.
액션 : 어떤 동작을 수행해 그 결과로 다른 타입의 결과를 반환하는 연산.
트랜스포메이션 연산 : 해당 RDD를 사용하는 다른 액션 연산이 호출될 때까지 실제 연산을 수행하지 않는 방식으로 동작.
예시) sc.textFile 실제 파일이 읽히지 않다가 saveAsTextFile 메서드를 호출하는 시점에 실제 데이터를 읽음.
함수형 프로그래밍 언어와 같이 "함수"를 이용한 프로그램을 작성 가능.
var rdd1 = sc.parallelize(1 to 10) // RDD를 생성 var rdd2 = rdd1.map(_ + 1)
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
rdd1 = sc.parallelize(range(1,10+1))
rdd2 = rdd1.map(lambda v:v+1)
rdd2.take(10)
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// 예제 2-1 class PassingFunctionSample { var count = 1 def add1(i: Int): Int = { count + 1 } def runMapSample(sc: SparkContext) { val rdd1 = sc.parallelize(1 to 10) // java.io.NotSerializableException !!!! val rdd2 = rdd1.map(add) println(rdd2.collect()) } }
실행하면 java.io.NotSerializableException 에러가 발생.
https://m.blog.naver.com/PostView.nhn?blogId=nkon&logNo=150190119036
map() 메서드에 전달된 함수는 클러스터를 구성하는 각 서버에서 동작할 수 있도록 클러스터에 속한 모든 워커 서버에 전달되어야 하기 때문에 Serializable 해야함.
자바의 직렬화 규칙에 따라 add 함수 뿐만 아니라 PassingFunctionSample 클래스 전체가 클러스터로 전달해야 하는 대상이 됨.
# 예제 2-4
class PassingFunctionSample():
def add1(self, i):
return i + 1
def runMapSample1(self, sc):
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# rdd2 = rdd1.map(self.add1) # => 잘못된 방법. 'self'를 전달하고 있음.
rdd2 = rdd1.map(add2) # 이렇게 처리 합니다!
print(", ".join(str(i) for i in rdd2.collect()))
if __name__ == "__main__":
def add2(i):
return i + 1
#conf = SparkConf()
#sc = SparkContext(master="local[*]", appName="PassingFunctionSample", conf=conf)
obj = PassingFunctionSample()
obj.runMapSample1(sc)
sc.stop()
2, 3, 4, 5, 6, 7, 8, 9, 10, 11
인스턴스 변수를 매개변수로 전달하는 경우도 전체 클래스 인스턴스를 직렬화 하는 문제 발생.
메소드 내부에서 선언한 지역변수로 변환해서 전달 해야 함.
reduceByKey : 키와 값의 형태로 구성돼 있는 경우에만 이 연산을 사용할 수 있음.
스파크에서는 원소가 2개짜리인 튜플 타입을 이용해 이를 표현.
스칼라
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreateSample") val sc = new SparkContext(conf)
자바
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("RDDCreateSample"); JavaSparkContext sc = new JavaSparkContext(conf);
# 파이썬
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(master="local", appName="RDDCreateTest", conf=conf)
첫 번째 방법 : 드라이버 프로그램의 컬렉션 객체를 이용
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e"))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"])
RDD 파티션 개수를 지정하고 싶을 때에는 parallelize 2번째 매개변수를 사용
val rdd1= sc.parallelize(1 to 1000, 10)
두 번째 방법 : 파일이나 데이터베이스 같은 외부 데이터를 읽어 새로운 RDD를 생성하는 방법
스칼라
val rdd1 = sc.textFile("<spark_home_dir>/README.md")
자바
JavaRDD<String> rdd1 = sc.textFile("<spark_home_dir>/README.md");
# 파이썬
import os
#rdd1 = sc.textFile("<spark_home_dir>/README.md")
rdd1 = sc.textFile("%s/README.md" % os.getenv("SPARK_HOME"))
파일의 각 줄은 한개의 RDD 요소(element)가 됨.
파일 읽는 과정은 하둡의 TextInputFormat을 이용.
RDD의 모든 원소를 모아서 배열로 돌려줌. 전체 데이터를 모두 담을 수 있을 정도로 충분한 메모리 공간 확보 필요.
스칼라
val rdd = sc.parallelize(1 to 10) val result = rdd.collect println(result.mkString(", "))
자바
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); List<Integer> result = rdd.collect(); for (Integer i : result) System.out.println(i);
# 파이썬
rdd = sc.parallelize(range(1, 10+1))
result = rdd.collect()
print(result)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
count는 RDD를 구성하는 전체 요소의 개수를 반환
스칼라
val rdd = sc.parallelize(1 to 10) val result = rdd.count println(result)
자바
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); long result = rdd.count(); System.out.println(result);
# 파이썬
rdd = sc.parallelize(range(1, 10+1))
result = rdd.count()
print(result)
10
기존 RDD를 이용해 새로운 RDD를 생성하는 연산.
하나의 입력을 받아 하나의 값을 돌려주는 함수를 인자(argument)로 받음.
map() : 이 함수를 RDD에 속하는 모든 요소에 적용한 뒤 그 결과로 구성된 새로운 RDD를 생성해 돌려줌.
스칼라
val rdd = sc.parallelize(1 to 5) val result = rdd.map(_ + 1) println(result.collect.mkString(", "))
자바
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); // Java7 JavaRDD<Integer> rdd2 = rdd1.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 + 1; } }); // Java8 Lambda JavaRDD<Integer> rdd3 = rdd1.map((Integer v1) -> v1 + 1); System.out.println(StringUtils.join(rdd2.collect(), ", "));
# 파이썬
rdd1 = sc.parallelize(range(1, 5+1))
rdd2 = rdd1.map(lambda v : v + 1)
print(rdd2.collect())
[2, 3, 4, 5, 6]
map 메서드와 비슷하지만 반환하는 값의 타입이 다름.
TraversableOnce는 스칼라에서 사용하는 이터레이터(Iterator) 타입 중 하나.
// 단어 3개를 가진 List 생성 val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange") // RDD 생성 val rdd1 = sc.parallelize(fruits) // RDD의 map() 메서드로 각 단어를 ","를 기준으로 분리 val rdd2 = rdd1.map(_.split(",")) // 결과를 출력 println(rdd2.collect().map(_.mkString("{", ", ", "}")).mkString("{", ", ", "}"))
fruits = ["apple,orange", "grape,apple,mango", "blueberry,tomato,orange"]
rdd1 = sc.parallelize(fruits)
rdd2 = rdd1.map(lambda v: v.split(","))
print(rdd2.collect())
[['apple', 'orange'], ['grape', 'apple', 'mango'], ['blueberry', 'tomato', 'orange']]
map의 정의에 따라 T가 문자열, U가 배열이므로 결과의 타입이 RDD[배열]이 된 것.
각 배열 속의 포함된 요소를 모두 배열 밖으로 끄집어내는 작업을 원함. => 이 때 flatMap 사용!
하나의 입력값에 대응되는 반환값이 여러개일 때 유용하게 사용가능.
flatMap 예제
스칼라
val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange") val rdd1 = sc.parallelize(fruits) val rdd2 = rdd1.flatMap(_.split(",")) print(rdd2.collect.mkString(", "))
자바
List<String> data = new ArrayList(); data.add("apple,orange"); data.add("grape,apple,mango"); data.add("blueberry,tomato,orange"); JavaRDD<String> rdd1 = sc.parallelize(data); JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String t) throws Exception { return Arrays.asList(t.split(",")).iterator(); } }); // Java8 Lambda JavaRDD<String> rdd3 = rdd1.flatMap((String t) -> Arrays.asList(t.split(",")).iterator()); System.out.println(rdd2.collect());
#파이썬
rdd1 = sc.parallelize(["apple,orange", "grape,apple,mango", "blueberry,tomato,orange"])
rdd2 = rdd1.flatMap(lambda s: s.split(","))
print(rdd2.collect())
['apple', 'orange', 'grape', 'apple', 'mango', 'blueberry', 'tomato', 'orange']
파싱 오류시 map()을 수행한 뒤 별도의 필터단계를 추가. 아니면 다음처럼 flatMap()을 사용하여 해결.
스칼라
val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange") val rdd1 = sc.parallelize(fruits) val rdd2 = rdd1.flatMap(log => { if (log.contains("apple")) { Some(log.indexOf("apple")) } else { None } }) println(rdd2.collect.mkString(","))
rdd1 = sc.parallelize(["apple,orange", "grape,apple,mango", "blueberry,tomato,orange"])
rdd2 = rdd1.flatMap(lambda log: str(log.index("apple")) if "apple" in log else "")
print(",".join(rdd2.collect()))
0,6
RDD를 파티션 단위로 처리. 인자로 전달받은 함수를 파티션 단위로 적용, 그 결과로 구성된 새로운 RDD를 생성하는 메소드.
스칼라
val rdd1 = sc.parallelize(1 to 10, 3) val rdd2 = rdd1.mapPartitions(numbers => { println("DB연결 !!!") numbers.map { number => number + 1 } }) println(rdd2.collect.mkString(", "))
자바
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3); JavaRDD<Integer> rdd2 = rdd1.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { public Iterator<Integer> call(Iterator<Integer> numbers) throws Exception { System.out.println("DB연결 !!!"); List<Integer> result = new ArrayList<>(); while (numbers.hasNext()) { result.add(numbers.next()); } return result.iterator(); } ; }); // Java8 Lambda JavaRDD<Integer> rdd3 = rdd1.mapPartitions((Iterator<Integer> numbers) -> { System.out.println("DB연결 !!!"); List<Integer> result = new ArrayList<>(); numbers.forEachRemaining(result::add); return result.iterator(); }); System.out.println(rdd3.collect());
# 파이썬
import os
def write_file(fn, msg):
with open(fn,'a' if os.path.exists(fn) else 'w') as f:
f.write(msg)
def increase(numbers):
write_file('t.txt', "DB 연결!!!\n")
return (i + 1 for i in numbers)
rdd1 = sc.parallelize(range(1,10+1), 3)
rdd2 = rdd1.mapPartitions(increase)
write_file('t.txt', "%s\n" % rdd2.collect())
%%sh
cat t.txt; rm t.txt
DB 연결!!! DB 연결!!! DB 연결!!! [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
rdd1 생성시 파티션을 3개로 주도록 설정함. (각 파티션에 대한 처리를 할 때 'DB 연결!!!'이라는 문자열을 출력하였음.)
numbers는 각 요소가 담긴 iterator 이며 그 결과를 다시 iterator로 리턴해야 함.
인자로 전달받은 함수를 파티션 단위로 적용, 그 결과로 구성된 새로운 RDD를 생성하는 메소드.
(콜백으로) 인자로 전달되는 함수를 호출할 때 (인덱스, iterator) 정보를 함께 전달해 줌.
스칼라
val rdd1 = sc.parallelize(1 to 10, 3) val rdd2 = rdd1.mapPartitionsWithIndex((idx, numbers) => { numbers.flatMap { case number if idx == 1 => Option(number + 1) case _ => None } }) println(rdd2.collect.mkString(", "))
자바
JavaRDD<Integer> rdd2 = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { @Override public Iterator<Integer> call(IInteger idx, Iterator<Integer> numbers) throws Exception { List<Integer> result = new ArrayList<>(); if (idx == 1) { while (numbers.hasNext()) { result.add(numbers.next()); } } return result.iterator(); } }, true); // Java8 Lambda JavaRDD<Integer> rdd3 = rdd2.mapPartitionsWithIndex((IInteger idx, Iterator<Integer> numbers) -> { List<Integer> result = new ArrayList<>(); if (idx == 1) numbers.forEachRemaining(result::add); return result.iterator(); }, true);
# 파이썬
def increaseWithIndex(idx, numbers):
for i in numbers:
if(idx == 1):
yield i
rdd1 = sc.parallelize(range(1,10+1),3)
rdd2 = rdd1.mapPartitionsWithIndex(increaseWithIndex)
print(rdd2.collect())
[4, 5, 6]
"키"에 해당하는 부분은 그대로 두고 "값"에만 map() 연산을 적용한 것과 같음.
스칼라
val rdd = sc.parallelize(List("a", "b", "c")).map((_, 1)) val result = rdd.mapValues(i => i + 1) println(result.collect.mkString("\t"))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c")); JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2(t, 1); } }); JavaPairRDD<String, Integer> rdd3 = rdd2.mapValues(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 + 1; } }); // Java8 Lambda JavaPairRDD<String, Integer> rdd4 = rdd1.mapToPair((String t) -> new Tuple2<String, Integer>(t, 1)).mapValues((Integer v1) -> v1 + 1); System.out.println(rdd3.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c"])
# (키, 값) 쌍으로 구성된 RDD를 생성
rdd2 = rdd1.map(lambda v : (v, 1))
rdd3 = rdd2.mapValues(lambda i: i + 1)
print(rdd3.collect())
[('a', 2), ('b', 2), ('c', 2)]
"키"에 해당하는 부분은 그대로 두고 "값"에만 flatMap() 연산을 적용함.
스칼라
val rdd = sc.parallelize(Seq((1, "a,b"), (2, "a,c"), (1, "d,e"))) val result = rdd.flatMapValues(_.split(",")) println(result.collect.mkString("\t"))
자바
List<Tuple2<Integer, String>> data = Arrays.asList(new Tuple2(1, "a,b"), new Tuple2(2, "a,c"), new Tuple2(1, "d,e")); JavaPairRDD<Integer, String> rdd1 = sc.parallelizePairs(data); // Java7 JavaPairRDD<Integer, String> rdd2 = rdd1.flatMapValues(new Function<String, Iterable<String>>() { @Override public Iterable<String> call(String v1) throws Exception { return Arrays.asList(v1.split(",")); } }); // Java8 Lambda JavaPairRDD<Integer, String> rdd3 = rdd1.flatMapValues((String v1) -> Arrays.asList(v1.split(","))); System.out.println(rdd2.collect());
# 파이썬
rdd1 = sc.parallelize([(1, "a,b"), (2, "a,c"), (1, "d,e")])
rdd2 = rdd1.flatMapValues(lambda s: s.split(","))
print(rdd2.collect())
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'c'), (1, 'd'), (1, 'e')]
그룹과 관련된 연산들
첫 번째 RDD의 n번째 요소를 키로 하고 두 번째 RDD의 n번째 요소를 값으로 하는 순서쌍을 생성.
두 RDD는 같은 개수의 파티션을 가지고 있고, 각 파티션에 속하는 요소의 수는 동일하다 가정.
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c")) val rdd2 = sc.parallelize(List(1, 2, 3)) val result = rdd1.zip(rdd2) println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c")); JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3)); JavaPairRDD<String, Integer> result = rdd1.zip(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c"])
rdd2 = sc.parallelize([1, 2, 3])
result = rdd1.zip(rdd2)
print(result.collect())
[('a', 1), ('b', 2), ('c', 3)]
파티션 단위로 zip() 연산을 수행하고 특정 함수를 적용해 그 결과로 새로운 RDD를 생성하는 메서드.
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c"), 3) val rdd2 = sc.parallelize(List(1, 2, 3), 3) val result = rdd1.zipPartitions(rdd2) { (it1, it2) => for { v1 <- it1; v2 <- it2 } yield v1 + v2 } println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c"), 3); JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3), 3); // Java7 JavaRDD<String> rdd3 = rdd1.zipPartitions(rdd2, new FlatMapFunction2<Iterator<String>, Iterator<Integer>, String>() { @Override public Iterator<String> call(Iterator<String> t1, Iterator<Integer> t2) throws Exception { List<String> list = new ArrayList<>(); while (t1.hasNext()) { while (t2.hasNext()) { list.add(t1.next() + t2.next()); } } return list.iterator(); } }); // Java8 Lambda JavaRDD<String> rdd4 = rdd1.zipPartitions(rdd2, (Iterator<String> t1, Iterator<Integer> t2) -> { List<String> list = new ArrayList<>(); t1.forEachRemaining((String s) -> { t2.forEachRemaining((Integer i) -> list.add(s + i)); }); return list.iterator(); }); System.out.println(rdd3.collect());
실행결과
a1, b2, c3
파이썬에서는 사용할 수 없음.
rdd1, rdd2 모두 3개의 파티션으로 구성.
두 RDD의 파티션에서 같은 위치에 있는 것을 연결해서 새로운 결과로 구성된 새로운 RDD를 생성.
RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고 이 그룹으로 구성된 새로운 RDD를 생성.
각 그룹은 키와 그 키에 속한 시퀀스로 구성.
메소드로 전달하는 함수가 각 그룹의 키를 결정하는 역할을 담당.
1에서 10까지 요소로 구성된 RDD를 홀짝으로 "even", "odd"라는 그룹으로 분류하는 예제
스칼라
val rdd = sc.parallelize(1 to 10) val result = rdd.groupBy { case i: Int if (i % 2 == 0) => "even" case _ => "odd" } result.collect.foreach { v => println(s"${v._1}, [${v._2.mkString(",")}]") }
자바
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); // Java7 JavaPairRDD<String, Iterable<Integer>> rdd2 = rdd1.groupBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return (v1 % 2 == 0) ? "even" : "odd"; } }); // Java8 Lambda JavaPairRDD<String, Iterable<Integer>> rdd3 = rdd1.groupBy((Integer v1) -> (v1 % 2 == 0) ? "even" : "odd"); System.out.println(rdd2.collect());
# 파이썬
rdd1 = sc.parallelize(range(1, 10+1))
rdd2 = rdd1.groupBy(lambda v: "even" if v % 2 == 0 else "odd")
for x in rdd2.collect():
print(x[0], list(x[1]))
even [2, 4, 6, 8, 10] odd [1, 3, 5, 7, 9]
이미 RDD의 구성요소가 키와 값으로 쌍으로 이루어진 경우 사용 가능한 메서드.
키를 기준을 같은 키를 가진 요소들로 그룹을 만들고 이 그룹들로 구성된 새로운 RDD를 생성.
키와 그 키에 속한 요소의 시퀀스로 구성됨.
스칼라
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1)) val result = rdd.groupByKey result.collect.foreach { v => println(s"${v._1}, [${v._2.mkString(",")}]") }
자바
List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("c", 1), new Tuple2("b", 1), new Tuple2("c", 1)); JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(data); JavaPairRDD<String, Iterable<Integer>> rdd2 = rdd1.groupByKey(); System.out.println(rdd2.collect());
rdd1 = sc.parallelize(["a", "b", "c", "b", "c"]).map(lambda v: (v, 1))
rdd2 = rdd1.groupByKey()
for x in rdd2.collect():
print(x[0], list(x[1]))
a [1] b [1, 1] c [1, 1]
RDD의 구성요소가 키와 값의 쌍으로 된 경우에만 사용할 수 있는 메서드
같은 키를 갖는 값 요소를 여러 RDD에서 찾아서 키와 그 키에 속하는 요소의 시퀀스로 된 튜플을 새로운 RDD로 생성.
스칼라
val rdd1 = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k1", "v3"))) val rdd2 = sc.parallelize(List(("k1", "v4"))) val result = rdd1.cogroup(rdd2) result.collect.foreach { case (k, (v_1, v_2)) => { println(s"($k, [${v_1.mkString(",")}], [${v_2.mkString(",")}])") } }
자바
List<Tuple2<String, String>> data1 = Arrays.asList(new Tuple2("k1", "v1"), new Tuple2("k2", "v2"), new Tuple2("k1", "v3")); List<Tuple2<String, String>> data2 = Arrays.asList(new Tuple2("k1", "v4")); JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data1); JavaPairRDD<String, String> rdd2 = sc.parallelizePairs(data2); JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> result = rdd1.<String>cogroup(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k1", "v3")])
rdd2 = sc.parallelize([("k1", "v4")])
result = rdd1.cogroup(rdd2)
for x in result.collect():
print(x[0], list(x[1][0]), list(x[1][1]))
k2 ['v2'] [] k1 ['v1', 'v3'] ['v4']
result = rdd2.cogroup(rdd1)
for x in result.collect():
print(x[0], list(x[1][0]), list(x[1][1]))
k2 [] ['v2'] k1 ['v4'] ['v1', 'v3']
rdd1.cogroup(rdd2, rdd3, ...)
Tuple(키, Tuple(rdd1 요소들 집합), Tuple(rdd2 요소들 집합), Tuple(rdd3 요소들 집합)
rdd2.cogroup(rdd1)
Tuple(키, Tuple(rdd2 요소들 집합), Tuple(rdd1 요소들 집합))
집합과 관련된 연산들
중복을 제외한 요소로만 구성된 새로운 RDD를 생성하는 메서드.
스칼라
val rdd = sc.parallelize(List(1, 2, 3, 1, 2, 3, 1, 2, 3)) val result = rdd.distinct() println(result.collect.mkString(", "))
자바
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3)); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("a", "b", "c")); JavaPairRDD<Integer, String> result = rdd1.cartesian(rdd2); System.out.println(result.collect());
# 파이썬
rdd = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 3])
result = rdd.distinct()
print(result.collect())
[1, 2, 3]
두 RDD 요소를 카테시안 곱을 구하고 그 결과를 요소로 하는 새로운 RDD(key, value)를 생성하는 메서드.
스칼라
val rdd1 = sc.parallelize(List(1, 2, 3)) val rdd2 = sc.parallelize(List("a", "b", "c")) val result = rdd1.cartesian(rdd2) println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3)); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("a", "b", "c")); JavaRDD<String> result = rdd1.cartesian(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(["a", "b", "c"])
result = rdd1.cartesian(rdd2)
print(result.collect())
[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]
두 개의 RDD가 있을 때 rdd1.substract(rdd2)는 rdd1에는 속하고, rdd2에는 속하지 않는 요소로 구성된 새로운 RDD를 생성하는 메서드.(차집합)
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e")) val rdd2 = sc.parallelize(List("d", "e")) val result = rdd1.subtract(rdd2) println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e")); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("d", "e")); JavaRDD<String> result = rdd1.subtract(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"])
rdd2 = sc.parallelize(["d", "e"])
result = rdd1.subtract(rdd2)
print(result.collect())
['c', 'b', 'a']
두 개의 RDD가 있을 때 rdd1 또는 rdd2에 속하는 요소로 구성된 새로운 RDD를 생성하는 메서드.
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c")) val rdd2 = sc.parallelize(List("d", "e", "f")) val result = rdd1.union(rdd2) println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c")); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("d", "e", "f")); JavaRDD<String> result = rdd1.union(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c"])
rdd2 = sc.parallelize(["d", "e", "f"])
result = rdd1.union(rdd2)
print(result.collect())
['a', 'b', 'c', 'd', 'e', 'f']
# 파이썬 : 중복해서 원소 표시됨.
rdd1 = sc.parallelize(["a", "b", "c", "d"])
rdd2 = sc.parallelize(["d", "e", "f"])
result = rdd1.union(rdd2)
print(result.collect())
['a', 'b', 'c', 'd', 'd', 'e', 'f']
두 개의 RDD가 있을 때 rdd1, rdd2에 동시에 속하는 요소로 구성된 RDD를 생성하는 메서드. (교집합)
결과로 생성되는 RDD에는 중복된 원소가 존재하지 않음.
스칼라
val rdd1 = sc.parallelize(List("a", "a", "b", "c")) val rdd2 = sc.parallelize(List("a", "a", "c", "c")) val result = rdd1.intersection(rdd2) println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "a", "b", "c")); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("a", "a", "c", "c")); JavaRDD<String> result = rdd1.intersection(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "a", "b", "c"])
rdd2 = sc.parallelize(["a", "a", "c", "c"])
result = rdd1.intersection(rdd2)
print(result.collect())
['c', 'a']