a- Recipe 1- Using Python Dictionary b- Short Overview of PySpark c- Recipe 2- Using Apache Spark - GroupBy Transformation d- Recipe 3- Using Apache Spark - ReduceBy Transformation
import pandas as pd
pd.set_option('display.max_rows',15) # change preview settings
fname = './InterestData.csv'
subscrib_data = pd.read_csv(fname, delimiter =';')
subscrib_data
import pandas as pd
pd.set_option('display.max_rows',20) # change preview settings
fname = './InterestAggregated.csv'
subscrib_data = pd.read_csv(fname, delimiter =';')
subscrib_data
# Note: key = ID ; Interest = Long string with commas between each interest
import csv
from collections import defaultdict
aggInterest = defaultdict(list) # dictionary <key,[list]>
header = None
# Read the file into a dictionary
with open('./InterestData.csv','r') as csvfile:
readCSV = csv.reader(csvfile, delimiter=';')
header = next(readCSV)
for row in readCSV: # loop over all records in file
aggInterest[row[0]].append((row[1])) # <--- aggregating interest into the corresponding dictionary key
# Write the dictionary data into the file
with open('./aggInterest.csv', 'w') as csvfile:
writeCSV = csv.writer(csvfile, delimiter=';')
writeCSV.writerow(header)
for idd, interest in aggInterest.items():
writeCSV.writerow((idd,interest))
text_file = open('./aggInterest.csv', 'r')
for line in text_file:
print(line)
#Note: This is not quite what we are looking for, we want to save a long string as one field rather than a list
with open('./aggInterest.csv', 'w') as csvfile:
writeCSV = csv.writer(csvfile, delimiter=';')
writeCSV.writerow(header)
for idd, interestLst in aggInterest.items():
interest = ",".join(interestLst) # <--- join the list into a long string
writeCSV.writerow((idd,interest))
text_file = open('./aggInterest.csv', 'r')
for line in text_file:
print(line)
# Note: This is better ...
from IPython.display import Image
from IPython.core.display import HTML
Image(filename = "PySpark_Execution_Process.png", width=750, height=700)
Image(filename ="PySpark_Execution_Architecture.png", width=800, height=750)
Image(filename ="Using_GroupByKey.png", width=900, height=900)
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.master", "local").appName("PythonCSV").getOrCreate()
file = "./InterestData.csv"
# read csv file includes header into a dataframe
dataframe = spark.read.csv(file, sep=";", inferSchema="true", header=True)
rdd1 = dataframe.rdd.groupByKey()
# to convert groupByKey() iterable into an actual list of strings (interest)
rdd2 = rdd1.map(lambda pair : (pair[0], list(pair[1])))
rdd3 = rdd2.map(lambda pair: (pair[0], (",".join(pair[1])) )) # convert list -> long string
for row in rdd3.collect():
print(row)
print('\n',rdd1.collect()[0]) # just to show that the groupBy returned an iterator
spark.stop()
Image(filename ="Using_ReduceByKey.png", width=900, height=900)
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.master", "local").appName("PythonCSV").getOrCreate()
file = "./InterestData.csv"
# read csv file includes header into a dataframe
dataframe = spark.read.csv(file, sep=";", inferSchema="true", header=True)
# Convert value part -> Set('Interest') using mapValues
# Make a union of all Sets belonging to same key using reduceByKey
rdd1 = dataframe.rdd.mapValues(lambda interest: {interest})\
.reduceByKey(lambda s1, s2: s1.union(s2))
rdd3 = rdd1.mapValues(lambda sett: ((",".join(sett)) )) # convert value part -> long string
for row in rdd3.collect():
print(row)
spark.stop()