#!/usr/bin/env python # coding: utf-8 # # Operating on Dask Dataframes with SQL # # [Dask-SQL](https://dask-sql.readthedocs.io/en/stable/) is an open source project and Python package leveraging [Apache Calcite](https://calcite.apache.org/) to provide a SQL frontend for [Dask](https://dask.org/) dataframe operations, allowing SQL users to take advantage of Dask's distributed capabilities without requiring an extensive knowledge of the dataframe API. # In[ ]: get_ipython().system(' pip install dask-sql') # ## Set up a Dask cluster # # Setting up a Dask [Cluster](https://docs.dask.org/en/latest/deploying.html) is optional, but can dramatically expand our options for distributed computation by giving us access to Dask workers on GPUs, remote machines, common cloud providers, and more). # Additionally, connecting our cluster to a Dask [Client](https://distributed.dask.org/en/stable/client.html) will give us access to a dashboard, which can be used to monitor the progress of active computations and diagnose issues. # # For this notebook, we will create a local cluster and connect it to a client. # Once the client has been created, a link will appear to its associated dashboard, which can be viewed throughout the following computations. # In[ ]: from dask.distributed import Client client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB') client # ## Create a context # # A `dask_sql.Context` is the Python equivalent to a SQL database, serving as an interface to register all tables and functions used in SQL queries, as well as execute the queries themselves. # In typical usage, a single `Context` is created and used for the duration of a Python script or notebook. # In[ ]: from dask_sql import Context c = Context() # ## Load and register data # # Once a `Context` has been created, there are a variety of ways to register tables in it. # The simplest way to do this is through the `create_table` method, which accepts a variety of input types which Dask-SQL then uses to infer the table creation method. # Supported input types include: # # - Dask / [Pandas](https://pandas.pydata.org/)-like dataframes # - String locations of local or remote datasets # - [Apache Hive](https://github.com/apache/hive) tables served through [PyHive](https://github.com/dropbox/PyHive) or [SQLAlchemy](https://www.sqlalchemy.org/) # # Input type can also be specified explicitly by providing a `format`. # When being registered, tables can optionally be persisted into memory by passing `persist=True`, which can greatly speed up repeated queries on the same table at the cost of loading the entire table into memory. # For more information, see [Data Loading and Input](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html). # In[ ]: import pandas as pd from dask.datasets import timeseries # register and persist a dask table ddf = timeseries() c.create_table("dask", ddf, persist=True) # register a pandas table (implicitly converted to a dask table) df = pd.DataFrame({"a": [1, 2, 3]}) c.create_table("pandas", df) # register a table from local storage; kwargs are passed on to the underlying table creation method c.create_table( "local", "surveys/data/2021-user-survey-results.csv.gz", format="csv", parse_dates=['Timestamp'], blocksize=None ) # Tables can also be registered through SQL `CREATE TABLE WITH` or `CREATE TABLE AS` statements, using the `sql` method. # In[ ]: # replace our table from local storage c.sql(""" CREATE OR REPLACE TABLE "local" WITH ( location = 'surveys/data/2021-user-survey-results.csv.gz', format = 'csv', parse_dates = ARRAY [ 'Timestamp' ] ) """) # create a new table from a SQL query c.sql(""" CREATE TABLE filtered AS ( SELECT id, name FROM dask WHERE name = 'Zelda' ) """) # All registered tables can be listed with a `SHOW TABLES` statement. # In[ ]: c.sql("SHOW TABLES FROM root").compute() # Dask-SQL currently offers experimental GPU support, powered by the [RAPIDS](https://rapids.ai/) suite of open source GPU data science libraries. # Input support is currently limited to Dask / Pandas-like dataframes and data in local/remote storage, and though most queries run without issue, users should expect some bugs or undefined behavior. # To register a table and mark it for use on GPUs, `gpu=True` can be passed to a standard `create_table` call, or its equivalent `CREATE TABLE WITH` query (note that this requires [cuDF and Dask-cuDF](https://github.com/rapidsai/cudf)). # # ```python # # register a dask table for use on GPUs (not possible in this binder) # c.create_table("gpu_dask", ddf, gpu=True) # # # load in a table from disk using GPU-accelerated IO operations # c.sql(""" # CREATE TABLE # "gpu_local" # WITH ( # location = 'surveys/data/2021-user-survey-results.csv.gz', # format = 'csv', # parse_dates = ARRAY [ 'Timestamp' ], # gpu = True # ) # """) # ``` # ## Query the data # # When the `sql` method is called, Dask-SQL hands the query off to Apache Calcite to convert into a relational algebra - essentially a list of SQL tasks that must be executed in order to get a result. # The relational algebra of any query can be viewed directly using the `explain` method. # In[ ]: print(c.explain("SELECT AVG(x) FROM dask")) # From here, this relational algebra is then converted into a Dask computation graph, which ultimately returns (or in the case of `CREATE TABLE` statements, implicitly assigns) a Dask dataframe. # In[ ]: c.sql("SELECT AVG(x) FROM dask") # Dask dataframes are lazy, meaning that at the time of their creation, none of their dependent tasks have been executed yet. # To actually execute these tasks and get a result, we must call `compute`. # In[ ]: c.sql("SELECT AVG(x) FROM dask").compute() # Looking at the dashboard, we can see that executing this query has triggered some Dask computations. # # Because the return value of a query is a Dask dataframe, it is also possible to do follow-up operations on it using Dask's dataframe API. # This can be useful if we want to perform some complex operations on a dataframe that are not possible through Dask, then follow up with some simpler operations that can easily be expressed through the dataframe API. # In[ ]: # perform a multi-column sort that isn't possible in Dask res = c.sql(""" SELECT * FROM dask ORDER BY name ASC, id DESC, x ASC """) # now do some follow groupby aggregations res.groupby("name").agg({"x": "sum", "y": "mean"}).compute() # ## Custom functions and aggregations # # When standard SQL functionality is insufficient, it is possible to register custom functions for use in queries. # These functions can be classified as one of the following: # # - Column-wise functions # - Row-wise functions # - Aggregations # # ### Column-wise functions # # Column-wise functions can take columns or literal values as input and return a column of an identical length. # Column-wise functions can be registered in a `Context` using the `register_function` method. # In[ ]: import numpy as np def f(x): return x ** 2 c.register_function(f, "f", [("x", np.float64)], np.float64) # Function registration requires the following inputs: # # - A callable function # - A name for the function to be referred to in queries # - A list of tuples, representing the input variables and their respective types, which can be either Pandas or [NumPy](https://numpy.org/) types # - A type for the output column # # Once a function has been registered, it can be called like any other standard SQL function. # In[ ]: c.sql("SELECT F(x) FROM dask").compute() # ### Row-wise functions # # In some cases, it may be easier to write a custom function that processes a dict-like `row` object - otherwise known as a row-wise function. # These functions can also be registered using `register_function` by passing `row_udf=True`, and used in the same manner as a column-wise function. # In[ ]: def g(row): if row["x"] > row["y"]: return row["x"] - row["y"] return row["y"] - row["x"] c.register_function(g, "g", [("x", np.float64), ("y", np.float64)], np.float64, row_udf=True) c.sql("SELECT G(x, y) FROM dask").compute() # Note that unlike column-wise functions, which are called directly using specified columns and literals as input, row-wise functions are called using `apply`, which can have unpredictable performance depending on the underlying dataframe library (e.g. Pandas, cuDF) and the function itself. # # ### Aggregations # # Aggregations take a single column as input and return a single value - thus, they can only be used to reduce the results of a `GROUP BY` query. # Aggregations can be registered using the `register_aggregation` method, which is functionally similar to `register_function` but takes a Dask [Aggregation](https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate) as input instead of a callable function. # In[ ]: import dask.dataframe as dd my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum()) c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64) c.sql("SELECT MY_SUM(x) FROM dask").compute() # ## Machine learning in SQL # # Dask-SQL has support for both model training and prediction, enabling machine learning workflows with a flexible combination of both Python and SQL. # A model can be registered in a `Context` either through the `register_model` method or a `CREATE MODEL` statement. # In[ ]: from dask_ml.linear_model import LinearRegression from sklearn.ensemble import GradientBoostingClassifier # create a dask-ml model and train it model = GradientBoostingClassifier() data = c.sql("SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50") model.fit(data[["x", "y"]], data["target"]) # register this model in the context c.register_model("python_model", model, training_columns=["x", "y"]) # create and train a model directly from SQL c.sql(""" CREATE MODEL sql_model WITH ( model_class = 'sklearn.ensemble.GradientBoostingClassifier', wrap_predict = True, target_column = 'target' ) AS ( SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50 ) """) # Registered models must follow the [scikit-learn](https://scikit-learn.org/stable/index.html) interface by implementing a `predict` method. # As with tables, all registered models can be listed with a `SHOW MODEL` statement. # In[ ]: c.sql("SHOW MODELS").compute() # From here, the models can be used to make predictions using the `PREDICT` keyword as part of a `SELECT` query. # In[ ]: c.sql(""" SELECT * FROM PREDICT ( MODEL sql_model, SELECT x, y, x * y > 0 AS actual FROM dask OFFSET 50 ) """).compute() # In[ ]: