Live Notebook

You can run this notebook in a live session or view it on Github.

# Text Vectorization Pipeline¶

This example illustrates how Dask-ML can be used to classify large textual datasets in parallel. It is adapted from this scikit-learn example.

The primary differences are that

• We fit the entire model, including text vectorization, as a pipeline.

• We use dask collections like Dask Bag, Dask Dataframe, and Dask Array rather than generators to work with larger than memory datasets.

[1]:

from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client

/usr/share/miniconda3/envs/dask-examples/lib/python3.8/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37341 instead
warnings.warn(

[1]:


### Cluster

• Workers: 2
• Cores: 4
• Memory: 4.00 GB

## Fetch the data¶

Scikit-Learn provides a utility to fetch the newsgroups dataset.

[2]:

import sklearn.datasets

bunch = sklearn.datasets.fetch_20newsgroups()


The data from scikit-learn isn’t too large, so the data is just returned in memory. Each document is a string. The target we’re predicting is an integer, which codes the topic of the post.

We’ll load the documents and targets directly into a dask DataFrame. In practice, on a larger than memory dataset, you would likely load the documents from disk or cloud storage using dask.bag or dask.delayed.

[3]:

import dask.dataframe as dd
import pandas as pd

df = dd.from_pandas(pd.DataFrame({"text": bunch.data, "target": bunch.target}),
npartitions=25)

df

[3]:

text target
npartitions=25
0 object int64
453 ... ...
... ... ...
10872 ... ...
11313 ... ...

Each row in the text column has a bit of metadata and the full text of a post.

[4]:

print(df.head().loc[0, 'text'][:500])

From: [email protected] (where's my thing)
Subject: WHAT car is this!?
Nntp-Posting-Host: rac3.wam.umd.edu
Organization: University of Maryland, College Park
Lines: 15

I was wondering if anyone out there could enlighten me on this car I saw
the other day. It was a 2-door sports car, looked to be from the late 60s/
early 70s. It was called a Bricklin. The doors were really small. In addition,
the front bumper was separate from the rest of the body. This is
all I know. If anyone can tellme a m


## Feature Hashing¶

Dask’s HashingVectorizer provides a similar API to scikit-learn’s implementation. In fact, Dask-ML’s implementation uses scikit-learn’s, applying it to each partition of the input dask.dataframe.Series or dask.bag.Bag.

Transformation, once we actually compute the result, happens in parallel and returns a dask Array.

[5]:

import dask_ml.feature_extraction.text

X = vect.fit_transform(df['text'])
X

[5]:

 Array Chunk (nan, 1048576) (nan, 1048576) 75 Tasks 25 Chunks float64 scipy.csr_matrix

The output array X has unknown chunk sizes becase the input dask Series or Bags don’t know their own length.

Each block in X is a scipy.sparse matrix.

[6]:

X.blocks[0].compute()

[6]:

<453x1048576 sparse matrix of type '<class 'numpy.float64'>'
with 64357 stored elements in Compressed Sparse Row format>


This is a document-term matrix. Each row is the hashed representation of the original post.

## Classification Pipeline¶

We can combine the HashingVectorizer with Incremental and a classifier like scikit-learn’s SGDClassifier to create a classification pipeline.

We’ll predict whether the topic was in the comp category.

[7]:

bunch.target_names

[7]:

['alt.atheism',
'comp.graphics',
'comp.os.ms-windows.misc',
'comp.sys.ibm.pc.hardware',
'comp.sys.mac.hardware',
'comp.windows.x',
'misc.forsale',
'rec.autos',
'rec.motorcycles',
'rec.sport.baseball',
'rec.sport.hockey',
'sci.crypt',
'sci.electronics',
'sci.med',
'sci.space',
'soc.religion.christian',
'talk.politics.guns',
'talk.politics.mideast',
'talk.politics.misc',
'talk.religion.misc']

[8]:

import numpy as np

positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]
y = df['target'].isin(positive).astype(int)
y

[8]:

Dask Series Structure:
npartitions=25
0        int64
453        ...
...
10872      ...
11313      ...
Name: target, dtype: int64

[9]:

import numpy as np
import sklearn.linear_model
import sklearn.pipeline



Because the input comes from a dask Series, with unknown chunk sizes, we need to specify assume_equal_chunks=True. This tells Dask-ML that we know that each partition in X matches a partition in y.

[10]:

sgd = sklearn.linear_model.SGDClassifier(
tol=1e-3
)
sgd, scoring='accuracy', assume_equal_chunks=True
)
pipe = sklearn.pipeline.make_pipeline(vect, clf)


SGDClassifier.partial_fit needs to know the full set of classes up front. Because our sgd is wrapped inside an Incremental, we need to pass it through as the incremental__classes keyword argument in fit.

[11]:

pipe.fit(df['text'], y,
incremental__classes=[0, 1]);


As usual, Incremental.predict lazily returns the predictions as a dask Array.

[12]:

predictions = pipe.predict(df['text'])
predictions

[12]:

 Array Chunk unknown unknown (nan,) (nan,) 100 Tasks 25 Chunks int64 numpy.ndarray

We can compute the predictions and score in parallel with dask_ml.metrics.accuracy_score.

[13]:

dask_ml.metrics.accuracy_score(y, predictions)

[13]:

0.9642920275764539


This simple combination of a HashingVectorizer and SGDClassifier is pretty effective at this prediction task.