Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Scale Scikit-Learn for Small Data Problems

This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem. We’ll fit a large model, a grid-search over many hyper-parameters, on a small dataset.

This video talks demonstrates the same example on a larger cluster.

[1]:
from IPython.display import YouTubeVideo

YouTubeVideo("5Zf6DQaf7jk")
[1]:
[2]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client
[2]:

Client

Client-7bcf1e8b-0de1-11ed-a455-000d3a8f7959

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Distributed Training

f1aeecbb269b4feba6c9dd10e300ed38 709cf32990b14a4d8131246bdba79bb8

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn’t helpful. For too large datasets (larger than a single machine’s memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets).

Create Scikit-Learn Pipeline

[3]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
[4]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
Loading 20 newsgroups dataset for categories:
['alt.atheism', 'talk.religion.misc']
857 documents
2 categories

We’ll define a small pipeline that combines text feature extraction with a simple classifier.

[5]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

Parallel, Distributed Prediction

Sometimes, you’re train on a small dataset, but need to predict for a much larger batch of data. In this case, you’d like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction. `dask_ml.wrappers.ParallelPostFit <http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit>`__ provides exactly that. It’s a meta-estimator. It does nothing during training; the underlying estimator (probably a scikit-learn estimator) will probably be in-memory on a single machine. But tasks like predict, score, etc. are parallelized and distributed.

Most of the time, using ParallelPostFit is as simple as wrapping the original estimator. When used inside a GridSearch, you’ll need to update the keys of the parameters, just like with any meta-estimator. The only complication comes when using ParallelPostFit with another meta-estimator like GridSearchCV. In this case, you’ll need to prefix your parameter names with estimator__.

[9]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit

We’ll load the small NumPy arrays for training.

[10]:
X, y = load_digits(return_X_y=True)
X.shape
[10]:
(1797, 64)
[11]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],
}

grid_search = GridSearchCV(svc, param_grid, cv=3)

And fit as usual.

[12]:
grid_search.fit(X, y)
[12]:
GridSearchCV(cv=3, estimator=ParallelPostFit(estimator=SVC(random_state=0)),
             param_grid={'estimator__C': [0.01, 1.0, 10]})

We’ll simulate a large dask array by replicating the training data a few times. In reality, you would load this from your file system.

[13]:
import dask.array as da
[14]:
big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)
])
big_X
[14]:
Array Chunk
Bytes 8.77 MiB 898.50 kiB
Shape (17970, 64) (1797, 64)
Count 11 Tasks 10 Chunks
Type float64 numpy.ndarray
64 17970

Operations like predict, or predict_proba return dask, rather than NumPy arrays. When you compute, the work will be done in parallel, out of core or distributed on the cluster.

[15]:
predicted = grid_search.predict(big_X)
predicted
[15]:
Array Chunk
Bytes 140.39 kiB 14.04 kiB
Shape (17970,) (1797,)
Count 21 Tasks 10 Chunks
Type int64 numpy.ndarray
17970 1

At this point predicted could be written to disk, or aggregated before returning to the client.