Reminder: documentation at https://spark.apache.org/docs/latest/api/python/index.html
# Create the SparkSession
# and read the dataset
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("DataFrame HandsOn 1") \
.config("spark.ui.showConsoleProgress","false") \
.getOrCreate()
online_retail_schema="InvoiceNo int, StockCode string, Description string, Quantity int,\
InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string"
df = spark.read \
.option("header", "true") \
.option("timestampFormat", "M/d/yyyy H:m")\
.csv("../data/online-retail-dataset.csv.gz",
schema=online_retail_schema)
spark
SparkSession - in-memory
df.limit(5).toPandas()
InvoiceNo | StockCode | Description | Quantity | InvoiceDate | UnitPrice | CustomerId | Country | |
---|---|---|---|---|---|---|---|---|
0 | 536365 | 85123A | WHITE HANGING HEART T-LIGHT HOLDER | 6 | 2010-12-01 08:26:00 | 2.55 | 17850 | United Kingdom |
1 | 536365 | 71053 | WHITE METAL LANTERN | 6 | 2010-12-01 08:26:00 | 3.39 | 17850 | United Kingdom |
2 | 536365 | 84406B | CREAM CUPID HEARTS COAT HANGER | 8 | 2010-12-01 08:26:00 | 2.75 | 17850 | United Kingdom |
3 | 536365 | 84029G | KNITTED UNION FLAG HOT WATER BOTTLE | 6 | 2010-12-01 08:26:00 | 3.39 | 17850 | United Kingdom |
4 | 536365 | 84029E | RED WOOLLY HOTTIE WHITE HEART. | 6 | 2010-12-01 08:26:00 | 3.39 | 17850 | United Kingdom |
Task: Show 5 lines of the "description" column
df.select("description").show(5,truncate=False)
+-----------------------------------+ |description | +-----------------------------------+ |WHITE HANGING HEART T-LIGHT HOLDER | |WHITE METAL LANTERN | |CREAM CUPID HEARTS COAT HANGER | |KNITTED UNION FLAG HOT WATER BOTTLE| |RED WOOLLY HOTTIE WHITE HEART. | +-----------------------------------+ only showing top 5 rows
Task: Count the number of distinct invoices in the dataframe
from pyspark.sql.functions import countDistinct
df.select(countDistinct("InvoiceNo")).show()
+-------------------------+ |count(DISTINCT InvoiceNo)| +-------------------------+ | 22061| +-------------------------+
Task: Find out in which month most invoices have been processed
# This shows how many line items have been processed per month
from pyspark.sql.functions import month
df.groupby(month("InvoiceDate")).count().sort("count").show()
+------------------+-----+ |month(InvoiceDate)|count| +------------------+-----+ | 2|27707| | 4|29916| | 1|35147| | 8|35284| | 3|36748| | 6|36874| | 5|37030| | 7|39518| | 9|50226| | 10|60742| | 12|68006| | 11|84711| +------------------+-----+
# This shows how distinct invoices have been processed per month
from pyspark.sql.functions import col, month, countDistinct
(df
.groupBy(month('InvoiceDate'))
.agg(countDistinct('InvoiceNo').alias('DistinctInvoices'))
.orderBy(col('DistinctInvoices').desc())
.show()
)
+------------------+----------------+ |month(InvoiceDate)|DistinctInvoices| +------------------+----------------+ | 11| 3021| | 12| 2568| | 10| 2275| | 9| 1994| | 5| 1848| | 6| 1683| | 3| 1665| | 7| 1657| | 4| 1504| | 8| 1456| | 1| 1216| | 2| 1174| +------------------+----------------+
Task: Filter the lines where the Quantity is more than 30
df.where("Quantity > 30").show()
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ | 536367| 84879|ASSORTED COLOUR B...| 32|2010-12-01 08:34:00| 1.69| 13047|United Kingdom| | 536370| 10002|INFLATABLE POLITI...| 48|2010-12-01 08:45:00| 0.85| 12583| France| | 536370| 22492|MINI PAINT SET VI...| 36|2010-12-01 08:45:00| 0.65| 12583| France| | 536371| 22086|PAPER CHAIN KIT 5...| 80|2010-12-01 09:00:00| 2.55| 13748|United Kingdom| | 536374| 21258|VICTORIAN SEWING ...| 32|2010-12-01 09:09:00| 10.95| 15100|United Kingdom| | 536376| 22114|HOT WATER BOTTLE ...| 48|2010-12-01 09:32:00| 3.45| 15291|United Kingdom| | 536376| 21733|RED HANGING HEART...| 64|2010-12-01 09:32:00| 2.55| 15291|United Kingdom| | 536378| 21212|PACK OF 72 RETROS...| 120|2010-12-01 09:37:00| 0.42| 14688|United Kingdom| | 536378| 85183B|CHARLIE & LOLA WA...| 48|2010-12-01 09:37:00| 1.25| 14688|United Kingdom| | 536378| 85071B|RED CHARLIE+LOLA ...| 96|2010-12-01 09:37:00| 0.38| 14688|United Kingdom| | 536381| 22719|GUMBALL MONOCHROM...| 36|2010-12-01 09:41:00| 1.06| 15311|United Kingdom| | 536382| 22381|TOY TIDY PINK POL...| 50|2010-12-01 09:45:00| 1.85| 16098|United Kingdom| | 536384| 84755|COLOUR GLASS T-LI...| 48|2010-12-01 09:53:00| 0.65| 18074|United Kingdom| | 536384| 22469|HEART OF WICKER S...| 40|2010-12-01 09:53:00| 1.45| 18074|United Kingdom| | 536384| 22470|HEART OF WICKER L...| 40|2010-12-01 09:53:00| 2.55| 18074|United Kingdom| | 536386| 84880|WHITE WIRE EGG HO...| 36|2010-12-01 09:57:00| 4.95| 16029|United Kingdom| | 536386| 85099C|JUMBO BAG BAROQU...| 100|2010-12-01 09:57:00| 1.65| 16029|United Kingdom| | 536386| 85099B|JUMBO BAG RED RET...| 100|2010-12-01 09:57:00| 1.65| 16029|United Kingdom| | 536387| 79321| CHILLI LIGHTS| 192|2010-12-01 09:58:00| 3.82| 16029|United Kingdom| | 536387| 22780|LIGHT GARLAND BUT...| 192|2010-12-01 09:58:00| 3.37| 16029|United Kingdom| +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ only showing top 20 rows
Task: Show the four most sold items (by quantity)
from pyspark.sql.functions import desc, asc, expr
(df.groupBy("Description")
.agg(expr("sum(Quantity) as totalQuantity"))
.sort("totalQuantity", ascending=False)
.show(4))
+--------------------+-------------+ | Description|totalQuantity| +--------------------+-------------+ |WORLD WAR 2 GLIDE...| 53847| |JUMBO BAG RED RET...| 47363| |ASSORTED COLOUR B...| 36381| | POPCORN HOLDER| 36334| +--------------------+-------------+ only showing top 4 rows
Bonus question: why do these two operations return different results? Hint: look at the documentation
print(df.select("InvoiceNo").distinct().count())
from pyspark.sql.functions import countDistinct
df.select(countDistinct("InvoiceNo")).show()
22062 +-------------------------+ |count(DISTINCT InvoiceNo)| +-------------------------+ | 22061| +-------------------------+
As you can see from the output of countDistinct
, internally it runs count(DISTINCT
, which excludes null
s.
https://spark.apache.org/docs/latest/api/sql/#count
count()
Returns the total number of retrieved rows, including rows containing null
count(DISTINCT expr[, expr...])
- Returns the number of rows for which the supplied expression(s) are unique and non-null.