Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。Dask DataFrame 按行分区,按索引值对行进行分组以提高效率。这些 Pandas 对象可能存在于磁盘或其他机器上。
Pandas 非常适合适合内存的表格数据集。当您要分析的数据集大于机器的 RAM 时,Dask 会变得很有用。
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
Client
|
Cluster
|
创建具有以下属性的随机数据时间序列:
它存储 2000 年每 10 秒的记录
它逐月拆分,将每个月保留为单独的 Pandas 数据框
除了日期时间索引,它还包含名称、ID 和数值列
这是一个大约 240 MB 的小数据集。增加天数或减少练习更大数据集的频率。
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()
# Dask DataFrames 是惰性的,因此这里不打印数据
df
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
df.dtypes
id int64 name object x float64 y float64 dtype: object
# 计算方差
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3
Dask Series Structure: npartitions=1 float64 ... Name: x, dtype: float64 Dask Name: sqrt, 157 tasks
%time computed_df = df3.compute()
CPU times: user 185 ms, sys: 19.9 ms, total: 204 ms Wall time: 1.28 s
computed_df
name Alice 0.577885 Bob 0.576743 Charlie 0.579827 Dan 0.577208 Edith 0.577062 Frank 0.576297 George 0.575883 Hannah 0.576153 Ingrid 0.580452 Jerry 0.574682 Kevin 0.577536 Laura 0.578090 Michael 0.577965 Norbert 0.578714 Oliver 0.577328 Patricia 0.577173 Quinn 0.577750 Ray 0.576110 Sarah 0.576397 Tim 0.577916 Ursula 0.576522 Victor 0.578701 Wendy 0.576590 Xavier 0.576980 Yvonne 0.576429 Zelda 0.576003 Name: x, dtype: float64
df = df.persist()
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
%time df3.compute()
CPU times: user 133 ms, sys: 12 ms, total: 145 ms Wall time: 494 ms
name Alice 0.577885 Bob 0.576743 Charlie 0.579827 Dan 0.577208 Edith 0.577062 Frank 0.576297 George 0.575883 Hannah 0.576153 Ingrid 0.580452 Jerry 0.574682 Kevin 0.577536 Laura 0.578090 Michael 0.577965 Norbert 0.578714 Oliver 0.577328 Patricia 0.577173 Quinn 0.577750 Ray 0.576110 Sarah 0.576397 Tim 0.577916 Ursula 0.576522 Victor 0.578701 Wendy 0.576590 Xavier 0.576980 Yvonne 0.576429 Zelda 0.576003 Name: x, dtype: float64
# 按照名称重排序
df = df.set_index('name')
df
id | x | y | |
---|---|---|---|
npartitions=30 | |||
Alice | int64 | float64 | float64 |
Alice | ... | ... | ... |
... | ... | ... | ... |
Zelda | ... | ... | ... |
Zelda | ... | ... | ... |
df = df.persist()
# 此时针对name字段的值进行索引就很快
%time df.loc['Alice'].compute()
CPU times: user 39.2 ms, sys: 11.7 ms, total: 51 ms Wall time: 88 ms
id | x | y | |
---|---|---|---|
name | |||
Alice | 928 | -0.523920 | 0.226035 |
Alice | 986 | -0.170906 | 0.354172 |
Alice | 931 | -0.499495 | 0.417713 |
Alice | 1003 | 0.828322 | -0.492837 |
Alice | 987 | 0.580199 | 0.944183 |
... | ... | ... | ... |
Alice | 1036 | -0.247774 | -0.955866 |
Alice | 1028 | -0.280042 | -0.701543 |
Alice | 1037 | 0.361707 | 0.656865 |
Alice | 970 | 0.702504 | -0.732269 |
Alice | 977 | -0.636661 | 0.062147 |
100244 rows × 3 columns
client.cancel(df)
client.shutdown()
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client _GatheringFuture exception was never retrieved future: <_GatheringFuture finished exception=CancelledError()> asyncio.exceptions.CancelledError