When using extensions in Fugue, you may add input data validation logic inside your code. However, there is standard way to add your validation logic. Here is a simple example:
from typing import List, Dict, Any
# partitionby_has: a
# schema: a:int,ct:int
def get_count(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
The following commented-out code will fail, because of the hint partitionby_has: a
requires the input dataframe to be prepartitioned by at least column a
.
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[1,1],[0,2]], "a:int,b:int")
# df.transform(get_count).show() # will fail because of no partition by
df.partition(by=["a"]).transform(get_count).show()
df.partition(by=["b","a"]).transform(get_count).show() # b,a is a super set of a
You can also have multiple rules, the following requires partition keys to contain a
, and presort to be exactly b asc
(b == b asc
)
from typing import List, Dict, Any
# partitionby_has: a
# presort_is: b
# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
from fugue import FugueWorkflow
with FugueWorkflow() as dag:
df = dag.df([[0,1],[1,1],[0,2]], "a:int,b:int")
# df.partition(by=["a"]).transform(get_count).show() # will fail because of no presort
df.partition(by=["a"], presort="b asc").transform(get_count).show()
The following are all supported validations. Compile time validations will happen when you construct the FugueWorkflow while runtime validations happen during execution. Compile time validations are very useful to quickly identify logical issues. Runtime validations may take longer time to happen but they are still useful.On Fugue level, we are trying to move runtime validations to compile time as much as we can.
Rule | Description | Compile Time | Order Matters | Examples |
---|---|---|---|---|
partitionby_has | assert the input dataframe is prepartitioned, and the partition keys contain these values | Yes | No | partitionby_has: a,b means the partition keys must contain a and b columns |
partitionby_is | assert the input dataframe is prepartitioned, and the partition keys are exactly these values | Yes | Yes | partitionby_is: a,b means the partition keys must contain and only contain a and b columns |
presort_has | assert the input dataframe is prepartitioned and presorted, and the presort keys contain these values | Yes | No | presort_has: a,b desc means the presort contains a asc and b desc (a == a asc ) |
presort_is | assert the input dataframe is prepartitioned and presorted, and the presort keys are exactly these values | Yes | Yes | presort_is: a,b desc means the presort is exactly a asc, b desc |
schema_has | assert input dataframe schema has certain keys or key type pairs | No | No | schema_has: a,b:str means input dataframe schema contains column a regardless of type, and b of type string, order doesn't matter. So b:str,a:int is valid, b:int,a:int is invalid because of b type, and b:str is invalid because a is not in the schema |
schema_is | assert input dataframe schema is exactly this value (the value must be a schema expression) | No | Yes | schema_is: a:int,b:str , then b:str,a:int is invalid because of order, a:str,b:str is invalid because of a type |
Extension Type | Supported | Not Supported |
---|---|---|
Transformer | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
None |
CoTransformer | None | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
OutputTransformer | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
None |
OutputCoTransformer | None | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
Creator | N/A | N/A |
Processor | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
None |
Outputter | partitionby_has , partitionby_is , presort_has , presort_is , schema_has , schema_is |
None |
It depends on how you write your extension, by comment, by decorator or by interface, feature wise, they are equivalent.
from typing import List, Dict, Any
# schema: a:int,ct:int
def get_count2(df:List[Dict[str,Any]]) -> List[List[Any]]:
return [[df[0]["a"],len(df)]]
import pandas as pd
from typing import List, Dict, Any
from fugue import processor, transformer
@transformer(schema="*", partitionby_has=["a","d"], presort_is="b, c desc")
def example1(df:pd.DataFrame) -> pd.DataFrame:
return df
@transformer(schema="*", partitionby_has="a,d", presort_is=["b",("c",False)])
def example2(df:pd.DataFrame) -> pd.DataFrame:
return df
# partitionby_has: a
# presort_is: b
@transformer(schema="*")
def example3(df:pd.DataFrame) -> pd.DataFrame:
return df
@processor(partitionby_has=["a","d"], presort_is="b, c desc")
def example4(df:pd.DataFrame) -> pd.DataFrame:
return df
In every extension, you can override validation_rules
from fugue import Transformer
class T(Transformer):
@property
def validation_rules(self):
return {
"partitionby_has": ["a"]
}
def get_output_schema(self, df):
return df.schema
def transform(self, df):
return df