Fugue (Spark, Ray, Dask) Integration

Open in Colab

Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing Spark, Dask or Ray clusters or existing Databricks, Coiled or Anyscale platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃

If you wish to have other insights on how to use whylogs, feel free to check our other existing examples, as they might be extremely useful!

For detailed questions regarding Fugue, please join Fugue's Slack channel: Slack Status

Installing the extra dependency

As we want to enable users to have exactly what they need to use from whylogs, the pyspark integration comes as an extra dependency. In order to have it available, install according to the following table:

Run Whylogs on ... Installation Command
Any Spark cluster (including Databricks Notebooks) pip install 'whylogs[fugue]' 'fugue[spark]'
Databricks (remote access) pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'
Any Ray cluster (including Anyscale Notebooks) pip install 'whylogs[fugue]' 'fugue[ray]'
Anyscale (remote access) pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'
Any Dask cluster pip install 'whylogs[fugue]' 'fugue[dask]'
Coiled pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'

For example, in this notebook we are using a local Spark cluster, so we should:

In [2]:
%pip install 'whylogs[fugue]' 'fugue[spark]'

The following environment variable should NOT need to be set in your own environment.

In [1]:
import os

os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

Constructing a dataset

In [2]:
import pandas as pd
import numpy as np

n = 100
np.random.seed(0)
tdf = pd.DataFrame(
    dict(
        a=np.random.choice([1, 2, 3], n),
        b=np.random.choice(["a", "b"], n),
        c=np.random.random(n),
        d=np.random.choice(["xy", "z"], n),
    )
)
tdf.to_parquet("/tmp/test.parquet")
tdf
Out[2]:
a b c d
0 1 a 0.533206 xy
1 2 b 0.230533 z
2 1 a 0.394869 z
3 2 b 0.618809 z
4 2 b 0.474868 xy
... ... ... ... ...
95 1 b 0.904425 xy
96 3 a 0.645785 z
97 1 a 0.324683 xy
98 2 b 0.519711 z
99 3 a 0.000055 z

100 rows × 4 columns

Profiling using Whylogs + Fugue

The simplest way to use profile is equivalent to use why.log(df).view()

In [3]:
from whylogs.api.fugue import fugue_profile

fugue_profile(tdf).to_pandas()
Out[3]:
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/n counts/null distribution/max distribution/mean distribution/median distribution/min distribution/n ... distribution/stddev frequent_items/frequent_strings ints/max ints/min type types/boolean types/fractional types/integral types/object types/string
column
a 3.000000 3.0 3.000150 100 0 3.000000 1.880000 2.000000 1.000000 100 ... 0.807540 [FrequentItem(value='1', est=39, upper=39, low... 3.0 1.0 SummaryType.COLUMN 0 0 100 0 0
b 2.000000 2.0 2.000100 100 0 NaN 0.000000 NaN NaN 0 ... 0.000000 [FrequentItem(value='a', est=57, upper=57, low... NaN NaN SummaryType.COLUMN 0 0 0 0 100
c 100.000025 100.0 100.005018 100 0 0.992396 0.499929 0.487838 0.000055 100 ... 0.294085 NaN NaN NaN SummaryType.COLUMN 0 100 0 0 0
d 2.000000 2.0 2.000100 100 0 NaN 0.000000 NaN NaN 0 ... 0.000000 [FrequentItem(value='xy', est=53, upper=53, lo... NaN NaN SummaryType.COLUMN 0 0 0 0 100

4 rows × 28 columns

We can select the columns for profiling

In [4]:
fugue_profile(tdf, profile_cols=["c","d"]).to_pandas()
Out[4]:
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/n counts/null distribution/max distribution/mean distribution/median distribution/min distribution/n ... distribution/q_95 distribution/q_99 distribution/stddev type types/boolean types/fractional types/integral types/object types/string frequent_items/frequent_strings
column
c 100.000025 100.0 100.005018 100 0 0.992396 0.499929 0.487838 0.000055 100 ... 0.970237 0.992396 0.294085 SummaryType.COLUMN 0 100 0 0 0 NaN
d 2.000000 2.0 2.000100 100 0 NaN 0.000000 NaN NaN 0 ... NaN NaN 0.000000 SummaryType.COLUMN 0 0 0 0 100 [FrequentItem(value='xy', est=53, upper=53, lo...

2 rows × 26 columns

Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
22/09/20 04:33:57 WARN Utils: Your hostname, codespaces-5144a4 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
22/09/20 04:33:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/20 04:33:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

If we want to profile the pandas df on Spark:

In [6]:
fugue_profile(tdf, engine=spark)
                                                                                
Out[6]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f4939be4dc0>

If we want to profile a SparkDataFrame:

In [7]:
spark_df = spark.createDataFrame(tdf)
fugue_profile(spark_df, engine=spark)
                                                                                
Out[7]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f4939078a00>
Connection error. Skip stats collection.

We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):

In [8]:
fugue_profile("/tmp/test.parquet", engine=spark)
                                                                                
Out[8]:
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f49390bb610>
Connection error. Skip stats collection.

It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections.

Profiling on logical partitions

If we want to profile tdf grouped by columns a and b

In [9]:
fugue_profile(tdf, partition={"by":["a","b"]})
Out[9]:
a b __whylogs_df_profile_view
0 1 a b'WHY1\x00\xd6\x02\n\x0e \xa5\xc0\xb3\xc9\xb50...
1 1 b b'WHY1\x00\xd6\x02\n\x0e \xb4\xc0\xb3\xc9\xb50...
2 2 a b'WHY1\x00\xd6\x02\n\x0e \xc6\xc0\xb3\xc9\xb50...
3 2 b b'WHY1\x00\xd6\x02\n\x0e \xd5\xc0\xb3\xc9\xb50...
4 3 a b'WHY1\x00\xd6\x02\n\x0e \xe3\xc0\xb3\xc9\xb50...
5 3 b b'WHY1\x00\xd6\x02\n\x0e \xf1\xc0\xb3\xc9\xb50...

We can also control the output profile field:

In [10]:
res = fugue_profile(tdf, partition={"by":["a","b"]}, profile_field="x")
res
Out[10]:
a b x
0 1 a b'WHY1\x00\xd6\x02\n\x0e \xed\xe3\xb3\xc9\xb50...
1 1 b b'WHY1\x00\xd6\x02\n\x0e \xfd\xe3\xb3\xc9\xb50...
2 2 a b'WHY1\x00\xd6\x02\n\x0e \x8b\xe4\xb3\xc9\xb50...
3 2 b b'WHY1\x00\xd6\x02\n\x0e \x9a\xe4\xb3\xc9\xb50...
4 3 a b'WHY1\x00\xd6\x02\n\x0e \xa8\xe4\xb3\xc9\xb50...
5 3 b b'WHY1\x00\xd6\x02\n\x0e \xb6\xe4\xb3\xc9\xb50...

Here is how to retrieve the views:

In [11]:
from whylogs import DatasetProfileView

res.x.apply(DatasetProfileView.deserialize)
Out[11]:
0    <whylogs.core.view.dataset_profile_view.Datase...
1    <whylogs.core.view.dataset_profile_view.Datase...
2    <whylogs.core.view.dataset_profile_view.Datase...
3    <whylogs.core.view.dataset_profile_view.Datase...
4    <whylogs.core.view.dataset_profile_view.Datase...
5    <whylogs.core.view.dataset_profile_view.Datase...
Name: x, dtype: object

When we profile a large number of partitions using a distributed backend and don't want to collect them on a local machine, we can keep the output as the native distributed dataframe, for example:

In [13]:
fugue_profile(tdf, partition={"by":["a","b"]}, engine=spark, as_local=False) # returns a native pyspark dataframe
Out[13]:
DataFrame[a: bigint, b: string, __whylogs_df_profile_view: binary]

We may also directly save the output to a file distributedly:

In [14]:
fugue_profile(tdf, partition={"by":["a","b"]}, save_path="/tmp/output1.parquet", engine=spark)
fugue_profile("/tmp/test.parquet", partition={"by":["a","b"]}, save_path="/tmp/output2.parquet", engine=spark)
                                                                                
Out[14]:
'/tmp/output2.parquet'
Connection error. Skip stats collection.
In [15]:
!ls /tmp/output*.parquet
/tmp/output1.parquet:
_SUCCESS  part-00000-f2fe6067-9404-4f32-8fd7-daa952f14ac3-c000.snappy.parquet

/tmp/output2.parquet:
_SUCCESS  part-00000-337a3e5c-5480-4105-84a9-3db993f33837-c000.snappy.parquet

Performance Tips

Spark

It is strongly recommended to enabled pandas UDF on Spark to get better performance. We need to follow this instruction to enable spark.sql.execution.arrow.pyspark.enabled. And we also need to enable Fugue to use pandas UDF. A typical way would be:

fugue_profile(..., engine=spark, engine_conf={"fugue.spark.use_pandas_udf":True})

We can also enable it globally (then we don't need to set it in engine_conf):

from fugue import register_global_conf

register_global_conf({"fugue.spark.use_pandas_udf":True})

When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine=spark, engine_conf={"fugue.spark.use_pandas_udf":True})

If we don't specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster.

When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=spark)

But the convention in Spark is to set spark.shuffle.partitions when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs.

Ray

When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine="ray")

If we don't specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. So in Ray, it is always a good idea to be explicit about num

When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine="ray")

Dask

When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:

fugue_profile(..., partition={"num": 200}, engine=dask_client)

If we don't specify num then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. So in Dask, it is always a good idea to be explicit about num

When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:

fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=dask_client)

No matter in Spark, Ray or Dask, no matter which way to set num, setting it to 2 times of the total cluster CPUs will in general work very well.

Accessing distributed platforms

In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:

fugue_profile("s3://<path>", engine="db:<databricks_cluster_id>", engine_conf={"fugue.spark.use_pandas_udf":True})
fugue_profile("s3://<path>", engine="<anyscale_cluster_uri>")
fugue_profile("s3://<path>", engine="coiled:<coiled_cluster_id>")

For details of each platform, please read the instructions for Databricks, Anyscale and Coiled