Volume refers to the amount of data generated through websites, portals and online applications in a data-driven business. Especially for online retailers, volume encompasses the available data that are out there and need to be assessed for relevance.
Velocity refers to the speed with which data is generated, and as internet speeds have increased and the number of users has increased, the velocity has also increased substantially.
Variety in Big Data refers to all the structured and unstructured data that has the possibility of getting generated either by humans or by machines. Structured data is whatever data you could store in a spreadsheet. It can easily be cataloged and summary statistics can be calculated for it. Unstructured data are raw things like texts, tweets, pictures, videos, emails, voice mails, hand-written text, ECG reading, and audio recordings. Humans can only make sense of data that is structured, and it is usually up to data scientists to create some organization and structure to unstructured data.
MapReduce is a programming paradigm that enables the ability to scale across hundreds or thousands of servers for big data analytics. The underlying concept can be somewhat difficult to grasp, because this paradigm differs from the traditional programming practices.
In a nutshell, the term "MapReduce" refers to two distinct tasks. The first is the Map job, which takes one set of data and transforms it into another set of data, where individual elements are broken down into tuples (key/value pairs), while the Reduce job takes the output from a map as input and combines those data tuples into a smaller set of tuples.
A distributed processing system is a group of computers in a network working in tandem to accomplish a task
With parallel computing:
The dataset that needs processing must first be transformed into key:value pairs and split into fragments, which are then assigned to map tasks. Each computing cluster is assigned a number of map tasks, which are subsequently distributed among its nodes. In this example, let's assume that we are using 5 nodes (a server with 5 different worker.
First, split the data from one file or files into however many nodes are being used.
We will then use the map function to create key value pairs represented by:
{animal} , {# of animals per zoo}
After processing of the original key:value pairs, some intermediate key:value pairs are generated. The intermediate key:value pairs are sorted by their key values to create a new list of key:value pairs.
This list from the map task is divided into a new set of fragments that sorts and shuffles the mapped objects into an order or grouping that will make it easier to reduce them. The number these new fragments, will be the same as the number of the reduce tasks.
Now, every properly shuffled segment will have a reduce task applied to it. After the task is completed, the final output is written onto a file system. The underlying file system is usually HDFS (Hadoop Distributed File System).
It's important to note that MapReduce will generally only be powerful when dealing with large amounts of data. When using on a small dataset, it will be faster to perform operations not in the MapReduce framework.
There are two groups of entities in this process to ensuring that the map reduce task gets done properly:
Job Tracker: a "master" node that informs the other nodes which map and reduce jobs to complete
Task Tracker: the "worker" nodes that complete the map and reduce operations
There are different names for these components depending on the technology used, but there will always be a master node that informs worker nodes what tasks to perform.
keeps data in memory
caching is lazy
cache selectively
df.cache() - caches a dataframe
df.unpersist() - uncaches
df.is_cached - checks if df is cached
df.storageLevel - provides details of how df is cached
- useDisk
- useMemory
- useOffHeap #offheap storage
- deserialized
- replication
spark.catalog.isCached(tablename='df') - lets you know if a df is cached
spark.catalog.cacheTable('df') - caches a table
spark.catalog.uncache('df') - uncaches table
spark.catalog.clearCache() - clears all cache
for production
import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s-%(levelname)s - %(message)s')
# Log columns of text_df as debug message
logging.debug("text_df columns: %s", text_df.columns)
# Log whether table1 is cached as info message
logging.info("table1 is cached: %s", spark.catalog.isCached(tableName="table1"))
# Log first row of text_df as warning message
logging.warning("The first row of text_df:\n %s", text_df.first())
# Log selected columns of text_df as error message
logging.error("Selected columns: %s", text_df.select("id", "word"))
configures a Spark Context including Java properties
pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)
import pyspark
sc = pyspark.SparkContext('local[*]')
type(sc)
dir(sc)
help(sc)
sc.defaultParallelism
sc.version
sc.appName
sc._conf.getAll()
sc.stop()
sc.pythonVer
sc.master
The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDD
To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
# Create my_spark
my_spark = SparkSession.builder.getOrCreate()
# Print my_spark
print(my_spark)
lists all data inside your cluster
print(spark.catalog.listTables())
SQL query format
# create a SQL query
query = "FROM flights SELECT * LIMIT 10"
# Get the first 10 rows of flights
flights10 = spark.sql(query)
# Show the results
flights10.show()
convert with the .toPandas() method
# create a query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
# Run the query
flight_counts = spark.sql(query)
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()
# Print the head of pd_counts
print(pd_counts.head())
the .createDataFrame() method will convert a DF to a Spark Cluster
spark_temp = spark.createDataFrame('file_path', schema=schema)
will allow spark to create a temporary cluster to use spark commands on a DataFrame
spark_temp.createOrReplaceTempView('new_table_name')
allows direct conversion to a spark dataframe for several file types
# file path to .csv
file_path = "/usr/local/share/datasets/airports.csv"
# Read in the airports data
airports = spark.read.csv(file_path, header=True, inferSchema=True)
# Show the data
airports.show()
In Spark you can do this using the .withColumn() method, which takes two arguments. First, a string with the name of your new column, and second the new column input
Spark dataframes are immutable, so adding new columns means reassigning the df.
df = df.withColumn("newCol", df.column.mean()) #using the mean of another column to create values for new column
similar to the WHERE clause in SQL
two ways of filtering:
passing a string value will filter out the values listed
passing in the direct df.column with the filter will create a new column will boolean values.
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")
# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)
similar to SQL SELECT statement
takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it
can also use column-wise operations
# Select columns using column strings
selected1 = flights.select('tailnum', 'origin', 'dest')
# Select columns using df.column format
temp = flights.select(flights.origin, flights.dest, flights.carrier)
#using operations to create a column
avg_speed = (flights.distance/(flights.air_time/60))
renames a column when selecting
#rename column to avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
you can aggregate with common methods by using the .groupBy() method
creating groups using the .groupby() allows them to be part of a pyspark.sql.GroupedData class
#using filter and groupby to fingd the shortest flight from PDX
flights.filter(flights.origin == 'PDX').groupBy().min("distance").show()
you can also use the .agg() function which allows use of any of the pyspark.sql.functions library
#import functions
import pyspark.sql.functions as F
#find standard deviation of a columns
df.column.agg(F.stddev()).show()
performed using the df.join() method
joined_df = df.join(joining_df, on='joining_column', how='how_to_join')
User Defined Function
from pyspark.sql.functions import udf
data type to be returned by UDF
from pyspark.sql.types import BooleanType, StringType, IntegerType, FloatType, ArrayType
# example
short_udf = udf(lambda x: True if not x or len(x) < 10 else False, BooleanType())
# Returns true if the value is a nonempty vector
nonempty_udf = udf(lambda x:
True if (x and hasattr(x, "toArray") and x.numNonzeros())
else False, BooleanType())
# Returns first element of the array as string
s_udf = udf(lambda x: str(x[0]) if (x and type(x) is list and len(x) > 0)
else '', StringType())
# Show the rows where doc contains the item '5'
df_before.where(array_contains('doc', '5')).show()
# UDF removes items in TRIVIAL_TOKENS from array
rm_trivial_udf = udf(lambda x:
list(set(x) - TRIVIAL_TOKENS) if x
else x,
ArrayType(StringType()))
# Remove trivial tokens from 'in' and 'out' columns of df2
df_after = df_before.withColumn('in', rm_trivial_udf('in'))\
.withColumn('out', rm_trivial_udf('out'))
# Show the rows of df_after where doc contains the item '5'
df_after.where(array_contains('doc','5')).show()
# Load the dataframe
df = spark.read.load('sherlock_sentences.parquet')
# Filter and show the first 5 rows
df.where('id > 70').show(5, truncate=False)
# Split the clause column into a column called words
split_df = clauses_df.select(split('clause', ' ').alias('words'))
split_df.show(5, truncate=False)
# Explode the words column into a column called word
exploded_df = split_df.select(explode('words').alias('word'))
exploded_df.show(10)
# Count the resulting number of rows in exploded_df
print("\nNumber of rows: ", exploded_df.count())
# Word for each row, previous two and subsequent two words
query = """
SELECT
part,
LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
word AS w3,
LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM text
"""
spark.sql(query).where("part = 12").show(10)
# Repartition text_df into 12 partitions on 'chapter' column
repart_df = text_df.repartition(12, 'chapter')
# Prove that repart_df has 12 partitions
repart_df.rdd.getNumPartitions()
query = """
SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (
SELECT word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,
LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,
LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5
FROM text
)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10 """
df = spark.sql(query)
df.show()
spark.sql("""
SELECT DISTINCT w1, w2, w3, w4, w5 FROM (
SELECT word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,
LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,
LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5
FROM text
)
ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC
LIMIT 10
""").show()
query = """
SELECT chapter, w1, w2, w3, count FROM
(
SELECT
chapter,
ROW_NUMBER() OVER (PARTITION BY chapter ORDER BY COUNT DESC) AS row,
w1, w2, w3, count
FROM ( %s )
)
WHERE row = 1
ORDER BY chapter ASC
""" % subquery
spark.sql(query).show()
# Selects the first element of a vector column
first_udf = udf(lambda x:
float(x.indices[0])
if (x and hasattr(x, "toArray") and x.numNonzeros())
else 0.0,
FloatType())
# Apply first_udf to the output column
df.select(first_udf("output").alias("result")).show(5)
# Add label by applying the get_first_udf to output column
df_new = df.withColumn('label', get_first_udf('output'))
# Show the first five rows
df_new.show(5)
# Transform df using model
result = model.transform(df.withColumnRenamed('in', 'words'))\
.withColumnRenamed('words', 'in')\
.withColumnRenamed('vec', 'invec')
result.drop('sentence').show(3, False)
# Add a column based on the out column called outvec
result = model.transform(result.withColumnRenamed('out', 'words'))\
.withColumnRenamed('words', 'out')\
.withColumnRenamed('vec', 'outvec')
result.select('invec', 'outvec').show(3,False)
# Import the lit function
from pyspark.sql.functions import lit
# Select the rows where endword is 'him' and label 1
df_pos = df.where("endword = 'him'")\
.withColumn('label', lit(1))
# Select the rows where endword is not 'him' and label 0
df_neg = df.where("endword <> 'him'")\
.withColumn('label', lit(0))
# Union pos and neg in equal number
df_examples = df_pos.union(df_neg.limit(df_pos.count()))
print("Number of examples: ", df_examples.count())
df_examples.where("endword <> 'him'").sample(False, .1, 42).show(5)
# Split the examples into train and test, use 80/20 split
df_trainset, df_testset = df_examples.randomSplit((.8,.2), 42)
# Print the number of training examples
print("Number training: ", df_trainset.count())
# Print the number of test examples
print("Number test: ", df_testset.count())
# Import the logistic regression classifier
from pyspark.ml.classification import LogisticRegression
# Instantiate logistic setting elasticnet to 0.0
logistic = LogisticRegression(maxIter=100, regParam=0.4, elasticNetParam=0.0)
# Train the logistic classifer on the trainset
df_fitted = logistic.fit(df_trainset)
# Print the number of training iterations
print("Training iterations: ", df_fitted.summary.totalIterations)
# Score the model on test data
testSummary = df_fitted.evaluate(df_testset)
# Print the AUC metric
print("\ntest AUC: %.3f" % testSummary.areaUnderROC)
# Apply the model to the test data
predictions = df_fitted.transform(df_testset).select(fields)
# Print incorrect if prediction does not match label
for x in predictions.take(8):
print()
if x.label != int(x.prediction):
print("INCORRECT ==> ")
for y in fields:
print(y,":", x[y])
#load data
test_df = spark.read.csv('test_csv')
#select data to be visualized
test_df_col = test_df.select('Col_name')
#create histogram
hist(test_df_col, bins=10, color='red')
#load data
test_df = spark.read.csv('test_csv')
#select data to be visualized
test_df_col = test_df.select('Col_name')
#create histogram
distplot(test_df_col, bins=10, color='red')
#load data
test_df = spark.read.csv('test_csv')
#select data to be visualized
test_df_col = test_df.select('Col_name')
#create histogram
pandas_histogram(test_df_col, bins=10, color='red')
new package
histogram example shown
#load data
test_df = spark.read.csv('test_csv')
#convert to Handy
test_df_col = test_df.toHandy()
#create histogram
test_df_col.cols['col_name'].hist()
Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. An RDD could come from any datasource, e.g. text files, a database, a JSON file etc.
rdd = sc.parallelize(data,numSlices=10) #creates 10 partitions
print(type(rdd))
#load text data
rdd2 = sc.textFile('file_name.file', minPartitions = 5)#defines a minimum no. of partitions
rdd.getNumPartitions()
rdd.count() #returns the total count of items in the RDD
rdd.first() #returns the first item in the RDD
rdd.take(number) #returns the first n items in the RDD
rdd.top(number) #returns the top n items
rdd.collect() #returns everything from your RDD
reduce(function) #aggregates elements of a RDD
saveAsTextFile('file_name') #saves RDD as text file with each partition as a separate file
coalesce() #used with saveAsTextFile will save to a single text file
selected_items.reduce(lambda x,y :x + y)
total_spent.sortBy(lambda x: x[1],ascending = False)
discounted = revenue_minus_tax.map(lambda x : x*0.9)
price_items.map(sales_tax).map(lambda x : x*0.9).top(15)
discounted.toDebugString()
flat_mapped = price_items.flatMap(lambda x : (x, x*0.92*0.9 ))
selected_items = discounted.filter(lambda x: x>300)
combinedRDD = RDD1.union(RDD2)
my_tuple =[('Sam':, 23), ('Mary': 34)]
pairRDD = sc.parallelize(my_tuple)
total_spent = sales_data.reduceByKey(lambda x,y :x + y)
groupByKey(RDD to group)
sortByKey(ascending=True)
RDD1.join(RDD2)
for key, val in rdd.countByKey().items():
print(key, value)
sc.parallelize([(1,2),(3,4)]).collectAsMap()
Transformer
Estimator
only takes numbers (decimals, whole numbers) called 'doubles','integers' in Spark
Spark will make a guess on datatypes, if not recognized as a double or integer, you can use the .cast() method in combination of .withColumn() method to convert the datatype ("integer", "double")
#create new column with numerical data
original_df = original_df.withColumn("col_name", df.col_name.cast('datatype')
there are two steps to take to one-hot-encode categorical variables:
step 1: use StringIndexer to convert distinct variables to individual numbered columns
#create stringindexer
indexer = StringIndexer(inputCol='col_name', outputCol='new_cols_name')
step 2: use OneHotEncoder to OHE the columns
#create ohe
encoder = OneHotEncoder(inputCol='col_name', outputCol='new_cols_name'
Spark modeling requires all columns containing features to be combined in one column
done using VectorAssembler
#assemble matrix into a vector
assembler = VectorAssembler(inputCols=['list', 'of', 'col', 'names'], outputCol='feature')
combines all estimators and transformers
wraps the process so you can reuse the named pipeline
#import pipeline
from pyspark.ml import Pipeline
#make pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler])
#fit and transform
piped_data = pipeline.fit(model_data).transform(model_data)
#split data
training, test = piped_data.randomSplit([.7,.3])
select evaluation via pyspark.ml.evaluation
evaluator = selectedEvaluationMethod()
performed using the pyspark.ml.tuning library
if several parameters will be tuned, use ParamGridBuilder() method
#import tuning package
import pyspark.ml.tuning as tune
#create grid
grid = ParamGridBuilder()
#replace the grid with all additions
grid = grid.addGrid(model.parameter, [0,1,2])
grid = grid.addGrid(model.parameter, [0,1,2])
#build the grid
grid = grid.build()
the tuning module also contains the CrossValidator class which will implement your selected evaluator
#create crossvalidator
cv = tune.CrossValidator(estimator=model_name,
estimatorParamMaps=grid,
evaluator=evaluator)
best_model = model_name.fit(training)
results = best_model.transform(test)
print(evaluator.evaluate(test_results))
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
#set pipeline parameters
string_indexer = StringIndexer(inputCol='month',outputCol='month_num',handleInvalid='keep')
one_hot_encoder = OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'])
vector_assember = VectorAssembler(inputCols=features,outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',labelCol='area')
stages = [string_indexer, one_hot_encoder, vector_assember,random_forest]
pipeline = Pipeline(stages=stages) #instantiate pipeline
params = ParamGridBuilder()\
.addGrid(random_forest.maxDepth, [5,10,15])\
.addGrid(random_forest.numTrees, [20,50,100])\
.build() #performs gridsearch on set parameters
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area',metricName = 'mae') #evaluates model
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params,evaluator=reg_evaluator)
cross_validated_model = cv.fit(spark_df) #fits model
cross_validated_model.avgMetrics #returns best metrics based on metricName
#shows selected predictions
predictions = cross_validated_model.transform(spark_df)
predictions.select('prediction','area').show(300)
cross_validated_model.bestModel.stages #checking best model by stage
optimal_rf_model = cross_validated_model.bestModel.stages[3] #looking at stage 3 of process
optimal_rf_model.fe
optimal_rf_model.featureImportances #checking feature importance
#alternative least squares
from pyspark mllib.recommendation import ALS
#logistic regression with LBFGS
from pyspark mllib.classification import LogisticRegressionWithLBFGS
#K means
from pyspark.mllib.clustering import KMeans
# Load the data into RDD
data = sc.textFile(file_path)
# Split the RDD
ratings = data.map(lambda l: l.split(','))
# Transform the ratings RDD
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))
# Split the data into training and test
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])
# Create the ALS model on the training data
model = ALS.train(training_data, rank=10, iterations=10)
# Drop the ratings column
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))
# Predict the model
predictions = model.predictAll(testdata_no_rating)
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))
# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))
# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)
# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))
# Load the datasets into RDDs
spam_rdd = sc.textFile(file_path_spam)
non_spam_rdd = sc.textFile(file_path_non_spam)
# Split the email messages into words
spam_words = spam_rdd.flatMap(lambda email: email.split(' '))
non_spam_words = non_spam_rdd.flatMap(lambda email: email.split(' '))
# Print the first element in the split RDD
print("The first element in spam_words is", spam_words.first())
print("The first element in non_spam_words is", non_spam_words.first())
# Create a HashingTf instance with 200 features
tf = HashingTF(numFeatures=200)
# Map each word to one feature
spam_features = tf.transform(spam_words)
non_spam_features = tf.transform(non_spam_words)
# Label the features: 1 for spam, 0 for non-spam
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
non_spam_samples = non_spam_features.map(lambda features:LabeledPoint(0, features))
# Combine the two datasets
samples = spam_samples.join(non_spam_samples)
# Split the data into training and testing
train_samples,test_samples = samples.randomSplit([0.8, 0.2])
# Train the model
model = LogisticRegressionWithLBFGS.train(train_samples)
# Create a prediction label from the test data
predictions = model.predict(test_samples.map(lambda x: x.features))
# Combine original labels with the predicted labels
labels_and_preds = test_samples.map(lambda x: x.label).zip(predictions)
# Check the accuracy of the model on the test data
accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_samples.count())
print("Model accuracy : {:.2f}".format(accuracy))
# Load the dataset into a RDD
clusterRDD = sc.textFile(file_path)
# Split the RDD based on tab
rdd_split = clusterRDD.map(lambda x: x.split('\t'))
# Transform the split RDD by creating a list of integers
rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])
# Count the number of rows in RDD
print("There are {} rows in the rdd_split_int dataset".format(rdd_split_int.count()))
# Train the model with clusters from 13 to 16 and compute WSSSE
for clst in range(13, 17):
model = KMeans.train(rdd_split_int, clst, seed=1)
WSSSE = rdd_split_int.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("The cluster {} has Within Set Sum of Squared Error {}".format(clst, WSSSE))
# Train the model again with the best k
model = KMeans.train(rdd_split_int, k=15, seed=1)
# Get cluster centers
cluster_centers = model.clusterCenters
# Convert rdd_split_int RDD into Spark DataFrame
rdd_split_int_df = spark.createDataFrame(rdd_split_int, schema=["col1", "col2"])
# Convert Spark DataFrame into Pandas DataFrame
rdd_split_int_df_pandas = rdd_split_int_df.toPandas()
# Convert "cluster_centers" that you generated earlier into Pandas DataFrame
cluster_centers_pandas = pd.DataFrame(cluster_centers, columns=["col1", "col2"])
# Create an overlaid scatter plot
plt.scatter(rdd_split_int_df_pandas["col1"], rdd_split_int_df_pandas["col2"])
plt.scatter(cluster_centers_pandas["col1"], cluster_centers_pandas["col2"], color="red", marker="x")
plt.show()
stopWordList = ['', 'the','a','in','of','on','at','for','by','i','you','me']
def wordCount(filename, stopWordlist):
output = sc.textFile(filename)
words1 = lines.flatMap(lambda x: x.split(' '))
words2 = words1.map(lambda x: (x.lower(), 1))
wordCount = words2.reduceByKey(lambda x,y: x+y)
freqWords = wordCount.filter(lambda x: x[1] >= 5 )
stopWords = freqWords.filter(lambda x: x[0] in stopWordList)
output = stopWords.collect()
return output