#!/usr/bin/env python # coding: utf-8 # # COVID19 Data Exploration # # This example is to demonstrate how to use Fugue SQL to do data analytics. You will see how we keep the backbone inside SQL while using Fugue extensions to accomplish certain things. # # Also pay attention to how we iterate on the problem: from partial to all, from local to spark. It's very efficient to do thing in this way because it can maximize your iteration speed. # # This dataset is from [Kaggle Novel Corona Virus 2019 Dataset](https://www.kaggle.com/sudalairajkumar/novel-corona-virus-2019-dataset). # # ## First of all, I want to make the experiment environment more friendly # # There is small amount of python code to write in order to run each piece of Fugue SQL, however, I want to make that even simpler using [magic](https://ipython.readthedocs.io/en/stable/interactive/magics.html). So firstly I registered a new magic `%%sql` so I can directly write Fugue SQL in cells. # # I also feel that the built in `PRINT` can't print fat tables nicely in this environment, so I write my own print extension `pprint` to render the dataframes as pandas dataframes. # In[ ]: from fugue import NativeExecutionEngine, DataFrames, PandasDataFrame, Schema, Transformer, IterableDataFrame, ArrayDataFrame from fugue_sql import FugueSQLWorkflow from fugue_spark import SparkExecutionEngine from IPython.display import display, HTML from IPython.core.magic import (register_cell_magic, register_line_cell_magic) from typing import Iterable, Dict, Any from datetime import datetime @register_cell_magic def sql(line, cell): dag = FugueSQLWorkflow() dag(cell) dag.run(NativeExecutionEngine if line=="" else line) def pprint(dfs:DataFrames, rows = 10, title=None): if title is not None: print(title) for df in dfs.values(): pdf= PandasDataFrame(df.head(rows), df.schema) display(pdf.native) # ## Now I start to explore the data # Now I want to take a peek at the example datasets and also try to test the magic and `pprint`. The following sql will run on `NativeExecutionEngine` because no engine specified # In[ ]: get_ipython().run_cell_magic('sql', '', 'confirmed = LOAD CSV "../data/confirmed.csv"(header=true)\nOUTPUT USING pprint(rows=20)\ndeath = LOAD CSV "../data/death.csv"(header=true)\nOUTPUT USING pprint\n') # The table is very fat, each date is a column. It's not straightforward and not easy to do data analysis on such data structure. So I need to pivot the table so each date becomes a row. In pandas and Spark, they have their own ways to pivot table, neither is simple, plus I notice additional steps is needed to convert the column name to a date, so a great idea is to write a transformer to do that as a whole, so this logic becomes platform and scale agnostic. # # I have many options to write a transformer, but it seems implementing the interface is the best way because # # * the output schema is dependent on parameters, so schema hint is not an option # * for each transform, it needs to reshape data according the schema, this preprocess is partition agnostic, so if we put that into `on_init`, it will run only once for each physical partition. # In[ ]: class Pivot(Transformer): def get_output_schema(self, df): fields = [f for f in df.schema.fields if not f.name.startswith("_")] col = self.params.get_or_throw("col",str) return Schema(fields)+f"date:datetime,{col}:int" def on_init(self, df): self.value_col = self.params.get_or_throw("col",str) self.cols = [f for f in df.schema.names if not f.startswith("_")] self.date_cols = {f:datetime.strptime(f[1:], '%m_%d_%y') for f in df.schema.names if f.startswith("_")} def transform(self, df): def get_rows(): for row in df.as_dict_iterable(): res = [row[k] for k in self.cols] for k,v in self.date_cols.items(): yield res + [v,int(row[k])] return IterableDataFrame(get_rows(), self.output_schema) # In the above `transform` code, I choose to consume the input as iterable and use `IterableDataFrame` as the output, so it will minimize the memory usage. Think about that, if the df is large, without treating it in the pure streaming way, the original data and pivot table can use large amount of memory, and it is unnecessary. # # Now, I want to test this transformer locally. So I use `NativeExecutionEngine` to run this. Before transforming, I use a `SELECT LIMIT` to make the input data really small, so it can run quickly on native python. # # **It's very important to have this validation step** # # * Make sure you add a filtering statement to make the input data small because it's to test correctness not scalability. # * Running on `NativeExecutionEngine` is extremely fast to find errors, and more importantly, all errors will be more explicit than running on cluster. Plus, at this step, if you see any errors, you can add `print` inside the transformer code to help debug, they will all display. # * Consider using mock data as the input as well, Fugue SQL is very unit-testable, you should consider making this part one of your unittests (with mock input data) # In[ ]: get_ipython().run_cell_magic('sql', '', 'LOAD CSV "../data/confirmed.csv"(header=true)\nSELECT * WHERE iso3 = \'USA\' LIMIT 10\nconfirmed = TRANSFORM USING Pivot(col="confirmed")\nOUTPUT USING pprint\n \nLOAD CSV "../data/death.csv"(header=true)\nSELECT * WHERE iso3 = \'USA\' LIMIT 10\ndeath = TRANSFORM USING Pivot(col="death")\nOUTPUT USING pprint\n') # ## Bring it to Spark! # # It seems it works well, now let's start using `SparkExecutionEngine` to make it run on Spark. I need to join the 2 dataframes so each row will have `confirmed` and `death` because I always want to use them together in later steps. # # So here, I use a `JOIN` statement. Notice I `PERSIST` the select output because I will both `pprint` and `SAVE` the same output, persisting it will avoid rerun in the execution plan. Instead of adding persist explicitly, here is an [alternative solution](useful_config.ipynb#Auto-Persist) # # Also, saving intermediate data into a persistent storage is often a good idea for exploration. It's like a checkpoint. # In[ ]: get_ipython().run_cell_magic('sql', 'SparkExecutionEngine', 'LOAD CSV "../data/confirmed.csv"(header=true)\nSELECT * WHERE iso3 = \'USA\'\nconfirmed = TRANSFORM USING Pivot(col="confirmed")\n \nLOAD CSV "../data/death.csv"(header=true)\nSELECT * WHERE iso3 = \'USA\'\ndeath = TRANSFORM USING Pivot(col="death")\n\nSELECT \n confirmed.Combined_Key AS key,\n confirmed.Admin2 AS city,\n confirmed.Province_State AS state,\n Population AS population,\n confirmed.date,\n confirmed.confirmed,\n death\nFROM confirmed INNER JOIN death\n ON confirmed.Combined_Key = death.Combined_Key AND confirmed.date = death.date\nPERSIST\n \nOUTPUT USING pprint\nSAVE OVERWRITE "/tmp/covid19.parquet"\n') # As you can see I load the parquet back and do data analytics using SQL, and output using `pprint`. Here you should take a break and use differet `SELECT` to explore the data by yourself for a moment. # In[ ]: get_ipython().run_cell_magic('sql', 'SparkExecutionEngine', 'data = LOAD "/tmp/covid19.parquet"\n\nSELECT DISTINCT city, state, population FROM data\nOUTPUT USING pprint\n\nSELECT state, date, SUM(confirmed) AS confirmed, SUM(death) AS death\n FROM data GROUP BY state, date\nOUTPUT USING pprint\n') # ## Visualize it # # It will be useful to see the confirmed vs death chart for different partitions (I should be able to define the partition, either on city or state or something else). So again, my problem becomes, for a certain partition, I want to draw a chart, and in the end, it can be collected on driver to display in the notebook. # # This again is a typical transformation. For each partition, I draw the picture save to a common storage and return the path, so the driver can collect them and render together. # # And again, how should I write a transformer? It need to use the partition key information, so the most convenient way is still implementing the interface because this way, the partition values are easy to access. # # # In[ ]: import matplotlib.pyplot as plt from uuid import uuid4 import os from IPython.display import SVG class Draw(Transformer): def get_output_schema(self, df): return self.key_schema + "path:str" def transform(self, df): root = self.params.get_or_throw("path", str) pdf = df.as_pandas()[["date","confirmed","death"]] pdf=pdf.set_index("date") fig = pdf.plot(title=self.cursor.key_value_dict.__repr__()).get_figure() path = os.path.join(root, str(uuid4())) fig.savefig(path, format="svg") res = self.cursor.key_value_array + [path] return ArrayDataFrame([res], self.output_schema) def render_all(df:Iterable[Dict[str,Any]]) -> None: for row in df: display(SVG(filename=row["path"])) # Now let's first test it using native python on a small data set # In[ ]: get_ipython().run_cell_magic('sql', '', 'data = LOAD "/tmp/covid19.parquet"\nSELECT * WHERE key = \'Adams, Mississippi, US\'\nTRANSFORM PREPARTITION BY key USING Draw(path="/tmp")\nOUTPUT USING render_all\n') # It seems wrong, the number should not jump between 0 and 200. AH, it's because we didn't sort the date. So we can sort the date inside the transformer before drawing, or we can `PRESORT` in Fugue SQL like below. # In[ ]: get_ipython().run_cell_magic('sql', '', 'data = LOAD "/tmp/covid19.parquet"\nSELECT * WHERE key = \'Adams, Mississippi, US\'\nTRANSFORM PREPARTITION BY key PRESORT date USING Draw(path="/tmp")\nOUTPUT USING render_all\n') # Also you can see it rendered twice, it's because the `plot` inside the transformer is also rendered here because it's NativeExecutionEngine. This shouldn't happen when using `SparkExecutionEngine`, so we are fine with it. # # We only want to draw the charts on the 10 states with highest death numbers, so the following code, we firstly find the top 10 states, then we use `SEMI JOIN` to filter the dataset and then we aggregate and use the transformer to plot. # # Notice in the last `SELECT`, we can use inline `TRANSFORM`. Fugue SQL is more flexible than standard SQL. # In[ ]: get_ipython().run_cell_magic('sql', 'SparkExecutionEngine', 'data = LOAD "/tmp/covid19.parquet"\n\ntop10 = \n SELECT state, SUM(death) AS death \n FROM data GROUP BY state \n ORDER BY death DESC LIMIT 10 PERSIST\n\ndata = \n SELECT * FROM data \n LEFT SEMI JOIN top10 ON data.state = top10.state\n WHERE date > \'2020-04-01\'\n\nSELECT state, date, SUM(confirmed) AS confirmed, SUM(death) AS death\n GROUP BY state, date\n \n# this step is to sort the output by death, the output of Draw doesn\'t have death, so we need join\nSELECT a.*, death FROM \n (TRANSFORM PREPARTITION BY state PRESORT date USING Draw(path="/tmp")) AS a\n INNER JOIN top10 ON a.state = top10.state\n ORDER BY death DESC\n \nOUTPUT USING render_all\n') # ## Summary # # The way we solve this problem is very platform and scale agnostic. No matter you run it on native python with small data or on Spark with very large data, you may no longer need to change the code.