A Spark context (or a session, that encapsulates a context) is the entry gate for Spark. It represents the Spark engine (whether on the local machine or on a cluster) and provides an API for creating and running data pipelines.
In this example, we're going to load a text file into a RDD, split the text into ngrams, and count the frequency of ngrams.
from pyspark import SparkContext
from operator import add
filename = "wiki429MB"
sc = SparkContext(
appName = "Ngrams with pyspark " + filename
)
sc
We are going to use the file /data/wiki429MB
that has been been previously uploaded to HDFS. The file has size $429$MB.
Note: the file contains one whole document per line.
!hdfs dfs -put wiki429MB /data/wiki429MB
put: `/data/wiki429MB': File exists
!hdfs dfs -ls -h /data/wiki429MB
-rw-r--r-- 3 datalab hdfs 428.8 M 2020-02-14 08:54 /data/wiki429MB
The second parameter ($80$) indicates the desired number of partitions.
textFile = sc.textFile("/data/wiki429MB", 80)
print("textFile is of type: {}\nNumber of partitions: {}". \
format(type(textFile), textFile.getNumPartitions()))
textFile is of type: <class 'pyspark.rdd.RDD'> Number of partitions: 80
n = 3
ngrams = textFile \
.flatMap(lambda x: [x.split()]) \
.flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.sortBy(lambda x: x[1], ascending=False)
type(ngrams)
pyspark.rdd.PipelinedRDD
Note: up to now we've just carried out a series of transformations. Spark hasn't jet done any computation. It's by applying the action take
that we first act on the data to get a result.
for (ngram, count) in ngrams.take(10):
print("{:<20}{:>d}".format(' '.join(ngram), count))
one of the 27795 as well as 25145 part of the 17984 the United States 17224 such as the 13886 the end of 13878 a number of 12986 in the United 11760 known as the 10172 end of the 9842
We're going to create a new context. In order to do that, we first need to stop the current Spark context to free resources.
sc.stop()
Transformations
map
flatMap
filter
reduceByKey
sortBy
Actions
take
sc = SparkContext(
appName = "Remove newlines"
)
sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\n')
!hdfs dfs -ls -h GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna
-rw-r--r-- 3 groda supergroup 1.3 G 2020-02-18 15:19 GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna
!hadoop fs -rmr GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN
rmr: DEPRECATED: Please use '-rm -r' instead. 20/02/19 23:37:53 INFO fs.TrashPolicyDefault: Moved: 'hdfs://nameservice1/user/groda/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN' to trash at: hdfs://nameservice1/user/groda/.Trash/Current/user/groda/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN
genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna", minPartitions=12)
genomeFile.take(2)[:140]
['>QVRK01000602.1 Homo sapiens isolate HG02106 chromosome 1 1-100500000:0, whole genome shotgun sequence', 'CCCCAGCCACCCTTgcttccctgccccagccttccatcTCATCTCTCTTGCTTCCATCTCTGGCTTTTCCACTCCAGCCA']
This passage is to remove newlines and use comment lines (beginning with ">") as block delimiters. The new file is saved in GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN
.
import re
genomeFile \
.map(lambda x: re.sub('^>.*', '---', x)) \
.map(lambda x: x.upper()) \
.map(lambda x: re.sub('^$', '\n', x)) \
.saveAsTextFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN")
sc.stop()
sc = SparkContext(
appName = "Genome"
)
sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '---')
genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN", minPartitions=12)
n = 3
ngrams = genomeFile \
.map(lambda x: re.sub('\n', '', x)) \
.flatMap(lambda x: x.split()) \
.flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.sortBy(lambda x: x[1], ascending=False)
for (ngram, count) in ngrams.take(10):
print("{:<20}{:>d}".format(' '.join(ngram), count))
T T T 48189867 A A A 48113618 A A T 30902280 A T T 30887259 C A G 30608314 C T G 30478385 A G A 29926360 T C T 29887506 A C A 27652593 T G T 27622134
sc.stop()