Transformer
represents the logic unit executing on logical partitions of the input dataframe. The partitioning logic is not a concern of Transformer
, it should be specified in a previous step. But you must understand the concept of partition in Fugue, please read this.
It accepts these input DataFrame Types: LocalDataFrame
, pd.DataFrame
, List[List[Any]]
, Iterable[List[Any]]
, EmptyAwareIterable[List[Any]]
, List[Dict[str, Any]]
, Iterable[Dict[str, Any]]
, EmptyAwareIterable[Dict[str, Any]]
It accepts these output DataFrame types: LocalDataFrame
, pd.DataFrame
, List[List[Any]]
, Iterable[List[Any]]
, EmptyAwareIterable[List[Any]]
, List[Dict[str, Any]]
, Iterable[Dict[str, Any]]
, EmptyAwareIterable[Dict[str, Any]]
Notice that ArrayDataFrame
and other local dataframes can't be used as annotation, you must use LocalDataFrame
.
Transformer
requires users to be explicit on the output schema. *
can represent the input dataframe schema, so *,b:int
means the output will have an additional column. The schema can be specified by shema hint, decorator, or in the Fugue code.
Normally computing frameworks can infer output schema, however, it is neither reliable nor efficient. To infer the schema, it has to go through at least one partition of data and figure out the possible schema. However, what if a transformer is producing inconsistent schemas on different data partitions? What if that partition takes a long time or fail? So to avoid potential correctness and performance issues, Transformer
and CoTransformer
output schemas are required in Fugue.
The simplest way, with no dependency on Fugue. You just need to have acceptable annotations on input dataframe and output. In native approach, you must specify schema in the Fugue code.
from typing import Iterable, Dict, Any, List
import pandas as pd
def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
df["b"]+=n
return df
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
yield next(df)
return
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
# with out schema hint you have to specify schema in Fugue code
df = df.transform(add, schema="*").transform(add, schema="*", params=dict(n=2))
# how to define partition for transformers to operate on
# get smallest b of each partition
df.partition(by=["a"], presort="b").transform(get_top, schema="*").show()
# get largest b of each partition
df.partition(by=["a"], presort="b DESC").transform(get_top, schema="*").show()
When you need to reuse a transformer multiple times, it's tedious to specify the schema in Fugue code every time. You can instead, write a schema hint on top of the function, this doesn't require you to have Fugue dependency. The following code is doing the same thing as above but see how much shorter.
from typing import Iterable, Dict, Any, List
import pandas as pd
# schema: *
def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
df["b"]+=n
return df
# schema: *
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
yield next(df)
return
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df = df.transform(add).transform(add, params=dict(n=2)) # see how parameters are set
df.partition(by=["a"], presort="b").transform(get_top).show()
df.partition(by=["a"], presort="b DESC").transform(get_top).show()
from typing import Iterable, Dict, Any, List
import pandas as pd
# schema: *,c:int
def with_c(df:pd.DataFrame) -> pd.DataFrame:
df["c"]=1
return df
# schema: *-b
def drop_b(df:pd.DataFrame) -> pd.DataFrame:
return df.drop("b", axis=1)
# schema: *~b,c
def drop_b_c_if_exists(df:pd.DataFrame) -> pd.DataFrame:
return df.drop(["b","c"], axis=1, errors='ignore')
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df = df.transform(with_c)
df.show()
df = df.transform(drop_b)
df.show()
df = df.transform(drop_b_c_if_exists)
df.show()
Decorator approach can do everything the schema hint can do, plus, it can take in a function to generate the schema.
import pandas as pd
from fugue import transformer
# df is the zipped DataFrames, **kwargs is the parameters passed in from fugue
@transformer(lambda df, **kwargs: df.schema+"c:int") # == @transformer("*,c:int")
def with_c(df:pd.DataFrame) -> pd.DataFrame:
df["c"]=1
return df
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df = df.transform(with_c)
df.show()
All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But for certain cases, you should implement interface, for example:
The biggest advantage of interface approach is that you can customize pyhisical partition level initialization, and you have all the up-to-date context variables to use.
In the interface approach, type annotations are not necessary, but again, it's good practice to have them.
The following case focuses on performance comparison, to see how to use context variables, read CoTransfromer example
from fugue import Transformer, FugueWorkflow, PandasDataFrame, DataFrame, LocalDataFrame
from triad.collections import Schema
from time import sleep
import pandas as pd
import numpy as np
def expensive_init(sec=5):
sleep(sec)
def helper(ct=20) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))
class Median(Transformer):
# this is invoked on driver side
def get_output_schema(self, df):
return df.schema + (self.params.get_or_throw("col", str),float)
# on initialization of the physical partition
def on_init(self, df: DataFrame) -> None:
self.col = self.params.get_or_throw("col", str)
expensive_init(self.params.get("sec",0))
def transform(self, df):
pdf = df.as_pandas()
pdf[self.col]=pdf["b"].median()
return PandasDataFrame(pdf)
with FugueWorkflow() as dag:
dag.create(helper).partition(by=["a"]).transform(Median, params={"col":"m", "sec": 1}).show(rows=100)
Notice that we set self.col
in on_init
, it's better to set it in on_init
or transform
. It's better not to set it in get_output_schema
because that will need to be serialized and send to each workers if using a distributed engine, serialization can fail for some value types.
In order to show the benefit of on_init
we also create an interfaceless version (which is a lot simpler), but you have to call expensive_init
in that function for each logical partition. Also, in the run function, we set num=2
to show the effect. So for Median
transformer, the expensive_init
will be called at most twice, but for median
it will be called for more times.
Notice, the numbers may be off if you run this on binder.
from fugue_spark import SparkExecutionEngine
from timeit import timeit
# schema: *, m:double
def median(df:pd.DataFrame, sec=0) -> pd.DataFrame:
expensive_init(sec)
df["m"]=df["b"].median()
return df
def run(engine, interfaceless, sec):
with FugueWorkflow(engine) as dag:
df = dag.create(helper)
if interfaceless:
df.partition(by=["a"], num=2).transform(median, params={"sec": sec}).show(rows=100)
else:
df.partition(by=["a"], num=2).transform(Median, params={"col":"m", "sec": sec}).show(rows=100)
engine = SparkExecutionEngine()
print(timeit(lambda: run(engine, True, 1), number=1))
print(timeit(lambda: run(engine, False, 1), number=1))