{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask DataFrames\n", "\n", "\"Dask\n", " \n", "Dask Dataframes coordinate many Pandas dataframes, partitioned along an index. They support a large subset of the Pandas API." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. It will provide a dashboard which \n", "is useful to gain insight on the computation. \n", "\n", "The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:11.664867Z", "iopub.status.busy": "2022-07-27T19:11:11.664611Z", "iopub.status.idle": "2022-07-27T19:11:13.808579Z", "shell.execute_reply": "2022-07-27T19:11:13.807922Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-e0ebc4ca-0ddf-11ed-98b5-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

fb3316a9

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 2\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 1.86 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-44cb79c2-de9d-4ec5-8466-954570037d71

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:40039\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 1.86 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:38545\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:38797/status\n", " \n", " Memory: 0.93 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:41967\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-7ivifrfo\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:36071\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:46269/status\n", " \n", " Memory: 0.93 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:37775\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-7ftt40fj\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit=\"1GB\")\n", "client\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Random Dataframe\n", "\n", "We create a random timeseries of data with the following attributes:\n", "\n", "1. It stores a record for every second in the month of January of the year 2000\n", "2. It splits that month by day, keeping each day as a partitioned dataframe\n", "3. Along with a datetime index it has columns for names, ids, and numeric values\n", "\n", "This is a small dataset of about 240 MB. Increase the number of days or reduce the time interval between data points to practice with a larger dataset by setting some of the [`dask.datasets.timeseries()` arguments](https://docs.dask.org/en/stable/api.html#dask.datasets.timeseries)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:13.812754Z", "iopub.status.busy": "2022-07-27T19:11:13.812204Z", "iopub.status.idle": "2022-07-27T19:11:14.145983Z", "shell.execute_reply": "2022-07-27T19:11:14.144719Z" } }, "outputs": [], "source": [ "import dask\n", "\n", "df = dask.datasets.timeseries()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unlike Pandas, Dask DataFrames are _lazy_, meaning that data is only loaded when it is needed for a computation. No data is printed here, instead it is replaced by ellipses (`...`)." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.149561Z", "iopub.status.busy": "2022-07-27T19:11:14.149084Z", "iopub.status.idle": "2022-07-27T19:11:14.166539Z", "shell.execute_reply": "2022-07-27T19:11:14.165923Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Nonetheless, the column names and dtypes are known." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.169923Z", "iopub.status.busy": "2022-07-27T19:11:14.169502Z", "iopub.status.idle": "2022-07-27T19:11:14.175120Z", "shell.execute_reply": "2022-07-27T19:11:14.174496Z" } }, "outputs": [ { "data": { "text/plain": [ "id int64\n", "name object\n", "x float64\n", "y float64\n", "dtype: object" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some operations will automatically display the data." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.177965Z", "iopub.status.busy": "2022-07-27T19:11:14.177754Z", "iopub.status.idle": "2022-07-27T19:11:14.181013Z", "shell.execute_reply": "2022-07-27T19:11:14.180414Z" } }, "outputs": [], "source": [ "# This sets some formatting parameters for displayed data.\n", "import pandas as pd\n", "\n", "pd.options.display.precision = 2\n", "pd.options.display.max_rows = 10\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.184009Z", "iopub.status.busy": "2022-07-27T19:11:14.183815Z", "iopub.status.idle": "2022-07-27T19:11:14.658179Z", "shell.execute_reply": "2022-07-27T19:11:14.657005Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01 00:00:00999Patricia0.860.50
2000-01-01 00:00:01974Alice-0.040.25
2000-01-01 00:00:02984Ursula-0.05-0.92
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 00:00:00 999 Patricia 0.86 0.50\n", "2000-01-01 00:00:01 974 Alice -0.04 0.25\n", "2000-01-01 00:00:02 984 Ursula -0.05 -0.92" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head(3)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use Standard Pandas Operations\n", "\n", "Most common Pandas operations can be used in the same way on Dask dataframes. This example shows how to slice the data based on a mask condition and then determine the standard deviation of the data in the `x` column." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.661483Z", "iopub.status.busy": "2022-07-27T19:11:14.661276Z", "iopub.status.idle": "2022-07-27T19:11:14.689321Z", "shell.execute_reply": "2022-07-27T19:11:14.688694Z" } }, "outputs": [ { "data": { "text/plain": [ "Dask Series Structure:\n", "npartitions=1\n", " float64\n", " ...\n", "Name: x, dtype: float64\n", "Dask Name: sqrt, 157 tasks" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2 = df[df.y > 0]\n", "df3 = df2.groupby(\"name\").x.std()\n", "df3\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the data in `df3` are still represented by ellipses. All of the operations in the previous cell are lazy operations. You can call `.compute()` when you want your result as a Pandas dataframe or series.\n", "\n", "If you started `Client()` above then you can watch the status page during computation to see the progress." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:14.692387Z", "iopub.status.busy": "2022-07-27T19:11:14.692073Z", "iopub.status.idle": "2022-07-27T19:11:15.741650Z", "shell.execute_reply": "2022-07-27T19:11:15.740555Z" } }, "outputs": [ { "data": { "text/plain": [ "pandas.core.series.Series" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "computed_df = df3.compute()\n", "type(computed_df)\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:15.745384Z", "iopub.status.busy": "2022-07-27T19:11:15.744926Z", "iopub.status.idle": "2022-07-27T19:11:15.753072Z", "shell.execute_reply": "2022-07-27T19:11:15.751533Z" } }, "outputs": [ { "data": { "text/plain": [ "name\n", "Alice 0.58\n", "Bob 0.58\n", "Charlie 0.58\n", "Dan 0.58\n", "Edith 0.58\n", " ... \n", "Victor 0.58\n", "Wendy 0.58\n", "Xavier 0.58\n", "Yvonne 0.58\n", "Zelda 0.58\n", "Name: x, Length: 26, dtype: float64" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "computed_df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the computed data are now shown in the output.\n", "\n", "Another example calculation is to aggregate multiple columns, as shown below. Once again, the dashboard will show the progress of the computation." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:15.755947Z", "iopub.status.busy": "2022-07-27T19:11:15.755556Z", "iopub.status.idle": "2022-07-27T19:11:16.377459Z", "shell.execute_reply": "2022-07-27T19:11:16.376809Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
xy
name
Alice172.301.0
Bob54.791.0
Charlie255.011.0
Dan93.231.0
Edith155.411.0
.........
Victor-303.801.0
Wendy-20.971.0
Xavier112.181.0
Yvonne335.841.0
Zelda205.431.0
\n", "

26 rows × 2 columns

\n", "
" ], "text/plain": [ " x y\n", "name \n", "Alice 172.30 1.0\n", "Bob 54.79 1.0\n", "Charlie 255.01 1.0\n", "Dan 93.23 1.0\n", "Edith 155.41 1.0\n", "... ... ...\n", "Victor -303.80 1.0\n", "Wendy -20.97 1.0\n", "Xavier 112.18 1.0\n", "Yvonne 335.84 1.0\n", "Zelda 205.43 1.0\n", "\n", "[26 rows x 2 columns]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df4 = df.groupby(\"name\").aggregate({\"x\": \"sum\", \"y\": \"max\"})\n", "df4.compute()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask dataframes can also be joined like Pandas dataframes. In this example we join the aggregated data in `df4` with the original data in `df`. Since the index in `df` is the timeseries and `df4` is indexed by names, we use `left_on=\"name\"` and `right_index=True` to define the merge columns. We also set suffixes for any columns that are common between the two dataframes so that we can distinguish them.\n", "\n", "Finally, since `df4` is small, we also make sure that it is a single partition dataframe." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:16.381151Z", "iopub.status.busy": "2022-07-27T19:11:16.380740Z", "iopub.status.idle": "2022-07-27T19:11:17.123758Z", "shell.execute_reply": "2022-07-27T19:11:17.123250Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamex_originaly_originalx_aggregatedy_aggregated
timestamp
2000-01-01 00:00:00999Patricia0.860.50-34.681.0
2000-01-01 00:00:03988Patricia-0.57-0.67-34.681.0
2000-01-01 00:00:121038Patricia-0.480.35-34.681.0
2000-01-01 00:01:16964Patricia-0.250.13-34.681.0
2000-01-01 00:01:331050Patricia-0.58-0.38-34.681.0
\n", "
" ], "text/plain": [ " id name x_original y_original x_aggregated \\\n", "timestamp \n", "2000-01-01 00:00:00 999 Patricia 0.86 0.50 -34.68 \n", "2000-01-01 00:00:03 988 Patricia -0.57 -0.67 -34.68 \n", "2000-01-01 00:00:12 1038 Patricia -0.48 0.35 -34.68 \n", "2000-01-01 00:01:16 964 Patricia -0.25 0.13 -34.68 \n", "2000-01-01 00:01:33 1050 Patricia -0.58 -0.38 -34.68 \n", "\n", " y_aggregated \n", "timestamp \n", "2000-01-01 00:00:00 1.0 \n", "2000-01-01 00:00:03 1.0 \n", "2000-01-01 00:00:12 1.0 \n", "2000-01-01 00:01:16 1.0 \n", "2000-01-01 00:01:33 1.0 " ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df4 = df4.repartition(npartitions=1)\n", "joined = df.merge(\n", " df4, left_on=\"name\", right_index=True, suffixes=(\"_original\", \"_aggregated\")\n", ")\n", "joined.head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Persist data in memory\n", "\n", "If you have the available RAM for your dataset then you can persist data in memory. \n", "\n", "This allows future computations to be much faster." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:17.127751Z", "iopub.status.busy": "2022-07-27T19:11:17.127337Z", "iopub.status.idle": "2022-07-27T19:11:17.163137Z", "shell.execute_reply": "2022-07-27T19:11:17.147526Z" } }, "outputs": [], "source": [ "df = df.persist()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Time Series Operations\n", "\n", "Because `df` has a datetime index, time-series operations work efficiently.\n", "\n", "The first example below resamples the data at 1 hour intervals to reduce the total size of the dataframe. Then the mean of the `x` and `y` columns are taken." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:17.168212Z", "iopub.status.busy": "2022-07-27T19:11:17.166242Z", "iopub.status.idle": "2022-07-27T19:11:17.483174Z", "shell.execute_reply": "2022-07-27T19:11:17.482516Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
xy
timestamp
2000-01-01 00:00:001.96e-031.43e-02
2000-01-01 01:00:005.23e-031.51e-02
2000-01-01 02:00:00-9.91e-04-9.25e-04
2000-01-01 03:00:00-2.57e-03-5.00e-03
2000-01-01 04:00:00-5.71e-031.16e-02
\n", "
" ], "text/plain": [ " x y\n", "timestamp \n", "2000-01-01 00:00:00 1.96e-03 1.43e-02\n", "2000-01-01 01:00:00 5.23e-03 1.51e-02\n", "2000-01-01 02:00:00 -9.91e-04 -9.25e-04\n", "2000-01-01 03:00:00 -2.57e-03 -5.00e-03\n", "2000-01-01 04:00:00 -5.71e-03 1.16e-02" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df[[\"x\", \"y\"]].resample(\"1h\").mean().head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The next example resamples the data at 24 hour intervals and plots the mean values. Notice that `plot()` is called after `compute()` because `plot()` will not work until the data are computed." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:17.487780Z", "iopub.status.busy": "2022-07-27T19:11:17.486731Z", "iopub.status.idle": "2022-07-27T19:11:20.251345Z", "shell.execute_reply": "2022-07-27T19:11:20.250015Z" } }, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%matplotlib inline\n", "df[['x', 'y']].resample('24h').mean().compute().plot();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This final example computes the rolling 24 hour mean of the data." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:20.254643Z", "iopub.status.busy": "2022-07-27T19:11:20.254201Z", "iopub.status.idle": "2022-07-27T19:11:20.316021Z", "shell.execute_reply": "2022-07-27T19:11:20.315475Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
xy
timestamp
2000-01-01 00:00:000.860.50
2000-01-01 00:00:010.410.38
2000-01-01 00:00:020.25-0.05
2000-01-01 00:00:030.05-0.21
2000-01-01 00:00:040.18-0.18
\n", "
" ], "text/plain": [ " x y\n", "timestamp \n", "2000-01-01 00:00:00 0.86 0.50\n", "2000-01-01 00:00:01 0.41 0.38\n", "2000-01-01 00:00:02 0.25 -0.05\n", "2000-01-01 00:00:03 0.05 -0.21\n", "2000-01-01 00:00:04 0.18 -0.18" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df[[\"x\", \"y\"]].rolling(window=\"24h\").mean().head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Random access is cheap along the index, but must since the Dask dataframe is lazy, it must be computed to materialize the data." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:20.319057Z", "iopub.status.busy": "2022-07-27T19:11:20.318630Z", "iopub.status.idle": "2022-07-27T19:11:20.336583Z", "shell.execute_reply": "2022-07-27T19:11:20.335897Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=1
2000-01-05 00:00:00.000000000int64objectfloat64float64
2000-01-05 23:59:59.999999999............
\n", "
\n", "
Dask Name: loc, 31 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=1 \n", "2000-01-05 00:00:00.000000000 int64 object float64 float64\n", "2000-01-05 23:59:59.999999999 ... ... ... ...\n", "Dask Name: loc, 31 tasks" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.loc[\"2000-01-05\"]\n" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:20.339739Z", "iopub.status.busy": "2022-07-27T19:11:20.339405Z", "iopub.status.idle": "2022-07-27T19:11:20.422385Z", "shell.execute_reply": "2022-07-27T19:11:20.421524Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 28.7 ms, sys: 7.62 ms, total: 36.3 ms\n", "Wall time: 64.2 ms\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-05 00:00:001001Hannah0.85-0.23
2000-01-05 00:00:011021Charlie-0.09-0.42
2000-01-05 00:00:02974Zelda0.70-0.81
2000-01-05 00:00:031015Sarah0.35-0.13
2000-01-05 00:00:04989Frank-0.26-0.96
...............
2000-01-05 23:59:551023Alice0.680.21
2000-01-05 23:59:56982Alice0.900.74
2000-01-05 23:59:57941Sarah-0.49-0.39
2000-01-05 23:59:581009Kevin0.89-0.26
2000-01-05 23:59:591031Hannah0.800.53
\n", "

86400 rows × 4 columns

\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-05 00:00:00 1001 Hannah 0.85 -0.23\n", "2000-01-05 00:00:01 1021 Charlie -0.09 -0.42\n", "2000-01-05 00:00:02 974 Zelda 0.70 -0.81\n", "2000-01-05 00:00:03 1015 Sarah 0.35 -0.13\n", "2000-01-05 00:00:04 989 Frank -0.26 -0.96\n", "... ... ... ... ...\n", "2000-01-05 23:59:55 1023 Alice 0.68 0.21\n", "2000-01-05 23:59:56 982 Alice 0.90 0.74\n", "2000-01-05 23:59:57 941 Sarah -0.49 -0.39\n", "2000-01-05 23:59:58 1009 Kevin 0.89 -0.26\n", "2000-01-05 23:59:59 1031 Hannah 0.80 0.53\n", "\n", "[86400 rows x 4 columns]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time df.loc['2000-01-05'].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set Index\n", "\n", "Data is sorted by the index column. This allows for faster access, joins, groupby-apply operations, and more. However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently. In the next few examples, we will group the data by the `name` column, so we will set that column as the index to improve efficiency." ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:20.426038Z", "iopub.status.busy": "2022-07-27T19:11:20.425670Z", "iopub.status.idle": "2022-07-27T19:11:22.364173Z", "shell.execute_reply": "2022-07-27T19:11:22.363565Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
npartitions=26
Aliceint64float64float64
Bob.........
............
Zelda.........
Zelda.........
\n", "
\n", "
Dask Name: sort_index, 954 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id x y\n", "npartitions=26 \n", "Alice int64 float64 float64\n", "Bob ... ... ...\n", "... ... ... ...\n", "Zelda ... ... ...\n", "Zelda ... ... ...\n", "Dask Name: sort_index, 954 tasks" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df5 = df.set_index(\"name\")\n", "df5\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because resetting the index for this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory." ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:22.367256Z", "iopub.status.busy": "2022-07-27T19:11:22.367051Z", "iopub.status.idle": "2022-07-27T19:11:22.438671Z", "shell.execute_reply": "2022-07-27T19:11:22.413951Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
npartitions=26
Aliceint64float64float64
Bob.........
............
Zelda.........
Zelda.........
\n", "
\n", "
Dask Name: sort_index, 26 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id x y\n", "npartitions=26 \n", "Alice int64 float64 float64\n", "Bob ... ... ...\n", "... ... ... ...\n", "Zelda ... ... ...\n", "Zelda ... ... ...\n", "Dask Name: sort_index, 26 tasks" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df5 = df5.persist()\n", "df5\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask now knows where all data lives, indexed by name. As a result operations like random access are cheap and efficient." ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:22.446073Z", "iopub.status.busy": "2022-07-27T19:11:22.445863Z", "iopub.status.idle": "2022-07-27T19:11:24.831016Z", "shell.execute_reply": "2022-07-27T19:11:24.830452Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 360 ms, sys: 44.7 ms, total: 404 ms\n", "Wall time: 2.35 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
name
Alice974-0.040.25
Alice10010.440.55
Alice1039-0.380.88
Alice974-0.710.12
Alice9600.98-0.21
............
Alice9940.560.90
Alice9990.03-0.11
Alice988-0.49-0.80
Alice999-0.35-0.20
Alice1079-0.290.88
\n", "

99801 rows × 3 columns

\n", "
" ], "text/plain": [ " id x y\n", "name \n", "Alice 974 -0.04 0.25\n", "Alice 1001 0.44 0.55\n", "Alice 1039 -0.38 0.88\n", "Alice 974 -0.71 0.12\n", "Alice 960 0.98 -0.21\n", "... ... ... ...\n", "Alice 994 0.56 0.90\n", "Alice 999 0.03 -0.11\n", "Alice 988 -0.49 -0.80\n", "Alice 999 -0.35 -0.20\n", "Alice 1079 -0.29 0.88\n", "\n", "[99801 rows x 3 columns]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time df5.loc['Alice'].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Groupby Apply with Scikit-Learn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that our data is sorted by name we can inexpensively do operations like random access on name, or groupby-apply with custom functions.\n", "\n", "Here we train a different scikit-learn linear regression model on each name." ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:24.838060Z", "iopub.status.busy": "2022-07-27T19:11:24.837859Z", "iopub.status.idle": "2022-07-27T19:11:25.701687Z", "shell.execute_reply": "2022-07-27T19:11:25.700967Z" } }, "outputs": [], "source": [ "from sklearn.linear_model import LinearRegression\n", "\n", "\n", "def train(partition):\n", " if not len(partition):\n", " return\n", " est = LinearRegression()\n", " est.fit(partition[[\"x\"]].values, partition.y.values)\n", " return est\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `partition` argument to `train()` will be one of the group instances from the `DataFrameGroupBy`. If there is no data in the partition, we don't need to proceed. If there is data, we want to fit the linear regression model and return that as the value for this group.\n", "\n", "Now working with `df5`, whose index is the names from `df`, we can group by the `names` column. This also happens to be the index, but that's fine. Then we use `.apply()` to run `train()` on each group in the `DataFrameGroupBy` generated by `.groupby()`.\n", "\n", "The `meta` argument tells Dask how to create the `DataFrame` or `Series` that will hold the result of `.apply()`. In this case, `train()` returns a single value, so `.apply()` will create a `Series`. This means we need to tell Dask what the type of that single column should be and optionally give it a name.\n", "\n", "The easiest way to specify a single column is with a tuple. The name of the column is the first element of the tuple. Since this is a series of linear regressions, we will name the column `\"LinearRegression\"`. The second element of the tuple is the type of the return value of `train`. In this case, Pandas will store the result as a general `object`, which should be the type we pass." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:11:25.705144Z", "iopub.status.busy": "2022-07-27T19:11:25.704922Z", "iopub.status.idle": "2022-07-27T19:11:26.886130Z", "shell.execute_reply": "2022-07-27T19:11:26.885615Z" } }, "outputs": [ { "data": { "text/plain": [ "name\n", "Alice LinearRegression()\n", "Bob LinearRegression()\n", "Charlie LinearRegression()\n", "Dan LinearRegression()\n", "Edith LinearRegression()\n", " ... \n", "Victor LinearRegression()\n", "Wendy LinearRegression()\n", "Xavier LinearRegression()\n", "Yvonne LinearRegression()\n", "Zelda LinearRegression()\n", "Name: LinearRegression, Length: 26, dtype: object" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df6 = df5.groupby(\"name\").apply(\n", " train, meta=(\"LinearRegression\", object)\n", ").compute()\n", "df6" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Further Reading\n", "\n", "For a more in-depth introduction to Dask dataframes, see the [dask tutorial](https://github.com/dask/dask-tutorial), notebooks 04 and 07." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }