Live Notebook
You can run this notebook in a live session or view it on Github.
Blockwise Ensemble Methods¶
DaskML provides some ensemble methods that are tailored to dask.array
’s and dask.dataframe
’s blocked structure. The basic idea is to fit a copy of some subestimator to each block (or partition) of the dask Array or DataFrame. Becuase each block fits in memory, the subestimator only needs to handle inmemory data structures like a NumPy array or pandas DataFrame. It also will be relatively fast, since each block
fits in memory and we won’t need to move large amounts of data between workers on a cluster. We end up with an ensemble of models: one per block in the training dataset.
At prediction time, we combine the results from all the models in the ensemble. For regression problems, this means averaging the predictions from each subestimator. For classification problems, each subestimator votes and the results are combined. See https://scikitlearn.org/stable/modules/ensemble.html#votingclassifier for details on how they can be combeind. See https://scikitlearn.org/stable/modules/ensemble.html for a general overview of why averaging ensemble methods can be useful.
It’s crucially important that the distribution of values in your dataset be relatively uniform across partitions. Otherwise the parameters learned on any given partition of the data will be poor for the dataset as a whole. This will be shown in detail later.
Let’s randomly generate an example dataset. In practice, you would load the data from storage. We’ll create a dask.array
with 10 blocks.
[1]:
from distributed import Client
import dask_ml.datasets
import dask_ml.ensemble
client = Client(n_workers=4, threads_per_worker=1)
X, y = dask_ml.datasets.make_classification(n_samples=1_000_000,
n_informative=10,
shift=2, scale=2,
chunks=100_000)
X
[1]:

Classification¶
The subestimator
should be an instantiated scikitlearnAPI compatible estimator (anything that implements the fit
/ predict
API, including pipelines). It only needs to handle inmemory datasets. We’ll use sklearn.linear_model.RidgeClassifier
.
To get the output shapes right, we require that you provide the classes
for classification problems, either when creating the estimator or in .fit
if the subestimator also requires the classes.
[2]:
import sklearn.linear_model
subestimator = sklearn.linear_model.RidgeClassifier(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
subestimator,
classes=[0, 1]
)
clf
[2]:
BlockwiseVotingClassifier(classes=[0, 1],
estimator=RidgeClassifier(random_state=0))
We can train normally. This will independently fit a clone of subestimator
on each partition of X
and y
.
[3]:
clf.fit(X, y)
All of the fitted estimators are available at .estimators_
.
[4]:
clf.estimators_
[4]:
[RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0),
RidgeClassifier(random_state=0)]
These are different estimators! They’ve been trained on separate batches of data and have learned different parameters. We can plot the difference in the learned coef_
of the first two models to visualize this.
[5]:
import matplotlib.pyplot as plt
import numpy as np
[6]:
a = clf.estimators_[0].coef_
b = clf.estimators_[1].coef_
fig, ax = plt.subplots()
ax.bar(np.arange(a.shape[1]), (a  b).ravel())
ax.set(xticks=[], xlabel="Feature", title="Difference in Learned Coefficients");
That said, the assumption backing this entire process is that the distribution of the data is relatively uniform across partitions. The parameters learned by the each member of the ensemble should be relatively similar, and so will give relatively similar predictions when applied to the same data.
When you predict
, the result will have the same chunking pattern as the input array you’re predicting for (which need not match the partitioning of the training data).
[7]:
preds = clf.predict(X)
preds
[7]:

This generates a set of tasks that
Calls
subestimator.predict(chunk)
for each subestimator (10 in our case)Concatenates those predictions together
Somehow averages the predictions to a single overall prediction
We used the default voting="hard"
strategy, which means we just choose the class that had the higest number of votes. If the first two subestimators picked class 0
and the other eight picked class 1
for the first row, the final prediction for that row will be class 1
.
[8]:
preds[:10].compute()
[8]:
array([0, 1, 1, 1, 0, 0, 0, 0, 0, 0])
With voting="soft"
we have access to predict_proba
, as long as the subestimator has a predict_proba
method. These subestimators should be wellcalibrated for the predictions to be meaningful. See probability calibration for more.
[9]:
subestimator = sklearn.linear_model.LogisticRegression(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
subestimator,
classes=[0, 1],
voting="soft"
)
clf.fit(X, y)
[10]:
proba = clf.predict_proba(X)
proba[:5].compute()
[10]:
array([[9.89600376e01, 1.03996237e02],
[1.66692763e03, 9.98333072e01],
[3.84974590e02, 9.61502541e01],
[2.22764484e04, 9.99777236e01],
[8.72003621e01, 1.27996379e01]])
The stages here are similar to the voting="hard"
case. Only now instead of taking the majority vote we average the probabilities predicted by each subestimator.
Regression¶
Regression is quite similar. The primary difference is that there’s no voting; predictions from estimators are always reduced by averaging.
[11]:
X, y = dask_ml.datasets.make_regression(n_samples=1_000_000,
chunks=100_000,
n_features=20)
X
[11]:

[12]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
subestimator,
)
clf.fit(X, y)
[13]:
clf.predict(X)[:5].compute()
[13]:
array([ 43.53366418, 24.11641382, 99.36384051, 118.59701732,
165.12196187])
As usual with DaskML, scoring is done in parallel (and distributed on a cluster if you’re connected to one).
[14]:
clf.score(X, y)
[14]:
1.0
The dangers of nonuniformly distributed data¶
Finally, it must be reemphasized that your data should be uniformly distributed across partitoins prior to using these ensemble methods. If it’s not, then you’re better off just sampling rows from each partition and fitting a single classifer to it. By “uniform” we don’t mean “from a uniform probabillity distribution”. Just that there shouldn’t be a clear perpartition pattern to how the data is distributed.
Let’s demonstrate that with an example. We’ll generate a dataset with a clear trend across partitions. This might represent some nonstationary timeseries, though it can occur in other contexts as well (e.g. on data partitioned by geography, age, etc.)
[15]:
import dask.array as da
import dask.delayed
import sklearn.datasets
[16]:
def clone_and_shift(X, y, i):
X = X.copy()
X += i + np.random.random(X.shape)
y += 25 * (i + np.random.random(y.shape))
return X, y
[17]:
# Make a base dataset that we'll clone and shift
X, y = sklearn.datasets.make_regression(n_features=4, bias=2, random_state=0)
# Clone and shift 10 times, gradually increasing X and y for each partition
Xs, ys = zip(*[dask.delayed(clone_and_shift, nout=2)(X, y, i) for i in range(10)])
Xs = [da.from_delayed(x, shape=X.shape, dtype=X.dtype) for x in Xs]
ys = [da.from_delayed(y_, shape=y.shape, dtype=y.dtype) for y_ in ys]
X2 = da.concatenate(Xs)
y2 = da.concatenate(ys)
Let’s plot a sample of points, coloring by which partition the data came from.
[18]:
fig, ax = plt.subplots()
ax.scatter(X2[::5, 0], y2[::5], c=np.arange(0, len(X2), 5) // 100, cmap="Set1",
label="Partition")
ax.set(xlabel="Feature 0", ylabel="target", title="Nonstationary data (by partition)");
Now let’s fit two estimators:
One
BlockwiseVotingRegressor
on the entire dataset (which fits aLinearRegression
on each partition)One
LinearRegression
on a sample from the entire dataset
[19]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
subestimator,
)
clf.fit(X2, y2)
[20]:
X_sampled, y_sampled = dask.compute(X2[::10], y2[::10])
subestimator.fit(X_sampled, y_sampled)
[20]:
LinearRegression()
Comparing the scores, we find that the sampled dataset performs much better, despite training on less data.
[21]:
clf.score(X2, y2)
[21]:
11.578544746966614
[22]:
subestimator.score(X2, y2)
[22]:
0.08275716387548893
This shows that ensuring your needs to be relatively uniform across partitions. Even including the standard controls to normalize whatever underlying force is generating the nonstationary data (e.g. a time trend compontent or differencing timeseries data, dummy variables for geographic regions, etc) is not sufficient when your dataset is partioned by the nonuniform variable. You would still need to either shuffle your data prior to fitting, or just sample and fit the subestimator on the subsample that fits in memory.