Scale Scikit-Learn for Small Data Problems

This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.

This video talks demonstrates the same example on a larger cluster.

In [ ]:
from IPython.display import YouTubeVideo

In [ ]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')

Distributed Training

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).

Create Scikit-Learn Pipeline

In [ ]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
In [ ]:
# Scale Up: set categories=None to use all the categories
categories = [

print("Loading 20 newsgroups dataset for categories:")

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))

We'll define a small pipeline that combines text feature extraction with a simple classifier.

In [ ]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),

Grid search over some parameters.

In [ ]:
parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': (0.00001, 0.000001),
    # 'clf__penalty': ('l2', 'elasticnet'),
    # 'clf__n_iter': (10, 50, 80),
In [ ]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False)

To fit this normally, we would write,

That would use the default joblib backend (multiple processes) for parallelism. To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a parallel_backend context.

In [ ]:
import joblib

with joblib.parallel_backend('dask'):,

If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks.

Parallel, Distributed Prediction

Sometimes, you're train on a small dataset, but need to predict for a much larger batch of data. In this case, you'd like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction. dask_ml.wrappers.ParallelPostFit provides exactly that. It's a meta-estimator. It does nothing during training; the underlying estimator (probably a scikit-learn estimator) will probably be in-memory on a single machine. But tasks like predict, score, etc. are parallelized and distributed.

Most of the time, using ParallelPostFit is as simple as wrapping the original estimator. When used inside a GridSearch, you'll need to update the keys of the parameters, just like with any meta-estimator. The only complication comes when using ParallelPostFit with another meta-estimator like GridSearchCV. In this case, you'll need to prefix your parameter names with estimator__.

In [ ]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit

We'll load the small NumPy arrays for training.

In [ ]:
X, y = load_digits(return_X_y=True)
In [ ]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],

grid_search = GridSearchCV(svc, param_grid, cv=3)

And fit as usual.

In [ ]:, y)

We'll simulate a large dask array by replicating the training data a few times. In reality, you would load this from your file system.

In [ ]:
import dask.array as da
In [ ]:
big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)

Operations like predict, or predict_proba return dask, rather than NumPy arrays. When you compute, the work will be done in parallel, out of core or distributed on the cluster.

In [ ]:
predicted = grid_search.predict(big_X)

At this point predicted could be written to disk, or aggregated before returning to the client.