{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Text Vectorization Pipeline\n",
"\n",
"This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n",
"It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n",
"\n",
"The primary differences are that\n",
"\n",
"* We fit the entire model, including text vectorization, as a pipeline.\n",
"* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n",
" rather than generators to work with larger than memory datasets."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:22.574866Z",
"iopub.status.busy": "2021-01-14T10:50:22.574354Z",
"iopub.status.idle": "2021-01-14T10:50:24.844770Z",
"shell.execute_reply": "2021-01-14T10:50:24.845466Z"
}
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"\n",
"Client\n",
"\n",
" | \n",
"\n",
"Cluster\n",
"\n",
" - Workers: 2
\n",
" - Cores: 4
\n",
" - Memory: 4.00 GB
\n",
" \n",
" | \n",
"
\n",
"
"
],
"text/plain": [
""
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, progress\n",
"\n",
"client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fetch the data\n",
"\n",
"Scikit-Learn provides a utility to fetch the newsgroups dataset."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:24.848628Z",
"iopub.status.busy": "2021-01-14T10:50:24.847719Z",
"iopub.status.idle": "2021-01-14T10:50:25.382018Z",
"shell.execute_reply": "2021-01-14T10:50:25.382706Z"
}
},
"outputs": [],
"source": [
"import sklearn.datasets\n",
"\n",
"bunch = sklearn.datasets.fetch_20newsgroups()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The data from scikit-learn isn't *too* large, so the data is just\n",
"returned in memory. Each document is a string. The target we're predicting\n",
"is an integer, which codes the topic of the post.\n",
"\n",
"We'll load the documents and targets directly into a dask DataFrame.\n",
"In practice, on a larger than memory dataset, you would likely load the\n",
"documents from disk or cloud storage using `dask.bag` or `dask.delayed`."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:25.385612Z",
"iopub.status.busy": "2021-01-14T10:50:25.384709Z",
"iopub.status.idle": "2021-01-14T10:50:25.636102Z",
"shell.execute_reply": "2021-01-14T10:50:25.636766Z"
}
},
"outputs": [
{
"data": {
"text/html": [
"Dask DataFrame Structure:
\n",
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" text | \n",
" target | \n",
"
\n",
" \n",
" npartitions=25 | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" object | \n",
" int64 | \n",
"
\n",
" \n",
" 453 | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 10872 | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 11313 | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
"
\n",
"
\n",
"Dask Name: from_pandas, 25 tasks
"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" text target\n",
"npartitions=25 \n",
"0 object int64\n",
"453 ... ...\n",
"... ... ...\n",
"10872 ... ...\n",
"11313 ... ...\n",
"Dask Name: from_pandas, 25 tasks"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"\n",
"df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n",
" npartitions=25)\n",
"\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each row in the `text` column has a bit of metadata and the full text of a post."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:25.639510Z",
"iopub.status.busy": "2021-01-14T10:50:25.638666Z",
"iopub.status.idle": "2021-01-14T10:50:26.000407Z",
"shell.execute_reply": "2021-01-14T10:50:26.000728Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"From: lerxst@wam.umd.edu (where's my thing)\n",
"Subject: WHAT car is this!?\n",
"Nntp-Posting-Host: rac3.wam.umd.edu\n",
"Organization: University of Maryland, College Park\n",
"Lines: 15\n",
"\n",
" I was wondering if anyone out there could enlighten me on this car I saw\n",
"the other day. It was a 2-door sports car, looked to be from the late 60s/\n",
"early 70s. It was called a Bricklin. The doors were really small. In addition,\n",
"the front bumper was separate from the rest of the body. This is \n",
"all I know. If anyone can tellme a m\n"
]
}
],
"source": [
"print(df.head().loc[0, 'text'][:500])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Feature Hashing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n",
"\n",
"Transformation, once we actually compute the result, happens in parallel and returns a dask Array."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.005168Z",
"iopub.status.busy": "2021-01-14T10:50:26.004234Z",
"iopub.status.idle": "2021-01-14T10:50:26.137561Z",
"shell.execute_reply": "2021-01-14T10:50:26.137932Z"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"\n",
" \n",
" | Array | Chunk | \n",
" \n",
" \n",
"\n",
" Shape | (nan, 1048576) | (nan, 1048576) | \n",
" Count | 75 Tasks | 25 Chunks | \n",
" Type | float64 | scipy.csr_matrix | \n",
" \n",
" \n",
" | \n",
"\n",
"\n",
" | \n",
"
\n",
"
"
],
"text/plain": [
"dask.array<_transformer, shape=(nan, 1048576), dtype=float64, chunksize=(nan, 1048576), chunktype=scipy.csr_matrix>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask_ml.feature_extraction.text\n",
"\n",
"vect = dask_ml.feature_extraction.text.HashingVectorizer()\n",
"X = vect.fit_transform(df['text'])\n",
"X"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n",
"\n",
"Each block in `X` is a `scipy.sparse` matrix."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.143591Z",
"iopub.status.busy": "2021-01-14T10:50:26.142515Z",
"iopub.status.idle": "2021-01-14T10:50:26.682019Z",
"shell.execute_reply": "2021-01-14T10:50:26.681550Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"<453x1048576 sparse matrix of type ''\n",
"\twith 64357 stored elements in Compressed Sparse Row format>"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"X.blocks[0].compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a document-term matrix. Each row is the hashed representation of the original post."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Classification Pipeline\n",
"\n",
"We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to\n",
"create a classification pipeline.\n",
"\n",
"We'll predict whether the topic was in the `comp` category."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.685514Z",
"iopub.status.busy": "2021-01-14T10:50:26.685101Z",
"iopub.status.idle": "2021-01-14T10:50:26.689941Z",
"shell.execute_reply": "2021-01-14T10:50:26.690528Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"['alt.atheism',\n",
" 'comp.graphics',\n",
" 'comp.os.ms-windows.misc',\n",
" 'comp.sys.ibm.pc.hardware',\n",
" 'comp.sys.mac.hardware',\n",
" 'comp.windows.x',\n",
" 'misc.forsale',\n",
" 'rec.autos',\n",
" 'rec.motorcycles',\n",
" 'rec.sport.baseball',\n",
" 'rec.sport.hockey',\n",
" 'sci.crypt',\n",
" 'sci.electronics',\n",
" 'sci.med',\n",
" 'sci.space',\n",
" 'soc.religion.christian',\n",
" 'talk.politics.guns',\n",
" 'talk.politics.mideast',\n",
" 'talk.politics.misc',\n",
" 'talk.religion.misc']"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bunch.target_names"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.693249Z",
"iopub.status.busy": "2021-01-14T10:50:26.692432Z",
"iopub.status.idle": "2021-01-14T10:50:26.724634Z",
"shell.execute_reply": "2021-01-14T10:50:26.725224Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"Dask Series Structure:\n",
"npartitions=25\n",
"0 int64\n",
"453 ...\n",
" ... \n",
"10872 ...\n",
"11313 ...\n",
"Name: target, dtype: int64\n",
"Dask Name: astype, 101 tasks"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"\n",
"positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n",
"y = df['target'].isin(positive).astype(int)\n",
"y"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.728835Z",
"iopub.status.busy": "2021-01-14T10:50:26.727034Z",
"iopub.status.idle": "2021-01-14T10:50:26.733473Z",
"shell.execute_reply": "2021-01-14T10:50:26.734049Z"
}
},
"outputs": [],
"source": [
"import numpy as np\n",
"import sklearn.linear_model\n",
"import sklearn.pipeline\n",
"\n",
"import dask_ml.wrappers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`\n",
"matches a partition in `y`."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.738126Z",
"iopub.status.busy": "2021-01-14T10:50:26.736348Z",
"iopub.status.idle": "2021-01-14T10:50:26.744044Z",
"shell.execute_reply": "2021-01-14T10:50:26.744600Z"
}
},
"outputs": [],
"source": [
"sgd = sklearn.linear_model.SGDClassifier(\n",
" tol=1e-3\n",
")\n",
"clf = dask_ml.wrappers.Incremental(\n",
" sgd, scoring='accuracy', assume_equal_chunks=True\n",
")\n",
"pipe = sklearn.pipeline.make_pipeline(vect, clf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`SGDClassifier.partial_fit` needs to know the full set of classes up front.\n",
"Becuase our `sgd` is wrapped inside an `Incremental`, we need to pass it through\n",
"as the `incremental__classes` keyword argument in `fit`."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:26.748227Z",
"iopub.status.busy": "2021-01-14T10:50:26.746425Z",
"iopub.status.idle": "2021-01-14T10:50:32.206205Z",
"shell.execute_reply": "2021-01-14T10:50:32.205189Z"
}
},
"outputs": [],
"source": [
"pipe.fit(df['text'], y,\n",
" incremental__classes=[0, 1]);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As usual, `Incremental.predict` lazily returns the predictions as a dask Array."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:32.211305Z",
"iopub.status.busy": "2021-01-14T10:50:32.210884Z",
"iopub.status.idle": "2021-01-14T10:50:32.242808Z",
"shell.execute_reply": "2021-01-14T10:50:32.243627Z"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"\n",
" \n",
" | Array | Chunk | \n",
" \n",
" \n",
" Bytes | unknown | unknown | \n",
" Shape | (nan,) | (nan,) | \n",
" Count | 100 Tasks | 25 Chunks | \n",
" Type | int64 | numpy.ndarray | \n",
" \n",
" \n",
" | \n",
"\n",
"\n",
" | \n",
"
\n",
"
"
],
"text/plain": [
"dask.array<_predict, shape=(nan,), dtype=int64, chunksize=(nan,), chunktype=numpy.ndarray>"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predictions = pipe.predict(df['text'])\n",
"predictions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:50:32.265614Z",
"iopub.status.busy": "2021-01-14T10:50:32.265157Z",
"iopub.status.idle": "2021-01-14T10:50:36.010365Z",
"shell.execute_reply": "2021-01-14T10:50:36.011452Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"0.9647339579282305"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dask_ml.metrics.accuracy_score(y, predictions)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This simple combination of a HashingVectorizer and SGDClassifier is\n",
"pretty effective at this prediction task."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}