#!/usr/bin/env python # coding: utf-8 # # Ngrams with pyspark # # ### Create a Spark context # # A Spark context (or a session, that encapsulates a context) is the entry gate for Spark. # It represents the Spark engine (whether on the local machine or on a cluster) and provides an API for creating and running data pipelines. # # In this example, we're going to load a text file into a RDD, split the text into ngrams, and count the frequency of ngrams. # In[1]: from pyspark import SparkContext from operator import add # In[2]: filename = "wiki429MB" sc = SparkContext( appName = "Ngrams with pyspark " + filename ) # ### View Spark context # In[3]: sc # ### Check that data is there # # We are going to use the file `/data/wiki429MB` that has been been previously uploaded to HDFS. The file has size $429$MB. # # **Note:** the file contains one whole document per line. # In[4]: get_ipython().system('hdfs dfs -put wiki429MB /data/wiki429MB') # In[5]: get_ipython().system('hdfs dfs -ls -h /data/wiki429MB') # ### Create RDD from file # # The second parameter ($80$) indicates the desired number of partitions. # In[6]: textFile = sc.textFile("/data/wiki429MB", 80) print("textFile is of type: {}\nNumber of partitions: {}". \ format(type(textFile), textFile.getNumPartitions())) # ### Generate trigrams # In[7]: n = 3 ngrams = textFile \ .flatMap(lambda x: [x.split()]) \ .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) \ .sortBy(lambda x: x[1], ascending=False) # In[8]: type(ngrams) # **Note:** up to now we've just carried out a series of _transformations_. Spark hasn't jet done any computation. It's by applying the _action_ `take` that we first act on the data to get a result. # In[9]: for (ngram, count) in ngrams.take(10): print("{:<20}{:>d}".format(' '.join(ngram), count)) # ### Stop context # We're going to create a new context. In order to do that, we first need to stop the current Spark context to free resources. # In[10]: sc.stop() # ### Transformations and actions seen so far # # **Transformations** # - `map` # - `flatMap` # - `filter` # - `reduceByKey` # - `sortBy` # # **Actions** # - `take` # ## Create new context # In[11]: sc = SparkContext( appName = "Remove newlines" ) # In[12]: sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\n') # In[13]: get_ipython().system('hdfs dfs -ls -h GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna') # In[14]: get_ipython().system('hadoop fs -rmr GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN') # In[15]: genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna", minPartitions=12) genomeFile.take(2)[:140] # This passage is to remove newlines and use comment lines (beginning with ">") as block delimiters. The new file is saved in `GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN`. # In[16]: import re genomeFile \ .map(lambda x: re.sub('^>.*', '---', x)) \ .map(lambda x: x.upper()) \ .map(lambda x: re.sub('^$', '\n', x)) \ .saveAsTextFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN") # In[17]: sc.stop() sc = SparkContext( appName = "Genome" ) sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '---') # In[18]: genomeFile = sc.textFile("GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN", minPartitions=12) # In[19]: n = 3 ngrams = genomeFile \ .map(lambda x: re.sub('\n', '', x)) \ .flatMap(lambda x: x.split()) \ .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) \ .sortBy(lambda x: x[1], ascending=False) # In[20]: for (ngram, count) in ngrams.take(10): print("{:<20}{:>d}".format(' '.join(ngram), count)) # In[21]: sc.stop()