{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Text Vectorization Pipeline\n", "\n", "This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n", "It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n", "\n", "The primary differences are that\n", "\n", "* We fit the entire model, including text vectorization, as a pipeline.\n", "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n", " rather than generators to work with larger than memory datasets." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:22.574866Z", "iopub.status.busy": "2021-01-14T10:50:22.574354Z", "iopub.status.idle": "2021-01-14T10:50:24.844770Z", "shell.execute_reply": "2021-01-14T10:50:24.845466Z" } }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 2
  • \n", "
  • Cores: 4
  • \n", "
  • Memory: 4.00 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch the data\n", "\n", "Scikit-Learn provides a utility to fetch the newsgroups dataset." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:24.848628Z", "iopub.status.busy": "2021-01-14T10:50:24.847719Z", "iopub.status.idle": "2021-01-14T10:50:25.382018Z", "shell.execute_reply": "2021-01-14T10:50:25.382706Z" } }, "outputs": [], "source": [ "import sklearn.datasets\n", "\n", "bunch = sklearn.datasets.fetch_20newsgroups()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data from scikit-learn isn't *too* large, so the data is just\n", "returned in memory. Each document is a string. The target we're predicting\n", "is an integer, which codes the topic of the post.\n", "\n", "We'll load the documents and targets directly into a dask DataFrame.\n", "In practice, on a larger than memory dataset, you would likely load the\n", "documents from disk or cloud storage using `dask.bag` or `dask.delayed`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:25.385612Z", "iopub.status.busy": "2021-01-14T10:50:25.384709Z", "iopub.status.idle": "2021-01-14T10:50:25.636102Z", "shell.execute_reply": "2021-01-14T10:50:25.636766Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
texttarget
npartitions=25
0objectint64
453......
.........
10872......
11313......
\n", "
\n", "
Dask Name: from_pandas, 25 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " text target\n", "npartitions=25 \n", "0 object int64\n", "453 ... ...\n", "... ... ...\n", "10872 ... ...\n", "11313 ... ...\n", "Dask Name: from_pandas, 25 tasks" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "import pandas as pd\n", "\n", "df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n", " npartitions=25)\n", "\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each row in the `text` column has a bit of metadata and the full text of a post." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:25.639510Z", "iopub.status.busy": "2021-01-14T10:50:25.638666Z", "iopub.status.idle": "2021-01-14T10:50:26.000407Z", "shell.execute_reply": "2021-01-14T10:50:26.000728Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "From: lerxst@wam.umd.edu (where's my thing)\n", "Subject: WHAT car is this!?\n", "Nntp-Posting-Host: rac3.wam.umd.edu\n", "Organization: University of Maryland, College Park\n", "Lines: 15\n", "\n", " I was wondering if anyone out there could enlighten me on this car I saw\n", "the other day. It was a 2-door sports car, looked to be from the late 60s/\n", "early 70s. It was called a Bricklin. The doors were really small. In addition,\n", "the front bumper was separate from the rest of the body. This is \n", "all I know. If anyone can tellme a m\n" ] } ], "source": [ "print(df.head().loc[0, 'text'][:500])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Hashing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n", "\n", "Transformation, once we actually compute the result, happens in parallel and returns a dask Array." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.005168Z", "iopub.status.busy": "2021-01-14T10:50:26.004234Z", "iopub.status.idle": "2021-01-14T10:50:26.137561Z", "shell.execute_reply": "2021-01-14T10:50:26.137932Z" } }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "
Array Chunk
Shape (nan, 1048576) (nan, 1048576)
Count 75 Tasks 25 Chunks
Type float64 scipy.csr_matrix
\n", "
\n", "\n", "
" ], "text/plain": [ "dask.array<_transformer, shape=(nan, 1048576), dtype=float64, chunksize=(nan, 1048576), chunktype=scipy.csr_matrix>" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask_ml.feature_extraction.text\n", "\n", "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n", "X = vect.fit_transform(df['text'])\n", "X" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n", "\n", "Each block in `X` is a `scipy.sparse` matrix." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.143591Z", "iopub.status.busy": "2021-01-14T10:50:26.142515Z", "iopub.status.idle": "2021-01-14T10:50:26.682019Z", "shell.execute_reply": "2021-01-14T10:50:26.681550Z" } }, "outputs": [ { "data": { "text/plain": [ "<453x1048576 sparse matrix of type ''\n", "\twith 64357 stored elements in Compressed Sparse Row format>" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "X.blocks[0].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a document-term matrix. Each row is the hashed representation of the original post." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Classification Pipeline\n", "\n", "We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to\n", "create a classification pipeline.\n", "\n", "We'll predict whether the topic was in the `comp` category." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.685514Z", "iopub.status.busy": "2021-01-14T10:50:26.685101Z", "iopub.status.idle": "2021-01-14T10:50:26.689941Z", "shell.execute_reply": "2021-01-14T10:50:26.690528Z" } }, "outputs": [ { "data": { "text/plain": [ "['alt.atheism',\n", " 'comp.graphics',\n", " 'comp.os.ms-windows.misc',\n", " 'comp.sys.ibm.pc.hardware',\n", " 'comp.sys.mac.hardware',\n", " 'comp.windows.x',\n", " 'misc.forsale',\n", " 'rec.autos',\n", " 'rec.motorcycles',\n", " 'rec.sport.baseball',\n", " 'rec.sport.hockey',\n", " 'sci.crypt',\n", " 'sci.electronics',\n", " 'sci.med',\n", " 'sci.space',\n", " 'soc.religion.christian',\n", " 'talk.politics.guns',\n", " 'talk.politics.mideast',\n", " 'talk.politics.misc',\n", " 'talk.religion.misc']" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "bunch.target_names" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.693249Z", "iopub.status.busy": "2021-01-14T10:50:26.692432Z", "iopub.status.idle": "2021-01-14T10:50:26.724634Z", "shell.execute_reply": "2021-01-14T10:50:26.725224Z" } }, "outputs": [ { "data": { "text/plain": [ "Dask Series Structure:\n", "npartitions=25\n", "0 int64\n", "453 ...\n", " ... \n", "10872 ...\n", "11313 ...\n", "Name: target, dtype: int64\n", "Dask Name: astype, 101 tasks" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "\n", "positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n", "y = df['target'].isin(positive).astype(int)\n", "y" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.728835Z", "iopub.status.busy": "2021-01-14T10:50:26.727034Z", "iopub.status.idle": "2021-01-14T10:50:26.733473Z", "shell.execute_reply": "2021-01-14T10:50:26.734049Z" } }, "outputs": [], "source": [ "import numpy as np\n", "import sklearn.linear_model\n", "import sklearn.pipeline\n", "\n", "import dask_ml.wrappers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`\n", "matches a partition in `y`." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.738126Z", "iopub.status.busy": "2021-01-14T10:50:26.736348Z", "iopub.status.idle": "2021-01-14T10:50:26.744044Z", "shell.execute_reply": "2021-01-14T10:50:26.744600Z" } }, "outputs": [], "source": [ "sgd = sklearn.linear_model.SGDClassifier(\n", " tol=1e-3\n", ")\n", "clf = dask_ml.wrappers.Incremental(\n", " sgd, scoring='accuracy', assume_equal_chunks=True\n", ")\n", "pipe = sklearn.pipeline.make_pipeline(vect, clf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`SGDClassifier.partial_fit` needs to know the full set of classes up front.\n", "Becuase our `sgd` is wrapped inside an `Incremental`, we need to pass it through\n", "as the `incremental__classes` keyword argument in `fit`." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:26.748227Z", "iopub.status.busy": "2021-01-14T10:50:26.746425Z", "iopub.status.idle": "2021-01-14T10:50:32.206205Z", "shell.execute_reply": "2021-01-14T10:50:32.205189Z" } }, "outputs": [], "source": [ "pipe.fit(df['text'], y,\n", " incremental__classes=[0, 1]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As usual, `Incremental.predict` lazily returns the predictions as a dask Array." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:32.211305Z", "iopub.status.busy": "2021-01-14T10:50:32.210884Z", "iopub.status.idle": "2021-01-14T10:50:32.242808Z", "shell.execute_reply": "2021-01-14T10:50:32.243627Z" } }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes unknown unknown
Shape (nan,) (nan,)
Count 100 Tasks 25 Chunks
Type int64 numpy.ndarray
\n", "
\n", "\n", "
" ], "text/plain": [ "dask.array<_predict, shape=(nan,), dtype=int64, chunksize=(nan,), chunktype=numpy.ndarray>" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "predictions = pipe.predict(df['text'])\n", "predictions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:50:32.265614Z", "iopub.status.busy": "2021-01-14T10:50:32.265157Z", "iopub.status.idle": "2021-01-14T10:50:36.010365Z", "shell.execute_reply": "2021-01-14T10:50:36.011452Z" } }, "outputs": [ { "data": { "text/plain": [ "0.9647339579282305" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dask_ml.metrics.accuracy_score(y, predictions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This simple combination of a HashingVectorizer and SGDClassifier is\n", "pretty effective at this prediction task." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.6" } }, "nbformat": 4, "nbformat_minor": 4 }