#!/usr/bin/env python # coding: utf-8 #
# # # ## [mlcourse.ai](https://mlcourse.ai) – Open Machine Learning Course # ###
Author: Irina Knyazeva, ODS Slack nickname : iknyazeva # # ##
Tutorial # ###
"HANDLE DIFFERENT DATASET WITH DASK AND TRYING A LITTLE DASK ML" # ## WHY DO I NEED DASK? # # Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets. # # ## YOU DEFINITELY NEED DASK IF # if problem size close to limits of RAM, but fits to disk # # # ## Reading list # This notebook based mainly based on this three sources # # - [One more tutorial from analytics vidhya](https://www.analyticsvidhya.com/blog/2018/08/dask-big-datasets-machine_learning-python/) # - [taken from towardsdatascience](https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915) # # - [DataCamp course](https://campus.datacamp.com/courses/parallel-computing-with-dask/) # # - [Dask documentation](https://docs.dask.org/en/latest/) # # # # In[ ]: import gc import os import time import warnings import numpy as np import pandas as pd import psutil from dask import delayed warnings.filterwarnings("ignore") # Let's write a little function for tracking memory that takes python process # In[ ]: def memory_footprint(): mem = psutil.Process(os.getpid()).memory_info().rss return mem / 1024 ** 2 # In[ ]: before = memory_footprint() print(f"Memory used before is {round(before,2)} MB") # In[ ]: N = (1024 ** 2) // 8 x = np.random.randn(50 * N) after = memory_footprint() print(f"Memory used after is {round(after,2)} MB") # Computes, but doesn't bind result to a variable allocate extra memory # In[ ]: x ** 2 after1 = memory_footprint() print(f" Extra memory obtained after computation {round(after1,2)} MB") # ## Dask arrays # # Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.[dask array documentation](http://docs.dask.org/en/latest/array.html) # # # #
# # In dask there is three main structures: dask array (based on numpy array), dask dataframe (based on pandas dataframe) and dask bags (for unstructured data as text). # In[ ]: import dask.array as da y = da.from_array(x, chunks=len(x) // 4) print("Dask arrays require little memory:", memory_footprint() - after1) # In[ ]: import time t_start = time.time() x.mean() t_end = time.time() print("Compute mean value of this numpy array \n") print( "Elapsed time for compute mean of numpy array (ms):", round((t_end - t_start) * 1000), ) # In[ ]: t_start = time.time() y.mean().compute() t_end = time.time() print("Compute the same with dask \n") print( "Elapsed time for compute mean of dask array (ms):", round((t_end - t_start) * 1000) ) # Actually, this example will never be used in practice, because if your numpy already in memory, any partitioning will always raise computational time. But if you need to process data from HDF5, NetCDF or bulk of numpy files from disk it could be extremely useful # ## Delayed operations with dask # # But dask could be useful for small data with delayed computation. It could easily parallelize computation. Let's see the example with our previous numpy array # In[ ]: def f(z): return np.sqrt(z + 4) def g(y): return y - 3 def h(x): return x ** 2 time_start = time.time() x = np.random.randn(50 * N) y = h(x) z = g(x) w = f(z + y) time_end = time.time() print( "Elapsed time for compute complex functions with numpy array (ms):", round((time_end - time_start) * 1000), ) # In[ ]: y = delayed(h)(x) z = delayed(g)(x) w = delayed(f)(z + y) print("After we get dask delayed object", w) time_start = time.time() w.compute() time_end = time.time() print( "Elapsed time for compute complex functions with numpy array with dask delayed (ms):", round((time_end - time_start) * 1000), ) # It is easily understood why computation time decreased with the computational graph. Let's do this with the second way of introducing delay functions # In[ ]: @delayed def f(z): return np.sqrt(z + 4) @delayed def g(y): return y - 3 @delayed def h(x): return x ** 2 y = h(x) z = g(x) w = f(z + y) w.visualize() # ## Dask dataframe # # Dask DataFrames coordinate many Pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These Pandas objects may live on disk or on other machines. # (See documentation)[http://docs.dask.org/en/latest/dataframe.html] # #
# = 7) & (hour <= 11)).astype("float64") df["day"] = ((hour >= 12) & (hour <= 18)).astype("int") df["evening"] = ((hour >= 19) & (hour <= 23)).astype("int") df["night"] = ((hour >= 0) & (hour <= 6)).astype("int") df["sin_hour"] = np.sin(2 * np.pi * df["hour"] / 24) df["cos_hour"] = np.cos(2 * np.pi * df["hour"] / 24) df = df.drop(["hour"], axis=1) day_of_week = df["published"].dt.dayofweek.astype("int") df["day_of_week"] = day_of_week df["weekend"] = (day_of_week >= 5).astype("int") # turn to categorical df[to_cat_cols] = df[to_cat_cols].astype("category") return df # In[ ]: get_ipython().run_cell_magic('time', '', 'df_medium_train = additional_time_features_df(df.copy())\n') # In[ ]: dd_medium_train = additional_time_features_df(dd_no_content) # In[ ]: get_ipython().run_cell_magic('time', '', 'dd_medium_train.compute()\n') # ## Dask ML # # Dask ML provides scalable machine learning algorithms in python which are compatible with scikit-learn. Let us first understand how scikit-learn handles the computations and then we will look at how Dask performs these operations differently. See dask-ml tutorials: [Examples from dask ml](http://ml.dask.org/examples.html) # # You need to install dask-ml at first # # There are two main parts in dask ml: # - approaches to handle big datasets # - approaches to handle big models # ### Handle big model with dask distributed # The biggest model from our course was a random forest on text data in the week with Random Forest assignment. Below I just reproduce part of our assignment, but I reduced nrows and max features in Count vectorizer, but you can check with original parameters. # # Here we use the [`movie_reviews_train.csv`](https://drive.google.com/file/d/1WDz3EB0MMuQUuUTwZ30c4JJrN8d9shAW/view?usp=sharing) file. # In[ ]: # Download data df = pd.read_csv("../../data/movie_reviews_train.csv", nrows=5000) # Split data to train and test X_text = df["text"] y_text = df["label"] # Classes counts df.label.value_counts() # In[ ]: from sklearn.feature_extraction.text import CountVectorizer from sklearn.linear_model import LogisticRegression from sklearn.model_selection import GridSearchCV, StratifiedKFold from sklearn.pipeline import Pipeline # Split on 3 folds skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=17) # In Pipeline we will modify the text and train logistic regression classifier = Pipeline( [ ("vectorizer", CountVectorizer(max_features=500, ngram_range=(1, 3))), ("clf", LogisticRegression(random_state=17)), ] ) # In[ ]: get_ipython().run_cell_magic('time', '', 'parameters = {"clf__C": (0.1, 1, 10, 100)}\ngrid_search = GridSearchCV(classifier, parameters, scoring="roc_auc", cv=skf)\ngrid_search = grid_search.fit(X_text, y_text)\n') # In[ ]: grid_search.best_score_ # ### Replace joblib with dask # # In this approach all we need to do is replace joblib to dask distributed. We need to initialize distributed client, and change backend # In[ ]: from dask.distributed import Client get_ipython().run_line_magic('%time', '') from sklearn.externals import joblib client = Client() parameters = {"clf__C": (0.1, 1, 10, 100)} grid_search = GridSearchCV(classifier, parameters, scoring="roc_auc", cv=skf) t_start = time.time() with joblib.parallel_backend("dask"): grid_search.fit(X_text, y_text) t_end = time.time() print("Elapsed time for grid_search with joblib replace (s):", round((t_end - t_start))) # In[ ]: grid_search.best_score_ # ### Replace Grid search with dask # Parallel to Gridsearch CV in sklearn, Dask provides a library called Dask-search CV (Dask-search CV is now included in Dask ML). It merges steps so that there are less repetitions. Below are the installation steps for Dask-search. We need to install it separately # In[ ]: # pip3 install dask-searchcv import dask_searchcv as dcv # We can use a pipelines in dask grid search, and according the documentation we should use dask with pipelines with many opeations which could be parallelized, especially included feature union, but I've tried and get an error as a result... Anyway time consuming operations as CountVectorizer couldn't be parallelized, so here gridsearch from dask only for classifier [documentation](https://dask-searchcv.readthedocs.io/en/latest/). # In[ ]: get_ipython().run_cell_magic('time', '', 'vect = CountVectorizer(max_features=500, ngram_range=(1, 3))\nXvect = vect.fit_transform(X_text)\n') # In[ ]: lr = LogisticRegression() parameters = {"C": (0.1, 1, 10, 100)} t_start = time.time() grid_search = dcv.GridSearchCV(lr, parameters, scoring="roc_auc", cv=skf) grid_search.fit(Xvect, y_text) t_end = time.time() print( f"Elapsed time for grid_search (without time spended to vectorization) {round((t_end - t_start))} (s):" ) # In[ ]: grid_search.best_score_ # I tried to see how good dask will be with random forest with original parameters, but sometimes this raise en error get "(OSError: [Errno 24] Too many open files) after execution, and I couldn't fix it...." Sometimes it works ok, for small data it works in most cases, but if you re-run this notebook several times there is a big chance to get such an error. So, I believe that dask-ml very usefull, but for know I definitely don't know how it should be used properly. # In[ ]: from sklearn.ensemble import RandomForestClassifier rf = RandomForestClassifier(random_state=17) min_samples_leaf = [1, 2, 3] max_features = [0.3, 0.5, 0.7] max_depth = [None] parameters = { "max_features": max_features, "min_samples_leaf": min_samples_leaf, "max_depth": max_depth, } grid_search = dcv.GridSearchCV(rf, parameters, scoring="roc_auc", cv=skf) t_start = time.time() grid_search.fit(Xvect, y_text) t_end = time.time() print( f"Elapsed time for dask grid_search for Random Forest {round((t_end - t_start))} (s):" ) # ### Handle model with big data # There are number of models rewritten in dask, which could take dask object (huge arrays) and compute models on them. You could read more in dask documentation. Below an example with KMeans, but also there are dask version of linear models, processing functions. The notation is very similar to scikit-learn, and it should be easy to use. # In[ ]: from dask_ml import datasets from dask_ml.cluster import KMeans # In[ ]: X, y = datasets.make_blobs( n_samples=10000000, chunks=1000000, random_state=0, centers=3 ) # Persist will give you back a lazy dask.delayed object X = X.persist() X # In[ ]: km = KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10) km.fit(X) # Actually I read the article about dask couple of days ago and I've decided that task with tutorial a good way to get acquainted with the library. So I ask you not to be very strict if I misunderstood something:))