#!/usr/bin/env python # coding: utf-8 # # Dask for Machine Learning # # This is a high-level overview demonstrating some the components of Dask-ML. # Visit the main [Dask-ML](http://ml.dask.org) documentation, see the [dask tutorial](https://github.com/dask/dask-tutorial) notebook 08, or explore some of the other machine-learning examples. # In[ ]: from dask.distributed import Client, progress client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB') client # ## Distributed Training # # # # Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation. # # Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your *cluster* without significantly changing your code. # # This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below). # ### Create Scikit-Learn Estimator # In[ ]: from sklearn.datasets import make_classification from sklearn.svm import SVC from sklearn.model_selection import GridSearchCV import pandas as pd # We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`. # In[ ]: X, y = make_classification(n_samples=1000, random_state=0) X[:5] # We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $C$ hyperparameter. # In[ ]: param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0], "kernel": ['rbf', 'poly', 'sigmoid'], "shrinking": [True, False]} grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True), param_grid=param_grid, return_train_score=False, cv=3, n_jobs=-1) # To fit that normally, we would call # # ```python # grid_search.fit(X, y) # ``` # # To fit it using the cluster, we just need to use a context manager provided by joblib. # In[ ]: import joblib with joblib.parallel_backend('dask'): grid_search.fit(X, y) # We fit 48 different models, one for each hyper-parameter combination in `param_grid`, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc. # In[ ]: pd.DataFrame(grid_search.cv_results_).head() # In[ ]: grid_search.predict(X)[:5] # In[ ]: grid_search.score(X, y) # For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html). # ## Training on Large Datasets # # Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms. # # All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a [dask array](http://dask.pydata.org/en/latest/array.html) or [dataframe](http://dask.pydata.org/en/latest/dataframe.html). # In[ ]: get_ipython().run_line_magic('matplotlib', 'inline') # In[ ]: import dask_ml.datasets import dask_ml.cluster import matplotlib.pyplot as plt # In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays. # In[ ]: X, y = dask_ml.datasets.make_blobs(n_samples=10000000, chunks=1000000, random_state=0, centers=3) X = X.persist() X # We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: "k-means parallel") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel. # In[ ]: km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10) km.fit(X) # We'll plot a sample of points, colored by the cluster each falls into. # In[ ]: fig, ax = plt.subplots() ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000], cmap='viridis', alpha=0.25); # For all the estimators implemented in Dask-ML, see the [API documentation](http://dask-ml.readthedocs.io/en/latest/modules/api.html).