Gotcha's from Pandas to Dask

This notebook highlights some key differences when transfering code from Pandas to run in a Dask environment.
Most issues have a link to the Dask documentation for additional information.

In [ ]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')

Start Dask Client for Dashboard

Starting the Dask Client is optional. In this example we are running on a LocalCluster, this will also provide a dashboard which is useful to gain insight on the computation.
For additional information on Dask Client see documentation

The link to the dashboard will become visible when you create a client (as shown below).
When running within Jupyter Lab an extenstion can be installed to view the various dashboard widgets.

In [ ]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client

Create 2 DataFrames for comparison:

  1. for Dask
  2. for Pandas
    Dask comes with builtin dataset samples, we will use this sample for our example.
In [ ]:
ddf = dask.datasets.timeseries()
ddf
  • Remember Dask framework is lazy thus in order to see the result we need to run compute() (or head() which runs under the hood compute()) )
In [ ]:
ddf.head(2)

Pandas Dataframe

In order to create a Pandas dataframe we can use the compute() method from a Dask dataframe

In [ ]:
pdf = ddf.compute()  
print(type(pdf))
pdf.head(2)

dataframe.shape

We can also see dask laziness when using the shape attribute

In [ ]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')

We cannot get the full shape before accessing all the partitions - running len will do so

In [ ]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive

Creating a Dask dataframe from Pandas

In order to utilize Dask capablities on an existing Pandas dataframe (pdf) we need to convert the Pandas dataframe into a Dask dataframe (ddf) with the from_pandas method. You must supply the number of partitions or chunksize that will be used to generate the dask dataframe

In [ ]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2

Partitions in Dask Dataframes

Notice that when we created a Dask dataframe we needed to supply an argument of npartitions.
The number of partitions will assist Dask on how to breakup the Pandas Datafram and parallelize the computation.
Each partition is a separate dataframe. For additional information see partition documentation

An example for this can be seen when examing the reset_ index() method:

In [ ]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
In [ ]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute() 

Dask Dataframe vs Pandas Dataframe

Now that we have a dask (ddf) and a pandas (pdf) dataframe we can start to compair the interactions with them.

Conceptual shift - from Update to Insert/Delete

Dask does not update - thus there are no arguments such as inplace=True which exist in Pandas.
For more detials see issue#653 on github

Rename Columns

  • using inplace=True is not considerd to be best practice.
In [ ]:
# Pandas 
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
# Dask - Error # ddf.rename(columns={'id':'ID'}, inplace=True) # ddf.columns ''' python --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # Dask - Error ----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) 3 ddf.columns TypeError: rename() got an unexpected keyword argument 'inplace' '''
In [ ]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns

Data manipulations

There are several diffrences when manipulating data.

loc - Pandas

In [ ]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)

Error

cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

Dask - use mask/where

In [ ]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
In [ ]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)

For more information see dask mask documentation

Meta argument

One key feature in Dask is the introduction of meta arguement.

meta is the prescription of the names/types of the output from the computation
from stack overflow answer

Since Dask creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.
For additinal information see meta documentation

In [ ]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
In [ ]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)

Introducing meta argument

In [ ]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
In [ ]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
In [ ]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000  
    else:
        return row['y'] * -1
In [ ]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)

Map partitions

  • We can supply an ad-hoc function to run on each partition using the map_partitions method.
    Mainly useful for functions that are not implemented in Dask or Pandas .
  • Finally we can return a new dataframe which needs to be described in the meta argument
    The function could also include arguments.
In [ ]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'                                              
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()

Convert index into Time column

In [ ]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
In [ ]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)

Drop NA on column

In [ ]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1) 
In [ ]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
In [ ]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)

In odrer for Dask to drop a column with all na it must check all the partitions with compute()

In [ ]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)

1.4 Reset Index

In [ ]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
In [ ]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)

Read / Save files

  • When working with pandas and dask preferable use parquet format.
  • When working with Dask - files can be read with multiple workers .
  • Most kwargs are applicable for reading and writing files
    e.g. ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).
  • However some are not available such as nrows.

see documentaion (including the option for output file naming).

Save files

In [ ]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
In [ ]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
In [ ]:
list(output_dir_file.parent.glob('*.csv'))

Notice the '*' to allow for multiple file renaming.

In [ ]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
In [ ]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)

To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions

To change the number of output files use repartition which is an expensive operation.

In [ ]:
ddf.npartitions

Read Multiple files

For pandas it is possible to iterate and concat the files see answer from stack overflow.

In [ ]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f) 
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)
In [ ]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf

Remember that Dask is lazy - thus it does not realy read the file until it needs to...

In [ ]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)

Consider using client.persist()

Since Dask is lazy - it may run the entire graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use persist to keep the results in memory
Additional information can be read in this stackoverflow issue or see an example in this post
This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.

In [ ]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)

Group By - custom aggregations

In addition to the groupby notebook example that is in the repository -
This is another example how to try to eliminate the use of groupby.apply.
In this example we are grouping columns into unique lists.

Pandas

In [ ]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
In [ ]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list()))) 
               for att_col_gr in gp_col]
In [ ]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')      
print(df_edge_att.head(2))
  • Remeber that in any some cases Pandas is more efficiante (assuming that you can load all the data into the RAM).

Dask

In [ ]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
In [ ]:
ddf.columns
In [ ]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) 
               for att_col_gr in gp_col]
In [ ]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)

We can do better...
Using dask custom aggregation is consideribly better

In [ ]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
In [ ]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)  

Debugging

Debugging may be challenging...

  1. Run code without client
  2. Use Dashboard profiler
  3. Verify integrity of DAG

Corrupted DAG

In this example we show that once the DAG is currupted you may need to reset the calculation

In [ ]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
In [ ]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))

Is everything OK?

# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'
  • Even if the function is corrected the DAG is corrupted
In [ ]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
# Still Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

We need to reset the dataframe

In [ ]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)