In dieser Lektion werden wie tiefer in Spark und Python eintaucehn. Bitte schaue das Video für ausführliche Erklärungen.
Schauen wir uns schnell die wichtigen Begriffe an:
Es gibt zwei übliche Wege um ein RDD zu erstellen:
Methode | Ergebnis |
---|---|
sc.parallelize(array) |
RDD aus Elementen eines Arrays (oder Liste) erstellen |
sc.textFile(path/to/file) |
RDD aus Zeilen einer Datei erstellen |
Wir können Transformations nutzen, um ein Set von Anweisungen zu erstellen, die wir auf das RDD anwenden wollen.
Transformation Beispiel | Ergennis |
---|---|
filter(lambda x: x % 2 == 0) |
Ungerade Elemente ausschließen |
map(lambda x: x * 2) |
Jedes RDD Element mit 2 multiplizieren |
map(lambda x: x.split()) |
Jeden String in Worte trennen |
flatMap(lambda x: x.split()) |
Jeden String in Worte trennen und Sequenz ebnen |
sample(withReplacement=True,0.25) |
Ein Sample mit 25% der Elemente mit Ersetzen |
union(rdd) |
rdd an existierendes RDD anhängen |
distinct() |
Duplikate im RDD entfernen |
sortBy(lambda x: x, ascending=False) |
Elemente in abseitegender Reihenfolge ordnen |
Sobald wir unseren "Plan" an Transformations geschrieben haben können wir als nächstes eine Action auf das Ergebnis anwenden. Einige der üblichen Actions in der Übersicht:
Action | Ergebnis |
---|---|
collect() |
RDD in eine Liste im Speicher umwandeln |
take(3) |
Erste 3 Elemente des RDD |
top(3) |
Top 3 Elemente des RDD |
takeSample(withReplacement=True,3) |
Ein Sample mit 3 Elementen mit Ersetzen |
sum() |
Summe der Elemente (setzt numerische Werte voraus) |
mean() |
Durchschnitt der Elemente (setzt numerische Werte voraus) |
stdev() |
Standardabweichung der Elemente (setzt numerische Werte voraus) |
Der beste Weg all das zu verstehen ist es sich einige Beispiele anzuschauen. Wir werden erst einmal gemächlich einsteigen und mit einer einfachen Textdatei arbeiten. Danach fahren wir mit etwas realitätsnäheren Daten wie Kunden- und Verkaufsdaten fort.
%%writefile beispiel2.txt
erste
zweite zeile
die dritte zeile
dann eine vierte zeile
Writing beispiel2.txt
Jetzt können wir einige Transformations und Actions darauf anwenden:
from pyspark import SparkContext
sc = SparkContext()
# RDD erstellen
sc.textFile('beispiel2.txt')
beispiel2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
# Referenz für RDD erstellen
text_rdd = sc.textFile('beispiel2.txt')
# Map eine Funktion (oder Lambda Expression) zu jeder Zeile
# Dann "collect" das Ergebnis
text_rdd.map(lambda line: line.split()).collect()
[['erste'], ['zweite', 'zeile'], ['die', 'dritte', 'zeile'], ['dann', 'eine', 'vierte', 'zeile']]
# Alles als geebnet ausgeben
text_rdd.flatMap(lambda line: line.split()).collect()
['erste', 'zweite', 'zeile', 'die', 'dritte', 'zeile', 'dann', 'eine', 'vierte', 'zeile']
Jetzt wo wir mit RDDs gearbeitet haben und damit, wie wir Werte aggregieren, können wir uns Key Value Pairs anschauen. Dazu erstellen wir einige Fake Daten in einer Textdatei.
Diese Daten repräsentiert einige Services, die an Kunden eines SAAS Anbieters verkauft wurden.
%%writefile services.txt
#EventId Timestamp Customer State ServiceID Amount
201 10/13/2017 100 NY 131 100.00
204 10/18/2017 700 TX 129 450.00
202 10/15/2017 203 CA 121 200.00
206 10/19/2017 202 CA 131 500.00
203 10/17/2017 101 NY 173 750.00
205 10/19/2017 202 TX 121 200.00
Writing services.txt
services = sc.textFile('services.txt')
services.take(2)
['#EventId Timestamp Customer State ServiceID Amount', '201 10/13/2017 100 NY 131 100.00']
services.map(lambda x: x.split())
PythonRDD[9] at RDD at PythonRDD.scala:48
services.map(lambda x: x.split()).take(3)
[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'], ['201', '10/13/2017', '100', 'NY', '131', '100.00'], ['204', '10/18/2017', '700', 'TX', '129', '450.00']]
Lasst uns den ersten Hash-Tag entfernen!
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()
['EventId Timestamp Customer State ServiceID Amount', '201 10/13/2017 100 NY 131 100.00', '204 10/18/2017 700 TX 129 450.00', '202 10/15/2017 203 CA 121 200.00', '206 10/19/2017 202 CA 131 500.00', '203 10/17/2017 101 NY 173 750.00', '205 10/19/2017 202 TX 121 200.00']
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()
[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'], ['201', '10/13/2017', '100', 'NY', '131', '100.00'], ['204', '10/18/2017', '700', 'TX', '129', '450.00'], ['202', '10/15/2017', '203', 'CA', '121', '200.00'], ['206', '10/19/2017', '202', 'CA', '131', '500.00'], ['203', '10/17/2017', '101', 'NY', '173', '750.00'], ['205', '10/19/2017', '202', 'TX', '121', '200.00']]
Wir können als nächstes Methoden verwenden, die Lambda Expressions mit ByKey
Argumenten kombinieren. Diese ByKey
Methoden nehmen an, dass die Daten in Key-Value Format vorliegen.
Als Beispiel können wir die Sales Daten pro Staat ausgeben:
# Von zuvor
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())
cleanServ.collect()
[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'], ['201', '10/13/2017', '100', 'NY', '131', '100.00'], ['204', '10/18/2017', '700', 'TX', '129', '450.00'], ['202', '10/15/2017', '203', 'CA', '121', '200.00'], ['206', '10/19/2017', '202', 'CA', '131', '500.00'], ['203', '10/17/2017', '101', 'NY', '173', '750.00'], ['205', '10/19/2017', '202', 'TX', '121', '200.00']]
# Üben wir nun einzelne Felder auszuwählen
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()
[('State', 'Amount'), ('NY', '100.00'), ('TX', '450.00'), ('CA', '200.00'), ('CA', '500.00'), ('NY', '750.00'), ('TX', '200.00')]
# Weiter mit reduceByKey
# Dabei gehen wir davon aus, dass der erste Wert der Key ist
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : amt1+amt2)\
.collect()
[('State', 'Amount'), ('NY', '100.00750.00'), ('TX', '450.00200.00'), ('CA', '200.00500.00')]
Wir können unsere Analyse damit fortsetzen, den Output zu sortieren:
# State und Amount nehmen
# Addieren
# ('State','Amount') loswerden
# Nach Amount Wert sortieren
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()
[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]
Denkt daran, unpacking für die Leserlichkeit zu verwenden. Zum Beispiel:
x = ['ID','State','Amount']
def funk1(lst):
return lst[-1]
def funk2(id_st_amt):
# Unpack
(Id,st,amt) = id_st_amt
return amt
funk1(x)
'Amount'
funk2(x)
'Amount'