Fugue does not have its own data types. Instead, we use a subset of data types from Apache Arrow. Most of the pyarrow data types are supported. To see what the complete supported list you can read this and its source code.
For non-nested types, here is the list. is_primary
means if converting from pyarrow type to Fugue expression, we will use the primary ones. If you are interested, it's generated by this Fugue code
Primary | Fugue Expression | PyArrow |
---|---|---|
* | bool | bool |
boolean | bool | |
* | date | date32[day] |
* | double | double |
float64 | double | |
* | float | float |
float32 | float | |
* | float16 | halffloat |
* | short | int16 |
int16 | int16 | |
* | int | int32 |
int32 | int32 | |
* | long | int64 |
int64 | int64 | |
* | byte | int8 |
int8 | int8 | |
* | null | null |
* | str | string |
string | string | |
* | datetime | timestamp[us] |
* | ushort | uint16 |
uint16 | uint16 | |
* | uint | uint32 |
uint32 | uint32 | |
* | ulong | uint64 |
uint64 | uint64 | |
* | ubyte | uint8 |
uint8 | uint8 |
pa.ListType
and pa.StructType
are supported. For list type, the type expression is [<element type>]
, for struct type, it is json like expression, for example {a:int,b:[str]}
meaning the data is a dict with key a
as int and b
as a list of string.
Notice, it is just a way to express pyarrow data types, it does not invent new types.
Again, Fugue does not invent schema, it uses pyarrow schema. But Fugue creates a special syntax to represent schema: Separated by ,
, each column type pair is <name>:<type expression>
For example: a:int,b:str
or a:int,b_array:[int],c_dict:{x:int,y:str}
Now let's see some examples using the API:
from triad.collections import Schema
print(Schema("a:int,b:str"))
print(Schema("a:int32,b_array:[int64],c_dict:{x:int,y:string}"))
# get pyarrow schema
schema = Schema(" a : int , b : str") # space is ok
print("pa schema", schema.pa_schema)
# more ways to initialized fugue Schema
print(Schema(schema.pa_schema)) # by pyarrow schema
print(Schema(c=str,d=int)) # pythonic way
print(Schema("e:str","f:str")) # you can separate
# Compare schema with string
assert Schema("a: int, b: int64") == "a:int,b:long"
# Operations
print(Schema("a:int")+Schema("b:str"))
print(Schema("a:int")+"b:str")
print(Schema("a:int,c:int,d:int") - ["c"]) # for '-' all cols must exist
print(Schema("a:int,c:int,d:int").exclude(["c","x"])) # exclude means exclude if exists
print(Schema("a:int,c:int,d:int").extract(["d","a"]))
Schema
is very flexiible, for full API reference, please read this
All Fugue operations are on DataFrame, there is no concept such as RDD
or arbitrary object (they are not the core concetps Fugue wants to unify, but you still can use them easily in this framework, see other tutorials). DataFrame
is an abstract concept, it is schemaed dataset. And schema has been defined above.
The motivation of Fugue DataFrame is significantly different from other ideas such as Dask, Modin or Koalas. Fugue DataFrame is not to become another pandas-like DataFrame. And Fugue is NOT going to use Pandas language to unify data processing. That being said, Pandas and Pandas-like dataframes are still widely used and well supported in this framework, because it's an important component for data science.
These are built in local dataframes of Fugue:
You can convert between each other. For more all DataFrames, they all can convert to local dataframe, or local bounded dataframe.
These are built in non-local dataframes. To use them, you need to pip install the extras
It's more important to learn how to initialize local dataframes because by using Fugue, in most of the cases, you only deal with local dataframes on a single machine, single thread.
from fugue import ArrayDataFrame, ArrowDataFrame, IterableDataFrame, PandasDataFrame
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
# The most basic initialization is the same
ArrayDataFrame(data,schema).show()
ArrowDataFrame(data,schema).show()
IterableDataFrame(data,schema).show()
PandasDataFrame(data,schema).show()
# common methods as_array, as_array_iterable, as_dict_iterable
print(ArrowDataFrame(data,schema).as_array())
print(ArrowDataFrame(data,schema).as_array_iterable()) # iterator object
# type safe is very useful
df = ArrayDataFrame(data, "x:str,y:str")
assert isinstance(df.as_array()[0][0], int) # as_array or as_array_iterable by default returns raw data
assert isinstance(df.as_array(type_safe=True)[0][0], str) # turn on type safe to return the data according to schema
# as_pandas is the common interface for all DataFrames
pdf = ArrayDataFrame(data,schema).as_pandas()
print(pdf)
PandasDataFrame(pdf).show() # convert pd.DataFrame to PandasDataFrame
# as_arrow is the common interface for all DataFrames
adf = ArrayDataFrame(data,schema).as_arrow()
print(adf)
ArrowDataFrame(adf).show() # convert arrow table to ArrowDataFrame
# access the native data structure using .native, it applies for all built in DataFrames
print(ArrayDataFrame(data,schema).native)
print(ArrowDataFrame(data,schema).native)
IterableDataFrame
is very special, because it's not bounded. But one important feature of Fugue IterableDataFrame
is that, it is empty aware, so at any point you can check if the dataframe is empty and peek the head row, it will not affect the iteration. If you are interested in details, read this
from fugue import IterableDataFrame
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
# The most basic initialization is the same
df = IterableDataFrame(data,schema)
assert not df.empty
df.show()
assert df.empty # because show consumes all elements of the iterable
# it is ok to convert to other types, but they can work only once
print(IterableDataFrame(data,schema).as_array())
print(IterableDataFrame(data,schema).as_pandas())
print(IterableDataFrame(data,schema).as_arrow())
# common way to use
df = IterableDataFrame(data,schema)
for row in df.as_dict_iterable():
print(row)
from fugue.dataframe.utils import to_local_df, to_local_bounded_df
df = IterableDataFrame(data,schema)
assert to_local_df(df) is df # because it is already local dataframe
assert to_local_bounded_df(df) is not df # because it is not bounded
For non-local dataframes, you can convert them to local dataframe. But the initialization will depend on specific execution engines. Here we only use DaskDataFrame as an example
from fugue_dask import DaskDataFrame, DaskExecutionEngine
data = [[0,"a"],[1,"b"]]
schema = "x:int,y:str"
engine = DaskExecutionEngine()
df = engine.to_df(data, schema) # use engine.to_df is the best way to generate engine dependent dataframes
assert isinstance(df, DaskDataFrame)
df.as_local().show() # it converts to a local DataFrame, for Dask, it becomes a PandasDataFrame
print(df.native) # access the dask dataframe
from fugue.dataframe.utils import to_local_df, to_local_bounded_df
df = engine.to_df(data, schema)
to_local_df(df).show()
to_local_bounded_df(df).show() # this is stronger, it prevents using IterableDataFrame
Commonly, you only tell Fugue: I want to create a dataframe, and here is the raw data source. And Fugue with certain execution engine will do the job for you. In you own logic, you mostly care about two abstract ypes in you functions: DataFrame
and LocalDataFrame
. They can be seen in the extentions tutorials.
DataFrames
is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework
from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame
df1 = ArrayDataFrame([[0]],"a:int")
df2 = ArrowDataFrame([[1]],"b:int")
dfs = DataFrames(df1, df2) # list-like
assert not dfs.has_key
assert df1 is dfs[0]
assert df2 is dfs[1]
# how to get values as an array in list-like DataFrames
print(list(dfs.values()))
dfs = DataFrames(x=df1, y=df2) # dict-like
assert dfs.has_key
assert df1 is dfs["x"]
assert df2 is dfs["y"]
assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate
dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like
import triad
from fugue_sql import FugueSQLWorkflow
from typing import List, Any
#schema: fugue_type_expr:str, pa_type:str
def type_to_expr(primary:bool=False) -> List[List[Any]]:
if not primary:
return [[k,str(v)] for k,v in triad.utils.pyarrow._TYPE_EXPRESSION_MAPPING.items()]
else:
return [[v,str(k)] for k,v in triad.utils.pyarrow._TYPE_EXPRESSION_R_MAPPING.items()]
with FugueSQLWorkflow() as dag:
dag("""
f2p = CREATE USING type_to_expr
f2p_primary = CREATE USING type_to_expr(primary=true)
SELECT CASE WHEN f2p_primary.pa_type IS NOT NULL THEN "*" ELSE "" END AS is_primary,f2p.*
FROM f2p LEFT OUTER JOIN f2p_primary
ON f2p.fugue_type_expr=f2p_primary.fugue_type_expr
ORDER BY pa_type, is_primary DESC
PRINT ROWS 100
""")