#!/usr/bin/env python # coding: utf-8 # # PySpark Demo Notebook # ## Demo # 1. Run PostgreSQL Script # 2. Load PostgreSQL Data # 3. Create New Record # 4. Write New Record to PostgreSQL Table # 5. Load CSV Data File # 6. Write Data to PostgreSQL # 7. Analyze Data with Spark SQL # 8. Graph Data with BokehJS # 9. Read and Write Data to Parquet Format # # _Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) # Associated article: https://wp.me/p1RD28-61V_ # ### Run PostgreSQL Script # Run the PostgreSQL sql script # In[1]: get_ipython().system(' pip install psycopg2-binary --upgrade --quiet') # In[2]: get_ipython().run_line_magic('run', "-i '03_load_sql.py'") # In[3]: from pyspark.sql import SparkSession from pyspark.sql.functions import to_timestamp from pyspark.sql.types import StructType, StructField, StringType, IntegerType # In[4]: spark = SparkSession \ .builder \ .appName('pyspark_demo_app') \ .config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \ .master("local[*]") \ .getOrCreate() # ### Load PostgreSQL Data # Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame # In[5]: properties = { 'driver': 'org.postgresql.Driver', 'url': 'jdbc:postgresql://postgres:5432/demo', 'user': 'postgres', 'password': 'postgres1234', 'dbtable': 'bakery_basket', } df1 = spark.read \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .load() # In[6]: get_ipython().run_cell_magic('time', '', 'df1.show(10)\ndf1.count()\n') # ### Create New Record # Create a new bakery record and load into a DataFrame # In[7]: data = [('2016-10-30', '10:13:27', 2, 'Pastry')] bakery_schema = StructType([ StructField('date', StringType(), True), StructField('time', StringType(), True), StructField('transaction', IntegerType(), True), StructField('item', StringType(), True) ]) df2 = spark.createDataFrame(data, bakery_schema) # In[8]: df2.show() df2.count() # ### Write New Record to PostgreSQL Table # Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table # In[9]: df2.write \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .mode('append') \ .save() # In[10]: df1.show(10) df1.count() # ### Load CSV File Data # Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame # In[11]: get_ipython().system(' ls -lh *.csv') # In[12]: df3 = spark.read \ .format("csv") \ .option("header", "true") \ .load("BreadBasket_DMS.csv", schema=bakery_schema) # In[13]: df3.show(10) df3.count() # ### Write Data to PostgreSQL # Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table # In[14]: df3.write \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .mode('append') \ .save() # In[15]: df1.show(10) df1.count() # ### Analyze Data with Spark SQL # Analyze the DataFrame's bakery data using Spark SQL # In[16]: df1.createOrReplaceTempView("bakery_table") df4 = spark.sql("SELECT * FROM bakery_table " + "ORDER BY transaction, date, time") df4.show(10) df4.count() # In[17]: df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table") df5.show() df5 = spark.sql("SELECT item, count(*) as count " + "FROM bakery_table " + "WHERE item NOT LIKE 'NONE' " + "GROUP BY item ORDER BY count DESC " + "LIMIT 10") df5.show() # ### Graph Data with BokehJS # Create a vertical bar chart displaying DataFrame data # In[18]: from bokeh.io import output_notebook, show from bokeh.plotting import figure from bokeh.models import ColumnDataSource from bokeh.transform import factor_cmap from bokeh.palettes import Paired12 output_notebook() source = ColumnDataSource(data=df5.toPandas()) tooltips = [('item', '@item'), ('count', '@{count}{,}')] items = source.data['item'].tolist() color_map = factor_cmap(field_name='item', palette=Paired12, factors=items) plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips) plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map) plot.title.text = 'Top 10 Bakery Items' plot.xaxis.axis_label = 'Bakery Items' plot.yaxis.axis_label = 'Total Items Sold' show(plot) # In[19]: df6 = spark.sql("SELECT CONCAT(date,' ',time) as timestamp, transaction, item " + "FROM bakery_table " + "WHERE item NOT LIKE 'NONE'" + "ORDER BY transaction" ) df6.show(10) df6.count() # In[20]: df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss')) df7.printSchema() df7.show(10) df7.count() # In[21]: df7.createOrReplaceTempView("bakery_table") df8 = spark.sql("SELECT DISTINCT * " + "FROM bakery_table " + "WHERE item NOT LIKE 'NONE'" + "ORDER BY transaction DESC" ) df8.show(10) df8.count() # ### Read and Write Data to Parquet Format # Read and write DataFrame data to Parquet format files # In[22]: df8.write.parquet('output/bakery_parquet', mode='overwrite') # In[23]: get_ipython().system(' ls -lh output/') # In[24]: df9 = spark.read.parquet('output/bakery_parquet') df9.show(10) df9.count()