Run this notebook from Jupyter with Python kernel
pip install pyspark
# Create Spark Session, you need this to work with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("my streaming test app") \
.master("local[*]") \
.config("spark.driver.memory","2g") \
.config("spark.ui.showConsoleProgress", "false") \
.getOrCreate()
# sets the path to the directory with datafiles
PATH = "../data/streaming/"
schema = "timestamp int, name string, value double"
# Define the input part of the streaming pipeline
# This reads all the .csv files in a given directory
# It checks continuosly for arrival of new files
input_path = PATH + "*.csv"
input_stream = (spark.readStream.format("csv")
.option("header","true")
.schema(schema)
.option("path", input_path)
.load())
# Defines an output stream of the pipeline, this writes data to a view in memory
# Use for testing, in a real case you would write to files and/or Kafka
#
# Delete the checkpoint dir if it already exists
# ! rm -r myStreamingCheckPoint1
raw_stream = (input_stream.writeStream
.queryName("data_read")
.outputMode("append")
.format("memory")
.option("checkpointLocation", "myStreamingCheckPoint1")
.start())
spark.table("data_read").printSchema()
# Query the output table
# Run this multiple times, as you add csv files with data in the input_path directory
spark.sql("select * from data_read").show()
raw_stream.status
# This maps the input_stream to a temporary view, so that we can work with it using SQL
input_stream.createOrReplaceTempView("input_stream")
# Use Spark SQL to describe the aggregation and tranformation on streaming data
df = spark.sql("""
select name||'_aggregated' as name_aggregated, count(*) as n_points, sum(value) sum_values
from input_stream
group by name""")
# Defines another output stream for the pipeline
aggregated_stream = (df.writeStream
.queryName("data_aggregated")
.outputMode("complete")
.format("memory")
.option("checkpointLocation", "myStreamingCheckPoint2")
.start())
# Query the table with aggregated data, this is updated as new data arrives in the input pipeline
spark.sql("select * from data_aggregated").show()
Add more data in form of .csv files to the input_path folder and run the queries of the output streams again
# Query the output table
# Run this multiple times, as you add csv files with data in the input_path directory
spark.sql("select * from data_read").show()
# Query the table with aggregated data
# this is updated as new data arrives in the input pipeline
spark.sql("select * from data_aggregated").show()
# stop everything
raw_stream.stop()
aggregated_stream.stop()
spark.stop()