스파크가 사용하는 핵심 데이터 모델로 다수에 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합
병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성(tolerance)을 가지고 있다.
Spark revolves around the concept of a resilient distributed dataset(RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.
=> 셔플링: 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는 과정.
=> resilient : 회복력 있는
로컬 파일시스템 file:///~ 하둡 hdfs://~
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
# 1. Collection 이용
rdd = sc.parallelize(["a", "b", "c", "d", "e"])
# 2. 파일로부터 생성
# rdd = sc.textFile("<path_to_file>")
# 3. 기존 RDD로 새로운 RDD 생성
rdd1 = rdd.map(lambda s: s.upper())
우지(oozie) : http://oozie.apache.org
여러 개의 하둡 맵리듀스 잡과 피그, 하이브 잡을 스케쥴링하고 연동할 수 있게 지원하는 도구
일련의 작업 흐름을 XML을 사용해 명시적으로 선언해 사용 가능.
DAG(directed acyclic graph)를 구성하며, 이를 이용해 일련의 작성을 수행하면서 데이터 처리를 수행.
스파크
드라이버 프로그램 : Job을 구동하는 프로그램.
SparkContext 객체를 만들고 Job을 실행하고 종료하는 역할을 수행.
드라이버는 RDD의 연산 정보를 DAG 스케쥴러에게 전달.
스케쥴러는 실행 계획을 수립한 후 클러스터 매니저에게 전달.
넓은 의존성(Wide Dependencies) : 부모RDD를 구성하는 파티션이 여러개의 자식 RDD 파티션과 관련있는 경우. 셔플이 많이 발생.
좁은 의존성(Narrow Dependencies) : 부모RDD와 자식RDD가 1:1 관계
새로운 데이터는 일괄 처리 계층과 속도 계층 모두에 전달
일괄 처리 계층 : 일정 주기마다 일괄적으로 가공해서 배치 뷰를 생성. 뷰 : 결과 데이터
속도 계층 : 데이터를 즉시 처리해 실시간 뷰를 생성
서빙 계층 : 실시간 뷰와 배치 뷰의 결과를 적절히 조합해 사용자에게 데이터를 전달.
스파크 배포판 글자세기 예제
%%sh
cd ${SPARK_HOME}
./bin/run-example JavaWordCount README.md
package: 1 For: 3 Programs: 1 processing.: 1 Because: 1 The: 1 page](http://spark.apache.org/documentation.html).: 1 cluster.: 1 its: 1 [run: 1 than: 1 APIs: 1 have: 1 Try: 1 computation: 1 through: 1 several: 1 This: 2 graph: 1 Hive: 2 storage: 1 ["Specifying: 1 To: 2 "yarn": 1 Once: 1 ["Useful: 1 prefer: 1 SparkPi: 2 engine: 1 version: 1 file: 1 documentation,: 1 processing,: 1 the: 24 are: 1 systems.: 1 params: 1 not: 1 different: 1 refer: 2 Interactive: 2 R,: 1 given.: 1 if: 4 build: 4 when: 1 be: 2 Tests: 1 Apache: 1 thread: 1 programs,: 1 including: 4 ./bin/run-example: 2 Spark.: 1 package.: 1 1000).count(): 1 Versions: 1 HDFS: 1 Data.: 1 >>>: 1 Maven: 1 programming: 1 Testing: 1 module,: 1 Streaming: 1 environment: 1 run:: 1 Developer: 1 clean: 1 1000:: 2 rich: 1 GraphX: 1 Please: 4 is: 6 guide](http://spark.apache.org/contributing.html): 1 run: 7 URL,: 1 threads.: 1 same: 1 MASTER=spark://host:7077: 1 on: 7 built: 1 against: 1 [Apache: 1 tests: 2 examples: 2 at: 2 optimized: 1 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).: 1 usage: 1 development: 1 Maven,: 1 graphs: 1 talk: 1 Shell: 2 class: 2 abbreviated: 1 using: 5 directory.: 1 README: 1 computing: 1 overview: 1 `examples`: 2 example:: 1 ##: 9 N: 1 set: 2 use: 3 Hadoop-supported: 1 running: 1 find: 1 contains: 1 project: 1 Pi: 1 need: 1 or: 3 Big: 1 high-level: 1 Java,: 1 uses: 1 <class>: 1 Hadoop,: 2 available: 1 requires: 1 (You: 1 more: 1 see: 3 Documentation: 1 of: 5 tools: 1 using:: 1 cluster: 2 must: 1 supports: 2 built,: 1 tests](http://spark.apache.org/developer-tools.html#individual-tests).: 1 system: 1 build/mvn: 1 Hadoop: 3 this: 1 Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version): 1 particular: 2 Python: 2 Spark: 16 general: 3 YARN,: 1 pre-built: 1 [Configuration: 1 locally: 2 library: 1 A: 1 locally.: 1 sc.parallelize(1: 1 only: 1 Configuration: 1 following: 2 basic: 1 #: 1 changed: 1 More: 1 which: 2 learning,: 1 first: 1 ./bin/pyspark: 1 also: 4 info: 1 should: 2 for: 12 [params]`.: 1 documentation: 3 [project: 1 mesos://: 1 Maven](http://maven.apache.org/).: 1 setup: 1 <http://spark.apache.org/>: 1 latest: 1 your: 1 MASTER: 1 example: 3 ["Parallel: 1 scala>: 1 DataFrames,: 1 provides: 1 configure: 1 distributions.: 1 can: 7 About: 1 instructions.: 1 do: 2 easiest: 1 no: 1 project.: 1 how: 3 `./bin/run-example: 1 started: 1 Note: 1 by: 1 individual: 1 spark://: 1 It: 2 tips,: 1 Scala: 2 Alternatively,: 1 an: 4 variable: 1 submit: 1 -T: 1 machine: 1 thread,: 1 them,: 1 detailed: 2 stream: 1 And: 1 distribution: 1 review: 1 return: 2 Thriftserver: 1 developing: 1 ./bin/spark-shell: 1 "local": 1 start: 1 You: 4 Spark](#building-spark).: 1 one: 3 help: 1 with: 4 print: 1 Spark"](http://spark.apache.org/docs/latest/building-spark.html).: 1 data: 1 Contributing: 1 in: 6 -DskipTests: 1 downloaded: 1 versions: 1 online: 1 Guide](http://spark.apache.org/docs/latest/configuration.html): 1 builds: 1 comes: 1 Tools"](http://spark.apache.org/developer-tools.html).: 1 [building: 1 Python,: 2 Many: 1 building: 2 Running: 1 from: 1 way: 1 Online: 1 site,: 1 other: 1 Example: 1 [Contribution: 1 analysis.: 1 sc.parallelize(range(1000)).count(): 1 you: 4 runs.: 1 Building: 1 higher-level: 1 protocols: 1 guidance: 2 a: 8 guide,: 1 name: 1 fast: 1 SQL: 2 that: 2 will: 1 IDE,: 1 to: 17 get: 1 : 71 information: 1 core: 1 web: 1 "local[N]": 1 programs: 2 option: 1 MLlib: 1 ["Building: 1 contributing: 1 shell:: 2 instance:: 1 Scala,: 1 and: 9 command,: 2 package.): 1 ./dev/run-tests: 1 sample: 1
17/08/26 21:41:30 INFO SparkContext: Running Spark version 2.2.0 17/08/26 21:41:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/08/26 21:41:31 INFO SparkContext: Submitted application: JavaWordCount 17/08/26 21:41:31 INFO SecurityManager: Changing view acls to: donglyeolsin 17/08/26 21:41:31 INFO SecurityManager: Changing modify acls to: donglyeolsin 17/08/26 21:41:31 INFO SecurityManager: Changing view acls groups to: 17/08/26 21:41:31 INFO SecurityManager: Changing modify acls groups to: 17/08/26 21:41:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(donglyeolsin); groups with view permissions: Set(); users with modify permissions: Set(donglyeolsin); groups with modify permissions: Set() 17/08/26 21:41:31 INFO Utils: Successfully started service 'sparkDriver' on port 65233. 17/08/26 21:41:31 INFO SparkEnv: Registering MapOutputTracker 17/08/26 21:41:31 INFO SparkEnv: Registering BlockManagerMaster 17/08/26 21:41:31 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/08/26 21:41:31 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/08/26 21:41:31 INFO DiskBlockManager: Created local directory at /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/blockmgr-ea10087e-bf83-4c7a-81fd-415b57b656b7 17/08/26 21:41:32 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 17/08/26 21:41:32 INFO SparkEnv: Registering OutputCommitCoordinator 17/08/26 21:41:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 17/08/26 21:41:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 17/08/26 21:41:32 INFO Utils: Successfully started service 'SparkUI' on port 4042. 17/08/26 21:41:32 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.2:4042 17/08/26 21:41:32 INFO SparkContext: Added JAR file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar at spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar with timestamp 1503751292886 17/08/26 21:41:32 INFO SparkContext: Added JAR file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar at spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar with timestamp 1503751292887 17/08/26 21:41:33 INFO Executor: Starting executor ID driver on host localhost 17/08/26 21:41:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65250. 17/08/26 21:41:33 INFO NettyBlockTransferService: Server created on 192.168.1.2:65250 17/08/26 21:41:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/08/26 21:41:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.2, 65250, None) 17/08/26 21:41:33 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.2:65250 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.2, 65250, None) 17/08/26 21:41:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.2, 65250, None) 17/08/26 21:41:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.2, 65250, None) 17/08/26 21:41:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/spark-warehouse/'). 17/08/26 21:41:33 INFO SharedState: Warehouse path is 'file:/Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/spark-warehouse/'. 17/08/26 21:41:35 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 17/08/26 21:41:38 INFO FileSourceStrategy: Pruning directories with: 17/08/26 21:41:38 INFO FileSourceStrategy: Post-Scan Filters: 17/08/26 21:41:38 INFO FileSourceStrategy: Output Data Schema: struct<value: string> 17/08/26 21:41:38 INFO FileSourceScanExec: Pushed Filters: 17/08/26 21:41:38 INFO CodeGenerator: Code generated in 422.378204 ms 17/08/26 21:41:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 277.3 KB, free 366.0 MB) 17/08/26 21:41:39 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.4 KB, free 366.0 MB) 17/08/26 21:41:39 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:65250 (size: 23.4 KB, free: 366.3 MB) 17/08/26 21:41:39 INFO SparkContext: Created broadcast 0 from javaRDD at JavaWordCount.java:45 17/08/26 21:41:39 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes. 17/08/26 21:41:39 INFO SparkContext: Starting job: collect at JavaWordCount.java:53 17/08/26 21:41:39 INFO DAGScheduler: Registering RDD 5 (mapToPair at JavaWordCount.java:49) 17/08/26 21:41:39 INFO DAGScheduler: Got job 0 (collect at JavaWordCount.java:53) with 1 output partitions 17/08/26 21:41:39 INFO DAGScheduler: Final stage: ResultStage 1 (collect at JavaWordCount.java:53) 17/08/26 21:41:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 17/08/26 21:41:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 17/08/26 21:41:39 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at mapToPair at JavaWordCount.java:49), which has no missing parents 17/08/26 21:41:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 12.5 KB, free 366.0 MB) 17/08/26 21:41:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.5 KB, free 366.0 MB) 17/08/26 21:41:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.2:65250 (size: 6.5 KB, free: 366.3 MB) 17/08/26 21:41:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 17/08/26 21:41:39 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at mapToPair at JavaWordCount.java:49) (first 15 tasks are for partitions Vector(0)) 17/08/26 21:41:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 17/08/26 21:41:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5288 bytes) 17/08/26 21:41:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/08/26 21:41:39 INFO Executor: Fetching spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar with timestamp 1503751292886 17/08/26 21:41:39 INFO TransportClientFactory: Successfully created connection to /192.168.1.2:65233 after 48 ms (0 ms spent in bootstraps) 17/08/26 21:41:39 INFO Utils: Fetching spark://192.168.1.2:65233/jars/scopt_2.11-3.3.0.jar to /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/fetchFileTemp2991879884693235060.tmp 17/08/26 21:41:39 INFO Executor: Adding file:/private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/scopt_2.11-3.3.0.jar to class loader 17/08/26 21:41:39 INFO Executor: Fetching spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar with timestamp 1503751292887 17/08/26 21:41:39 INFO Utils: Fetching spark://192.168.1.2:65233/jars/spark-examples_2.11-2.2.0.jar to /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/fetchFileTemp8238211247047707553.tmp 17/08/26 21:41:39 INFO Executor: Adding file:/private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901/userFiles-2d31a0f7-d378-4ac8-8fa0-17eb63be7fc4/spark-examples_2.11-2.2.0.jar to class loader 17/08/26 21:41:40 INFO CodeGenerator: Code generated in 20.518002 ms 17/08/26 21:41:40 INFO FileScanRDD: Reading File path: file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md, range: 0-3809, partition values: [empty row] 17/08/26 21:41:40 INFO CodeGenerator: Code generated in 16.808836 ms 17/08/26 21:41:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1795 bytes result sent to driver 17/08/26 21:41:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 995 ms on localhost (executor driver) (1/1) 17/08/26 21:41:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/08/26 21:41:40 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at JavaWordCount.java:49) finished in 1.024 s 17/08/26 21:41:40 INFO DAGScheduler: looking for newly runnable stages 17/08/26 21:41:40 INFO DAGScheduler: running: Set() 17/08/26 21:41:40 INFO DAGScheduler: waiting: Set(ResultStage 1) 17/08/26 21:41:40 INFO DAGScheduler: failed: Set() 17/08/26 21:41:40 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[6] at reduceByKey at JavaWordCount.java:51), which has no missing parents 17/08/26 21:41:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.7 KB, free 366.0 MB) 17/08/26 21:41:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.1 KB, free 366.0 MB) 17/08/26 21:41:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.2:65250 (size: 2.1 KB, free: 366.3 MB) 17/08/26 21:41:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 17/08/26 21:41:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[6] at reduceByKey at JavaWordCount.java:51) (first 15 tasks are for partitions Vector(0)) 17/08/26 21:41:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 17/08/26 21:41:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 4621 bytes) 17/08/26 21:41:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 17/08/26 21:41:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 17/08/26 21:41:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms 17/08/26 21:41:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 7903 bytes result sent to driver 17/08/26 21:41:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 112 ms on localhost (executor driver) (1/1) 17/08/26 21:41:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 17/08/26 21:41:40 INFO DAGScheduler: ResultStage 1 (collect at JavaWordCount.java:53) finished in 0.114 s 17/08/26 21:41:40 INFO DAGScheduler: Job 0 finished: collect at JavaWordCount.java:53, took 1.373619 s 17/08/26 21:41:40 INFO SparkUI: Stopped Spark web UI at http://192.168.1.2:4042 17/08/26 21:41:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/08/26 21:41:40 INFO MemoryStore: MemoryStore cleared 17/08/26 21:41:40 INFO BlockManager: BlockManager stopped 17/08/26 21:41:40 INFO BlockManagerMaster: BlockManagerMaster stopped 17/08/26 21:41:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/08/26 21:41:40 INFO SparkContext: Successfully stopped SparkContext 17/08/26 21:41:40 INFO ShutdownHookManager: Shutdown hook called 17/08/26 21:41:40 INFO ShutdownHookManager: Deleting directory /private/var/folders/f1/br3skspx06b28tzr9qn3504h0000gn/T/spark-1ea2f36c-97db-4a7a-adec-8ddf83ba6901
for가 3번 나오는지 grep으로 확인
%%sh
cd ${SPARK_HOME}
grep --color=always "For" README.md
For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html). To run one of them, use `./bin/run-example <class> [params]`. For example: package. For instance:
run-example의 역할
$ cat ${SPARK_HOME}/bin/run-example ... 중략 ... exec "${SPARK_HOME}"/bin/spark-submit run-example "$@"
'spark-submit'이라는 셸을 호출하고 있음.
spark-submit은 필요한 환경변수를 정의하고 다시 spark-class 셸을 실행.
$ cat ${SPARK_HOME}/bin/spark-submit ... 중략 ... exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class 셸은 우리가 전달한 실행 매개변수를 이용해 org.apache.spark.launcher.Main 클래스를 실행
명령문 확인을 위해 다음처럼 실습
$ vi ~/.bash_profile $ export SPARK_PRINT_LAUNCH_COMMAND=1 $ source ~/.bash_profile $ ./bin/run-example JavaWordCount README.md
결과
Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /home/sdrlurker/apps/spark/conf/:/home/sdrlurker/apps/spark/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --jars /home/sdrlurker/apps/spark/examples/jars/spark-examples_2.11-2.1.0.jar,/home/sdrlurker/apps/spark/examples/jars/scopt_2.11-3.3.0.jar --class org.apache.spark.examples.JavaWordCount spark-internal README.md ... 후략 ...
python용 스파크 셸 예시
import os
SPARK_HOME = os.getenv("SPARK_HOME")
file = sc.textFile("file://%s/README.md" % SPARK_HOME)
words = file.flatMap(lambda line : line.split(" "))
result = words.countByValue()
result["For"]
3
스파크 셸은 개발 단계 및 작업 내용에 따라 빠른 테스트나 프로토타이핑 또는 일회성 데이터를 처리하는 등에 유용하게 활용 가능.
--master 옵션 : 스파크가 사용할 클러스터 마스터 정보
단일 서버에서 동작시킬 경우 "local"
여러 개의 스레드를 사용하려면 "local[스레드수]", local[*] 는 모든 스레드를 사용.
스파크 애플리케이션 설정정보 확인 방법
$ ./spark-shell --master=local --verbose
python 스파크 셸에서는 다음 명령어로 확인 가능.
sc.getConf().toDebugString().split()
['spark.app.id=local-1503751287905', 'spark.app.name=pyspark-shell', 'spark.driver.host=192.168.1.2', 'spark.driver.port=65230', 'spark.executor.id=driver', 'spark.master=local[*]', 'spark.rdd.compress=True', 'spark.serializer.objectStreamReset=100', 'spark.submit.deployMode=client']
클러스터 모드에서는 여러 대의 컴퓨터에 분산되어 데이터가 저장됨.
어떤 기준으로 데이터를 분류해서 각 서버에 분배할 지 결정해야 함.
일부 서버에 장애가 발생하거나 네트워크에 문제가 발생하는 경우도 생각해야 함.
코드 작성 -> 단위 테스트 -> 빌드 및 배포
SparkContext는 애플리케이션과 스파크 클러스터와의 연결을 담당하는 객체.
이를 통해 RDD나 accumulator 또는 broadcast 등 변수를 다룸.
스파크에서 사용하는 기본 분산 데이터 모델. "the basic abstraction in Spark"
RDD를 생성하는 방법: 외부 데이터 소스로부터 생성. 기존 RDD에서 또 다른 RDD를 생성.
예제에서는 process() 부분. 다양한 데이터 처리 함수로 프로그래머가 원하는 처리 수행 가능.
Java 8에서는 람다 문법 사용 가능.
테스트일 때는 "단순히 화면에 출력" 가능.
실제 서비스 시에는 "하둡 파일 시스템에 저장" 가능.
src/test/java 폴더 아래에 com.wikibooks.spark.ch1.WordCountTest 파일을 열고
Run -> Run As -> Junit Test를 선택.
setup() 메소드에서 SparkContext를 생성.
testProcess() 메소드에서 RDD를 만들고 필요한 처리를 수행.
input을 List로 받아 inputRDD로 만듬.
Map 형태로 collectAsMap 메소드로 resultMap을 생성.
검증 작업 수행 (assertThat)
cleanup() 메소드에서 SparkContext를 종료.
http://www.nextree.co.kr/p11104/
@BeforeClass, @AfterClass annotation은 참고.
maven으로 빌드한 뒤 실행방법
$ $SPARK_HOME/bin/spark-submit \ --class com.wikibooks.spark.ch1.WordCount \ beginning-spark-examples.jar \ local[*] \ file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/README.md \ file:///Users/donglyeolsin/spark-2.2.0-bin-hadoop2.7/testresult
--class : 메인함수를 가진 클래스를 지정하는 변수
Jar 파일 경로 : 이 클래스가 포함된 jar 파일 경로
클러스터 정보 : 프로그램 인자 1번째로 전달
입력 경로 : 프로그램 인자 2번째로 전달
출력 경로 : 프로그램 인자 3번째로 전달
test_result 경로에 _SUCCESS 파일과 part-로 시작하는 파일이 있으면 성공.
스칼라에는 JUnit뿐만 아니라 FlatSpec을 테스트 코드하는데 사용 가능.
FlatSpec은 BDD를 위한 것.