{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Text Vectorization Pipeline\n", "\n", "This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n", "It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n", "\n", "The primary differences are that\n", "\n", "* We fit the entire model, including text vectorization, as a pipeline.\n", "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n", " rather than generators to work with larger than memory datasets." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:22.960787Z", "iopub.status.busy": "2022-07-27T19:23:22.960372Z", "iopub.status.idle": "2022-07-27T19:23:25.125888Z", "shell.execute_reply": "2022-07-27T19:23:25.125336Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-94cfa4ae-0de1-11ed-a521-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

0f48ff5b

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 2\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 3.73 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-df50e6c2-504c-458c-904f-d9354f469039

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:35661\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 3.73 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:46267\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:41243/status\n", " \n", " Memory: 1.86 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:38595\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-imhji18u\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:44477\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:42527/status\n", " \n", " Memory: 1.86 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:34731\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-r6iwv690\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch the data\n", "\n", "Scikit-Learn provides a utility to fetch the newsgroups dataset." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:25.132238Z", "iopub.status.busy": "2022-07-27T19:23:25.131763Z", "iopub.status.idle": "2022-07-27T19:23:25.943088Z", "shell.execute_reply": "2022-07-27T19:23:25.942171Z" } }, "outputs": [], "source": [ "import sklearn.datasets\n", "\n", "bunch = sklearn.datasets.fetch_20newsgroups()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data from scikit-learn isn't *too* large, so the data is just\n", "returned in memory. Each document is a string. The target we're predicting\n", "is an integer, which codes the topic of the post.\n", "\n", "We'll load the documents and targets directly into a dask DataFrame.\n", "In practice, on a larger than memory dataset, you would likely load the\n", "documents from disk or cloud storage using `dask.bag` or `dask.delayed`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:25.946771Z", "iopub.status.busy": "2022-07-27T19:23:25.946315Z", "iopub.status.idle": "2022-07-27T19:23:26.238368Z", "shell.execute_reply": "2022-07-27T19:23:26.237748Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
texttarget
npartitions=25
0objectint64
453......
.........
10872......
11313......
\n", "
\n", "
Dask Name: from_pandas, 25 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " text target\n", "npartitions=25 \n", "0 object int64\n", "453 ... ...\n", "... ... ...\n", "10872 ... ...\n", "11313 ... ...\n", "Dask Name: from_pandas, 25 tasks" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "import pandas as pd\n", "\n", "df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n", " npartitions=25)\n", "\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each row in the `text` column has a bit of metadata and the full text of a post." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:26.242058Z", "iopub.status.busy": "2022-07-27T19:23:26.241618Z", "iopub.status.idle": "2022-07-27T19:23:26.720911Z", "shell.execute_reply": "2022-07-27T19:23:26.720239Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "From: lerxst@wam.umd.edu (where's my thing)\n", "Subject: WHAT car is this!?\n", "Nntp-Posting-Host: rac3.wam.umd.edu\n", "Organization: University of Maryland, College Park\n", "Lines: 15\n", "\n", " I was wondering if anyone out there could enlighten me on this car I saw\n", "the other day. It was a 2-door sports car, looked to be from the late 60s/\n", "early 70s. It was called a Bricklin. The doors were really small. In addition,\n", "the front bumper was separate from the rest of the body. This is \n", "all I know. If anyone can tellme a m\n" ] } ], "source": [ "print(df.head().loc[0, 'text'][:500])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Hashing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n", "\n", "Transformation, once we actually compute the result, happens in parallel and returns a dask Array." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:26.724412Z", "iopub.status.busy": "2022-07-27T19:23:26.723891Z", "iopub.status.idle": "2022-07-27T19:23:26.890680Z", "shell.execute_reply": "2022-07-27T19:23:26.889985Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Shape (nan, 1048576) (nan, 1048576)
Count 75 Tasks 25 Chunks
Type float64 scipy.sparse._csr.csr_matrix
\n", "
\n", " \n", "
" ], "text/plain": [ "dask.array<_transformer, shape=(nan, 1048576), dtype=float64, chunksize=(nan, 1048576), chunktype=scipy.csr_matrix>" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask_ml.feature_extraction.text\n", "\n", "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n", "X = vect.fit_transform(df['text'])\n", "X" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n", "\n", "Each block in `X` is a `scipy.sparse` matrix." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:26.894237Z", "iopub.status.busy": "2022-07-27T19:23:26.893797Z", "iopub.status.idle": "2022-07-27T19:23:27.604580Z", "shell.execute_reply": "2022-07-27T19:23:27.603993Z" } }, "outputs": [ { "data": { "text/plain": [ "<453x1048576 sparse matrix of type ''\n", "\twith 64357 stored elements in Compressed Sparse Row format>" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "X.blocks[0].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a document-term matrix. Each row is the hashed representation of the original post." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Classification Pipeline\n", "\n", "We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to\n", "create a classification pipeline.\n", "\n", "We'll predict whether the topic was in the `comp` category." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:27.607666Z", "iopub.status.busy": "2022-07-27T19:23:27.607232Z", "iopub.status.idle": "2022-07-27T19:23:27.614175Z", "shell.execute_reply": "2022-07-27T19:23:27.613617Z" } }, "outputs": [ { "data": { "text/plain": [ "['alt.atheism',\n", " 'comp.graphics',\n", " 'comp.os.ms-windows.misc',\n", " 'comp.sys.ibm.pc.hardware',\n", " 'comp.sys.mac.hardware',\n", " 'comp.windows.x',\n", " 'misc.forsale',\n", " 'rec.autos',\n", " 'rec.motorcycles',\n", " 'rec.sport.baseball',\n", " 'rec.sport.hockey',\n", " 'sci.crypt',\n", " 'sci.electronics',\n", " 'sci.med',\n", " 'sci.space',\n", " 'soc.religion.christian',\n", " 'talk.politics.guns',\n", " 'talk.politics.mideast',\n", " 'talk.politics.misc',\n", " 'talk.religion.misc']" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "bunch.target_names" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:27.617285Z", "iopub.status.busy": "2022-07-27T19:23:27.616831Z", "iopub.status.idle": "2022-07-27T19:23:27.627108Z", "shell.execute_reply": "2022-07-27T19:23:27.626510Z" } }, "outputs": [ { "data": { "text/plain": [ "Dask Series Structure:\n", "npartitions=25\n", "0 int64\n", "453 ...\n", " ... \n", "10872 ...\n", "11313 ...\n", "Name: target, dtype: int64\n", "Dask Name: astype, 101 tasks" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "\n", "positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n", "y = df['target'].isin(positive).astype(int)\n", "y" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:27.630128Z", "iopub.status.busy": "2022-07-27T19:23:27.629642Z", "iopub.status.idle": "2022-07-27T19:23:27.634084Z", "shell.execute_reply": "2022-07-27T19:23:27.633528Z" } }, "outputs": [], "source": [ "import numpy as np\n", "import sklearn.linear_model\n", "import sklearn.pipeline\n", "\n", "import dask_ml.wrappers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`\n", "matches a partition in `y`." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:27.636786Z", "iopub.status.busy": "2022-07-27T19:23:27.636594Z", "iopub.status.idle": "2022-07-27T19:23:27.640586Z", "shell.execute_reply": "2022-07-27T19:23:27.640017Z" } }, "outputs": [], "source": [ "sgd = sklearn.linear_model.SGDClassifier(\n", " tol=1e-3\n", ")\n", "clf = dask_ml.wrappers.Incremental(\n", " sgd, scoring='accuracy', assume_equal_chunks=True\n", ")\n", "pipe = sklearn.pipeline.make_pipeline(vect, clf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`SGDClassifier.partial_fit` needs to know the full set of classes up front.\n", "Because our `sgd` is wrapped inside an `Incremental`, we need to pass it through\n", "as the `incremental__classes` keyword argument in `fit`." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:27.643796Z", "iopub.status.busy": "2022-07-27T19:23:27.643109Z", "iopub.status.idle": "2022-07-27T19:23:30.379579Z", "shell.execute_reply": "2022-07-27T19:23:30.378969Z" } }, "outputs": [], "source": [ "pipe.fit(df['text'], y,\n", " incremental__classes=[0, 1]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As usual, `Incremental.predict` lazily returns the predictions as a dask Array." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:30.383485Z", "iopub.status.busy": "2022-07-27T19:23:30.383281Z", "iopub.status.idle": "2022-07-27T19:23:30.406063Z", "shell.execute_reply": "2022-07-27T19:23:30.405583Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes unknown unknown
Shape (nan,) (nan,)
Count 100 Tasks 25 Chunks
Type int64 numpy.ndarray
\n", "
\n", " \n", "
" ], "text/plain": [ "dask.array<_predict, shape=(nan,), dtype=int64, chunksize=(nan,), chunktype=numpy.ndarray>" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "predictions = pipe.predict(df['text'])\n", "predictions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:23:30.411318Z", "iopub.status.busy": "2022-07-27T19:23:30.409743Z", "iopub.status.idle": "2022-07-27T19:23:32.565664Z", "shell.execute_reply": "2022-07-27T19:23:32.565038Z" } }, "outputs": [ { "data": { "text/plain": [ "0.950150256319604" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dask_ml.metrics.accuracy_score(y, predictions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This simple combination of a HashingVectorizer and SGDClassifier is\n", "pretty effective at this prediction task." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }