This example is to demonstrate how to use Fugue SQL to do data analytics. You will see how we keep the backbone inside SQL while using Fugue extensions to accomplish certain things.
Also pay attention to how we iterate on the problem: from partial to all, from local to spark. It's very efficient to do thing in this way because it can maximize your iteration speed.
This dataset is from Kaggle Novel Corona Virus 2019 Dataset.
There is small amount of python code to write in order to run each piece of Fugue SQL, however, I want to make that even simpler using magic. So firstly I registered a new magic %%sql
so I can directly write Fugue SQL in cells.
I also feel that the built in PRINT
can't print fat tables nicely in this environment, so I write my own print extension pprint
to render the dataframes as pandas dataframes.
from fugue import NativeExecutionEngine, DataFrames, PandasDataFrame, Schema, Transformer, IterableDataFrame, ArrayDataFrame
from fugue_sql import FugueSQLWorkflow
from fugue_spark import SparkExecutionEngine
from IPython.display import display, HTML
from IPython.core.magic import (register_cell_magic, register_line_cell_magic)
from typing import Iterable, Dict, Any
from datetime import datetime
@register_cell_magic
def sql(line, cell):
dag = FugueSQLWorkflow()
dag(cell)
dag.run(NativeExecutionEngine if line=="" else line)
def pprint(dfs:DataFrames, rows = 10, title=None):
if title is not None:
print(title)
for df in dfs.values():
pdf= PandasDataFrame(df.head(rows), df.schema)
display(pdf.native)
Now I want to take a peek at the example datasets and also try to test the magic and pprint
. The following sql will run on NativeExecutionEngine
because no engine specified
%%sql
confirmed = LOAD CSV "../data/confirmed.csv"(header=true)
OUTPUT USING pprint(rows=20)
death = LOAD CSV "../data/death.csv"(header=true)
OUTPUT USING pprint
The table is very fat, each date is a column. It's not straightforward and not easy to do data analysis on such data structure. So I need to pivot the table so each date becomes a row. In pandas and Spark, they have their own ways to pivot table, neither is simple, plus I notice additional steps is needed to convert the column name to a date, so a great idea is to write a transformer to do that as a whole, so this logic becomes platform and scale agnostic.
I have many options to write a transformer, but it seems implementing the interface is the best way because
on_init
, it will run only once for each physical partition.class Pivot(Transformer):
def get_output_schema(self, df):
fields = [f for f in df.schema.fields if not f.name.startswith("_")]
col = self.params.get_or_throw("col",str)
return Schema(fields)+f"date:datetime,{col}:int"
def on_init(self, df):
self.value_col = self.params.get_or_throw("col",str)
self.cols = [f for f in df.schema.names if not f.startswith("_")]
self.date_cols = {f:datetime.strptime(f[1:], '%m_%d_%y')
for f in df.schema.names if f.startswith("_")}
def transform(self, df):
def get_rows():
for row in df.as_dict_iterable():
res = [row[k] for k in self.cols]
for k,v in self.date_cols.items():
yield res + [v,int(row[k])]
return IterableDataFrame(get_rows(), self.output_schema)
In the above transform
code, I choose to consume the input as iterable and use IterableDataFrame
as the output, so it will minimize the memory usage. Think about that, if the df is large, without treating it in the pure streaming way, the original data and pivot table can use large amount of memory, and it is unnecessary.
Now, I want to test this transformer locally. So I use NativeExecutionEngine
to run this. Before transforming, I use a SELECT LIMIT
to make the input data really small, so it can run quickly on native python.
It's very important to have this validation step
NativeExecutionEngine
is extremely fast to find errors, and more importantly, all errors will be more explicit than running on cluster. Plus, at this step, if you see any errors, you can add print
inside the transformer code to help debug, they will all display.%%sql
LOAD CSV "../data/confirmed.csv"(header=true)
SELECT * WHERE iso3 = 'USA' LIMIT 10
confirmed = TRANSFORM USING Pivot(col="confirmed")
OUTPUT USING pprint
LOAD CSV "../data/death.csv"(header=true)
SELECT * WHERE iso3 = 'USA' LIMIT 10
death = TRANSFORM USING Pivot(col="death")
OUTPUT USING pprint
It seems it works well, now let's start using SparkExecutionEngine
to make it run on Spark. I need to join the 2 dataframes so each row will have confirmed
and death
because I always want to use them together in later steps.
So here, I use a JOIN
statement. Notice I PERSIST
the select output because I will both pprint
and SAVE
the same output, persisting it will avoid rerun in the execution plan. Instead of adding persist explicitly, here is an alternative solution
Also, saving intermediate data into a persistent storage is often a good idea for exploration. It's like a checkpoint.
%%sql SparkExecutionEngine
LOAD CSV "../data/confirmed.csv"(header=true)
SELECT * WHERE iso3 = 'USA'
confirmed = TRANSFORM USING Pivot(col="confirmed")
LOAD CSV "../data/death.csv"(header=true)
SELECT * WHERE iso3 = 'USA'
death = TRANSFORM USING Pivot(col="death")
SELECT
confirmed.Combined_Key AS key,
confirmed.Admin2 AS city,
confirmed.Province_State AS state,
Population AS population,
confirmed.date,
confirmed.confirmed,
death
FROM confirmed INNER JOIN death
ON confirmed.Combined_Key = death.Combined_Key AND confirmed.date = death.date
PERSIST
OUTPUT USING pprint
SAVE OVERWRITE "/tmp/covid19.parquet"
As you can see I load the parquet back and do data analytics using SQL, and output using pprint
. Here you should take a break and use differet SELECT
to explore the data by yourself for a moment.
%%sql SparkExecutionEngine
data = LOAD "/tmp/covid19.parquet"
SELECT DISTINCT city, state, population FROM data
OUTPUT USING pprint
SELECT state, date, SUM(confirmed) AS confirmed, SUM(death) AS death
FROM data GROUP BY state, date
OUTPUT USING pprint
It will be useful to see the confirmed vs death chart for different partitions (I should be able to define the partition, either on city or state or something else). So again, my problem becomes, for a certain partition, I want to draw a chart, and in the end, it can be collected on driver to display in the notebook.
This again is a typical transformation. For each partition, I draw the picture save to a common storage and return the path, so the driver can collect them and render together.
And again, how should I write a transformer? It need to use the partition key information, so the most convenient way is still implementing the interface because this way, the partition values are easy to access.
import matplotlib.pyplot as plt
from uuid import uuid4
import os
from IPython.display import SVG
class Draw(Transformer):
def get_output_schema(self, df):
return self.key_schema + "path:str"
def transform(self, df):
root = self.params.get_or_throw("path", str)
pdf = df.as_pandas()[["date","confirmed","death"]]
pdf=pdf.set_index("date")
fig = pdf.plot(title=self.cursor.key_value_dict.__repr__()).get_figure()
path = os.path.join(root, str(uuid4()))
fig.savefig(path, format="svg")
res = self.cursor.key_value_array + [path]
return ArrayDataFrame([res], self.output_schema)
def render_all(df:Iterable[Dict[str,Any]]) -> None:
for row in df:
display(SVG(filename=row["path"]))
Now let's first test it using native python on a small data set
%%sql
data = LOAD "/tmp/covid19.parquet"
SELECT * WHERE key = 'Adams, Mississippi, US'
TRANSFORM PREPARTITION BY key USING Draw(path="/tmp")
OUTPUT USING render_all
It seems wrong, the number should not jump between 0 and 200. AH, it's because we didn't sort the date. So we can sort the date inside the transformer before drawing, or we can PRESORT
in Fugue SQL like below.
%%sql
data = LOAD "/tmp/covid19.parquet"
SELECT * WHERE key = 'Adams, Mississippi, US'
TRANSFORM PREPARTITION BY key PRESORT date USING Draw(path="/tmp")
OUTPUT USING render_all
Also you can see it rendered twice, it's because the plot
inside the transformer is also rendered here because it's NativeExecutionEngine. This shouldn't happen when using SparkExecutionEngine
, so we are fine with it.
We only want to draw the charts on the 10 states with highest death numbers, so the following code, we firstly find the top 10 states, then we use SEMI JOIN
to filter the dataset and then we aggregate and use the transformer to plot.
Notice in the last SELECT
, we can use inline TRANSFORM
. Fugue SQL is more flexible than standard SQL.
%%sql SparkExecutionEngine
data = LOAD "/tmp/covid19.parquet"
top10 =
SELECT state, SUM(death) AS death
FROM data GROUP BY state
ORDER BY death DESC LIMIT 10 PERSIST
data =
SELECT * FROM data
LEFT SEMI JOIN top10 ON data.state = top10.state
WHERE date > '2020-04-01'
SELECT state, date, SUM(confirmed) AS confirmed, SUM(death) AS death
GROUP BY state, date
# this step is to sort the output by death, the output of Draw doesn't have death, so we need join
SELECT a.*, death FROM
(TRANSFORM PREPARTITION BY state PRESORT date USING Draw(path="/tmp")) AS a
INNER JOIN top10 ON a.state = top10.state
ORDER BY death DESC
OUTPUT USING render_all
The way we solve this problem is very platform and scale agnostic. No matter you run it on native python with small data or on Spark with very large data, you may no longer need to change the code.