from fugue import FugueWorkflow from fugue_spark import SparkExecutionEngine from time import sleep import pandas as pd # schema: * def just_wait(df:pd.DataFrame) -> pd.DataFrame: sleep(5) return df %%time with FugueWorkflow(SparkExecutionEngine()) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait) df.show() df.show() %%time with FugueWorkflow(SparkExecutionEngine()) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).persist() df.show() df.show() %%time with FugueWorkflow(SparkExecutionEngine()) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).weak_checkpoint(lazy=True) %%time with FugueWorkflow(SparkExecutionEngine()) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).weak_checkpoint(lazy=True) df.show() df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).checkpoint() df.show() df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).strong_checkpoint(lazy=True) df.show() df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).deterministic_checkpoint() df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).deterministic_checkpoint() df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[0]],"a:int") df = df.transform(just_wait).deterministic_checkpoint(namespace="x") df.show() %%time engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) with FugueWorkflow(engine) as dag: df = dag.df([[1]],"a:int") df.show() # added a new independent subworkflow above, the determinism of the following code is not changed df = dag.df([[0]],"a:int") df = df.transform(just_wait).deterministic_checkpoint(namespace="x") df.show() %%time dag1 = FugueWorkflow() df = dag1.df([[0]],"a:int") df.transform(just_wait).yield_as("df") dag2 = FugueWorkflow() df = dag2.df(dag1.yields["df"]) df.transform(just_wait).show() engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) dag1.run(engine) dag2.run(engine) %%time dag1 = FugueWorkflow() df = dag1.df([[0]],"a:int") df.transform(just_wait).deterministic_checkpoint().yield_as("df") dag2 = FugueWorkflow() df = dag2.df(dag1.yields["df"]) df.transform(just_wait).show() engine = SparkExecutionEngine(conf={"fugue.workflow.checkpoint.path":"/tmp"}) dag1.run(engine) dag2.run(engine)