Outputter
represents the logic unit executing on driver on the entire input dataframes WITHOUT output. It's called Outputter because normally this step is to output the data to certain location, or print on console.
Input can be a single DataFrames
Alternatively, acceptable input DataFrame types: DataFrame
, LocalDataFrame
, pd.DataFrame
, List[List[Any]]
, Iterable[List[Any]]
, EmptyAwareIterable[List[Any]]
, List[Dict[str, Any]]
, Iterable[Dict[str, Any]]
, EmptyAwareIterable[Dict[str, Any]]
Output annotation must be None
Before input DataFrames you can have a parameter with ExecutionEngine
annotation so Fugue will pass the current ExecutionEngine
to you
Notice
ArrayDataFrame
and other local dataframes can't be used as annotation, you must use LocalDataFrame
or DataFrme
LocalDataFrame
will bring the entire dataset onto driver, for an Outputter this might be an expected operation, but you still need to be careful.Iterable
like input may have different exeuction plans to bring data to driver, in some cases it can be less optimial (slower), you must be careful.The simplest way, with no dependency on Fugue. You just need to have acceptable annotations on input dataframe.
from typing import Iterable, Dict, Any, List
import pandas as pd
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
def out2(df1:pd.DataFrame, df2:List[List[Any]]) -> None:
print(df1)
print(df2)
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out, params={"n":2})
dag.output(df,using=out,params={"n":2}) # == above
dag.output(df,df,using=out2)
It's very important to know another use case: with ExecutionEngine
. This is how you write native Spark code inside Fugue.
from fugue import ExecutionEngine, DataFrame
from fugue_spark import SparkExecutionEngine, SparkDataFrame
from typing import Iterable, Dict, Any, List
import pandas as pd
# pay attention to the input annotations
def out(e:ExecutionEngine, df:DataFrame) -> None:
assert isinstance(e,SparkExecutionEngine) # this extension only works with SparkExecutionEngine
df = e.to_df(df) # to make sure df is SparkDataFrame, or conversion is done here
df.native.show()
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
df.output(out)
It's also important to know how to use DataFrames
as input annotation. Because this is the only way to be dynamic
from typing import Iterable, Dict, Any, List
from fugue import DataFrames, DataFrame
def out(dfs:DataFrames) -> None:
for k, v in dfs.items():
v.show(title=k)
with FugueWorkflow() as dag:
df1 = dag.df([[0,1]],"a:int,b:int")
df2 = dag.df([[0,2],[1,3]],"a:int,b:int")
df3 = dag.df([[1,1]],"a:int,b:int")
dag.output(df1,df2,df3,using=out)
dag.output(dict(x=df1,y=df2,z=df3),using=out)
There is no obvious advantage to use decorator for Outputter
.
from fugue import outputter, FugueWorkflow
import pandas as pd
@outputter()
def out(df:List[List[Any]], n=1) -> None:
for i in range(n):
print(df)
with FugueWorkflow() as dag:
dag.df([[0,1]],"a:int,b:int").output(out)
All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But if you want to get all execution context such as partition information, use interface.
In the interface approach, type annotations are not necessary, but again, it's good practice to have them.
from fugue import FugueWorkflow, Outputter, DataFrames
from fugue_spark import SparkExecutionEngine
from time import sleep
import pandas as pd
import numpy as np
class Save(Outputter):
def process(self, dfs:DataFrames) -> None:
assert len(dfs)==1
assert isinstance(self.execution_engine, SparkExecutionEngine)
session = self.execution_engine.spark_session
# we get the partition information from Outputter
by = self.partition_spec.partition_by
df = self.execution_engine.to_df(dfs[0])
path = self.params.get_or_throw("path",str)
df.native.write.partitionBy(*by).format("parquet").mode("overwrite").save(path)
with FugueWorkflow(SparkExecutionEngine) as dag:
df = dag.df([[0,1],[0,3],[1,2],[1,1]],"a:int,b:int")
df.partition(by=["a"]).output(Save, params=dict(path="/tmp/x.parquet"))
In the following cases, Fugue does not have built in extensions, but it will very easy to write them by yourselves
DataFrame.show()
is implemented and create your own modified version to pretty print the head n rows using jupyter API.