Prepared by: Gary A. Stafford
Associated article: Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker
Read CSV-format data file into a Spark DataFrame.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.appName('04_notebook') \
.config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \
.getOrCreate()
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df1 = spark.read \
.format('csv') \
.option('header', 'true') \
.load('BreadBasket_DMS.csv', schema=bakery_schema)
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df1.show(10, False)
DataFrame rows: 21293 DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string] +----------+--------+-----------+-------------+ |date |time |transaction|item | +----------+--------+-----------+-------------+ |2016-10-30|09:58:11|1 |Bread | |2016-10-30|10:05:34|2 |Scandinavian | |2016-10-30|10:05:34|2 |Scandinavian | |2016-10-30|10:07:57|3 |Hot chocolate| |2016-10-30|10:07:57|3 |Jam | |2016-10-30|10:07:57|3 |Cookies | |2016-10-30|10:08:41|4 |Muffin | |2016-10-30|10:13:03|5 |Coffee | |2016-10-30|10:13:03|5 |Pastry | |2016-10-30|10:13:03|5 |Bread | +----------+--------+-----------+-------------+ only showing top 10 rows
Run the sql script to create the database schema and import data from CSV file.
%run -i '03_load_sql.py'
DROP TABLE IF EXISTS "transactions" DROP SEQUENCE IF EXISTS transactions_id_seq CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1 CREATE TABLE "public"."transactions" ( "id" integer DEFAULT nextval('transactions_id_seq') NOT NULL, "date" character varying(10) NOT NULL, "time" character varying(8) NOT NULL, "transaction" integer NOT NULL, "item" character varying(50) NOT NULL ) WITH (oids = false) Row count: 21293
Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame.
properties = {
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://postgres:5432/bakery',
'user': 'postgres',
'password': 'postgres1234',
'dbtable': 'transactions',
}
df2 = spark.read \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.load()
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df2.show(10, False)
DataFrame rows: 21293 DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string] +---+----------+--------+-----------+-------------+ |id |date |time |transaction|item | +---+----------+--------+-----------+-------------+ |1 |2016-10-30|09:58:11|1 |Bread | |2 |2016-10-30|10:05:34|2 |Scandinavian | |3 |2016-10-30|10:05:34|2 |Scandinavian | |4 |2016-10-30|10:07:57|3 |Hot chocolate| |5 |2016-10-30|10:07:57|3 |Jam | |6 |2016-10-30|10:07:57|3 |Cookies | |7 |2016-10-30|10:08:41|4 |Muffin | |8 |2016-10-30|10:13:03|5 |Coffee | |9 |2016-10-30|10:13:03|5 |Pastry | |10 |2016-10-30|10:13:03|5 |Bread | +---+----------+--------+-----------+-------------+ only showing top 10 rows
Create a new bakery record and load into a Spark DataFrame.
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
df3 = spark.createDataFrame(data, bakery_schema)
print('DataFrame rows: %d' % df3.count())
print('DataFrame schema: %s' % df3)
df3.show(10, False)
DataFrame rows: 1 DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string] +----------+--------+-----------+------+ |date |time |transaction|item | +----------+--------+-----------+------+ |2016-10-30|10:13:27|2 |Pastry| +----------+--------+-----------+------+
Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table.
df3.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
# should now contain one additional row of data
print('DataFrame rows: %d' % df2.count())
DataFrame rows: 21294
Overwrite the contents of the CSV file-based DataFrame to the 'transactions' table.
df1.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.option('truncate', 'true') \
.mode('overwrite') \
.save()
from math import pi
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap, cumsum
from bokeh.palettes import Paired12
output_notebook()
What are the busiest days of the week?
df1.createOrReplaceTempView('tmp_bakery')
sql_query = "SELECT date_format(date, 'EEEE') as day, count(*) as count " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"GROUP BY day " \
"ORDER BY count ASC " \
"LIMIT 10"
df4 = spark.sql(sql_query)
df4.show(10, False)
+---------+-----+ |day |count| +---------+-----+ |Wednesday|2320 | |Monday |2324 | |Tuesday |2392 | |Thursday |2646 | |Sunday |3095 | |Friday |3124 | |Saturday |4605 | +---------+-----+
data = df4.toPandas()
tooltips = [('day', '@day'), ('count', '@{count}{,}')]
days = data['day'].tolist()
color_map = factor_cmap(field_name='day', palette=Paired12, factors=days)
data['angle'] = data['count'] / data['count'].sum() * 2 * pi
plot = figure(plot_height=450,
plot_width=700,
title='Items Sold/Day',
tooltips=tooltips,
x_range=(-0.5, 1.0))
plot.wedge(x=0,
y=1,
radius=0.4,
start_angle=cumsum('angle', include_zero=True),
end_angle=cumsum('angle'),
line_color='white',
fill_color=color_map,
legend_field='day',
source=data)
plot.axis.axis_label = None
plot.axis.visible = False
plot.grid.grid_line_color = None
show(plot)
What are the busiest times of the day?
def time_increment(h, m):
"""Calculates a 30-minute time increment
Parameters:
h (str): hours, '0' or '00' to '23'
m (str): minutes, '0' or '00' to '59'
Returns:
str: 30-minute time increment, i.e. '07:30', '23:00', or '12:00'
"""
increment = (int(m) * (100 / 60)) / 100 # 0.0000 - 0.9833
increment = round(increment, 0) # 0.0 or 1.0
increment = int(increment) * 30 # 0 or 30
increment = str(h).rjust(2, '0') + ':' + str(increment).rjust(2, '0')
return increment # i.e. '07:30' or '23:00'
spark.udf.register("udfTimeIncrement", time_increment, StringType())
sql_query = "WITH tmp_table AS (" \
" SELECT udfTimeIncrement(date_format(time, 'HH'), date_format(time, 'mm')) as period, count(*) as count " \
" FROM tmp_bakery " \
" WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
" GROUP BY period " \
" ORDER BY period ASC" \
") " \
"SELECT period, count " \
"FROM tmp_table " \
"WHERE period BETWEEN '07:00' AND '19:00'"
df5 = spark.sql(sql_query)
df5.show(10, False)
+------+-----+ |period|count| +------+-----+ |07:00 |1 | |07:30 |23 | |08:00 |209 | |08:30 |436 | |09:00 |960 | |09:30 |1006 | |10:00 |1238 | |10:30 |1428 | |11:00 |1628 | |11:30 |1474 | +------+-----+ only showing top 10 rows
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('period', '@period'), ('count', '@{count}{,}')]
periods = source.data['period'].tolist()
plot = figure(x_range=periods,
plot_width=900,
plot_height=450,
min_border=0,
tooltips=tooltips)
plot.vbar(x='period', bottom=0, top='count', source=source, width=0.9)
plot.title.text = 'Items Sold/Hour'
plot.xaxis.axis_label = 'Hour of the Day'
plot.yaxis.axis_label = 'Total Items Sold'
show(plot)
What are the top selling bakery items?
sql_query = "SELECT item, count(*) as count " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"GROUP BY item " \
"ORDER BY count DESC " \
"LIMIT 10"
df6 = spark.sql(sql_query)
df6.show(10, False)
+-------------+-----+ |item |count| +-------------+-----+ |Coffee |5471 | |Bread |3325 | |Tea |1435 | |Cake |1025 | |Pastry |856 | |Sandwich |771 | |Medialuna |616 | |Hot chocolate|590 | |Cookies |540 | |Brownie |379 | +-------------+-----+
source = ColumnDataSource(data=df6.toPandas())
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = source.data['item'].tolist()
items.reverse()
plot = figure(y_range=items,
plot_width=750,
plot_height=375,
min_border=0,
tooltips=tooltips)
plot.hbar(y='item', right='count', height=.9, source=source)
plot.title.text = 'Top 10 Bakery Items'
plot.yaxis.axis_label = 'Items'
plot.xaxis.axis_label = 'Total Items Sold'
show(plot)
How many items do customers usually buy?
sql_query = "WITH tmp_table AS (" \
" SELECT transaction, count(*) as order_size " \
" FROM tmp_bakery " \
" WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
" GROUP BY transaction " \
" ORDER BY order_size DESC" \
") " \
"SELECT order_size, count(*) as count " \
"FROM tmp_table " \
"GROUP BY order_size " \
"ORDER BY order_size ASC" \
df7 = spark.sql(sql_query)
df7.show(24, False)
+----------+-----+ |order_size|count| +----------+-----+ |1 |3630 | |2 |2908 | |3 |1528 | |4 |850 | |5 |341 | |6 |135 | |7 |38 | |8 |21 | |9 |7 | |10 |2 | |11 |4 | +----------+-----+
source = ColumnDataSource(data=df7.toPandas())
tooltips = [('order_size', '@order_size'), ('count', '@count')]
items = source.data['order_size'].tolist()
items = list(map(str, items))
plot = figure(x_range=items,
plot_width=750,
plot_height=375,
min_border=0,
tooltips=tooltips)
plot.vbar(x='order_size', bottom=0, top='count', source=source, width=0.9)
plot.line(x='order_size',
y='count',
source=source,
line_color='red',
line_width=2)
plot.title.text = 'Transaction Size'
plot.xaxis.axis_label = 'Items/Transaction'
plot.yaxis.axis_label = 'Total Transactions'
show(plot)
Perform basic analysis of the bakery data using Spark SQL. Read and write resulting DataFrame contents to Apache Parquet format.
sql_query = "SELECT transaction, CAST(CONCAT(date,' ',time) as timestamp) as timestamp, item " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"ORDER BY transaction ASC, item ASC"
df8 = spark.sql(sql_query)
print('DataFrame rows: %d' % df8.count())
print('DataFrame schema: %s' % df8)
df8.show(10, False)
DataFrame rows: 20506 DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string] +-----------+-------------------+-------------+ |transaction|timestamp |item | +-----------+-------------------+-------------+ |1 |2016-10-30 09:58:11|Bread | |2 |2016-10-30 10:05:34|Scandinavian | |2 |2016-10-30 10:05:34|Scandinavian | |3 |2016-10-30 10:07:57|Cookies | |3 |2016-10-30 10:07:57|Hot chocolate| |3 |2016-10-30 10:07:57|Jam | |4 |2016-10-30 10:08:41|Muffin | |5 |2016-10-30 10:13:03|Bread | |5 |2016-10-30 10:13:03|Coffee | |5 |2016-10-30 10:13:03|Pastry | +-----------+-------------------+-------------+ only showing top 10 rows
df8.write.parquet('output/bakery_parquet', mode='overwrite')
! ls 2>&1 -lh output/bakery_parquet | head -10
! echo 'Parquet Files:' $(ls | wc -l)
total 800K -rw-r--r-- 1 garystaf users 1.9K Dec 6 03:46 part-00000-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 2.0K Dec 6 03:46 part-00001-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 1.8K Dec 6 03:46 part-00002-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 2.0K Dec 6 03:46 part-00003-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 1.9K Dec 6 03:46 part-00004-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 1.9K Dec 6 03:46 part-00005-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 2.0K Dec 6 03:46 part-00006-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 1.9K Dec 6 03:46 part-00007-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet -rw-r--r-- 1 garystaf users 2.1K Dec 6 03:46 part-00008-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet Parquet Files: 13
df9 = spark.read.parquet('output/bakery_parquet')
print('DataFrame rows: %d' % df9.count())
print('DataFrame schema: %s' % df9)
df9.select('transaction', 'timestamp', 'item') \
.sort('transaction', 'item') \
.show(10, False)
DataFrame rows: 20506 DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string] +-----------+-------------------+-------------+ |transaction|timestamp |item | +-----------+-------------------+-------------+ |1 |2016-10-30 09:58:11|Bread | |2 |2016-10-30 10:05:34|Scandinavian | |2 |2016-10-30 10:05:34|Scandinavian | |3 |2016-10-30 10:07:57|Cookies | |3 |2016-10-30 10:07:57|Hot chocolate| |3 |2016-10-30 10:07:57|Jam | |4 |2016-10-30 10:08:41|Muffin | |5 |2016-10-30 10:13:03|Bread | |5 |2016-10-30 10:13:03|Coffee | |5 |2016-10-30 10:13:03|Pastry | +-----------+-------------------+-------------+ only showing top 10 rows