import polars as pl
from polars.lazy import *
import numpy as np
from string import ascii_letters
import pandas as pd
import os
from typing import Union
Py-polars has a lazy API that supports a subset of the eager API. Lazyness means that operations aren't executed until you ask for them. Let's start with a short example..
Below we'll create a DataFrame in an eager fashion (meaning that the creation of the DataFrame is executed at once).
df = pl.DataFrame({"a": np.arange(0, 10),
"b": np.random.rand(10),
"c": list(ascii_letters[:10])})
df
+-----+-------+-----+ | a | b | c | | --- | --- | --- | | i64 | f64 | str | +=====+=======+=====+ | 0 | 0.111 | "a" | +-----+-------+-----+ | 1 | 0.034 | "b" | +-----+-------+-----+ | 2 | 0.56 | "c" | +-----+-------+-----+ | 3 | 0.142 | "d" | +-----+-------+-----+ | 4 | 0.584 | "e" | +-----+-------+-----+ | 5 | 0.537 | "f" | +-----+-------+-----+ | 6 | 0.643 | "g" | +-----+-------+-----+ | 7 | 0.349 | "h" | +-----+-------+-----+ | 8 | 0.716 | "i" | +-----+-------+-----+ | 9 | 0.451 | "j" | +-----+-------+-----+
To make this a lazy dataframe we call the .lazy
method. As we can see, not much happens.
ldf = df.lazy()
ldf
<polars.lazy.LazyFrame at 0x7f295093ba58>
We can filter this DataFrame
on all the rows, but we'll see that again nothing happens.
Note the col
and lit
(meaning column* and literal value) are part of the lazy dsl (domain specific language) and are needed to build a query plan.*
ldf = ldf.filter(col("a") == (lit(2)))
ldf
<polars.lazy.LazyFrame at 0x7f29197674a8>
The query is only executed when we ask for it. This can be done with .collect
method.
Below we execute the query and obtain our results.
ldf.collect()
+-----+------+-----+ | a | b | c | | --- | --- | --- | | i64 | f64 | str | +=====+======+=====+ | 2 | 0.56 | "c" | +-----+------+-----+
This lazyness opens up quite some cool possibitlies from an optimization perspective. It allows polars to modify the query right before executing it and make suboptimal queries more performant. Let's show this using various operations, comparing lazy execution with eager execution in both Polars and Pandas.
Let's create 2 DataFrames with quite some columns and rows.
def rand_string(n: int, set_size: int, lower=True) -> str:
s = "".join(np.random.choice(list(ascii_letters[:set_size]), n))
if lower:
return s.lower()
return s
rows = 100_000
columns = 30
key_size = 5
key_set_size = 4
np.random.seed(1)
dtypes = [np.float32, np.float64, np.int]
df_a = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key", np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_a.insert_at_idx(0, s)
rows = 80_000
columns = 8
df_b = pl.DataFrame({f"column_{i}": np.array(np.random.rand(rows) * 10, dtype=np.random.choice(dtypes)) for i in range(columns)})
s = pl.Series("key", np.array([rand_string(key_size, key_set_size) for _ in range(rows)]))
df_b.insert_at_idx(0, s)
print("Showing a subset of df_a:")
# only show a sub_slice
df_a[:3, :10]
Showing a subset of df_a:
+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+ | key | column_0 | column_1 | column_2 | column_3 | column_4 | column_5 | column_6 | column_7 | column_8 | | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | | str | f64 | f32 | f64 | f32 | i64 | f64 | f64 | f64 | i64 | +=========+==========+==========+==========+==========+==========+==========+==========+==========+==========+ | "aacbb" | 4.17 | 6.003 | 4.74 | 0.447 | 3 | 0.43 | 3.206 | 6.397 | 7 | +---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+ | "abbca" | 7.203 | 1.257 | 3.716 | 5.109 | 0 | 8.854 | 5.967 | 5.141 | 5 | +---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+ | "accac" | 0.001 | 3.436 | 4.738 | 4.253 | 8 | 6.743 | 5.28 | 4.888 | 0 | +---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
df_a_pd = df_a.to_pandas()
df_b_pd = df_b.to_pandas()
Arrow 2.0 is out and Polars is also faster in filtering. :)
Let's start with an operation where polars is slower than pandas; filtering. A filter predicate creates a boolean array. Polars/Arrow stores these boolean values not as boolean values of 1 byte, but as bits, meaning 1 bytes stores 8 booleans. This reduces memory 8-fold, but has some overhead on array creation. As we can see pandas is more than 5x faster, though there is a huge spread.
Pandas has something called a blockmanager which hugely increases filtering performance (I believe due to cache optimallity). However this blockmanager gives performance hits when modifying blocks and block consolidation is triggered. This block consolidation triggers:
Read more about the blockmanager.
%%timeit
df_a["column_2"] < 1
10000 loops, best of 5: 99.8 µs per loop
%%timeit
df_a_pd["column_2"] < 1
The slowest run took 35.76 times longer than the fastest. This could mean that an intermediate result is being cached. 10000 loops, best of 5: 119 µs per loop
If we use this mask to select rows from the DataFrame we see that polars gets slower linearly with the number of columns. If we apply this filter on a DataFrame with a single column pandas is 1.2 faster, however the runtime is just 1 ms. So the operations are fast.
%%timeit
df_a[:, :1][df_a["column_2"] < 1]
1000 loops, best of 5: 1.32 ms per loop
%%timeit
df_a_pd.iloc[:, :1][df_a_pd["column_2"] < 1]
100 loops, best of 5: 2.32 ms per loop
~The peformance stays approximatly the same with more columns Here we observe that with 30 columns, polars is still ~1.2x slower.~
%%timeit
df_a[df_a["column_2"] < 1]
100 loops, best of 5: 2.33 ms per loop
%%timeit
df_a_pd[df_a_pd["column_2"] < 1]
100 loops, best of 5: 2.41 ms per loop
However, polars wins in all the expensive operations. Joins an groupby operations take most of the running time of query. Below we see that a join in polars is more than 3x faster than the join of pandas and that join operation can take 1000-3000x the runtime of a DataFrame filter.
It's better to be fast in the expensive operations.
%%timeit
df_a.join(df_b, left_on="key", right_on="key", how="inner").shape
1 loop, best of 5: 1.06 s per loop
%%timeit
df_a_pd.merge(df_b_pd, left_on="key", right_on="key", how="inner").shape
1 loop, best of 5: 3.95 s per loop
In the groupby operation with an aggregation on all the columns we see that polars is more than 5x faster. Again polars is embarissingly parallel. Which means that it can be slower than pandas due to parallelization overhead. However, when this is the case, it doesn't matter because you are counting only a few ms extra for parallelization.
%%timeit
df_a.groupby(["key"]).first().shape
100 loops, best of 5: 3.28 ms per loop
%%timeit
df_a_pd.groupby("key").first().shape
100 loops, best of 5: 16.2 ms per loop
Filtering a DataFrame leads to a new allocation. An often sub-optimal query is doing multiple queries at once. Let's see if lazyness can help optimize that.
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
df = df[df['column_2'] < 9]
df = df[df['column_3'] > 1]
df = df[df['column_6'] > 1]
df = df[df['column_4'] > 1]
return df
assert eager(df_a_pd).shape == eager(df_a).shape
eager(df_a_pd).shape
(58019, 31)
%%timeit
eager(df_a)
10 loops, best of 5: 18.7 ms per loop
%%timeit
eager(df_a_pd)
10 loops, best of 5: 20.2 ms per loop
def lazy_query(df_a: pl.DataFrame):
return (df_a.lazy().filter(col("column_2") < lit(9))
.filter(col("column_3") > lit(1))
.filter(col("column_6") > lit(1))
.filter(col("column_4") > lit(1)))
%%timeit
lazy_query(df_a).collect()
100 loops, best of 5: 5.88 ms per loop
Above the query optimizer aggregated all the filters and executed them at once. This reduces a lot of extra allocations at every filter operations. With this optimization we don't incur a performance hit by blatantly filtering on different location in a query.
We did increase the eager performance by ~2x by rewriting the query.
Let's look at another optimization. Let's say we are only interested in the columns "key"
and "column_1"
.
A suboptimal eager query could be written like below. This query could be more performant if the projection (selecting columns) was done before the selection (filtering rows). Below we see that the lazy query is optimized and selects the needed columns before doing the filter operation. This speeds up the query to ~1.5x by not filtering columns that are part of the result.
def eager(df: Union[pl.DataFrame, pd.DataFrame]):
df = df[df['column_2'] < 9]
df = df[df['column_3'] > 1]
df = df[df['column_6'] > 1]
df = df[df['column_4'] > 1]
return df[["key", "column_1"]]
def lazy_query(df_a: pl.DataFrame):
return (df_a.lazy().filter(col("column_2") < lit(9))
.filter(col("column_3") > lit(1))
.filter(col("column_6") > lit(1))
.filter(col("column_4") > lit(1))
.select([col("key"), col("column_1")]))
%%timeit
eager(df_a)
10 loops, best of 5: 19.1 ms per loop
%%timeit
eager(df_a_pd)
10 loops, best of 5: 23.9 ms per loop
%%timeit
lazy_query(df_a).collect()
100 loops, best of 5: 3.97 ms per loop
The same trick can be done with predicates. A sub-optimal query would do the filter after an expensive join operation.
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
df_a = df_a[df_a["column_1"] < 1]
df_b = df_b[df_b["column_1"] < 1]
# pandas
if hasattr(df_a, "values"):
return df_a.merge(df_b, left_on="key", right_on="key")
return df_a.join(df_b, left_on="key", right_on="key")
%%timeit
eager(df_a, df_b)
100 loops, best of 5: 11.9 ms per loop
%%timeit
eager(df_a_pd, df_b_pd)
10 loops, best of 5: 43.9 ms per loop
def eager(df_a: Union[pl.DataFrame, pd.DataFrame], df_b: Union[pl.DataFrame, pd.DataFrame]):
# pandas
if hasattr(df_a, "values"):
df = df_a.merge(df_b, left_on="key", right_on="key")
df = df[df["column_1_x"] < 1]
return df
df = df_a.join(df_b, left_on="key", right_on="key")
df = df[df["column_1"] < 1]
return df
%%timeit
eager(df_a, df_b)
1 loop, best of 5: 1.18 s per loop
%%timeit
eager(df_a_pd, df_b_pd)
1 loop, best of 5: 4.83 s per loop
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
return (df_a.lazy()
.join(df_b.lazy(), left_on=col("key"), right_on=col("key"))
.filter(col("column_1") < lit(1))
)
%%timeit
lazy_query(df_a, df_b).collect().shape
100 loops, best of 5: 11.6 ms per loop
As we can see, choosing the wrong order of filters has a large effect, slowing down the query more than 66x. In the lazy variant, the optimizer pushed down the predicates such that they are executed before the join.
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
return (df_a.lazy()
.join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
.filter(col("column_1") < lit(1))
.groupby("key")
.agg([col("column_0").agg_sum()])
.select([col("key"), col("column_0_sum")])
)
%%timeit
lazy_query(df_a, df_b).collect()
100 loops, best of 5: 13.7 ms per loop
The lazy api also has access to all the eager
operations on Series
because there are udf's with almost no overhead (no serializing or pickling). Below we'll add a column "udf"
with a lambda
and help of the eager api. It still needs some polishing, as we need to make sure that we don't modify the dtypes. I hope you can imagine that this can be very powerful! :)
%%time
def lazy_query(df_a: pl.DataFrame, df_b: pl.DataFrame):
return (df_a.lazy()
.join(df_b.lazy(), left_on=col("key"), right_on=col("key"), how="inner")
.filter(col("column_1") < lit(1))
.groupby("key")
.agg([col("column_0").agg_sum(), col("column_2").agg_max().alias("foo")])
.with_column(col("foo").apply(
lambda series: pl.Series("", np.ones(series.len(), dtype=np.float32) * series.sum() )
).alias('udf'))
.select([col("key"), col("column_0_sum"), col("udf"), col("foo")])
)
lazy_query(df_a, df_b).collect()
CPU times: user 76.1 ms, sys: 0 ns, total: 76.1 ms Wall time: 16.3 ms
# More coming up later.