#!/usr/bin/env python
# coding: utf-8
# # Dask for Machine Learning
#
# This is a high-level overview demonstrating some the components of Dask-ML.
# Visit the main [Dask-ML](http://ml.dask.org) documentation, see the [dask tutorial](https://github.com/dask/dask-tutorial) notebook 08, or explore some of the other machine-learning examples.
# In[ ]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
n_workers=1, memory_limit='2GB')
client
# ## Distributed Training
#
#
#
# Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.
#
# Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your *cluster* without significantly changing your code.
#
# This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).
# ### Create Scikit-Learn Estimator
# In[ ]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd
# We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`.
# In[ ]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]
# We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $C$ hyperparameter.
# In[ ]:
param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
"kernel": ['rbf', 'poly', 'sigmoid'],
"shrinking": [True, False]}
grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
param_grid=param_grid,
return_train_score=False,
cv=3,
n_jobs=-1)
# To fit that normally, we would call
#
# ```python
# grid_search.fit(X, y)
# ```
#
# To fit it using the cluster, we just need to use a context manager provided by joblib.
# In[ ]:
import joblib
with joblib.parallel_backend('dask'):
grid_search.fit(X, y)
# We fit 48 different models, one for each hyper-parameter combination in `param_grid`, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.
# In[ ]:
pd.DataFrame(grid_search.cv_results_).head()
# In[ ]:
grid_search.predict(X)[:5]
# In[ ]:
grid_search.score(X, y)
# For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html).
# ## Training on Large Datasets
#
# Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.
#
# All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a [dask array](http://dask.pydata.org/en/latest/array.html) or [dataframe](http://dask.pydata.org/en/latest/dataframe.html).
# In[ ]:
get_ipython().run_line_magic('matplotlib', 'inline')
# In[ ]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt
# In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays.
# In[ ]:
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
chunks=1000000,
random_state=0,
centers=3)
X = X.persist()
X
# We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: "k-means parallel") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel.
# In[ ]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)
# We'll plot a sample of points, colored by the cluster each falls into.
# In[ ]:
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
cmap='viridis', alpha=0.25);
# For all the estimators implemented in Dask-ML, see the [API documentation](http://dask-ml.readthedocs.io/en/latest/modules/api.html).