import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
두 RDD에서 서로 같은 키를 가지고 있는 요소를 모아 그룹을 형성. 이 결과로 구성된 새로운 RDD를 생성.
Tuple(키, Tuple(첫번째 RDD의 요소, 두번째 RDD의 요소)) 형태로 구성.
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c", "d", "e")).map((_, 1)) val rdd2 = sc.parallelize(List("b", "c")).map((_, 2)) val result = rdd1.join(rdd2) println(result.collect.mkString("\n"))
자바
List<Tuple2<String, Integer>> data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("c", 1), new Tuple2("d", 1), new Tuple2("e", 1));
List<Tuple2<String, Integer>> data2 = Arrays.asList(new Tuple2("b", 2), new Tuple2("c", 2));
JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(data2);
JavaPairRDD<String, Tuple2<Integer, Integer>> result = rdd1.<Integer>join(rdd2);
System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1), 1)
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2), 1)
result = rdd1.join(rdd2)
print(result.collect())
[('c', (1, 2)), ('b', (1, 2))]
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))
result = rdd2.join(rdd1)
print(result.collect())
[('c', (2, 1)), ('b', (2, 1))]
왼쪽 외부 조인과 오른쪽 외부 조인을 수행하고, 그 결과로 구성된 새로운 RDD를 돌려줌.
catesian() 메서드의 실행 결과에는 나타나지 않았던 요소들이 포함.
스칼라
val rdd1 = sc.parallelize(List("a", "b", "c")).map((_, 1)) val rdd2 = sc.parallelize(List("b", "c")).map((_, 2)) val result1 = rdd1.leftOuterJoin(rdd2) val result2 = rdd1.rightOuterJoin(rdd2) println("Left: " + result1.collect.mkString("\t")) println("Right: " + result2.collect.mkString("\t"))
자바
List<Tuple2<String, Integer>> data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", "1"), new Tuple2("c", "1")); List<Tuple2<String, Integer>> data2 = Arrays.asList(new Tuple2("b", 2), new Tuple2("c", "2")); JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(data1); JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(data2); JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> result1 = rdd1.<Integer>leftOuterJoin(rdd2); JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> result2 = rdd1.<Integer>rightOuterJoin(rdd2); System.out.println("Left: " + result1.collect()); System.out.println("Right: " + result2.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b", "c"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))
result1 = rdd1.leftOuterJoin(rdd2)
result2 = rdd1.rightOuterJoin(rdd2)
print("Left: %s" % result1.collect())
print("Right: %s" % result2.collect())
Left: [('a', (1, None)), ('c', (1, 2)), ('b', (1, 2))] Right: [('c', (1, 2)), ('b', (1, 2))]
rdd1.subtractByKey(rdd2)는 rdd1의 요소 중에서 rdd2에 같은 키가 존재하는 요소를 제외한 나머지로 구성된 새로운 RDD를 돌려줌.
스칼라
val rdd1 = sc.parallelize(List("a", "b")).map((_, 1)) val rdd2 = sc.parallelize(List("b")).map((_, 2)) val result = rdd1.subtractByKey(rdd2) println(result.collect.mkString("\n"))
자바
List<Tuple2<String, Integer>> data1 = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1)); List<Tuple2<String, Integer>> data2 = Arrays.asList(new Tuple2("b", 2)); JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(data1); JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(data2); JavaPairRDD<String, Integer> result = rdd1.subtractByKey(rdd2); System.out.println(result.collect());
# 파이썬
rdd1 = sc.parallelize(["a", "b"]).map(lambda v: (v, 1))
rdd2 = sc.parallelize(["b"]).map(lambda v: (v, 2))
result = rdd1.subtractByKey(rdd2)
print(result.collect())
[('a', 1)]
같은 키를 가진 값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성.
두 개의 값을 하나로 합치는 함수를 인자로 전달 받으며 이 함수는 결합법칙과 교환법칙이 성립해야 함.
스칼라
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1)) val result = rdd.reduceByKey(_ + _) println(result.collect.mkString(","))
자바
List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("b", 1)); JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data); // Java7 JavaPairRDD<String, Integer> result = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // Java8 Lambda JavaPairRDD<String, Integer> result2 = rdd.reduceByKey((Integer v1, Integer v2) -> v1 + v2); System.out.println(result.collect());
# 파이썬
rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))
result = rdd.reduceByKey(lambda v1, v2: v1 + v2)
print(result.collect())
[('b', 2), ('a', 1)]
reduceByKey와 유사하게 같은 키를 가진 값들을 하나로 병합해 키-값 쌍으로 구성된 새로운 RDD를 생성.
초기값을 메서드의 인자로 전달해서 병합 시 사용가능.
각 단위 병합 단계에서 결과에 영향이 없는 값을 초기값으로 사용하므로 이 함수는 교환법칙만 만족하면 사용가능.
스칼라
val rdd = sc.parallelize(List("a", "b", "b")).map((_, 1)) val result = rdd.foldByKey(0)(_ + _) println(result.collect.mkString(","))
자바
List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2("a", 1), new Tuple2("b", 1), new Tuple2("b", 1)); JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data); // Java7 JavaPairRDD<String, Integer> result = rdd.foldByKey(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // Java8 Lambda JavaPairRDD<String, Integer> result2 = rdd.foldByKey(0, (Integer v1, Integer v2) -> v1 + v2); System.out.println(result.collect());
# 파이썬
rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))
result = rdd.foldByKey(0, lambda v1, v2: v1 + v2)
print(result.collect())
[('b', 2), ('a', 1)]
같은 키를 가진 값들을 하나로 병합하는 기능을 수행. 병합을 수행하는 과정에서 값의 타입이 바뀔 수 있음.
기존 병합 함수의 스칼라 API
def reduceByKey(func: (V, V) => V): RDD[(K, V)] def foldByKey(zerovalue: V)(func: (V, V) => V): RDD[(K, V)]
combineByKey 스칼라 API, C는 클래스
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V), mergeCombiners: (C, C) => C): RDD[(K, C)]
인자로 지정된 3개의 함수 의미
스칼라
case class Record(var amount: Long, var number: Long = 1) { def map(v: Long) = Record(v) def add(amount: Long): Record = { add(map(amount)) } def add(other: Record): Record = { this.number += other.number this.amount += other.amount this } override def toString: String = s"avg:${amount / number}" } // combineByKey()를 이용한 평균값 계산 val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L)) val rdd = sc.parallelize(data) val createCombiner = (v: Long) => Record(v) val mergeValue = (c: Record, v: Long) => c.add(v) val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2) val result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners) println(result.collect.mkString(",\t"))
자바
List<Tuple2<String, Long>> data = Arrays.asList(new Tuple2("Math", 100L), new Tuple2("Eng", 80L), new Tuple2("Math", 50L), new Tuple2("Eng", 70L), new Tuple2("Eng", 90L)); JavaPairRDD<String, Long> rdd = sc.parallelizePairs(data); // Java7 Function<Long, Record> createCombiner = new Function<Long, Record>() { @Override public Record call(Long v) throws Exception { return new Record(v); } }; Function2<Record, Long, Record> mergeValue = new Function2<Record, Long, Record>() { @Override public Record call(Record record, Long v) throws Exception { return record.add(v); } }; Function2<Record, Record, Record> mergeCombiners = new Function2<Record, Record, Record>() { @Override public Record call(Record r1, Record r2) throws Exception { return r1.add(r2); } }; JavaPairRDD<String, Record> result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners); // Java8 JavaPairRDD<String, Record> result2 = rdd.combineByKey((Long v) -> new Record(v), (Record record, Long v) -> record.add(v), (Record r1, Record r2) -> r1.add(r2)); System.out.println(result.collect());
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
# https://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute
record_py = """class Record:
def __init__(self, amount, number=1):
self.amount = amount
self.number = number
def addAmt(self, amount):
return Record(self.amount + amount, self.number + 1)
def __add__(self, other):
amount = self.amount + other.amount
number = self.number + other.number
return Record(amount, number)
def __str__(self):
return "avg:" + str(self.amount / self.number)
def __repr__(self):
return 'Record(%r, %r)' % (self.amount, self.number)
"""
src = """import pyspark
from record import *
sc = pyspark.SparkContext()
def createCombiner(v):
return Record(v)
def mergeValue(c, v):
return c.addAmt(v)
def mergeCombiners(c1, c2):
return c1 + c2
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
result = rdd.combineByKey(lambda v: createCombiner(v), lambda c, v: mergeValue(c, v),
lambda c1, c2: mergeCombiners(c1, c2))
print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])
"""
with open('record.py', 'w') as f:
f.write(record_py)
with open('job.py', 'w') as f:
f.write(src)
%%bash
$SPARK_HOME/bin/spark-submit --py-files record.py job.py 2>/dev/null
Math avg:75.0 Eng avg:80.0
import os
#os.remove('record.py')
os.remove('job.py')
combineByKey()의 특수한 경우로 초기값을 생성하는 부분을 제외하면 동일한 동작을 수행.
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
seqOp은 mergeValue(), combOp은 mergeCombiner()와 서로 같은 역할을 하는 함수.
병합에 필요한 초기값을 알 기 위해 zeroValue라는 "값"을 사용함.
스칼라
val data = Seq(("Math", 100L), ("Eng", 80L), ("Math", 50L), ("Eng", 70L), ("Eng", 90L)) val rdd = sc.parallelize(data) val zero = Record(0, 0) val mergeValue = (c: Record, v: Long) => c.add(v) val mergeCombiners = (c1: Record, c2: Record) => c1.add(c2) val result = rdd.aggregateByKey(zero)(mergeValue, mergeCombiners) println(result.collect.mkString(",\t"))
자바
List<Tuple2<String, Long>> data = Arrays.asList(new Tuple2("Math", 100L), new Tuple2("Eng", 80L), new Tuple2("Math", 50L), new Tuple2("Eng", 70L), new Tuple2("Eng", 90L)); JavaPairRDD<String, Long> rdd = sc.parallelizePairs(data); // Java7 Record zero = new Record(0, 0); Function2<Record, Long, Record> mergeValue = new Function2<Record, Long, Record>() { @Override public Record call(Record record, Long v) throws Exception { return record.add(v); } }; Function2<Record, Record, Record> mergeCombiners = new Function2<Record, Record, Record>() { @Override public Record call(Record r1, Record r2) throws Exception { return r1.add(r2); } }; JavaPairRDD<String, Record> result = rdd.aggregateByKey(zero, mergeValue, mergeCombiners); // Java8 JavaPairRDD<String, Record> result2 = rdd.aggregateByKey(zero, (Record record, Long v) -> record.add(v), (Record r1, Record r2) -> r1.add(r2)); System.out.println(result.collect());
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
src = """import pyspark
from record import *
sc = pyspark.SparkContext()
def mergeValue(c, v):
return c.addAmt(v)
def mergeCombiners(c1, c2):
return c1 + c2
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
result = rdd.aggregateByKey(Record(0, 0), lambda c, v: mergeValue(c, v), lambda c1, c2: mergeCombiners(c1, c2))
print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])
"""
with open('job.py', 'w') as f:
f.write(src)
%%bash
$SPARK_HOME/bin/spark-submit --py-files record.py job.py 2>/dev/null
Math avg:75.0 Eng avg:80.0
os.remove('record.py')
os.remove('job.py')
pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스 활용 가능.
cut 유틸리티를 이용해 문자열 분리 후 첫 번째와 세 번째 숫자를 뽑아내는 예제
스칼라
val rdd = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9")) val result = rdd.pipe("cut -f 1,3 -d ,") println(result.collect.mkString(", "))
자바
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("1,2,3", "4,5,6", "7,8,9")); JavaRDD<String> result = rdd.pipe("cut -f 1,3 -d ,"); System.out.println(result.collect());
# 파이썬
rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
result = rdd.pipe("cut -f 1,3 -d ,")
print(result.collect())
['1,3', '4,6', '7,9']
repartition() : 파티션 수를 늘리거나 줄이는 것 모두 가능. 셔플을 기반으로 동작. 파티션을 늘릴 경우 주로 사용.
coalesce() : 줄이는 것만 가능. 셔플옵션 없으면 셔플 안함. 파티션을 줄일 경우 주로 사용.
스칼라
val rdd1 = sc.parallelize(1 to 1000000, 10) val rdd2 = rdd1.coalesce(5) val rdd3 = rdd2.repartition(10); println(s"partition size: ${rdd1.getNumPartitions}") println(s"partition size: ${rdd2.getNumPartitions}") println(s"partition size: ${rdd3.getNumPartitions}")
자바
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0), 10); JavaRDD<Integer> rdd2 = rdd1.coalesce(5); JavaRDD<Integer> rdd3 = rdd2.coalesce(10); System.out.println("partition size:" + rdd1.getNumPartitions()); System.out.println("partition size:" + rdd2.getNumPartitions()); System.out.println("partition size:" + rdd3.getNumPartitions());
# 파이썬
rdd1 = sc.parallelize(list(range(1, 11)), 10)
rdd2 = rdd1.coalesce(5)
rdd3 = rdd2.repartition(10)
print("partition size: %d" % rdd1.getNumPartitions())
print("partition size: %d" % rdd2.getNumPartitions())
print("partition size: %d" % rdd3.getNumPartitions())
partition size: 10 partition size: 5 partition size: 10
RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 이 결과로 새로운 RDD를 생성해 주는 메서드.
데이터가 키와 값 쌍으로 구성돼 있어야 함.
파티셔너 : 각 데이터의 키 값을 이용해 데이터가 속할 파티션을 결정. 이 때 키 값을 이용한 정렬도 함께 수행.
10개의 무작위 숫자를 3개의 파티션으로 분리해 보는 예제
스칼라
val r = scala.util.Random val data = for (i <- 1 to 10) yield (r.nextInt(100), "-") val rdd1 = sc.parallelize(data) val rdd2 = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3)) rdd2.foreachPartition(it => { println("=========="); it.foreach(v => println(v)) })
자바
List<Integer> data = fillToNRandom(10); JavaPairRDD<Integer, String> rdd1 = sc.parallelize(data).mapToPair((Integer v) -> new Tuple2(v, "-")); JavaPairRDD<Integer, String> rdd2 = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(3)); rdd2.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, String>>>() { @Override public void call(Iterator<Tuple2<Integer, String>> it) throws Exception { System.out.println("=========="); while (it.hasNext()) { System.out.println(it.next()); } } });
# 파이썬
import random
data = [random.randrange(1, 100) for i in range(0, 10)]
rdd1 = sc.parallelize(data).map(lambda v: (v, "-"))
rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x: x)
# 결과 검증
# rdd2.foreachPartition(lambda values: print(list(values)))
rdd2.collect()
[(6, '-'), (27, '-'), (54, '-'), (93, '-'), (10, '-'), (22, '-'), (41, '-'), (86, '-'), (86, '-'), (98, '-')]
foreachPartition() 메서드는 RDD의 파티션 단위로 특정 함수를 실행해 주는 메서드.
결과 : 각 기 값에 따라 파티션이 분리되고 동시에 키 값에 따라 정렬.
RDD 구성요소가 키와 값의 쌍으로 구성된 경우 사용할 수 있는 메서드.
Partitioner 클래스의 인스턴스를 인자로 전달.
HashPartitioner, RangePartitioner 두 종류 존재. 파티션 생성 기준을 변경하고 싶으면 Partitioner 클래스를 상속.
스칼라
val rdd1 = sc.parallelize(List("apple", "mouse", "monitor"), 5).map { a => (a, a.length) } val rdd2 = rdd1.partitionBy(new HashPartitioner(3)) println(s"rdd1:${rdd1.getNumPartitions}, rdd2:${rdd2.getNumPartitions}")
자바
List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2("apple", 1), new Tuple2("mouse", 1), new Tuple2("monitor", 1)); JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(data, 5); JavaPairRDD<String, Integer> rdd2 = rdd1.partitionBy(new HashPartitioner(3)); System.out.println("rdd1:" + rdd1.getNumPartitions() + ", rdd2:" + rdd2.getNumPartitions());
# 파이썬
rdd1 = sc.parallelize([("apple", 1), ("mouse", 1), ("monitor", 1)], 5)
rdd2 = rdd1.partitionBy(3)
print("rdd1: %d, rdd2: %d" % (rdd1.getNumPartitions(), rdd2.getNumPartitions()))
rdd1: 5, rdd2: 3
RDD의 원하는 요소만 남기고 원하지 않는 요소를 걸러내는 메서드.
참 거짓으로 가려내는 함수를 RDD의 각 요소에 적용해 결과가 참인 것만 남김.
스칼라
val rdd = sc.parallelize(1 to 5) val result = rdd.filter(_ > 2) println(result.collect.mkString(", "))
자바
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaRDD<Integer> result = rdd.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { return v1 > 2; } }); System.out.println(result.collect());
rdd1 = sc.parallelize(range(1, 5+1))
rdd2 = rdd1.filter(lambda i: i > 2)
print(rdd2.collect())
[3, 4, 5]
sortByKey()는 키 값을 기준으로 요소를 정렬하는 연산.
키 값을 기준으로 정렬하기 때문에 모든 요소가 키와 값 형태도 구성돼 있어야 함.
스칼라
val rdd = sc.parallelize(List("q", "z", "a")) val result = rdd.map((_, 1)).sortByKey() println(result.collect.mkString(", "))
자바
List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2("q", 1), new Tuple2("z", 1), new Tuple2("a", 1)); JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data); JavaPairRDD<String, Integer> result = rdd.sortByKey(); System.out.println(result.collect());
# 파이썬
rdd = sc.parallelize([("q", 1), ("z", 1), ("a", 1)])
result = rdd.sortByKey()
print(result.collect())
[('a', 1), ('q', 1), ('z', 1)]
키와 값 쌍으로 구성된 경우에 사용할 수 있는 메서드
keys() : 키에 해당하는 요소로 구성된 RDD를 생성
values() : 값에 해당하는 요소로 구성된 RDD를 리턴
스칼라
val rdd = sc.parallelize(List(("k1", "v1"), ("k2", "v2"), ("k3", "v3")) println(rdd.keys.collect.mkString(",")) println(rdd.values.collect.mkString(","))
자바
List<Tuple2<String, String>> data = Arrays.asList(new Tuple2("k1", "v1"), new Tuple2("k2", "v2"), new Tuple2("k3", "v3")); JavaPairRDD<String, String> rdd = sc.parallelizePairs(data); System.out.println(rdd.keys().collect()); System.out.println(rdd.values().collect());
# 파이썬
rdd = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k3", "v3")])
print(rdd.keys().collect())
print(rdd.values().collect())
['k1', 'k2', 'k3'] ['v1', 'v2', 'v3']
샘플을 추출해 새로운 RDD를 생성.
sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
withReplacement : 복원 추출을 수행할지 여부를 정하는 것. True면 복원, False면 비복원.
fraction
복원 추출 : 각 요소의 평균 발생 회수. 반드시 0 이상이여야 함.
비복원 추출 : 각 요소가 샘플에 포함될 확률. 0과 1 사이의 값으로 지정.
seed : 반복 시행 시 결과가 바뀌지 않고 일정한 값이 나오도록 제어하는 목적.
스칼라
val rdd = sc.parallelize(1 to 100) val result1 = rdd.sample(false, 0.5) val result2 = rdd.sample(true, 1.5) println(result1.take(5).mkString(",")) println(result2.take(5).mkString(","))
자바
List<Integer> data = fillToN(100); JavaRDD<Integer> rdd = sc.parallelize(data); JavaRDD<Integer> result1 = rdd.sample(false, 0.5); JavaRDD<Integer> result2 = rdd.sample(true, 1.5); System.out.println(result1.take(5)); System.out.println(result2.take(5));
# 파이썬
rdd = sc.parallelize(range(1,100+1))
result1 = rdd.sample(False, 0.5)
result2 = rdd.sample(True, 1.5)
print(result1.take(5))
print(result2.take(5))
[2, 3, 6, 7, 8] [2, 3, 3, 3, 3]
결과값이 정수, 리스트, 맵 등 RDD가 아닌 타입인 경우
트렌스포메이션 : 결과값이 RDD인 메서드. 느긋한 평가(lazy evaluation, 지연계산) 방식을 채택.
액션 메서드를 여러번 호출하면 트렌스포메이션 메서드도 여러번 실행됨.
RDD 요소 가운데 첫 번째 요소 하나를 돌려줌.
결과를 빠르게 확인하는 용도로 활용
스칼라
val rdd = sc.parallelize(List(5, 4, 1)) val result = rdd.first println(result)
자바
List<Integer> data = Arrays.asList(5, 4, 1); JavaRDD<Integer> rdd = sc.parallelize(data); int result = rdd.first(); System.out.println(result);
# 파이썬
rdd = sc.parallelize([5, 4, 1])
result = rdd.first();
print(result)
5
첫 번째 요소로부터 순서대로 n개를 추출해서 되돌려주는 메서드
스칼라
val rdd = sc.parallelize(1 to 20, 5) val result = rdd.take(5) println(result.mkString(", "))
자바
List<Integer> data = fillToN(100); JavaRDD<Integer> rdd = sc.parallelize(data); List<Integer> result = rdd.take(5); System.out.println(result);
# 파이썬
rdd = sc.parallelize(range(1, 100+1))
result = rdd.take(5)
print(result)
[1, 2, 3, 4, 5]
RDD 요소 가운데 지정된 크기의 샘플을 추출하는 메서드.
결과 타입이 배열이나 리스트같은 컬렉션 타입. 샘플의 크기가 너무 크면 메모리 오류날 수 있음.
스칼라
val rdd = sc.parallelize(1 to 100) val result = rdd.takeSample(false, 20) println(result.length)
자바
List<Integer> data = fillToN(100); JavaRDD<Integer> rdd = sc.parallelize(data); List<Integer> result = rdd.takeSample(false, 20); System.out.println(result.size());
# 파이썬
rdd = sc.parallelize(range(1, 100))
result = rdd.takeSample(False, 20)
print(len(result))
20
collect() : RDD의 모든 요소를 하나의 컬렉션에 담아서 돌려주는 메서드.
count() : RDD의 모든 요소의 개수를 돌려주는 메서드.
RDD에 속하는 각 값들이 나타나는 회수를 구해 맵 형태로 돌려주는 메서드.
스칼라
val rdd = sc.parallelize(List(1, 1, 2, 3, 3)) val result = rdd.countByValue println(result)
자바
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 3)); Map<Integer, Long> result = rdd.countByValue(); System.out.println(result);
# 파이썬
rdd = sc.parallelize([1, 1, 2, 3, 3])
result = rdd.countByValue()
for k, v in result.items():
print(k, v)
1 2 2 1 3 2
RDD에 포함된 임의의 값 두개를 하나로 합치는 함수를 이용해
RDD에 포함된 모든 요소를 하나의 값으로 병합하고 그 결과값을 반환하는 메서드.
reduce() 메서드의 정의
def reduce(f: (T, T) => T): T
각 서버에 흩어져 있는 파티션 단위로 나누어 처리됨.
RDD에 포함된 모든 요소에 대해 교환법칙과 결합법칙이 성립되는 경우에만 사용 가능.
1에서 10까지 숫자로 구성된 RDD의 모든 원소의 합을 reduce()로 구하는 예제
스칼라
val rdd = sc.parallelize(1 to 10, 3) val result = rdd.reduce(_ + _) println(result)
자바
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> rdd = sc.parallelize(data, 3); // Java7 int result = rdd.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // Java8 int result2 = rdd.reduce((Integer v1, Integer v2) -> v1 + v2); System.out.println(result);
# 파이썬
rdd = sc.parallelize(range(1, 11), 3)
result = rdd.reduce(lambda v1, v2: v1 + v2)
print(result)
55
병합 연산의 초기값을 지정해 줄 수 있음.
fold() 메서드의 정의
def fold(zeroValue: T) (op: (T, T) => T)
fold() 메서드에 지정한 초기값은 각 파티션별 부분 병합을 수행할 때마다 사용되기 때문에 여러 번 적용해도 문제가 없는 값을 사용해야 함.
병합 연산이 덧셈이라면 0, 곱셉이라면 1을 사용 가능.
스칼라
val rdd = sc.parallelize(1 to 10, 3) val result = rdd.fold(0)(_ + _) println(result)
자바
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> rdd = sc.parallelize(data, 3); // Java7 int result = rdd.fold(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // Java8 int result2 = rdd.fold(0, (Integer v1, Integer v2) -> v1 + v2); System.out.println(result);
# 파이썬
rdd = sc.parallelize(range(1, 11), 3)
result = rdd.fold(0, lambda v1, v2: v1 + v2)
print(result)
55
reduce()와 fold() 연산을 이용하여 RDD 상품의 총 가격과 총 개수를 구하는 예제.
def reduceVsFold(sc: SparkContext) { // Prod 클래스 선언 case class Prod(var price: Int) { var cnt = 1 } val rdd = sc.parallelize(List(Prod(300), Prod(200), Prod(100)), 10) // reduce val r1 = rdd.reduce((p1, p2) => { p1.price += p2.price p1.cnt += 1 }) println(s"Reduce: (${r1.price}, ${r1.cnt})") // fold val r2 = rdd.fold(Prod(0))((p1, p2) => { p1.price += p2.price p1.cnt += 1 }) println(s"Fold: (${r2.price}, ${r2.cnt})") }
결과
Reduce: (600, 3) Fold: (600, 11)
reduce 연산의 경우 RDD에 포함된 요소만 병합을 수행하므로 파티션에 속하는 원소가 없으면 처리가 수행되지 않음.
fold 연산의 경우 파티션에 속하는 원소가 없어도 초기값으로 지정한 값으로 인해 최초 한 번의 연산이 수행됨.
메서드의 인자 총 3개. 첫 번째는 초기값. 두 번째는 병합함수. 세 번째는 파티션으로 생성된 부분합을 최종적으로 하나로 합치기 위한 병함함수.
aggregate() 메서드의 정의
def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, compOp: (U, U) => U)(implicit arg0:classTag[U]): U
U는 초기값과 같은 타입이어야 함.
첫 번째 인자인 seqOp은 파티션 병합, 두 번째 인자인 compOp은 최종 병합에 사용.
RDD에 포함된 숫자의 평균을 구하는 예제
스칼라
val rdd = sc.parallelize(List(100, 80, 75, 90, 95), 3) val zeroValue = Record(0, 0) val seqOp = (r: Record, v: Int) => r.add(v) val combOp = (r1: Record, r2: Record) => r1 add r2 val result1 = rdd.aggregate(zeroValue)(seqOp, combOp) println(result1.amount / result1.number) // 좀더 간결한 코드 val result2 = rdd.aggregate(Record(0, 0))(_ add _, _ add _) println(result1.amount / result1.number)
자바
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(100, 80, 75, 90, 95), 3); Record zeroValue = new Record(0, 0); // Java7 Function2<Record, Integer, Record> seqOp = new Function2<Record, Integer, Record>() { @Override public Record call(Record r, Integer v) throws Exception { return r.add(v); } }; Function2<Record, Record, Record> combOp = new Function2<Record, Record, Record>() { @Override public Record call(Record r1, Record r2) throws Exception { return r1.add(r2); } }; Record result = rdd.aggregate(zeroValue, seqOp, combOp); // Java8 Function2<Record, Integer, Record> seqOp2 = (Record r, Integer v) -> r.add(v); Function2<Record, Record, Record> combOp2 = (Record r1, Record r2) -> r1.add(r2); Record result2 = rdd.aggregate(zeroValue, seqOp2, combOp2); System.out.println(result);
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
src = """import pyspark
from record import *
sc = pyspark.SparkContext()
def seqOp(r, v):
return r.addAmt(v)
def combOp(r1, r2):
return r1 + r2
rdd = sc.parallelize([100, 80, 75, 90, 95])
result = rdd.aggregate(Record(0, 0), seqOp, combOp)
print(result)
"""
with open('record.py', 'w') as f:
f.write(record_py)
with open('job.py', 'w') as f:
f.write(src)
%%bash
$SPARK_HOME/bin/spark-submit --py-files record.py job.py 2>/dev/null
avg:88.0
os.remove('record.py')
os.remove('job.py')
Record는 이전의 combineByKey()에서 사용했던 클래스와 같음.
피연산자로 지정된 첫 번째 요소 내부의 값에 대해서는 예외적으로 변형을 허락함.
RDD를 구성하는 모든 요소가 double, Long 등 숫자 타입일 경우만 사용가능.
전체 요소의 합을 구함.
스칼라
val rdd = sc.parallelize(1 to 10) val result = rdd.sum println(result)
자바
List<Double> data = Arrays.asList(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d, 10d); JavaDoubleRDD rdd = sc.parallelizeDoubles(data); double result = rdd.sum(); System.out.println(result);
# 파이썬
rdd = sc.parallelize(range(1, 11))
result = rdd.sum()
print(result)
55
foreach()는 RDD의 모든 요소에 특정 함수를 적용하는 메서드.
foreachPartition()는 파티션 단위로 특정 함수를 적용. 결과값을 돌려주지 않음.
스칼라
val rdd = sc.parallelize(1 to 10, 3) rdd.foreach { v => println(s"Value Side Effect: ${v}") } val rdd = sc.parallelize(1 to 10, 3) rdd.foreachPartition(values => { println("Partition Side Effect!!") for (v <- values) println(s"Value Side Effect: ${v}") })
자바
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> rdd = sc.parallelize(data); // Java7 rdd.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println("Value Side Effect: " + t); } }); // Java8 rdd.foreach((Integer t) -> System.out.println("Value Side Effect: " + t)); List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> rdd = sc.parallelize(data, 3); // Java7 rdd.foreachPartition(new VoidFunction<Iterator<Integer>>() { @Override public void call(Iterator<Integer> it) throws Exception { System.out.println("Partition Side Effect!!"); while (it.hasNext()) System.out.println("Value Side Effect: " + it.next()); } }); // Java8 rdd.foreachPartition((Iterator<Integer> it) -> { System.out.println("Partition Side Effect!!"); it.forEachRemaining(v -> System.out.println("Value Side Effect:" + v)); });
import os
def write_file(fn, msg):
with open(fn,'a' if os.path.exists(fn) else 'w') as f:
f.write(msg + "\n")
# 파이썬
def sideEffect(values):
write_file("t.txt", "Partition Side Effect!!")
for v in values:
write_file("t.txt", "Value Side Effect: %s" % v)
rdd = sc.parallelize(range(1, 11), 3)
result = rdd.foreach(lambda v: write_file("t.txt", "Value Side Effect: %s" % v))
result = rdd.foreachPartition(sideEffect)
%%sh
cat t.txt;rm t.txt
Value Side Effect: 1 Value Side Effect: 2 Value Side Effect: 3 Value Side Effect: 7 Value Side Effect: 8 Value Side Effect: 9 Value Side Effect: 10 Value Side Effect: 4 Value Side Effect: 5 Value Side Effect: 6 Partition Side Effect!! Value Side Effect: 1 Value Side Effect: 2 Value Side Effect: 3 Partition Side Effect!! Value Side Effect: 4 Value Side Effect: 5 Value Side Effect: 6 Partition Side Effect!! Value Side Effect: 7 Value Side Effect: 8 Value Side Effect: 9 Value Side Effect: 10
디버깅을 위한 메서드. RDD의 파티션 개수나 의존성 정보 등 세부정보를 알고 싶을 때 사용.
스칼라
val rdd = sc.parallelize(1 to 100, 10) .map(_ * 2).persist .map(_ + 1).coalesce(2) println(rdd.toDebugString)
자바
JavaRDD<Integer> rdd1 = sc.parallelize(fillToN(100), 10); JavaRDD<Integer> rdd2 = rdd1.map((Integer v1) -> v1 * 2); JavaRDD<Integer> rdd3 = rdd2.map((Integer v1) -> v1 * 2); JavaRDD<Integer> rdd4 = rdd3.coalesce(2); System.out.println(rdd4.toDebugString());
# 파이썬
rdd = sc.parallelize(range(1, 101), 10).map(lambda v: v*2).persist().map(lambda v: v+1).coalesce(2)
for line in rdd.toDebugString().decode('ascii').split('\n'):
print(line)
(2) CoalescedRDD[118] at coalesce at NativeMethodAccessorImpl.java:0 [] | PythonRDD[117] at RDD at PythonRDD.scala:48 [] | PythonRDD[116] at RDD at PythonRDD.scala:48 [] | ParallelCollectionRDD[115] at parallelize at PythonRDD.scala:480 []
cache(), persist() : 액션을 수행할 때 불 필요한 재생성 단계를 거치지 않고 원하는 작업을 즉시 실행토록 함.
cache() : RDD 정보를 메모리에 저장. 메모리가 부족하면 저장을 수행하지 않음.
persist() : StorageLevel 옵션을 이용해 저장 위치와 저장 방식(직렬화 여부) 등 지정.
unpersist() : 이미 저장 중인 데이터가 더 필요없을 때 캐시 설정을 취소.
스칼라
val rdd = sc.parallelize(1 to 100, 10) val rdd1 = rdd.cache val rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY)
자바
JavaRDDrdd = sc.parallelize(fillToN(100), 10); rdd.cache(); rdd.persist(StorageLevel.MEMORY_ONLY());
# 파이썬
from pyspark.storagelevel import StorageLevel
rdd = sc.parallelize(range(1, 101), 10)
rdd.cache()
rdd.persist(StorageLevel.MEMORY_ONLY)
print(rdd.persist().is_cached)
True
partitions() : RDD의 파티션 정보가 담긴 Partition 타입 객체 배열을 돌려줌. 파티션 인덱스 정보를 알려주는 index() 메서드 포함.
getNumPartitions() : 파티션 크기를 알아냄.
스칼라
val rdd = sc.parallelize(1 to 1000, 10) println(rdd.partitions.size) println(rdd.getNumPartitions)
자바
JavaRDD<Integer> rdd = sc.parallelize(fillToN(1000), 10); System.out.println(rdd.partitions().size()); System.out.println(rdd.getNumPartitions());
# 파이썬
rdd = sc.parallelize(range(1, 100), 10)
print(rdd.getNumPartitions())
10
하둡 API를 기반으로 다양한 데이터 포멧과 파일 시스템을 지원.
지원 파일 포멧 : JSON, 하둡 시퀀스파일, csv 등
지원 파일 시스템 : 하둡 파일시스템(HDFS), AWS의 S3, 오픈스택의 Swift등
지원 데이터베이스 : MySQL, HBase, 카산드라, Hive등
로컬 파일 시스템에 저장된 텍스트 파일로 RDD 생성.
var rdd = sc.textFile("file:///data/sample.txt")
파일 시스템에 따라 접두사가 바뀜. (예 HDFS hdfs://, S3 s3n://)
지정된 파일은 모든 서버에서 동일하게 접근 가능해야 함. 위의 예제는 모든 서버에 "/data/sample.txt" 경로로 파일 접근 가능해야 함.
textFile 2번째 인자값으로 파티션 크기를 지정 가능. sc.textFile(path, 10) 10개의 파티션으로 구성된 RDD 생성.
RDD의 내용을 텍스트 파일로 저장하고 불러오는 예제.
스칼라
val rdd = sc.parallelize(1 to 1000, 3) val codec = classOf[org.apache.hadoop.io.compress.GzipCodec] // save rdd.saveAsTextFile("<path_to_save>/sub1") // save(gzip) rdd.saveAsTextFile("<path_to_save>/sub2", codec) // load val rdd2 = sc.textFile("<path_to_save>/sub1") println(rdd2.take(10).toList)
자바
JavaRDD<Integer> rdd = sc.parallelize(fillToN(1000), 3); Class codec = org.apache.hadoop.io.compress.GzipCodec.class; // save rdd.saveAsTextFile("<path_to_save>/sub1"); // save(gzip) rdd.saveAsTextFile("<path_to_save>/sub2", codec); // load JavaRDD<String> rdd2 = sc.textFile("<path_to_save>/sub1"); System.out.println(rdd2.take(10));
# 파이썬
rdd = sc.parallelize(range(1, 1000), 3)
codec = "org.apache.hadoop.io.compress.GzipCodec"
# save
rdd.saveAsTextFile("sub1")
# save(gzip)
rdd.saveAsTextFile("sub2", codec)
# load
rdd2 = sc.textFile("sub1")
print(rdd2.take(10))
['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
%%sh
# 디렉터리에 생성
ls -al sub*
sub1: total 56 drwxr-xr-x 10 donglyeolsin staff 340 9 23 23:46 . drwxr-xr-x 19 donglyeolsin staff 646 9 23 23:46 .. -rw-r--r-- 1 donglyeolsin staff 8 9 23 23:46 ._SUCCESS.crc -rw-r--r-- 1 donglyeolsin staff 20 9 23 23:46 .part-00000.crc -rw-r--r-- 1 donglyeolsin staff 20 9 23 23:46 .part-00001.crc -rw-r--r-- 1 donglyeolsin staff 20 9 23 23:46 .part-00002.crc -rw-r--r-- 1 donglyeolsin staff 0 9 23 23:46 _SUCCESS -rw-r--r-- 1 donglyeolsin staff 1224 9 23 23:46 part-00000 -rw-r--r-- 1 donglyeolsin staff 1332 9 23 23:46 part-00001 -rw-r--r-- 1 donglyeolsin staff 1332 9 23 23:46 part-00002 sub2: total 56 drwxr-xr-x 10 donglyeolsin staff 340 9 23 23:46 . drwxr-xr-x 19 donglyeolsin staff 646 9 23 23:46 .. -rw-r--r-- 1 donglyeolsin staff 8 9 23 23:46 ._SUCCESS.crc -rw-r--r-- 1 donglyeolsin staff 16 9 23 23:46 .part-00000.gz.crc -rw-r--r-- 1 donglyeolsin staff 16 9 23 23:46 .part-00001.gz.crc -rw-r--r-- 1 donglyeolsin staff 16 9 23 23:46 .part-00002.gz.crc -rw-r--r-- 1 donglyeolsin staff 0 9 23 23:46 _SUCCESS -rw-r--r-- 1 donglyeolsin staff 596 9 23 23:46 part-00000.gz -rw-r--r-- 1 donglyeolsin staff 536 9 23 23:46 part-00001.gz -rw-r--r-- 1 donglyeolsin staff 536 9 23 23:46 part-00002.gz
%%sh
rm -rf sub1
rm -rf sub2
압축을 사용하려면 sub2방법처럼 압축 코덱 클래스를 사용.
gzip(GzipCodec), Snappy(SnappyCodec), bzip2(BZip2Codec), LZO(Lz4Codec) 등 사용 가능.
part-* 파일이 결과에 해당하는 파일. 여러 개의 파일을 나누면 병렬처리, 오류가 있더라도 전체 작업에 미치는 영향이 줄어둠.
objectFile() : 오브젝트 직렬화 방법을 이용해 RDD를 구성하는 요소를 파일로 읽는 기능을 수행.
saveAsObjectFile() : 직렬화 방법으로 RDD를 구성하는 요소를 파일로 쓰는 기능을 수행.
장점 : 언어 수준의 기본적으로 지원하는 기능을 이용. 간단하게 사용가능.
단점 : 속도가 느리고 변경에 취약함.
오브젝트를 이용해 RDD를 생성 및 저장하는 예제
스칼라
val rdd = sc.parallelize(1 to 1000) // save rdd.saveAsObjectFile("<path_to_save>/sub_path") // load! val rdd2 = sc.objectFile[Int]("<path_to_save>/sub_path") println(rdd2.take(10).mkString(", "))
자바
JavaRDD<Integer> rdd = sc.parallelize(fillToN(1000), 3); // save rdd.saveAsObjectFile("<path_to_save>/sub_path"); // load JavaRDD<Integer> rdd2 = sc.objectFile("<path_to_save>/sub_path"); System.out.println(rdd2.take(10));
# 파이썬
rdd = sc.parallelize(range(1, 1000), 3)
# save
rdd.saveAsPickleFile("object")
# load
rdd2 = sc.pickleFile("object")
print(rdd2.take(10))
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
%%sh
rm -rf object
RDD 포함된 데이터를 오브젝트 파일로 다루려면
키와 값으로 구성된 데이터를 저장하는 이진 파일 포멧. 하둡에서 자주 사용되는 대표적 파일 포맷.
장점 : 대량의 데이터 처리에 적합한 분할 압축 기능. 효율적인 파일 관리.
해당 RDD의 데이터는 하둡의 Writable 인터페이스를 구현해야 함.
시퀀스 파일을 이용해 RDD를 저장하는 예제
스칼라
val rdd = sc.parallelize(List("a", "b", "c", "b", "c")).map((_, 1)) // save rdd.saveAsSequenceFile("data/sample/saveAsSeqFile/scala") // load! val rdd2 = sc.sequenceFile[String, Int]("data/sample/saveAsSeqFile/scala") println(rdd2.collect.mkString(", "))
자바
// 아래 경로는 실제 저장 경로로 변경하여 테스트 String path = "data/sample/saveAsSeqFile/java"; JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "b", "c")); // Writable로 변환 - Java7 JavaPairRDD<Text, LongWritable> rdd2 = rdd1.mapToPair(new PairFunction<String, Text, LongWritable>() { @Override public Tuple2<Text, LongWritable> call(String v) throws Exception { return new Tuple2<Text, LongWritable>(new Text(v), new LongWritable(1)); } }); // Writable로 변환 - Java8 JavaPairRDD<Text, LongWritable> rdd2_1 = rdd1.mapToPair((String v) -> new Tuple2<Text, LongWritable>(new Text(v), new LongWritable(1))); // SequenceFile로 저장 rdd2.saveAsNewAPIHadoopFile(path, Text.class, LongWritable.class, SequenceFileOutputFormat.class); // SequenceFile로 부터 RDD 생성 JavaPairRDD<Text, LongWritable> rdd3 = sc.newAPIHadoopFile(path, SequenceFileInputFormat.class, Text.class, LongWritable.class, new Configuration()); // Writable을 String으로 변환 - Java7 JavaRDD<String> rdd4 = rdd3.map(new Function<Tuple2<Text, LongWritable>, String>() { @Override public String call(Tuple2<Text, LongWritable> v1) throws Exception { return v1._1().toString() + v1._2; } }); // Writable을 String으로 변환 - Java8 JavaRDD<String> rdd4_1 = rdd3.map((Tuple2<Text, LongWritable> v1) -> v1._1().toString()); // 결과 출력 System.out.println(rdd4.collect());
자바의 경우 시퀀스 파일만 다루는 메서드가 제공 안됨.
RDD 데이터를 파일로 저장시 saveAsHadoopFile(), saveAsNewAPIHadoopFile() 메서드 사용 필요.
RDD 데이터를 파일로 불러올 때 hadoopFile(), newAPIHadoopFile 메서드 사용 필요.
Writable로 구성된 RDD는 직접 호출하면 직렬화 오류가 발생할 수 있음.
map() 계열의 메서드를 통해 다른 오브젝트로 변환한 뒤 사용하는 걸 권장.
# 파이썬
path = "sequence"
outputFormatClass = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
inputFormatClass = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
keyClass = "org.apache.hadoop.io.Text"
valueClass = "org.apache.hadoop.io.IntWritable"
conf = "org.apache.hadoop.conf.Configuration"
rdd1 = sc.parallelize(["a", "b", "c", "b", "c"])
rdd2 = rdd1.map(lambda x: (x, 1))
# save
rdd2.saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass, valueClass)
# load
rdd3 = sc.newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass)
for k, v in rdd3.collect():
print(k, v)
a 1 b 1 c 1 b 1 c 1
%%sh
rm -rf sequence
다수의 프로세스가 공유할 수 있는 읽기 자원과 쓰기 자원을 설정.
클러스터 내 모든 서버에서 공유할 수 있는 읽기전용 자원을 설정할 수 있는 변수.
브로드캐스트 변수를 rdd의 필터 메서드에 적용한 예제
스칼라
val broadcastUsers = sc.broadcast(Set("u1", "u2")) val rdd = sc.parallelize(List("u1", "u3", "u3", "u4", "u5", "u6"), 3) val result = rdd.filter(broadcastUsers.value.contains(_)) println(result.collect.mkString(","))
자바
Broadcast<Set<String>> bu = sc.broadcast(new HashSet<String>(Arrays.asList("u1", "u2"))); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("u1", "u3", "u3", "u4", "u5", "u6"), 3); // Java7 JavaRDD<String> result = rdd.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { return bu.value().contains(v1); } }); // Java8 JavaRDD<String> result2 = rdd.filter((String v1) -> bu.value().contains(v1)); System.out.println(result.collect());
bu = sc.broadcast(set(["u1", "u2"]))
rdd = sc.parallelize(["u1", "u3", "u3", "u4", "u5", "u6"], 3)
result = rdd.filter(lambda v: v in bu.value)
print(result.collect())
['u1']
클러스터 내의 모든 서버가 공유하는 쓰기 공간을 제공함.
스파크에서 제공하는 공식 가이드(이전버전)
어큐뮬레이터 생성
var acc = sc.longAccumulator("invalidFormat")
스파크에서는 몇 가지 타입에 대한 어큐뮬레이터를 미리 정의해 놓았음.
longAccumulator : long타입을 저장하기 위한 LongAccumulator를 생성하고 작업 환경에 등록하는 역할을 하는 메서드.
정해진 형식에 맞지 않는 데이터 정보를 어큐뮬레이터로 찾는 예제
스칼라
val acc1 = sc.longAccumulator("invalidFormat") val acc2 = sc.collectionAccumulator[String]("invalidFormat2") val data = List("U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7") sc.parallelize(data, 3).foreach { v => if (v.split(":").length != 2) { acc1.add(1L) acc2.add(v) } } println("잘못된 데이터 수:" + acc1.value) println("잘못된 데이터:" + acc2.value)
자바
LongAccumulator acc1 = jsc.sc().longAccumulator("invalidFormat"); CollectionAccumulator acc2 = jsc.sc().collectionAccumulator("invalidFormat2"); List<String> data = Arrays.asList("U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7"); jsc.parallelize(data, 3).foreach(new VoidFunction<String>() { @Override public void call(String v) throws Exception { if (v.split(":").length != 2) { acc1.add(1L); acc2.add(v); } } }); System.out.println("잘못된 데이터 수:" + acc1.value()); System.out.println("잘못된 데이터:" + acc2.value());
[결과]
잘못된 데이터 수:3
잘못된 데이터:[U3, U5;Addr5, U7::Addr7]
foreach() 메서드를 호출해 규칙에 맞지 않는 문자의 개수와 그 내용을 어큐뮬레이터를 통해 집계.
# 파이썬
def accumulate(v, acc):
if(len(v.split(":")) != 2):
acc.add(1)
acc1 = sc.accumulator(0)
data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7"]
rdd = sc.parallelize(data)
rdd.foreach(lambda v: accumulate(v, acc1))
print(acc1.value)
3
파이썬의 경우 sc.accumulator()를 이용해 원하는 어큐뮬레이터를 생성함.
초기값은 정수, 실수, 복소수 중의 한 타입. 다른 타입이면 스파르가 제공하는 기본 어큐뮬레이터는 사용 불가.
스칼라
class RecordAccumulator extends AccumulatorV2[Record, Long] { private var _record = Record(0) // 초기값 여부 def isZero: Boolean = _record.amount == 0 && _record.number == 1 // 동일한 값을 가진 새로운 어큐뮬레이터 생성 def copy(): AccumulatorV2[Record, Long] = { val newAcc = new RecordAccumulator newAcc._record = Record(_record.amount, _record.number) newAcc } // 어큐뮬레이터에 포함된 데이터 값을 초기화 def reset(): Unit = { _record.amount = 0L _record.number = 1L } // 다른 데이터 병합 def add(other: Record): Unit = { _record.add(other) } // 다른 어큐뮬레이터 병합 def merge(other: AccumulatorV2[Record, Long]): Unit = other match { case o: RecordAccumulator => _record.add(o._record); case _ => throw new RuntimeException } // 최종 결과(AccumulatorV2의 출력 타입과 같아야 함) def value: Long = { _record.amount } }
자체 어큐뮬레이터를 정의하려면 AccumulatorV2 추상클래스를 상속받고 필요한 메서드를 정의해야 함.
isZero() : RecordAccumulator에 포함된 값이 초기값에 해당하는지 여부를 확인. 초기값이면 True 아니면 False.
sc.register() 메서드로 어큐뮬레이터 등록.
val acc = new RecordAccumulator sc.register(acc, "invalidFormat") val data = List("U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7") sc.parallelize(data, 2).foreach { v => if (v.split(":").length != 2) { acc.add(Record(1)) } } println("잘못된 데이터 수:" + acc.value)
오류가 발생했을 때 Record타입의 데이터를 사용한 것이 차이점.
자바를 사용하는 경우 예제
class RecordAccumulator extends AccumulatorV2<Record, Long> { private static final long serialVersionUID = 1L; private Record _record = new Record(0L); @Override public boolean isZero() { return _record.amount == 0L && _record.number == 1L; } @Override public AccumulatorV2<Record, Long> copy() { RecordAccumulator newAcc = new RecordAccumulator(); newAcc._record = new Record(_record.amount, _record.number); return newAcc; } @Override public void reset() { _record.amount = 0L; _record.number = 1L; } @Override public void add(Record other) { _record.add(other); } @Override public void merge(AccumulatorV2<Record, Long> otherAcc) { try { Record other = ((RecordAccumulator) otherAcc)._record; _record.add(other); } catch (Exception e) { throw new RuntimeException(); } } @Override public Long value() { return _record.amount; } }
문자열 오류를 찾아내는 코드
RecordAccumulator acc = new RecordAccumulator(); jsc.sc().register(acc, "invalidFormat"); List<String> data = Arrays.asList("U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7"); jsc.parallelize(data, 3).foreach(new VoidFunction<String>() { @Override public void call(String v) throws Exception { if (v.split(":").length != 2) { acc.add(new Record(1L)); } } }); System.out.println("잘못된 데이터 수:" + acc.value());
# 파이썬 - 다음 소스는 jupyter notebook에서 바로 실행 불가.
accumulator_py="""from pyspark import AccumulatorParam
from record import *
class RecordAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Record(0)
def addInPlace(self, v1, v2):
if (isinstance(v2, Record)):
return v1 + v2
else:
return v1.addAmt(v2)"""
파이썬의 경우 AccumulatorParam을 상속받아 사용.
src = """import pyspark
from record import *
from accumulator import *
sc = pyspark.SparkContext()
def accumulate(v, acc):
if (len(v.split(":")) != 2):
acc.add(1)
acc = sc.accumulator(Record(0), RecordAccumulatorParam())
data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5;Addr5", "U6:Addr6", "U7::Addr7"]
rdd = sc.parallelize(data)
rdd.foreach(lambda v: accumulate(v, acc))
print(acc.value.amount)
"""
with open('record.py', 'w') as f:
f.write(record_py)
with open('accumulator.py', 'w') as f:
f.write(accumulator_py)
with open('job.py', 'w') as f:
f.write(src)
%%bash
python job.py 2>/dev/null
3
os.remove('record.py')
os.remove('accumulator.py')
os.remove('job.py')
파이썬의 경우 sc.accumulator() 메서드로만 사용자 정의 데이터 타입을 위한 어큐뮬레이터 생성.
두 번째 인자로 RecordAccumulatorParam()을 사용함.
어큐뮬레이터를 증가시키는 동작은 클러스터의 모든 데이터 처리 프로세서에서 가능.
읽는 동작은 드라이버 프로그램 내에서만 가능.
특별한 목적이 없는 한 액션 연산을 수행하는 메서드에서만 사용해야 함.
=> 트랜스포메이션 연산에서는 어큐뮬레이터를 여러 번 실행할 수 있어 정확하지 않는 데이터가 수집될 수 있음.
RDD : 스파크에서 다루는 데이터에 대한 추상 모델. 데이터가 누락되거나 유실되지 않게 하는 에러 복구 메커니즘을 갖춤.
데이터 처리 시 자주 사용되는 연산을 수행하는 다양한 API를 제공.
데이터 모델