Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing Spark, Dask or Ray clusters or existing Databricks, Coiled or Anyscale platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃
If you wish to have other insights on how to use whylogs, feel free to check our other existing examples, as they might be extremely useful!
For detailed questions regarding Fugue, please join Fugue's Slack channel:
As we want to enable users to have exactly what they need to use from whylogs, the pyspark
integration comes as an extra dependency. In order to have it available, install according to the following table:
Run Whylogs on ... | Installation Command |
---|---|
Any Spark cluster (including Databricks Notebooks) | pip install 'whylogs[fugue]' 'fugue[spark]' |
Databricks (remote access) | pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]' |
Any Ray cluster (including Anyscale Notebooks) | pip install 'whylogs[fugue]' 'fugue[ray]' |
Anyscale (remote access) | pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]' |
Any Dask cluster | pip install 'whylogs[fugue]' 'fugue[dask]' |
Coiled | pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]' |
For example, in this notebook we are using a local Spark cluster, so we should:
# Note: you may need to restart the kernel to use updated packages.
%pip install 'whylogs[fugue]' 'fugue[spark]'
The following environment variable should NOT need to be set in your own environment.
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
import pandas as pd
import numpy as np
n = 100
np.random.seed(0)
tdf = pd.DataFrame(
dict(
a=np.random.choice([1, 2, 3], n),
b=np.random.choice(["a", "b"], n),
c=np.random.random(n),
d=np.random.choice(["xy", "z"], n),
)
)
tdf.to_parquet("/tmp/test.parquet")
tdf
a | b | c | d | |
---|---|---|---|---|
0 | 1 | a | 0.533206 | xy |
1 | 2 | b | 0.230533 | z |
2 | 1 | a | 0.394869 | z |
3 | 2 | b | 0.618809 | z |
4 | 2 | b | 0.474868 | xy |
... | ... | ... | ... | ... |
95 | 1 | b | 0.904425 | xy |
96 | 3 | a | 0.645785 | z |
97 | 1 | a | 0.324683 | xy |
98 | 2 | b | 0.519711 | z |
99 | 3 | a | 0.000055 | z |
100 rows × 4 columns
The simplest way to use profile
is equivalent to use why.log(df).view()
from whylogs.api.fugue import fugue_profile
fugue_profile(tdf).to_pandas()
cardinality/est | cardinality/lower_1 | cardinality/upper_1 | counts/inf | counts/n | counts/nan | counts/null | distribution/max | distribution/mean | distribution/median | ... | distribution/stddev | frequent_items/frequent_strings | ints/max | ints/min | type | types/boolean | types/fractional | types/integral | types/object | types/string | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
column | |||||||||||||||||||||
a | 3.000000 | 3.0 | 3.000150 | 0 | 100 | 0 | 0 | 3.000000 | 1.880000 | 2.000000 | ... | 0.807540 | [FrequentItem(value='1', est=39, upper=39, low... | 3.0 | 1.0 | SummaryType.COLUMN | 0 | 0 | 100 | 0 | 0 |
b | 2.000000 | 2.0 | 2.000100 | 0 | 100 | 0 | 0 | NaN | 0.000000 | NaN | ... | 0.000000 | [FrequentItem(value='a', est=57, upper=57, low... | NaN | NaN | SummaryType.COLUMN | 0 | 0 | 0 | 0 | 100 |
c | 100.000025 | 100.0 | 100.005018 | 0 | 100 | 0 | 0 | 0.992396 | 0.499929 | 0.487838 | ... | 0.294085 | NaN | NaN | NaN | SummaryType.COLUMN | 0 | 100 | 0 | 0 | 0 |
d | 2.000000 | 2.0 | 2.000100 | 0 | 100 | 0 | 0 | NaN | 0.000000 | NaN | ... | 0.000000 | [FrequentItem(value='xy', est=53, upper=53, lo... | NaN | NaN | SummaryType.COLUMN | 0 | 0 | 0 | 0 | 100 |
4 rows × 30 columns
We can select the columns for profiling
fugue_profile(tdf, profile_cols=["c","d"]).to_pandas()
cardinality/est | cardinality/lower_1 | cardinality/upper_1 | counts/inf | counts/n | counts/nan | counts/null | distribution/max | distribution/mean | distribution/median | ... | distribution/q_95 | distribution/q_99 | distribution/stddev | type | types/boolean | types/fractional | types/integral | types/object | types/string | frequent_items/frequent_strings | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
column | |||||||||||||||||||||
c | 100.000025 | 100.0 | 100.005018 | 0 | 100 | 0 | 0 | 0.992396 | 0.499929 | 0.487838 | ... | 0.970237 | 0.992396 | 0.294085 | SummaryType.COLUMN | 0 | 100 | 0 | 0 | 0 | NaN |
d | 2.000000 | 2.0 | 2.000100 | 0 | 100 | 0 | 0 | NaN | 0.000000 | NaN | ... | NaN | NaN | 0.000000 | SummaryType.COLUMN | 0 | 0 | 0 | 0 | 100 | [FrequentItem(value='xy', est=53, upper=53, lo... |
2 rows × 28 columns
Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
If we want to profile the pandas df on Spark:
fugue_profile(tdf, engine=spark)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd21e5f90>
If we want to profile a SparkDataFrame:
spark_df = spark.createDataFrame(tdf)
fugue_profile(spark_df)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd1ee4910>
We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):
fugue_profile("/tmp/test.parquet", engine=spark)
<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7effd1ef9240>
It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections.
If we want to profile tdf
grouped by columns a
and b
fugue_profile(tdf, partition={"by":["a","b"]})
a | b | __whylogs_df_profile_view | |
---|---|---|---|
0 | 1 | a | b'WHY1\x00\xf6\x02\n\x0e \xb3\x93\x93\x80\xda0... |
1 | 1 | b | b'WHY1\x00\xf6\x02\n\x0e \xc6\x93\x93\x80\xda0... |
2 | 2 | a | b'WHY1\x00\xf6\x02\n\x0e \xd6\x93\x93\x80\xda0... |
3 | 2 | b | b'WHY1\x00\xf6\x02\n\x0e \xe5\x93\x93\x80\xda0... |
4 | 3 | a | b'WHY1\x00\xf6\x02\n\x0e \xf3\x93\x93\x80\xda0... |
5 | 3 | b | b'WHY1\x00\xf6\x02\n\x0e \x82\x94\x93\x80\xda0... |
We can also control the output profile field:
res = fugue_profile(tdf, partition={"by":["a","b"]}, profile_field="x")
res
a | b | x | |
---|---|---|---|
0 | 1 | a | b'WHY1\x00\xf6\x02\n\x0e \xf7\xb1\x93\x80\xda0... |
1 | 1 | b | b'WHY1\x00\xf6\x02\n\x0e \x89\xb2\x93\x80\xda0... |
2 | 2 | a | b'WHY1\x00\xf6\x02\n\x0e \x99\xb2\x93\x80\xda0... |
3 | 2 | b | b'WHY1\x00\xf6\x02\n\x0e \xa8\xb2\x93\x80\xda0... |
4 | 3 | a | b'WHY1\x00\xf6\x02\n\x0e \xb7\xb2\x93\x80\xda0... |
5 | 3 | b | b'WHY1\x00\xf6\x02\n\x0e \xc5\xb2\x93\x80\xda0... |
Here is how to retrieve the views:
from whylogs import DatasetProfileView
res.x.apply(DatasetProfileView.deserialize)
0 <whylogs.core.view.dataset_profile_view.Datase... 1 <whylogs.core.view.dataset_profile_view.Datase... 2 <whylogs.core.view.dataset_profile_view.Datase... 3 <whylogs.core.view.dataset_profile_view.Datase... 4 <whylogs.core.view.dataset_profile_view.Datase... 5 <whylogs.core.view.dataset_profile_view.Datase... Name: x, dtype: object
When we profile a large number of partitions using a distributed backend and don't want to collect them on a local machine, we can keep the output as the native distributed dataframe, for example:
fugue_profile(tdf, partition={"by":["a","b"]}, engine=spark, as_local=False) # returns a native pyspark dataframe
DataFrame[a: bigint, b: string, __whylogs_df_profile_view: binary]
We may also directly save the output to a file distributedly:
fugue_profile(tdf, partition={"by":["a","b"]}, save_path="/tmp/output1.parquet", engine=spark)
fugue_profile("/tmp/test.parquet", partition={"by":["a","b"]}, save_path="/tmp/output2.parquet", engine=spark)
'/tmp/output2.parquet'
!ls /tmp/output*.parquet
/tmp/output1.parquet: _SUCCESS part-00000-ed074f68-68ff-42a8-a003-57e4a0275767-c000.snappy.parquet /tmp/output2.parquet: _SUCCESS part-00000-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet part-00001-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet part-00002-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet part-00003-83c8a2dc-0948-4ccc-bc06-e0fe14c89abb-c000.snappy.parquet
fugue_profile
in the Fugue API way¶Fugue API is a collection of standalone platform agnostic functions for common big data operations. fugue_profile
is following the same style and can be used like one of them. In the previous example we do data profiling from file to file with one function. While it is compact, it overloads the function, making it hard to read and maintain. Instead, if we follow the best practice, we can do this alternatively:
import fugue.api as fa
with fa.engine_context(spark):
df = fa.load("/tmp/test.parquet")
res = fugue_profile(df, partition={"by":["a","b"]})
fa.save(res, "/tmp/output2.parquet")
Execution wise, it is the same as the previous example. But this makes one line of code do one thing, which is a better coding style
Whylogs profile visualization is a auto-registered extension in Fugue. The namespace is why
and the extension name is viz
.
Here is how you use the extension:
import whylogs.api.fugue.registry # you don't really need to import this explicitly, the registration is automatic
import fugue.api as fa
fa.fugue_sql_flow("""
-- visualize a single dataframe's profile
OUTPUT df USING why:viz
-- compare profiles, must set reference and target
OUTPUT target=df, reference=df USING why:viz
""", df = tdf).run();
If running using a distributed backend, the profling will be done by fugue_profile
fa.fugue_sql_flow("""
df = LOAD "/tmp/test.parquet"
OUTPUT USING why:viz
OUTPUT target=df, reference=df USING why:viz
""").run(spark);
Please use Fugue >= 0.8.0
, which enables Pandas UDF by default
It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow this instruction to enable spark.sql.execution.arrow.pyspark.enabled
.
When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism:
fugue_profile(..., partition={"num": 200}, engine=spark)
If we don't specify num
then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster.
When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use:
fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=spark)
But the convention in Spark is to set spark.shuffle.partitions
when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs.
When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:
fugue_profile(..., partition={"num": 200}, engine="ray")
If we don't specify num
then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. So in Ray, it is always a good idea to be explicit about num
When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:
fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine="ray")
When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:
fugue_profile(..., partition={"num": 200}, engine=dask_client)
If we don't specify num
then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. So in Dask, it is always a good idea to be explicit about num
When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:
fugue_profile(..., partition={"by":["a","b"], "num": 200}, engine=dask_client)
No matter in Spark, Ray or Dask, no matter which way to set num
, setting it to 2 times of the total cluster CPUs will in general work very well.
In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:
fugue_profile("s3://<path>", engine="db:<databricks_cluster_id>")
fugue_profile("s3://<path>", engine="<anyscale_cluster_uri>")
fugue_profile("s3://<path>", engine="coiled:<coiled_cluster_id>")
For details of each platform, please read the instructions for Databricks, Anyscale and Coiled