Run this notebook from Jupyter with Python kernel
pip install pyspark
In order to run this notebook as slides:
pip install RISE
# !pip install pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master("local[*]") \
.appName("DataFrame HandsOn 1") \
.config("spark.ui.showConsoleProgress","false") \
.getOrCreate()
)
spark
The master local[*]
means that the executors are in the same node that is running the driver. The *
tells Spark to start as many executors as there are logical cores available
This demostrates how to read a csv file and construct a DataFrame.
We will use the online retail dataset from Kaggle, credits: https://www.kaggle.com/datasets/vijayuv/onlineretail
!gzip -cd ../data/online-retail-dataset.csv.gz 2>&1| head -n3
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom 536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"
df = (spark.read
.option("header", "true")
.option("timestampFormat", "M/d/yyyy H:m")
.csv("../data/online-retail-dataset.csv.gz",
schema=online_retail_schema)
)
df.show(2, False)
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode|Description |Quantity|InvoiceDate |UnitPrice|CustomerId|Country | +---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+ |536365 |85123A |WHITE HANGING HEART T-LIGHT HOLDER|6 |2010-12-01 08:26:00|2.55 |17850 |United Kingdom| |536365 |71053 |WHITE METAL LANTERN |6 |2010-12-01 08:26:00|3.39 |17850 |United Kingdom| +---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+ only showing top 2 rows
df.printSchema()
root |-- InvoiceNo: integer (nullable = true) |-- StockCode: string (nullable = true) |-- Description: string (nullable = true) |-- Quantity: integer (nullable = true) |-- InvoiceDate: timestamp (nullable = true) |-- UnitPrice: float (nullable = true) |-- CustomerId: integer (nullable = true) |-- Country: string (nullable = true)
Select dataframe columns
# select single column
df.select("Country").show(2)
+--------------+ | Country| +--------------+ |United Kingdom| |United Kingdom| +--------------+ only showing top 2 rows
Select multiple columns
df.select("StockCode","Description","UnitPrice").show(n=2, truncate=False)
+---------+----------------------------------+---------+ |StockCode|Description |UnitPrice| +---------+----------------------------------+---------+ |85123A |WHITE HANGING HEART T-LIGHT HOLDER|2.55 | |71053 |WHITE METAL LANTERN |3.39 | +---------+----------------------------------+---------+ only showing top 2 rows
df.columns
['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerId', 'Country']
# select first 5 columns
df.select(df.columns[0:5]).show(2)
+---------+---------+--------------------+--------+-------------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate| +---------+---------+--------------------+--------+-------------------+ | 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| | 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| +---------+---------+--------------------+--------+-------------------+ only showing top 2 rows
# selects all the original columns and adds a new column that specifies high value item
(df.selectExpr(
"*", # all original columns
"(UnitPrice > 100) as HighValueItem")
.show(2)
)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|HighValueItem| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+ | 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850|United Kingdom| false| | 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850|United Kingdom| false| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+ only showing top 2 rows
# selects all the original columns and adds a new column that specifies high value item
(df.selectExpr(
"sum(Quantity) as TotalQuantity",
"cast(sum(UnitPrice) as int) as InventoryValue")
.show()
)
+-------------+--------------+ |TotalQuantity|InventoryValue| +-------------+--------------+ | 5176450| 2498803| +-------------+--------------+
# add a new column called InvoiceValue
from pyspark.sql.functions import expr
df_1 = (df
.withColumn("InvoiceValue", expr("UnitPrice * Quantity"))
.select("InvoiceNo","Description","UnitPrice","Quantity","InvoiceValue")
)
df_1.show(2, False)
# rename InvoiceValue to LineTotal
df_2 = df_1.withColumnRenamed("InvoiceValue","LineTotal")
df_2.show(2, False)
# drop a column
df_2.drop("LineTotal").show(2, False)
+---------+----------------------------------+---------+--------+------------+ |InvoiceNo|Description |UnitPrice|Quantity|InvoiceValue| +---------+----------------------------------+---------+--------+------------+ |536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 |15.299999 | |536365 |WHITE METAL LANTERN |3.39 |6 |20.34 | +---------+----------------------------------+---------+--------+------------+ only showing top 2 rows +---------+----------------------------------+---------+--------+---------+ |InvoiceNo|Description |UnitPrice|Quantity|LineTotal| +---------+----------------------------------+---------+--------+---------+ |536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 |15.299999| |536365 |WHITE METAL LANTERN |3.39 |6 |20.34 | +---------+----------------------------------+---------+--------+---------+ only showing top 2 rows +---------+----------------------------------+---------+--------+ |InvoiceNo|Description |UnitPrice|Quantity| +---------+----------------------------------+---------+--------+ |536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 | |536365 |WHITE METAL LANTERN |3.39 |6 | +---------+----------------------------------+---------+--------+ only showing top 2 rows
from pyspark.sql.functions import col
# select invoice lines with quantity > 50 and unitprice > 20
df.where(col("Quantity") > 20).where(col("UnitPrice") > 50).show(2)
df.filter(df.Quantity > 20).filter(df.UnitPrice > 50).show(2)
df.filter("Quantity > 20 and UnitPrice > 50").show(2)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
# select invoice lines with quantity > 100 or unitprice > 20
df.where((col("Quantity") > 100) | (col("UnitPrice") > 20)).show(2)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 536378| 21212|PACK OF 72 RETROS...| 120|2010-12-01 09:37:00| 0.42| 14688|United Kingdom| | null| D| Discount| -1|2010-12-01 09:41:00| 27.5| 14527|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ only showing top 2 rows
from pyspark.sql.functions import desc, asc
# sort in the default order: ascending
df.orderBy(expr("UnitPrice")).show(2)
df.orderBy(col("Quantity").desc(), col("UnitPrice").asc()).show(10)
+---------+---------+---------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+---------------+--------+-------------------+---------+----------+--------------+ | null| B|Adjust bad debt| 1|2011-08-12 14:52:00|-11062.06| null|United Kingdom| | null| B|Adjust bad debt| 1|2011-08-12 14:51:00|-11062.06| null|United Kingdom| +---------+---------+---------------+--------+-------------------+---------+----------+--------------+ only showing top 2 rows +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 581483| 23843|PAPER CRAFT , LIT...| 80995|2011-12-09 09:15:00| 2.08| 16446|United Kingdom| | 541431| 23166|MEDIUM CERAMIC TO...| 74215|2011-01-18 10:01:00| 1.04| 12346|United Kingdom| | 578841| 84826|ASSTD DESIGN 3D P...| 12540|2011-11-25 15:57:00| 0.0| 13256|United Kingdom| | 542504| 37413| null| 5568|2011-01-28 12:03:00| 0.0| null|United Kingdom| | 573008| 84077|WORLD WAR 2 GLIDE...| 4800|2011-10-27 12:26:00| 0.21| 12901|United Kingdom| | 554868| 22197|SMALL POPCORN HOLDER| 4300|2011-05-27 10:52:00| 0.72| 13135|United Kingdom| | 556231| 85123A| ?| 4000|2011-06-09 15:04:00| 0.0| null|United Kingdom| | 544612| 22053|EMPIRE DESIGN ROS...| 3906|2011-02-22 10:43:00| 0.82| 18087|United Kingdom| | 560599| 18007|ESSENTIAL BALM 3....| 3186|2011-07-19 17:04:00| 0.06| 14609|United Kingdom| | 550461| 21108|FAIRY CAKE FLANNE...| 3114|2011-04-18 13:20:00| 2.1| 15749|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ only showing top 10 rows
full list of built int functions - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions
%%time
# Count distinct customers
from pyspark.sql.functions import countDistinct
df.select(countDistinct("CustomerID")).show()
+--------------------------+ |count(DISTINCT CustomerID)| +--------------------------+ | 4372| +--------------------------+ CPU times: user 1.48 ms, sys: 2.55 ms, total: 4.02 ms Wall time: 1.49 s
%%time
# approx. distinct stock items
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("CustomerID", 0.1)).show()
+---------------------------------+ |approx_count_distinct(CustomerID)| +---------------------------------+ | 4336| +---------------------------------+ CPU times: user 2.36 ms, sys: 1.57 ms, total: 3.93 ms Wall time: 1.24 s
# average, maximum and minimum purchase quantity
from pyspark.sql.functions import avg, max, min
( df.select(
avg("Quantity").alias("avg_purchases"),
max("Quantity").alias("max_purchases"),
min("Quantity").alias("min_purchases"))
.show()
)
+----------------+-------------+-------------+ | avg_purchases|max_purchases|min_purchases| +----------------+-------------+-------------+ |9.55224954743324| 80995| -80995| +----------------+-------------+-------------+
# count of items on the invoice
df.groupBy("InvoiceNo", "CustomerId").count().show(5)
# grouping with expressions
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
.show(5)
+---------+----------+-----+ |InvoiceNo|CustomerId|count| +---------+----------+-----+ | 536573| 17025| 4| | 537228| 17677| 1| | 537419| 13495| 14| | 538093| 12682| 33| | 538648| 17937| 5| +---------+----------+-----+ only showing top 5 rows +---------+------------------+--------------------+ |InvoiceNo| avg(Quantity)|stddev_pop(Quantity)| +---------+------------------+--------------------+ | 536532| 25.36986301369863| 16.850272831671976| | 537632| 1.0| 0.0| | 538708| 10.61111111111111| 7.150282736359209| | 538877|14.258278145695364| 27.56989037543246| | 538993| 9.333333333333334| 2.748737083745107| +---------+------------------+--------------------+ only showing top 5 rows
%%time
is an iPython magic https://ipython.readthedocs.io/en/stable/interactive/magics.html
It's possible to read files without specifying the schema. Some file formats (Parquet is one of them) include the schema, which means that Spark can start reading the file. For format without schema (csv, json...) Spark can infer the schema. Let's see what's the difference in terms of time and of results:
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"
%%time
df = spark.read \
.option("header", "true") \
.option("timestampFormat", "M/d/yyyy H:m")\
.csv("../data/online-retail-dataset.csv.gz",
schema=online_retail_schema)
CPU times: user 735 µs, sys: 2.35 ms, total: 3.08 ms Wall time: 36.4 ms
%%time
df_infer = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/online-retail-dataset.csv.gz")
CPU times: user 2.45 ms, sys: 741 µs, total: 3.19 ms Wall time: 2.03 s
Reminder: documentation at https://spark.apache.org/docs/latest/api/python/index.html
If you didn't run the previous cells, run the following one:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("DataFrame HandsOn 1") \
.config("spark.ui.showConsoleProgress","false") \
.getOrCreate()
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"
df = spark.read \
.option("header", "true") \
.option("timestampFormat", "M/d/yyyy H:m")\
.csv("../data/online-retail-dataset.csv.gz",
schema=online_retail_schema)
Task: Show 5 lines of the "description" column
Task: Count the number of distinct invoices in the dataframe
Task: Find out in which month most invoices have been issued
Task: Filter the lines where the Quantity is more than 30
Task: Show the four most sold items (by quantity)
Bonus question: why do these two operations return different results? Hint: look at the documentation
print(df.select("InvoiceNo").distinct().count())
from pyspark.sql.functions import countDistinct
df.select(countDistinct("InvoiceNo")).show()