You can run this notebook in a live session or view it on Github.
Scale ScikitLearn for Small Data Problems¶
This example demonstrates how Dask can scale scikitlearn to a cluster of machines for a CPUbound problem. We’ll fit a large model, a gridsearch over many hyperparameters, on a small dataset.
This video talks demonstrates the same example on a larger cluster.
[1]:
from IPython.display import HTML
HTML("""<iframe width="560" height="315" src="https://www.youtube.com/embed/5Zf6DQaf7jk" frameborder="0" allow="autoplay; encryptedmedia" allowfullscreen> </iframe>""")
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/IPython/core/display.py:689: UserWarning: Consider using IPython.display.IFrame instead
warnings.warn("Consider using IPython.display.IFrame instead")
[1]:
[2]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
n_workers=1, memory_limit='2GB')
client
[2]:
Client

Cluster

Distributed Training¶
Scikitlearn uses joblib for singlemachine parallelism. This lets you train most estimators (anything that accepts an n_jobs
parameter) using all the cores of your laptop or workstation.
Alternatively, ScikitLearn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code.
This is most useful for training large models on mediumsized datasets. You may have a large model when searching over many hyperparameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that clusterwide parallelism isn’t helpful. For too large datasets (larger than a single machine’s memory), the scikitlearn estimators may not be able to cope (though DaskML provides other ways for working with larger than memory datasets).
Create ScikitLearn Pipeline¶
[3]:
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
[4]:
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
Downloading 20news dataset. This may take a few minutes.
Downloading dataset from https://ndownloader.figshare.com/files/5975967 (14 MB)
Loading 20 newsgroups dataset for categories:
['alt.atheism', 'talk.religion.misc']
857 documents
2 categories
We’ll define a small pipeline that combines text feature extraction with a simple classifier.
[5]:
pipeline = Pipeline([
('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
Define Grid for Parameter Search¶
Grid search over some parameters.
[6]:
parameters = {
'vect__max_df': (0.5, 0.75, 1.0),
#'vect__max_features': (None, 5000, 10000, 50000),
'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams
#'tfidf__use_idf': (True, False),
#'tfidf__norm': ('l1', 'l2'),
# 'clf__alpha': (0.00001, 0.000001),
# 'clf__penalty': ('l2', 'elasticnet'),
#'clf__n_iter': (10, 50, 80),
}
[7]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=1, verbose=1, cv=3, refit=False, iid=False)
To fit this normally, we would write
grid_search.fit(data.data, data.target)
That would use the default joblib backend (multiple processes) for parallelism. To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a parallel_backend
context.
[8]:
from sklearn.externals import joblib
with joblib.parallel_backend('dask'):
grid_search.fit(data.data, data.target)
Fitting 3 folds for each of 6 candidates, totalling 18 fits
[Parallel(n_jobs=1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
/home/travis/miniconda/envs/test/lib/python3.7/sitepackages/sklearn/linear_model/stochastic_gradient.py:183: FutureWarning: max_iter and tol parameters have been added in SGDClassifier in 0.19. If max_iter is set but tol is left unset, the default value for tol in 0.19 and 0.20 will be None (which is equivalent to infinity, so it has no effect) but will change in 0.21 to 1e3. Specify tol to silence this warning.
FutureWarning)
[Parallel(n_jobs=1)]: Done 18 out of 18  elapsed: 23.5s finished
If you had your distributed dashboard open during that fit, you’ll notice that each worker performs some of the fit tasks.
Parallel, Distributed Prediction¶
Sometimes, you’re train on a small dataset, but need to predict for a much larger batch of data. In this case, you’d like your estimator to handle NumPy arrays and pandas DataFrames for training, and dask arrays or DataFrames for prediction. `dask_ml.wrappers.ParallelPostFit
<http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit>`__ provides exactly that. It’s a metaestimator. It does nothing during training; the underlying estimator
(probably a scikitlearn estimator) will probably be inmemory on a single machine. But tasks like predict
, score
, etc. are parallelized and distributed.
Most of the time, using ParallelPostFit
is as simple as wrapping the original estimator. When used inside a GridSearch, you’ll need to update the keys of the parameters, just like with any metaestimator. The only complication comes when using ParallelPostFit
with another metaestimator like GridSearchCV
. In this case, you’ll need to prefix your parameter names with estimator__
.
[9]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit
We’ll load the small NumPy arrays for training.
[10]:
X, y = load_digits(return_X_y=True)
X.shape
[10]:
(1797, 64)
[11]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))
param_grid = {
# use estimator__param instead of param
'estimator__C': [0.01, 1.0, 10],
}
grid_search = GridSearchCV(svc, param_grid, iid=False, cv=3)
And fit as usual.
[12]:
grid_search.fit(X, y)
[12]:
GridSearchCV(cv=3, error_score='raisedeprecating',
estimator=ParallelPostFit(estimator=SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
max_iter=1, probability=False, random_state=0, shrinking=True,
tol=0.001, verbose=False),
scoring=None),
fit_params=None, iid=False, n_jobs=None,
param_grid={'estimator__C': [0.01, 1.0, 10]},
pre_dispatch='2*n_jobs', refit=True, return_train_score='warn',
scoring=None, verbose=0)
We’ll simulate a large dask array by replicating the training data a few times. In reality, you would load this from your file system.
[13]:
import dask.array as da
[14]:
big_X = da.concatenate([
da.from_array(X, chunks=X.shape)
for _ in range(10)
])
big_X
[14]:
dask.array<concatenate, shape=(17970, 64), dtype=float64, chunksize=(1797, 64)>
Operations like predict
, or predict_proba
return dask, rather than NumPy arrays. When you compute, the work will be done in parallel, out of core or distributed on the cluster.
[15]:
predicted = grid_search.predict(big_X)
predicted
[15]:
dask.array<_predict, shape=(17970,), dtype=int64, chunksize=(1797,)>
At this point predicted could be written to disk, or aggregated before returning to the client.