{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Score and Predict Large Datasets\n", "================================" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sometimes you'll train on a smaller dataset that fits in memory, but need to predict or score for a much larger (possibly larger than memory) dataset. Perhaps your [learning curve](http://scikit-learn.org/stable/modules/learning_curve.html) has leveled off, or you only have labels for a subset of the data.\n", "\n", "In this situation, you can use [ParallelPostFit](http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html) to parallelize and distribute the scoring or prediction steps." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:35.095475Z", "iopub.status.busy": "2022-07-27T19:22:35.095170Z", "iopub.status.idle": "2022-07-27T19:22:36.218649Z", "shell.execute_reply": "2022-07-27T19:22:36.218003Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-7847b57a-0de1-11ed-a43b-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://10.1.1.64:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

0bbc581a

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://10.1.1.64:8787/status\n", " \n", " Workers: 1\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 1.86 GiB\n", "
Status: runningUsing processes: False
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-02b344ef-0a9b-4ca2-b806-8205e284cc50

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: inproc://10.1.1.64/9275/1\n", " \n", " Workers: 1\n", "
\n", " Dashboard: http://10.1.1.64:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 1.86 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: inproc://10.1.1.64/9275/4\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://10.1.1.64:39787/status\n", " \n", " Memory: 1.86 GiB\n", "
\n", " Nanny: None\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/machine-learning/dask-worker-space/worker-ocmx77q9\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "\n", "# Scale up: connect to your own cluster with bmore resources\n", "# see http://dask.pydata.org/en/latest/setup.html\n", "client = Client(processes=False, threads_per_worker=4,\n", " n_workers=1, memory_limit='2GB')\n", "client" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:36.222013Z", "iopub.status.busy": "2022-07-27T19:22:36.221476Z", "iopub.status.idle": "2022-07-27T19:22:36.798250Z", "shell.execute_reply": "2022-07-27T19:22:36.797609Z" } }, "outputs": [], "source": [ "import numpy as np\n", "import dask.array as da\n", "from sklearn.datasets import make_classification" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll generate a small random dataset with scikit-learn." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:36.801826Z", "iopub.status.busy": "2022-07-27T19:22:36.801361Z", "iopub.status.idle": "2022-07-27T19:22:36.809530Z", "shell.execute_reply": "2022-07-27T19:22:36.808999Z" } }, "outputs": [ { "data": { "text/plain": [ "array([[ 1.53682958, -1.39869399],\n", " [ 1.36917601, -0.63734411],\n", " [ 0.50231787, -0.45910529],\n", " [ 1.83319262, -1.29808229],\n", " [ 1.04235568, 1.12152929]])" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "X_train, y_train = make_classification(\n", " n_features=2, n_redundant=0, n_informative=2,\n", " random_state=1, n_clusters_per_class=1, n_samples=1000)\n", "X_train[:5]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And we'll clone that dataset many times with `dask.array`. `X_large` and `y_large` represent our larger than memory dataset." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:36.812548Z", "iopub.status.busy": "2022-07-27T19:22:36.812219Z", "iopub.status.idle": "2022-07-27T19:22:36.865427Z", "shell.execute_reply": "2022-07-27T19:22:36.864750Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 1.53 MiB 15.62 kiB
Shape (100000, 2) (1000, 2)
Count 101 Tasks 100 Chunks
Type float64 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 2\n", " 100000\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Scale up: increase N, the number of times we replicate the data.\n", "N = 100\n", "X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)\n", " for _ in range(N)])\n", "y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)\n", " for _ in range(N)])\n", "X_large" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since our training dataset fits in memory, we can use a scikit-learn estimator as the actual estimator fit during traning.\n", "But we know that we'll want to predict for a large dataset, so we'll wrap the scikit-learn estimator with `ParallelPostFit`." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:36.869111Z", "iopub.status.busy": "2022-07-27T19:22:36.868532Z", "iopub.status.idle": "2022-07-27T19:22:37.108154Z", "shell.execute_reply": "2022-07-27T19:22:37.107407Z" } }, "outputs": [], "source": [ "from sklearn.linear_model import LogisticRegressionCV\n", "from dask_ml.wrappers import ParallelPostFit" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:37.111724Z", "iopub.status.busy": "2022-07-27T19:22:37.111335Z", "iopub.status.idle": "2022-07-27T19:22:37.118145Z", "shell.execute_reply": "2022-07-27T19:22:37.117693Z" } }, "outputs": [], "source": [ "clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring=\"r2\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See the note in the `dask-ml`'s documentation about when and why a `scoring` parameter is needed: https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll call `clf.fit`. Dask-ML does nothing here, so this step can only use datasets that fit in memory." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:37.121297Z", "iopub.status.busy": "2022-07-27T19:22:37.120808Z", "iopub.status.idle": "2022-07-27T19:22:37.202502Z", "shell.execute_reply": "2022-07-27T19:22:37.201739Z" } }, "outputs": [ { "data": { "text/plain": [ "ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "clf.fit(X_train, y_train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that training is done, we'll turn to predicting for the full (larger than memory) dataset." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:37.205665Z", "iopub.status.busy": "2022-07-27T19:22:37.205144Z", "iopub.status.idle": "2022-07-27T19:22:37.219261Z", "shell.execute_reply": "2022-07-27T19:22:37.218742Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 781.25 kiB 7.81 kiB
Shape (100000,) (1000,)
Count 201 Tasks 100 Chunks
Type int64 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 100000\n", " 1\n", "\n", "
" ], "text/plain": [ "dask.array<_predict, shape=(100000,), dtype=int64, chunksize=(1000,), chunktype=numpy.ndarray>" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "y_pred = clf.predict(X_large)\n", "y_pred" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "y_pred is Dask arary. Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine.\n", "\n", "Or we can check the models score on the entire large dataset. The computation will be done in parallel, and no single machine will have to hold all the data." ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:22:37.222440Z", "iopub.status.busy": "2022-07-27T19:22:37.221942Z", "iopub.status.idle": "2022-07-27T19:22:38.322122Z", "shell.execute_reply": "2022-07-27T19:22:38.321406Z" } }, "outputs": [ { "data": { "text/plain": [ "0.596" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "clf.score(X_large, y_large)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }