#!/usr/bin/env python # coding: utf-8 # # Fugue (Spark, Ray, Dask) Integration # # [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Fugue_Profiling.ipynb) # # # Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing **Spark, Dask or Ray** clusters or existing **Databricks, Coiled or Anyscale** platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃 # # If you wish to have other insights on how to use whylogs, feel free to check our [other existing examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples), as they might be extremely useful! # # For detailed questions regarding [Fugue](https://github.com/fugue-project/fugue), please join Fugue's Slack channel: [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai) # ## Installing the extra dependency # # As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, install according to the following table: # # | Run Whylogs on ... | Installation Command | # |:---|:---| # | Any Spark cluster (including Databricks Notebooks) | `pip install 'whylogs[fugue]' 'fugue[spark]'` | # | Databricks (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'` | # | Any Ray cluster (including Anyscale Notebooks) | `pip install 'whylogs[fugue]' 'fugue[ray]'` | # | Anyscale (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'` | # | Any Dask cluster | `pip install 'whylogs[fugue]' 'fugue[dask]'` | # | Coiled | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'` | # # For example, in this notebook we are using a local Spark cluster, so we should: # In[ ]: # Note: you may need to restart the kernel to use updated packages. get_ipython().run_line_magic('pip', "install 'whylogs[fugue]' 'fugue[spark]'") # The following environment variable should NOT need to be set in your own environment. # In[1]: import os os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" # ## Constructing a dataset # In[2]: import pandas as pd import numpy as np n = 100 np.random.seed(0) tdf = pd.DataFrame( dict( a=np.random.choice([1, 2, 3], n), b=np.random.choice(["a", "b"], n), c=np.random.random(n), d=np.random.choice(["xy", "z"], n), ) ) tdf.to_parquet("/tmp/test.parquet") tdf # ## Profiling using Whylogs + Fugue # # The simplest way to use `profile` is equivalent to use `why.log(df).view()` # In[3]: from whylogs.api.fugue import fugue_profile fugue_profile(tdf).to_pandas() # We can select the columns for profiling # In[4]: fugue_profile(tdf, profile_cols=["c","d"]).to_pandas() # Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession: # In[6]: from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() # If we want to profile the pandas df on Spark: # In[7]: fugue_profile(tdf, engine=spark) # If we want to profile a SparkDataFrame: # In[8]: spark_df = spark.createDataFrame(tdf) fugue_profile(spark_df) # We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly): # In[9]: fugue_profile("/tmp/test.parquet", engine=spark) # It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections. # # ## Profiling on logical partitions # # If we want to profile `tdf` grouped by columns `a` and `b` # In[10]: fugue_profile(tdf, partition={"by":["a","b"]}) # We can also control the output profile field: # In[11]: res = fugue_profile(tdf, partition={"by":["a","b"]}, profile_field="x") res # Here is how to retrieve the views: # In[12]: from whylogs import DatasetProfileView res.x.apply(DatasetProfileView.deserialize) # When we profile a large number of partitions using a distributed backend and don't want to collect them on a local machine, we can keep the output as the native distributed dataframe, for example: # In[13]: fugue_profile(tdf, partition={"by":["a","b"]}, engine=spark, as_local=False) # returns a native pyspark dataframe # We may also directly save the output to a file distributedly: # In[14]: fugue_profile(tdf, partition={"by":["a","b"]}, save_path="/tmp/output1.parquet", engine=spark) fugue_profile("/tmp/test.parquet", partition={"by":["a","b"]}, save_path="/tmp/output2.parquet", engine=spark) # In[22]: get_ipython().system('ls /tmp/output*.parquet') # ## Using `fugue_profile` in the Fugue API way # # [Fugue API](https://fugue.readthedocs.io/en/latest/top_api.html) is a collection of standalone platform agnostic functions for common big data operations. `fugue_profile` is following the same style and can be used like one of them. In the previous example we do data profiling from file to file with one function. While it is compact, it overloads the function, making it hard to read and maintain. Instead, if we follow the best practice, we can do this alternatively: # In[21]: import fugue.api as fa with fa.engine_context(spark): df = fa.load("/tmp/test.parquet") res = fugue_profile(df, partition={"by":["a","b"]}) fa.save(res, "/tmp/output2.parquet") # Execution wise, it is the same as the previous example. But this makes one line of code do one thing, which is a better coding style # # ## Visualization in FugueSQL # # Whylogs profile visualization is a auto-registered extension in Fugue. The namespace is `why` and the extension name is `viz`. # # Here is how you use the extension: # In[11]: import whylogs.api.fugue.registry # you don't really need to import this explicitly, the registration is automatic import fugue.api as fa fa.fugue_sql_flow(""" -- visualize a single dataframe's profile OUTPUT df USING why:viz -- compare profiles, must set reference and target OUTPUT target=df, reference=df USING why:viz """, df = tdf).run(); # If running using a distributed backend, the profling will be done by `fugue_profile` # In[12]: fa.fugue_sql_flow(""" df = LOAD "/tmp/test.parquet" OUTPUT USING why:viz OUTPUT target=df, reference=df USING why:viz """).run(spark); # # # ## Performance Tips # # ### Spark # # Please use `Fugue >= 0.8.0`, which enables Pandas UDF by default # # It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow [this instruction](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) to enable `spark.sql.execution.arrow.pyspark.enabled`. # # When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism: # # ```python # fugue_profile(..., partition={"num": 200}, engine=spark) # ``` # # If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster. # # When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use: # # ```python # fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=spark) # ``` # # But the convention in Spark is to set `spark.shuffle.partitions` when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs. # # ### Ray # # When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism: # # ```python # fugue_profile(..., partition={"num": 200}, engine="ray") # ``` # # If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. **So in Ray, it is always a good idea to be explicit about `num`** # # When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use: # # ```python # fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine="ray") # ``` # # ### Dask # # When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism: # # ```python # fugue_profile(..., partition={"num": 200}, engine=dask_client) # ``` # # If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. **So in Dask, it is always a good idea to be explicit about `num`** # # When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use: # # ```python # fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=dask_client) # ``` # # **No matter in Spark, Ray or Dask, no matter which way to set `num`, setting it to 2 times of the total cluster CPUs will in general work very well.** # # ## Accessing distributed platforms # # In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as: # # ```python # fugue_profile("s3://", engine="db:") # fugue_profile("s3://", engine="") # fugue_profile("s3://", engine="coiled:") # ``` # # For details of each platform, please read the instructions for [Databricks](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/databricks.html), [Anyscale](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/anyscale.html) and [Coiled](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/coiled.html) #