Text Vectorization Pipeline
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Text Vectorization Pipeline¶
This example illustrates how Dask-ML can be used to classify large textual datasets in parallel. It is adapted from this scikit-learn example.
The primary differences are that
We fit the entire model, including text vectorization, as a pipeline.
We use dask collections like Dask Bag, Dask Dataframe, and Dask Array rather than generators to work with larger than memory datasets.
[1]:
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client
[1]:
Client
Client-94cfa4ae-0de1-11ed-a521-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
0f48ff5b
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 4 | Total memory: 3.73 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-df50e6c2-504c-458c-904f-d9354f469039
Comm: tcp://127.0.0.1:35661 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 3.73 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:46267 | Total threads: 2 |
Dashboard: http://127.0.0.1:41243/status | Memory: 1.86 GiB |
Nanny: tcp://127.0.0.1:38595 | |
Local directory: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-imhji18u |
Worker: 1
Comm: tcp://127.0.0.1:44477 | Total threads: 2 |
Dashboard: http://127.0.0.1:42527/status | Memory: 1.86 GiB |
Nanny: tcp://127.0.0.1:34731 | |
Local directory: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-r6iwv690 |
Fetch the data¶
Scikit-Learn provides a utility to fetch the newsgroups dataset.
[2]:
import sklearn.datasets
bunch = sklearn.datasets.fetch_20newsgroups()
The data from scikit-learn isn’t too large, so the data is just returned in memory. Each document is a string. The target we’re predicting is an integer, which codes the topic of the post.
We’ll load the documents and targets directly into a dask DataFrame. In practice, on a larger than memory dataset, you would likely load the documents from disk or cloud storage using dask.bag
or dask.delayed
.
[3]:
import dask.dataframe as dd
import pandas as pd
df = dd.from_pandas(pd.DataFrame({"text": bunch.data, "target": bunch.target}),
npartitions=25)
df
[3]:
text | target | |
---|---|---|
npartitions=25 | ||
0 | object | int64 |
453 | ... | ... |
... | ... | ... |
10872 | ... | ... |
11313 | ... | ... |
Each row in the text
column has a bit of metadata and the full text of a post.
[4]:
print(df.head().loc[0, 'text'][:500])
From: [email protected] (where's my thing)
Subject: WHAT car is this!?
Nntp-Posting-Host: rac3.wam.umd.edu
Organization: University of Maryland, College Park
Lines: 15
I was wondering if anyone out there could enlighten me on this car I saw
the other day. It was a 2-door sports car, looked to be from the late 60s/
early 70s. It was called a Bricklin. The doors were really small. In addition,
the front bumper was separate from the rest of the body. This is
all I know. If anyone can tellme a m
Feature Hashing¶
Dask’s HashingVectorizer provides a similar API to scikit-learn’s implementation. In fact, Dask-ML’s implementation uses scikit-learn’s, applying it to each partition of the input dask.dataframe.Series
or dask.bag.Bag
.
Transformation, once we actually compute the result, happens in parallel and returns a dask Array.
[5]:
import dask_ml.feature_extraction.text
vect = dask_ml.feature_extraction.text.HashingVectorizer()
X = vect.fit_transform(df['text'])
X
[5]:
|
The output array X
has unknown chunk sizes becase the input dask Series or Bags don’t know their own length.
Each block in X
is a scipy.sparse
matrix.
[6]:
X.blocks[0].compute()
[6]:
<453x1048576 sparse matrix of type '<class 'numpy.float64'>'
with 64357 stored elements in Compressed Sparse Row format>
This is a document-term matrix. Each row is the hashed representation of the original post.
Classification Pipeline¶
We can combine the HashingVectorizer with Incremental and a classifier like scikit-learn’s SGDClassifier
to create a classification pipeline.
We’ll predict whether the topic was in the comp
category.
[7]:
bunch.target_names
[7]:
['alt.atheism',
'comp.graphics',
'comp.os.ms-windows.misc',
'comp.sys.ibm.pc.hardware',
'comp.sys.mac.hardware',
'comp.windows.x',
'misc.forsale',
'rec.autos',
'rec.motorcycles',
'rec.sport.baseball',
'rec.sport.hockey',
'sci.crypt',
'sci.electronics',
'sci.med',
'sci.space',
'soc.religion.christian',
'talk.politics.guns',
'talk.politics.mideast',
'talk.politics.misc',
'talk.religion.misc']
[8]:
import numpy as np
positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]
y = df['target'].isin(positive).astype(int)
y
[8]:
Dask Series Structure:
npartitions=25
0 int64
453 ...
...
10872 ...
11313 ...
Name: target, dtype: int64
Dask Name: astype, 101 tasks
[9]:
import numpy as np
import sklearn.linear_model
import sklearn.pipeline
import dask_ml.wrappers
Because the input comes from a dask Series, with unknown chunk sizes, we need to specify assume_equal_chunks=True
. This tells Dask-ML that we know that each partition in X
matches a partition in y
.
[10]:
sgd = sklearn.linear_model.SGDClassifier(
tol=1e-3
)
clf = dask_ml.wrappers.Incremental(
sgd, scoring='accuracy', assume_equal_chunks=True
)
pipe = sklearn.pipeline.make_pipeline(vect, clf)
SGDClassifier.partial_fit
needs to know the full set of classes up front. Because our sgd
is wrapped inside an Incremental
, we need to pass it through as the incremental__classes
keyword argument in fit
.
[11]:
pipe.fit(df['text'], y,
incremental__classes=[0, 1]);
As usual, Incremental.predict
lazily returns the predictions as a dask Array.
[12]:
predictions = pipe.predict(df['text'])
predictions
[12]:
|
We can compute the predictions and score in parallel with dask_ml.metrics.accuracy_score
.
[13]:
dask_ml.metrics.accuracy_score(y, predictions)
[13]:
0.950150256319604
This simple combination of a HashingVectorizer and SGDClassifier is pretty effective at this prediction task.