WARNING: UDF support is an experimental feature that is still evolving. For example, there was an incompatible UDF signature change in the whylogs 1.2.5 release. We may drop support for metric UDFs as other types of UDFs become able to handle the metric UDF use cases. Feedback on how UDFs should evolve is welcome.
Sometimes you want to use whylogs to track values computed from your data along with the original input data. whylogs accepts input as either a Python dictionary representing a single row of data or a Pandas dataframe containing multiple rows. Both of these provide easy interfaces to add the results of user defined functions (UDFs) to your input data. whylogs also provides a UDF mechanism for logging computed data. It offers two advantagves over the native UDF facilities: you can easily define and apply a suite of UDFs suitable for an application area (e.g., langkit), and you can easily customize which metrics whylogs tracks for each UDF output. Let's explore the whylogs UDF APIs.
%pip install whylogs
Installing collected packages: whylogs-sketching, types-urllib3, types-requests, whylabs-client, whylogs Successfully installed types-requests-2.31.0.2 types-urllib3-1.26.25.14 whylabs-client-0.5.3 whylogs-1.2.7 whylogs-sketching-3.4.1.dev3
whylogs supports four kinds of UDFs:
UdfMetric
instance attached to input column in the dataset profile.Dataset, multioutput, and type UDFs produce their output columns before whylogs profiles the dataset. Thus the full machinery of whylogs schema specification and segmentation apply to the output columns. The UdfMetric
has its own submetric schema mechanism to control the statistics tracked for metric UDF output, but since metric UDFs do not create columns they cannot be used for segmentation.
The signature for dataset UDFs is
f(x: Union[Dict[str, List], pd.DataFrame]) -> Union[List, pd.Series]
The dataframe or dictionary only contains the columns the UDF is registered to access (see the section on registration below). DataFrame
inputs may contain multiple rows. Dictionary inputs contain only a single row, but it is presented as a list containing one value. This allows UDFs to be written using the intersection of the DataFrame
and dictionary/list APIs to handle both cases. Performance-critical UDFs can check the type of input to provide implementations optimized for the specific input type. The returned list or series should contain one value for each input row.
The signature for multioutput UDFs is
f(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]
These are very similar to dataset UDFs. Where dataset UDFs use the UDF's name as the name of their single output column, multioutput UDFs prepend the UDF's name to the names of the columns returned by the UDF.
The signature for type UDFs is
f(x: Union[List, pd.Series]) -> Union[List, pd.Series]
Since type UDFs take a single column as input, the input is presented as a single-element list representing a single row of data, or as a Pandas series representing a column. Note that the column created by a type UDF will have the input column's name prepended to it to avoid name collisions.
The signature for metric UDFs is
f(x: Any) -> Any
Metric UDFs recieve a single value as input, and produce a single value as output. The UDF will be invoked for each element of the column the UdfMetric
is attached to.
The easiest way to get whylogs to invoke your UDFs is to register the UDF functions with the appropriate decorator. There's a decorator for each type of UDF. Note that using the decorators requires you use the schema produced by whylogs.experimental.core.udf_schema()
.
The @register_dataset_udf
decorator declares dataset UDFs.
from whylogs.experimental.core.udf_schema import register_dataset_udf
import pandas as pd
@register_dataset_udf(["mass", "volume"])
def density(data: Union[Dict[str, List], pd.DataFrame]) -> Union[List, pd.Series]:
if isinstance(data, pd.DataFrame):
return data["mass"] / data["volume"]
else:
return [mass / volume for mass, volume in zip(data["mass"], data["volume"])]
If you log a DataFrame
(or single row via a dictionary) containing columns named mass
and volume
, a column named density
will be added by applying the density()
function before whylogs produces its profile. If either of the input columns is missing or the output column is already present, the UDF will not be invoked. Note that the code in the else
branch works fine for DataFrame
inputs as well, so the the isinstance
check is just an optimization.
The @register_dataset_udf
decorator has several optional arguments to customize whylogs' behavior.
def register_dataset_udf(
col_names: List[str],
udf_name: Optional[str] = None,
metrics: Optional[List[MetricSpec]] = None,
namespace: Optional[str] = None,
schema_name: str = "",
anti_metrics: Optional[List[Metric]] = None,
)
The col_names
arguments lists the UDF's required input columns. The remaining arguments are optional:
udf_name
specifies the name of the UDF's output column. It defaults to the name of the function.metrics
takes a list of MetricSpec
instances (see Schema Configuration) specifying the whylogs metrics to track for the column produced by the UDF. If this is omitted, the metrics are determined by the defualt schema or any metric specifications passed to udf_schema()
.anti_metrics
is an optional list of whylogs Metric
classes to prohibit from being attached to the UDFs output column.namespace
, if present, is prepended to the UDF name to help manage UDF name collisions.schema_name
helps manage collections of UDFs. A UDF can be registered in a specified schema. If omitted, it will be registered to the defualt schema. udf_schema()
merges the UDFs registered in the requested schemas.The @register_multioutput_udf
decorator declares multioutput UDFs.
from whylogs.experimental.core.udf_schema import register_multioutput_udf
import pandas as pd
@register_multioutput_udf(["x"])
def powers(data: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:
if isinstance(data, pd.DataFrame):
result = pd.DataFrame()
result["xx"] = data["x"] * data["x"]
result["xxx"] = data["x"] * data["x"] * data["x"]
return result
else:
result = {"xx" : [data["x"][0] * data["x"][0]]}
result["xxx"] = [data["x"][0] * data["x"][0] * data["x"][0]]
return result
If you log a DataFrame
(or single row via a dictionary) containing a column named x
, columns named powers.xx
and powers.xxx
containing the squared and cubed input column will be added by applying the powers()
function before whylogs produces its profile. If any of the input columns is missing, the UDF will not be invoked. While dataset UDFs do not execute if their output column already exists, multioutput UDFs always produce their output columns.
The @register_type_udf
decorator declares type UDFs to be applied to columns of a specified type. Types can be specified as subclass of whylogs.core.datatypes.DataType
or a plain Python type.
from whylogs.experimental.core.udf_schema import register_type_udf
from whylogs.core.datatypes import Fractional
import pandas as pd
@register_type_udf(Fractional)
def square(input: Union[List, pd.Series]) -> Union[List, pd.Series]:
return [x * x for x in input]
The square()
function will be applied to any floating point columns in a DataFrame
or row logged. The output columns are named square
prepended with the input column name. In this example, we use code that works for either DataFrame
or single row (dictionary) input.
The @register_type_udf
decorator also has optional parameters to customize its behavior:
def register_type_udf(
col_type: Type,
udf_name: Optional[str] = None,
namespace: Optional[str] = None,
schema_name: str = "",
type_mapper: Optional[TypeMapper] = None,
)
col_type
is the column type the UDF should be applied to. It can be a subclass of whylogs.core.datatype.DataType
or a Python type. Note that the argument must be a subclass of DataType
or Type
, not an instance.udf_name
specifies the suffix of the name of the UDF's output column. It defaults to the name of the function. The input column's name is the prefix.namespace
, if present, is prepended to the UDF name to help manage UDF name collisions.schema_name
helps manage collections of UDFs. A UDF can be registered in a specified schema. If omitted, it will be registered to the defualt schema. udf_schema()
merges the UDFs registered in the requested schemas.type_mapper
is an instance of whylogs.core.datatype.TypeMapper
responsible for mapping native Python data types to a subclass of whylogs.core.datatype.DataType
.The @register_metric_udf
decorator declares metric UDFs to be applied to columns specified by name or type. Types can be specified as subclass of whylogs.core.datatypes.DataType
or a plain Python type.
from whylogs.experimental.core.metrics.udf_metric import register_metric_udf
from whylogs.core.datatypes import String
@register_metric_udf(col_type=String)
def upper(input: Any) -> Any:
return input.upper()
This will create a UdfMetric
instance for all string columns. Note that there can only be one instance of a metric class for a column, so avoid specifying UdfMetric
on string columns elswhere in your schema definition.
The UdfMetric
will have a submetric named upper
that tracks metrics according to the default submetric schema for the upper
UDF's return type, in this case also string.
The @register_metric_udf
decorator also has optional parameters to customize its behavior:
def register_metric_udf(
col_name: Optional[str] = None,
col_type: Optional[DataType] = None,
submetric_name: Optional[str] = None,
submetric_schema: Optional[SubmetricSchema] = None,
type_mapper: Optional[TypeMapper] = None,
namespace: Optional[str] = None,
schema_name: str = "",
)
You must specify exactly one of either col_name
or col_type
.
col_type
can be a subclass of whylogs.core.datatype.DataType
or a Python type. Note that the argument must be a subclass of DataType
or Type
, not an instance.
submetric_name
is the name of the submetric within the UdfMetric
. It defautls to the name of the decorated function. Note that all lambdas are named "lambda" so omitting submetric_name
on more than one lambda will result in name collisions. If you pass a namespace, it will be prepended to the UDF name.submetric_schema
allows you to specify and configure the metrics to be tracked for each metric UDF. This defualts to the STANDARD_UDF_RESOLVER
metrics.type_mapper
is an instance of whylogs.core.datatype.TypeMapper
responsible for mapping native Python data types to a subclass of whylogs.core.datatype.DataType
.namespace
, if present, is prepended to the UDF name to help manage UDF name collisions.schema_name
helps manage collections of UDFs. A UDF can be registered in a specified schema. If omitted, it will be registered to the defualt schema. udf_schema()
merges the UDFs registered in the requested schemas.SubmetricSchema
is very similar to the DeclarativeSchema
(see Schema Configuration), but applies to just the submetrics within an instance of a UdfMetric
. The defualt STANDARD_UDF_RESOLVER
applies the same metrics as the STANDARD_RESOLVER
for the dataset, except it does not include frequent items for string columns. You can customize the metrics tracked for your UDF outputs by specifying your own submetric_schema
. Note that several @register_metric_udf
decorators may apply to the same input column; you should make sure only one of the decorators is passed your submetric schema, or that they are all passed the same submetric schema.
import whylogs as why
from whylogs.core.datatypes import Fractional, String
from whylogs.experimental.core.udf_schema import (
register_dataset_udf,
register_multioutput_udf,
register_type_udf,
udf_schema
)
from whylogs.experimental.core.metrics.udf_metric import register_metric_udf
from typing import Any, Dict, List, Union
import pandas as pd
@register_dataset_udf(["mass", "volume"])
def density(data: Union[Dict[str, List], pd.DataFrame]) -> Union[List, pd.Series]:
if isinstance(data, pd.DataFrame):
return data["mass"] / data["volume"]
else:
return [mass / volume for mass, volume in zip(data["mass"], data["volume"])]
@register_multioutput_udfs(["x"])
def powers(data: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:
if isinstance(data, pd.DataFrame):
result = pd.DataFrame()
result["xx"] = data["x"] * data["x"]
result["xxx"] = data["x"] * data["x"] * data["x"]
return result
else:
result = {"xx": [data["x"][0] * data["x"][0]]}
result["xxx"] = [data["x"][0] * data["x"][0] * data["x"][0]]
return result
@register_type_udf(Fractional)
def square(input: Union[List, pd.Series]) -> Union[List, pd.Series]:
return [x * x for x in input]
@register_metric_udf(col_type=String)
def upper(input: Any) -> Any:
return input.upper()
df = pd.DataFrame({
"mass": [1, 2, 3],
"volume": [4, 5, 6],
"score": [1.9, 4.2, 3.1],
"lower": ["a", "b", "c"],
"x": [1, 2, 3]
})
schema = udf_schema()
result = why.log(df, schema=schema)
result.view().to_pandas()
cardinality/est | cardinality/lower_1 | cardinality/upper_1 | counts/inf | counts/n | counts/nan | counts/null | distribution/max | distribution/mean | distribution/median | ... | udf/upper:distribution/stddev | udf/upper:frequent_items/frequent_strings | udf/upper:types/boolean | udf/upper:types/fractional | udf/upper:types/integral | udf/upper:types/object | udf/upper:types/string | udf/upper:types/tensor | ints/max | ints/min | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
column | |||||||||||||||||||||
density | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | 0.50 | 0.383333 | 0.40 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
lower | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | NaN | 0.000000 | NaN | ... | 0.0 | [FrequentItem(value='A', est=1, upper=1, lower... | 0.0 | 0.0 | 0.0 | 0.0 | 3.0 | 0.0 | NaN | NaN |
mass | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | 3.00 | 2.000000 | 2.00 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | 3.0 | 1.0 |
score | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | 4.20 | 3.066667 | 3.10 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
score.square | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | 17.64 | 10.286667 | 9.61 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
volume | 3.0 | 3.0 | 3.00015 | 0 | 3 | 0 | 0 | 6.00 | 5.000000 | 5.00 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | 6.0 | 4.0 |
6 rows × 58 columns
result.view().get_column("lower").get_metric("udf").to_summary_dict()["upper:frequent_items/frequent_strings"]
[FrequentItem(value='A', est=1, upper=1, lower=1), FrequentItem(value='C', est=1, upper=1, lower=1), FrequentItem(value='B', est=1, upper=1, lower=1)]
Sometimes you might want to apply the UDFs before or instead of logging data for profiling. You can do that with the apply_udfs()
method of the UdfSchema
.
new_df, _ = schema.apply_udfs(df)
new_df
mass | volume | score | lower | density | score.square | |
---|---|---|---|---|---|---|
0 | 1 | 4 | 1.9 | a | 0.25 | 3.61 |
1 | 2 | 5 | 4.2 | b | 0.40 | 17.64 |
2 | 3 | 6 | 3.1 | c | 0.50 | 9.61 |
_, new_row = schema.apply_udfs(row={"mass": 4, "volume": 7, "score": 2.0, "lower": "d"})
new_row
{'mass': 4, 'volume': 7, 'score': 2.0, 'lower': 'd', 'density': 0.5714285714285714, 'score.square': 4.0}
Note that metric UDFs are not applied in this case, because metric UDFs are invoked by the UdfMetric
. Since we are not profiling in this case, no whylogs metrics exist to invoke them.
Also note that dataset and type UDFs are not invoked if their output columns are already present in the input. So in this case why.log(new_df, schema=schema)
will only execute the metric UDFs since the other UDF columns are already there.