Prepared by: Gary A. Stafford
Associated article: https://wp.me/p1RD28-61V
Run the PostgreSQL sql script
! pip install psycopg2-binary --upgrade --quiet
%run -i '03_load_sql.py'
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.appName('pyspark_demo_app') \
.config('spark.driver.extraClassPath',
'postgresql-42.2.10.jar') \
.master("local[*]") \
.getOrCreate()
Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame
properties = {
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://postgres:5432/demo',
'user': 'postgres',
'password': 'postgres1234',
'dbtable': 'bakery_basket',
}
df1 = spark.read \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.load()
%%time
df1.show(10)
df1.count()
Create a new bakery record and load into a DataFrame
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df2 = spark.createDataFrame(data, bakery_schema)
df2.show()
df2.count()
Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table
df2.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
df1.show(10)
df1.count()
Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame
! ls -lh *.csv
df3 = spark.read \
.format("csv") \
.option("header", "true") \
.load("BreadBasket_DMS.csv", schema=bakery_schema)
df3.show(10)
df3.count()
Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table
df3.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
df1.show(10)
df1.count()
Analyze the DataFrame's bakery data using Spark SQL
df1.createOrReplaceTempView("bakery_table")
df4 = spark.sql("SELECT * FROM bakery_table " +
"ORDER BY transaction, date, time")
df4.show(10)
df4.count()
df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table")
df5.show()
df5 = spark.sql("SELECT item, count(*) as count " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE' " +
"GROUP BY item ORDER BY count DESC " +
"LIMIT 10")
df5.show()
Create a vertical bar chart displaying DataFrame data
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap
from bokeh.palettes import Paired12
output_notebook()
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = source.data['item'].tolist()
color_map = factor_cmap(field_name='item', palette=Paired12, factors=items)
plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips)
plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map)
plot.title.text = 'Top 10 Bakery Items'
plot.xaxis.axis_label = 'Bakery Items'
plot.yaxis.axis_label = 'Total Items Sold'
show(plot)
df6 = spark.sql("SELECT CONCAT(date,' ',time) as timestamp, transaction, item " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE'" +
"ORDER BY transaction"
)
df6.show(10)
df6.count()
df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss'))
df7.printSchema()
df7.show(10)
df7.count()
df7.createOrReplaceTempView("bakery_table")
df8 = spark.sql("SELECT DISTINCT * " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE'" +
"ORDER BY transaction DESC"
)
df8.show(10)
df8.count()
Read and write DataFrame data to Parquet format files
df8.write.parquet('output/bakery_parquet', mode='overwrite')
! ls -lh output/
df9 = spark.read.parquet('output/bakery_parquet')
df9.show(10)
df9.count()