#!/usr/bin/env python # coding: utf-8 # In[1]: get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # In[2]: import sys sys.path.append("/home/argenisleon/Optimus/") # In[3]: from optimus import Optimus # In[4]: op= Optimus("dask-cudf", n_workers=4, threads_per_worker=2, processes=False, memory_limit="3G", comm=True, verbose= True) # In[5]: df = op.load.csv("data/crime.csv", sep=",", header=True, infer_schema='false', null_value="None", charset="latin1") # In[6]: df = df.repartition(npartitions=1) # In[7]: df = df.persist() # In[8]: df.dtypes # In[9]: num_cols = ["OFFENSE_CODE","YEAR", "MONTH", "HOUR"] # cc= ["INCIDENT_NUMBER","OFFENSE_CODE_GROUP","OFFENSE_DESCRIPTION","DISTRICT","REPORTING_AREA","REPORTING_AREA","SHOOTING","OCCURRED_ON_DATE","DAY_OF_WEEK","UCR_PART","STREET","Location"] # In[10]: import numpy as np bins = np.array([-30, 0, 3, 6, 9, 10, 19, 50]) # ### Using digitize # In[11]: import dask import cudf # def histogram(ser, bins): # print(type(ser)) # binned = ser.digitize(bins, right=False) # vc = binned.value_counts().sort_index() # return vc, cudf.Series(bins) def hist_serie(serie, buckets): def func(_serie): binned = _serie.digitize(bins, right=False) vc = binned.value_counts().sort_index() return vc, bins return dask.delayed(func)(serie) delayed_tasks =[] for num_col in num_cols: delayed_tasks.append(hist_serie(df[num_col],10)) get_ipython().run_line_magic('time', 'dask.compute(*delayed_tasks)') # ### Using Cupy # In[17]: import cupy as cp import dask def hist_serie(serie, buckets): def func(_serie): arr = cp.fromDlpack(_serie.to_dlpack()) return cp.histogram(arr, buckets) return dask.delayed(func)(serie) # In[18]: delayed_tasks =[] for num_col in num_cols: delayed_tasks.append(hist_serie(df[num_col],10)) get_ipython().run_line_magic('time', 'print(dask.compute(*delayed_tasks))') # ### Using Optimus # In[19]: get_ipython().run_cell_magic('time', '', 'df.cols.hist(num_cols)\n') # In[ ]: