It's strongly recommended to quickly go through the COVID19 example to get a sense of what Fugue SQL can do, and how it works. And here we are going through details of different Fugue SQL features.
Fugue SQL is an alternative to Fugue programming interface. Both of them are used to describe your end to end workflow logic. The SQL sementic is platform and scale agnostic, so if you write logic in SQL, it's very high level and abstract, and the underlying computing frameworks will try to excute them in the optimal way. For example when you use SparkExecutionEngine
, the sql statements are executed as SparkSQL, which is highly optimized for execution.
The syntax of Fugue SQL is between standard SQL, json and python. The goals are
SELECT
statementIn order to use Fugue SQL, you firstly need to make sure you have installed the sql extra
pip install fugue[sql]
To use Fugue SQL in your program, you need to use FugueSQLWorkflow
derived from FugueWorkflow
, it has all the programming interface features plus Fugue SQL support.
from fugue_sql import FugueSQLWorkflow
with FugueSQLWorkflow() as dag:
dag("""
CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT
""")
the SQL will be translated to a sequence of operations in programming interface. And you can mix with programming interface
with FugueSQLWorkflow() as dag:
df = dag.df([[0,"hello"],[1,"world"]],"a:int,b:str")
dag("""
SELECT * FROM df WHERE a=0 # see we can use df directly defined outside
PRINT
""")
with FugueSQLWorkflow() as dag:
df = dag.df([[0,"hello"],[1,"world"]],"a:int,b:str")
x=0
dag("""
SELECT * FROM df WHERE a={{x}} # see we can use variable x directly
PRINT
""")
dag("""
SELECT * FROM df WHERE a={{y}} # or in this way
PRINT
""", y=1)
Local variables or previous SQL block defined variables can be used directly in the next SQL block. All variables (dataframes) defined in SQL blocks can be accessed by dag["<key>"]
with FugueSQLWorkflow() as dag:
dag("""
a=CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
""")
dag("""
PRINT a
""")
dag["a"].show()
In Fugue SQL a very important simplification is anonymity, it's optional, but it usually can significantly simplify your code.
For a statement that onlly needs to consume the previous dataframe, you can use anonymity. PRINT
is the best example.
from fugue_sql import FugueSQLWorkflow
with FugueSQLWorkflow() as dag:
dag("""
a=CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT # here if i don't specify, it means I want to print a -- the last dataframe output of the previous statements
PRINT # I can use anonymity again because PRINT doesn't generate output, so it still means PRINT a
""")
For statements that don't generate output, you can't assign it to any variable. For statements that generates single output, you can also use anonymity and don't assign to a variable. The following statements will have to use anonymity if they need to consume this output.
from fugue_sql import FugueSQLWorkflow
with FugueSQLWorkflow() as dag:
dag("""
a=CREATE [[0,"hello"]] SCHEMA a:int,b:str
CREATE [[1,"world"]] SCHEMA a:int,b:str
PRINT # print the second
PRINT a # print the first, because it is explicit
PRINT # print the second
""")
Not only Fugue extensions can use anonymity, SELECT
statement can also use it
with FugueSQLWorkflow() as dag:
dag("""
CREATE [[0,"hello"], [1,"world"]] SCHEMA a:int,b:str
SELECT * WHERE a=1 # I don't need FROM, and it means FROM the last output of the previous statements
PRINT
""")
You can make one statement inside another using (
)
. But since you can easily do variable assignment in Fugue, you may not need to write your code in this way. It's all up to you.
with FugueSQLWorkflow() as dag:
dag("""
SELECT *
FROM (CREATE [[0,"hello"], [1,"world"]] SCHEMA a:int,b:str)
WHERE a=1
PRINT
""")
dag("""
PRINT (
SELECT *
FROM (CREATE [[0,"hello"], [1,"world"]] SCHEMA a:int,b:str)
WHERE a=1
)
""")
CREATE array [SCHEMA schema]
with FugueSQLWorkflow() as dag:
dag("""
CREATE [[0,"hello"],[1,"world"]] SCHEMA a:int,b:str
PRINT
""")
from typing import List, Any
import pandas as pd
def create1(n=1) -> pd.DataFrame:
return pd.DataFrame([[n]],columns=["a"])
# schema: a:int
def create2(n=1) -> List[List[Any]]:
return [[n]]
def create3(n=1) -> List[List[Any]]:
return [[n]]
with FugueSQLWorkflow() as dag:
dag("""
CREATE USING create1 PRINT
CREATE USING create1(n=2) PRINT
CREATE USING create2(n=3) PRINT
CREATE USING create3(n=4) SCHEMA a:int PRINT
""")
LOAD [PARQUET|CSV|JSON] path [(params)] [COLUMNS schema|columns]
Only if the path has no explicit suffix, you need to sepcify the file type hint
with FugueSQLWorkflow() as dag:
dag("""
CREATE [[0,"1"]] SCHEMA a:int,b:str
SAVE OVERWRITE "/tmp/f.parquet"
SAVE OVERWRITE "/tmp/f.csv"(header=true)
SAVE OVERWRITE "/tmp/f.json"
SAVE OVERWRITE PARQUET "/tmp/f"
""")
dag("""
LOAD "/tmp/f.parquet" PRINT
LOAD "/tmp/f.parquet" COLUMNS a PRINT
LOAD PARQUET "/tmp/f" PRINT
LOAD "/tmp/f.csv"(header=true) PRINT
LOAD "/tmp/f.csv"(header=true) COLUMNS a:int,b:str PRINT
LOAD "/tmp/f.json" PRINT
LOAD "/tmp/f.json" COLUMNS a:int,b:str PRINT
""")
PRINT [dataframes] [ROWS int] [ROWCOUNT] [TITLE "title"]
When you give ROWCOUNT
you want to print the total row count of the dataframe. In a distributed environment, it can be expensive, so you mmay consider persisting the dataframes you want to print.
import numpy as np
import pandas as pd
def helper(ct=30) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))
with FugueSQLWorkflow() as dag:
dag("""
CREATE USING helper
PRINT
PRINT ROWS 5 ROWCOUNT TITLE "xyz"
""")
import pandas as pd
def output(df:pd.DataFrame, n=1) -> None:
print(n)
print(df)
with FugueSQLWorkflow() as dag:
dag("""
a=CREATE [[0]] SCHEMA a:int
OUTPUT a USING output(n=2)
OUTPUT PREPARTITION BY a USING output
""")
SAVE [dataframe] [PREPARTITION statement] OVERWRITE|APPEND|TO [SINGLE] [PARQUET|CSV|JSON] path [(params)]
SAVE ... TO
means if the file exists, the error will be thrown.
When saving to CSV, normally, you add (header=true)
to save header.
from fugue_spark import SparkExecutionEngine
with FugueSQLWorkflow(SparkExecutionEngine) as dag:
dag("""
CREATE [[0,"1"]] SCHEMA a:int,b:str
SAVE OVERWRITE "/tmp/f.parquet"
SAVE OVERWRITE SINGLE PARQUET "/tmp/f2"
SAVE PREPARTITION BY a PRESORT b OVERWRITE "/tmp/f.csv"(header=true)
""")
PREPARTITION
can change the file structure, it can also affect speed for a distributed framework. The following shows how partition changes the output structure using Spark.
def helper(ct=30) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))
with FugueSQLWorkflow(SparkExecutionEngine) as dag:
dag("""
CREATE USING helper
SAVE OVERWRITE "/tmp/f3.parquet"
SAVE PREPARTITION BY a OVERWRITE "/tmp/f4.parquet"
""")
from fugue import FileSystem
print(FileSystem().listdir("/tmp/f3.parquet"))
print(FileSystem().listdir("/tmp/f4.parquet"))
def concat(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
return pd.concat([df1,df2]).reset_index(drop=True)
with FugueSQLWorkflow() as dag:
dag("""
a = CREATE [[0,"1"]] SCHEMA a:int,b:str
b = CREATE [[1,"2"]] SCHEMA a:int,b:str
PROCESS a,b USING concat
PRINT
""")
TRANSFORM [dataframe] [PREPARTITION statement] USING extension [(params)] [SCHEMA schema]
PREPARTITION
will control if you want to apply the transformer directly on physical partitions or logical partitions. For the concept of partition read the partition tutorial.
Please also read this
This is very important section, please read
import pandas as pd
import numpy as np
from typing import Iterable,List,Any
from fugue_sql import FugueSQLWorkflow
from fugue_spark import SparkExecutionEngine
def helper(ct=20) -> pd.DataFrame:
np.random.seed(0)
return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))
# schema: *,partition_count:int
def ct(df:List[List[Any]]) -> Iterable[List[Any]]:
c = len(df)
for row in df:
yield row+[c]
dag=FugueSQLWorkflow()
dag("""
a = CREATE USING helper PERSIST
TRANSFORM a USING ct # on whatever physical partition a has
PRINT ROWS 100
TRANSFORM a PREPARTITION BY a USING ct # on user defined logical partition, similar to groupBy-apply
PRINT ROWS 100
""")
dag.run()
dag.run(SparkExecutionEngine)
As you can see on Spark and native python, the first transform gives different result because they have different ways to partition your data on the physical level. However, they produce the same result when you specify prepartition by a, because this logical partition is defined by you, and across all ExecutionEngine
, it will be respected.
Let's see another example: select top record of each partition, so how we use PRESORT
# schema: *
def select_top(df:Iterable[List[Any]]) -> List[List[Any]]:
return [next(df)]
dag=FugueSQLWorkflow()
dag("""
a = CREATE USING helper PERSIST
TRANSFORM a USING select_top # on whatever physical partition a has
PRINT TITLE "underministic result"
TRANSFORM a PREPARTITION BY a PRESORT b USING select_top
PRINT TITLE "smallest of each partition"
TRANSFORM a PREPARTITION BY a PRESORT b DESC USING select_top
PRINT TITLE "largest of each partition"
""")
dag.run()
dag.run(SparkExecutionEngine)
TRANSFORM [dataframe] USING extension [(params)] [SCHEMA schema]
The extension must be a CoTransformer
or compatible functions. And before transform you must use ZIP
ZIP [dataframes] [joinType] [BY cols] [PRESORT presort]
join types are: CROSS
, INNER
, LEFT OUTER
, RIGHT OUTER
, FULL OUTER
. If not specified, the default is INNER
.
Please also read this
In the following example, inline statement is used for simplicity
from fugue import DataFrames
#schema: res:[str]
def to_str_with_key(dfs:DataFrames) -> List[List[Any]]:
return [[[k+" "+x.as_array().__repr__() for k,x in dfs.items()]]]
with FugueSQLWorkflow() as dag:
dag("""
df1 = CREATE [[0,1],[1,3]] SCHEMA a:int,b:int
df2 = CREATE [[0,4],[2,2]] SCHEMA a:int,c:int
df3 = CREATE [[0,2],[1,1],[1,5]] SCHEMA a:int,d:int
TRANSFORM (ZIP df1,df2,df3) USING to_str_with_key
PRINT
TRANSFORM (ZIP a=df1,b=df2,c=df3 LEFT OUTER BY a PRESORT b DESC) USING to_str_with_key
PRINT
""")
It should be fully compatible with standard SELECT
syntax. Here are some additions:
FROM
clause if omitted, it means you want to select from the last dataframe generated by the previous statementsFROM
clause can have inline Fugue statements such as CREATE
, PROCESS
, TRANSFORM
, etc.with FugueSQLWorkflow() as dag:
dag("""
df1 = CREATE [[0,1],[1,3]] SCHEMA a:int,b:int
# inline
SELECT df1.*, df2.c FROM df1 INNER JOIN (CREATE [[0,4],[2,2]] SCHEMA a:int,c:int) AS df2
ON df1.a=df2.a
# no from
SELECT c,b,a
PRINT
""")
For any statement that outputs a single dataframe, you can persist or broadcast that result.
Please also read this
In the following example, there are several reasons to persist df1
SELECT
and PRINT
, for Spark, without persist, it will run twiceWe also broadcast df2 because it is really small, broadcast it explicitly is a good idea
import pandas as pd
import numpy as np
from typing import Iterable,List,Any
from fugue_sql import FugueSQLWorkflow
from fugue_spark import SparkExecutionEngine
def helper(ct=20) -> pd.DataFrame:
return pd.DataFrame(np.random.randint(0,10,size=(ct, 2)), columns=list('ab'))
dag = FugueSQLWorkflow()
dag("""
df1 = SELECT *, rand() AS rand FROM (CREATE USING helper) PERSIST
df2 = CREATE [[0,2],[2,3]] SCHEMA a:int,c:int BROADCAST
SELECT df1.*,c FROM df1 INNER JOIN df2 ON df1.a=df2.a
PRINT
PRINT df1
""")
dag.run(SparkExecutionEngine)