{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Gotcha's from Pandas to Dask\n", "\n", "This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment. \n", "Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:19.321541Z", "iopub.status.busy": "2022-07-27T19:19:19.320861Z", "iopub.status.idle": "2022-07-27T19:19:20.115880Z", "shell.execute_reply": "2022-07-27T19:19:20.114678Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dask versoin: 2022.05.0\n", "Pandas versoin: 1.4.2\n" ] } ], "source": [ "# since Dask is activly beeing developed - the current example is running with the below version\n", "import dask\n", "import dask.dataframe as dd\n", "import pandas as pd\n", "print(f'Dask versoin: {dask.__version__}')\n", "print(f'Pandas versoin: {pd.__version__}')" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. In this example we are running on a `LocalCluster`, this will also provide a dashboard which is useful to gain insight on the computation. \n", "For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup) \n", "\n", "The link to the dashboard will become visible when you create a client (as shown below). \n", "When running within `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to view the various dashboard widgets. " ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:20.119279Z", "iopub.status.busy": "2022-07-27T19:19:20.118879Z", "iopub.status.idle": "2022-07-27T19:19:21.850721Z", "shell.execute_reply": "2022-07-27T19:19:21.850001Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-03e84462-0de1-11ed-a1e8-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

45bdf0f8

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 2\n", "
\n", " Total threads: 2\n", " \n", " Total memory: 6.78 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-1dfabd40-56f7-4aad-a87c-63bd33674848

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:36429\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 2\n", "
\n", " Started: Just now\n", " \n", " Total memory: 6.78 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:39049\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:37891/status\n", " \n", " Memory: 3.39 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:41889\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-t0q30i9q\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:36835\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:36827/status\n", " \n", " Memory: 3.39 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:43323\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-uo70pumh\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n", "client = Client()\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Create 2 DataFrames for comparison: \n", "1. for Dask \n", "2. for Pandas \n", "Dask comes with builtin dataset samples, we will use this sample for our example. " ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:21.856847Z", "iopub.status.busy": "2022-07-27T19:19:21.856413Z", "iopub.status.idle": "2022-07-27T19:19:21.885517Z", "shell.execute_reply": "2022-07-27T19:19:21.884218Z" }, "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf = dask.datasets.timeseries()\n", "ddf" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "* Remember `Dask framework` is **lazy** thus in order to see the result we need to run [compute()](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.compute.html) \n", " (or `head()` which runs under the hood compute()) )" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:21.889391Z", "iopub.status.busy": "2022-07-27T19:19:21.888953Z", "iopub.status.idle": "2022-07-27T19:19:22.379284Z", "shell.execute_reply": "2022-07-27T19:19:22.378633Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744
2000-01-01 00:00:01964Jerry0.0219150.588930
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "#### Pandas Dataframe\n", "In order to create a `Pandas` dataframe we can use the `compute()` method from a `Dask dataframe`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:22.382692Z", "iopub.status.busy": "2022-07-27T19:19:22.382235Z", "iopub.status.idle": "2022-07-27T19:19:24.041845Z", "shell.execute_reply": "2022-07-27T19:19:24.041279Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744
2000-01-01 00:00:01964Jerry0.0219150.588930
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = ddf.compute() \n", "print(type(pdf))\n", "pdf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### dataframe.shape \n", "We can also see *dask laziness* when using the shape attribute" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:24.045352Z", "iopub.status.busy": "2022-07-27T19:19:24.044829Z", "iopub.status.idle": "2022-07-27T19:19:24.053190Z", "shell.execute_reply": "2022-07-27T19:19:24.052580Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pandas shape: (2592000, 4)\n", "---------------------------\n", "Dask lazy shape: (Delayed('int-cfb1b2b5-09dd-494e-b2d2-f875ace2562d'), 4)\n" ] } ], "source": [ "print(f'Pandas shape: {pdf.shape}')\n", "print('---------------------------')\n", "print(f'Dask lazy shape: {ddf.shape}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We cannot get the full shape before accessing all the partitions - running `len` will do so" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:24.055995Z", "iopub.status.busy": "2022-07-27T19:19:24.055483Z", "iopub.status.idle": "2022-07-27T19:19:24.474425Z", "shell.execute_reply": "2022-07-27T19:19:24.473553Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dask computed shape: 2,592,000\n" ] } ], "source": [ "print(f'Dask computed shape: {len(ddf.index):,}') # expensive" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Creating a `Dask dataframe` from `Pandas`\n", "In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf) with the [from_pandas](https://docs.dask.org/en/latest/generated/dask.dataframe.from_pandas.html) method. \n", "You must supply the number of partitions or chunksize that will be used to generate the dask dataframe" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:24.477627Z", "iopub.status.busy": "2022-07-27T19:19:24.477218Z", "iopub.status.idle": "2022-07-27T19:19:24.766009Z", "shell.execute_reply": "2022-07-27T19:19:24.765404Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=10
2000-01-01 00:00:00int64objectfloat64float64
2000-01-04 00:00:00............
...............
2000-01-28 00:00:00............
2000-01-30 23:59:59............
\n", "
\n", "
Dask Name: from_pandas, 10 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=10 \n", "2000-01-01 00:00:00 int64 object float64 float64\n", "2000-01-04 00:00:00 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-28 00:00:00 ... ... ... ...\n", "2000-01-30 23:59:59 ... ... ... ...\n", "Dask Name: from_pandas, 10 tasks" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)\n", "ddf2" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Partitions in Dask Dataframes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`. \n", " The number of partitions will assist `Dask` on how to breakup the `Pandas Datafram` and parallelize the computation. \n", "Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions) \n", "\n", "An example for this can be seen when examing the `reset_ index()` method:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:24.769418Z", "iopub.status.busy": "2022-07-27T19:19:24.768890Z", "iopub.status.idle": "2022-07-27T19:19:24.841713Z", "shell.execute_reply": "2022-07-27T19:19:24.841108Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "timestamp 2000-01-01 00:00:00\n", "id 983\n", "name Wendy\n", "x -0.303374\n", "y -0.423744\n", "Name: 0, dtype: object" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf2 = pdf.reset_index()\n", "# Only 1 row\n", "pdf2.loc[0]" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:24.844798Z", "iopub.status.busy": "2022-07-27T19:19:24.844201Z", "iopub.status.idle": "2022-07-27T19:19:25.904966Z", "shell.execute_reply": "2022-07-27T19:19:25.904507Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
timestampidnamexy
02000-01-01983Wendy-0.303374-0.423744
02000-01-041002Kevin-0.825578-0.584699
02000-01-07963Oliver0.024036-0.692546
02000-01-101023Yvonne0.8974860.958034
02000-01-131088Quinn-0.7219540.261693
02000-01-16994George0.463023-0.166976
02000-01-19932Frank0.272315-0.585240
02000-01-221007Ursula-0.919138-0.173157
02000-01-25983Patricia-0.893431-0.892484
02000-01-281043Oliver-0.979336-0.581927
\n", "
" ], "text/plain": [ " timestamp id name x y\n", "0 2000-01-01 983 Wendy -0.303374 -0.423744\n", "0 2000-01-04 1002 Kevin -0.825578 -0.584699\n", "0 2000-01-07 963 Oliver 0.024036 -0.692546\n", "0 2000-01-10 1023 Yvonne 0.897486 0.958034\n", "0 2000-01-13 1088 Quinn -0.721954 0.261693\n", "0 2000-01-16 994 George 0.463023 -0.166976\n", "0 2000-01-19 932 Frank 0.272315 -0.585240\n", "0 2000-01-22 1007 Ursula -0.919138 -0.173157\n", "0 2000-01-25 983 Patricia -0.893431 -0.892484\n", "0 2000-01-28 1043 Oliver -0.979336 -0.581927" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf2 = ddf2.reset_index()\n", "# each partition has an index=0\n", "ddf2.loc[0].compute() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask Dataframe vs Pandas Dataframe\n", "Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Conceptual shift - from Update to Insert/Delete\n", "Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas. \n", "For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Rename Columns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* using `inplace=True` is not considerd to be *best practice*. " ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:25.910962Z", "iopub.status.busy": "2022-07-27T19:19:25.909652Z", "iopub.status.idle": "2022-07-27T19:19:26.007534Z", "shell.execute_reply": "2022-07-27T19:19:26.006930Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index(['id', 'name', 'x', 'y'], dtype='object')\n" ] }, { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y'], dtype='object')" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas \n", "print(pdf.columns)\n", "# pdf.rename(columns={'id':'ID'}, inplace=True)\n", "pdf = pdf.rename(columns={'id':'ID'})\n", "pdf.columns" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Dask - Error\n", "# ddf.rename(columns={'id':'ID'}, inplace=True)\n", "# ddf.columns\n", "\n", "''' python\n", "--------------------------------------------------------------------------- \n", "TypeError Traceback (most recent call last) \n", " in \n", " 1 # Dask - Error \n", "----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) \n", " 3 ddf.columns \n", "TypeError: rename() got an unexpected keyword argument 'inplace' \n", "'''" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:26.011042Z", "iopub.status.busy": "2022-07-27T19:19:26.010619Z", "iopub.status.idle": "2022-07-27T19:19:26.023455Z", "shell.execute_reply": "2022-07-27T19:19:26.022820Z" }, "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index(['id', 'name', 'x', 'y'], dtype='object')\n" ] }, { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y'], dtype='object')" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask\n", "print(ddf.columns)\n", "ddf = ddf.rename(columns={'id':'ID'})\n", "ddf.columns" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Data manipulations \n", "There are several diffrences when manipulating data. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### loc - Pandas" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:26.026621Z", "iopub.status.busy": "2022-07-27T19:19:26.026048Z", "iopub.status.idle": "2022-07-27T19:19:26.574765Z", "shell.execute_reply": "2022-07-27T19:19:26.573944Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:061019Xavier0.63480245.051214
2000-01-01 00:00:081013Charlie0.627523-8.101142
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:06 1019 Xavier 0.634802 45.051214\n", "2000-01-01 00:00:08 1013 Charlie 0.627523 -8.101142" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)\n", "pdf.loc[cond_pdf, ['y']] = pdf['y']* 100\n", "pdf[cond_pdf].head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Error" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n", "ddf[cond_ddf].head(2)\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", "Input In [14], in ()\n", " 1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n", " 3 ddf[cond_ddf].head(2)\n", "\n", "TypeError: '_LocIndexer' object does not support item assignment\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dask - use mask/where" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:26.577902Z", "iopub.status.busy": "2022-07-27T19:19:26.577459Z", "iopub.status.idle": "2022-07-27T19:19:26.670655Z", "shell.execute_reply": "2022-07-27T19:19:26.669763Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744
2000-01-01 00:00:01964Jerry0.0219150.588930
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas\n", "pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:26.674158Z", "iopub.status.busy": "2022-07-27T19:19:26.673610Z", "iopub.status.idle": "2022-07-27T19:19:26.756822Z", "shell.execute_reply": "2022-07-27T19:19:26.756329Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744
2000-01-01 00:00:01964Jerry0.0219150.588930
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "#Dask\n", "cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more information see [dask mask documentation](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.mask.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Meta argument\n", "One key feature in `Dask` is the introduction of `meta` arguement. \n", "> `meta` is the prescription of the names/types of the output from the computation \n", "from [stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)\n", "\n", "Since `Dask` creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage. \n", "For additinal information see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:26.760049Z", "iopub.status.busy": "2022-07-27T19:19:26.759580Z", "iopub.status.idle": "2022-07-27T19:19:27.482491Z", "shell.execute_reply": "2022-07-27T19:19:27.481981Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We
2000-01-01 00:00:01964Jerry0.0219150.588930Je
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:27.486396Z", "iopub.status.busy": "2022-07-27T19:19:27.485832Z", "iopub.status.idle": "2022-07-27T19:19:27.590335Z", "shell.execute_reply": "2022-07-27T19:19:27.589688Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3946: UserWarning: \n", "You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n", "To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n", " Before: .apply(func)\n", " After: .apply(func, meta=('name', 'object'))\n", "\n", " warnings.warn(meta_warning(meta))\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We
2000-01-01 00:00:01964Jerry0.0219150.588930Je
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask - Warning\n", "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Introducing meta argument" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:27.593986Z", "iopub.status.busy": "2022-07-27T19:19:27.593576Z", "iopub.status.idle": "2022-07-27T19:19:27.598533Z", "shell.execute_reply": "2022-07-27T19:19:27.597896Z" } }, "outputs": [], "source": [ "# Describe the outcome type of the calculation\n", "meta_arg = pd.Series(object, name='initials')" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:27.602572Z", "iopub.status.busy": "2022-07-27T19:19:27.601307Z", "iopub.status.idle": "2022-07-27T19:19:27.738137Z", "shell.execute_reply": "2022-07-27T19:19:27.737641Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We
2000-01-01 00:00:01964Jerry0.0219150.588930Je
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)\n", "ddf.head(2)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:27.741764Z", "iopub.status.busy": "2022-07-27T19:19:27.741562Z", "iopub.status.idle": "2022-07-27T19:19:27.746531Z", "shell.execute_reply": "2022-07-27T19:19:27.745937Z" } }, "outputs": [], "source": [ "# similar when using a function\n", "def func(row):\n", " if (row['x']> 0):\n", " return row['x'] * 1000 \n", " else:\n", " return row['y'] * -1" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:27.750216Z", "iopub.status.busy": "2022-07-27T19:19:27.749786Z", "iopub.status.idle": "2022-07-27T19:19:29.106610Z", "shell.execute_reply": "2022-07-27T19:19:29.105823Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialsz
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We0.423744
2000-01-01 00:00:01964Jerry0.0219150.588930Je21.914646
\n", "
" ], "text/plain": [ " ID name x y initials z\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Map partitions\n", "* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html) method. \n", "Mainly useful for functions that are not implemented in `Dask` or `Pandas` . \n", "* Finally we can return a new `dataframe` which needs to be described in the `meta` argument \n", "The function could also include arguments." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:29.110055Z", "iopub.status.busy": "2022-07-27T19:19:29.109596Z", "iopub.status.idle": "2022-07-27T19:19:30.452434Z", "shell.execute_reply": "2022-07-27T19:19:30.451789Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexydist
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744NaN
2000-01-01 00:00:01964Jerry0.0219150.5889301.063636
2000-01-01 00:00:02996Kevin0.3361840.1504780.539449
2000-01-01 00:00:031035Quinn0.8536550.0312220.531035
2000-01-01 00:00:041039Ingrid0.890711-0.9927941.024686
\n", "
" ], "text/plain": [ " ID name x y dist\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 NaN\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 1.063636\n", "2000-01-01 00:00:02 996 Kevin 0.336184 0.150478 0.539449\n", "2000-01-01 00:00:03 1035 Quinn 0.853655 0.031222 0.531035\n", "2000-01-01 00:00:04 1039 Ingrid 0.890711 -0.992794 1.024686" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "def func2(df, coor_x, coor_y, drop_cols):\n", " df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 \n", " + (df[coor_y] - df[coor_y].shift())**2 )\n", " return df.drop(drop_cols, axis=1)\n", "\n", "ddf2 = ddf.map_partitions(func2\n", " , coor_x='x'\n", " , coor_y='y'\n", " , drop_cols=['initials', 'z']\n", " , meta=pd.DataFrame({'ID':'i8'\n", " , 'name':str\n", " , 'x':'f8'\n", " , 'y':'f8' \n", " , 'dist':'f8'}, index=[0]))\n", "ddf2.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Convert index into Time column" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:30.455872Z", "iopub.status.busy": "2022-07-27T19:19:30.455402Z", "iopub.status.idle": "2022-07-27T19:19:32.255477Z", "shell.execute_reply": "2022-07-27T19:19:32.254841Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialstimes
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We00:00:00
2000-01-01 00:00:01964Jerry0.0219150.588930Je00:00:01
\n", "
" ], "text/plain": [ " ID name x y initials times\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 00:00:00\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 00:00:01" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Only Pandas\n", "pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:32.259330Z", "iopub.status.busy": "2022-07-27T19:19:32.258775Z", "iopub.status.idle": "2022-07-27T19:19:35.086760Z", "shell.execute_reply": "2022-07-27T19:19:35.086037Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialsztimes
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.423744We0.42374400:00:00
2000-01-01 00:00:01964Jerry0.0219150.588930Je21.91464600:00:01
\n", "
" ], "text/plain": [ " ID name x y initials z \\\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744 \n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646 \n", "\n", " times \n", "timestamp \n", "2000-01-01 00:00:00 00:00:00 \n", "2000-01-01 00:00:01 00:00:01 " ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask or Pandas\n", "ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))\n", "# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )\n", "ddf['times'] = ddf['times'].dt.time\n", "ddf =client.persist(ddf)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Drop NA on column" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:35.090472Z", "iopub.status.busy": "2022-07-27T19:19:35.090082Z", "iopub.status.idle": "2022-07-27T19:19:35.974814Z", "shell.execute_reply": "2022-07-27T19:19:35.973824Z" } }, "outputs": [], "source": [ "# no issue with regular drop columns\n", "pdf = pdf.drop(labels=['initials'],axis=1)\n", "ddf = ddf.drop(labels=['initials','z'],axis=1) " ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:35.978485Z", "iopub.status.busy": "2022-07-27T19:19:35.978138Z", "iopub.status.idle": "2022-07-27T19:19:36.460883Z", "shell.execute_reply": "2022-07-27T19:19:36.460300Z" } }, "outputs": [], "source": [ "# Pandas\n", "pdf = pdf.assign(colna = None)\n", "# Dask\n", "ddf = ddf.assign(colna = None)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:36.464358Z", "iopub.status.busy": "2022-07-27T19:19:36.464155Z", "iopub.status.idle": "2022-07-27T19:19:38.862694Z", "shell.execute_reply": "2022-07-27T19:19:38.861803Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.42374400:00:00
2000-01-01 00:00:01964Jerry0.0219150.58893000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = pdf.dropna(axis=1, how='all')\n", "pdf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In odrer for `Dask` to drop a column with all `na` it must check all the partitions with `compute()`" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:38.865844Z", "iopub.status.busy": "2022-07-27T19:19:38.865498Z", "iopub.status.idle": "2022-07-27T19:19:55.593930Z", "shell.execute_reply": "2022-07-27T19:19:55.593302Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
timestamp
2000-01-01 00:00:00983Wendy-0.303374-0.42374400:00:00
2000-01-01 00:00:01964Jerry0.0219150.58893000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "timestamp \n", "2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00\n", "2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive\n", " ddf = ddf.drop(labels=['colna'],axis=1)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 Reset Index" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:55.597217Z", "iopub.status.busy": "2022-07-27T19:19:55.596660Z", "iopub.status.idle": "2022-07-27T19:19:55.742059Z", "shell.execute_reply": "2022-07-27T19:19:55.741472Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0983Wendy-0.303374-0.42374400:00:00
1964Jerry0.0219150.58893000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 983 Wendy -0.303374 -0.423744 00:00:00\n", "1 964 Jerry 0.021915 0.588930 00:00:01" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas\n", "pdf =pdf.reset_index(drop=True)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:55.745029Z", "iopub.status.busy": "2022-07-27T19:19:55.744611Z", "iopub.status.idle": "2022-07-27T19:19:55.818296Z", "shell.execute_reply": "2022-07-27T19:19:55.817579Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0983Wendy-0.303374-0.42374400:00:00
1964Jerry0.0219150.58893000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 983 Wendy -0.303374 -0.423744 00:00:00\n", "1 964 Jerry 0.021915 0.588930 00:00:01" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask\n", "ddf = ddf.reset_index()\n", "ddf = ddf.drop(labels=['timestamp'], axis=1 )\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Read / Save files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* When working with `pandas` and `dask` preferable use [parquet format](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format). \n", "* When working with `Dask` - files can be read with multiple workers . \n", "* Most `kwargs` are applicable for reading and writing files \n", "e.g. \n", "ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False). \n", "* However some are not available such as `nrows`.\n", "\n", "[see documentaion](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.to_csv.html) (including the option for output file naming)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save files" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:55.821657Z", "iopub.status.busy": "2022-07-27T19:19:55.821163Z", "iopub.status.idle": "2022-07-27T19:19:55.825699Z", "shell.execute_reply": "2022-07-27T19:19:55.825039Z" } }, "outputs": [], "source": [ "from pathlib import Path\n", "output_dir_file = Path('data/pdf_single_file.csv')\n", "output_dir_file.parent.mkdir(parents=True, exist_ok=True)" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:19:55.828409Z", "iopub.status.busy": "2022-07-27T19:19:55.828009Z", "iopub.status.idle": "2022-07-27T19:20:12.412198Z", "shell.execute_reply": "2022-07-27T19:20:12.411672Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 16.4 s, sys: 433 ms, total: 16.8 s\n", "Wall time: 16.6 s\n" ] } ], "source": [ "%%time\n", "# Pandas\n", "pdf.to_csv(output_dir_file)" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:12.415270Z", "iopub.status.busy": "2022-07-27T19:20:12.414724Z", "iopub.status.idle": "2022-07-27T19:20:12.422528Z", "shell.execute_reply": "2022-07-27T19:20:12.421928Z" } }, "outputs": [ { "data": { "text/plain": [ "[PosixPath('data/2000-01-25.csv'),\n", " PosixPath('data/2000-01-20.csv'),\n", " PosixPath('data/2000-01-29.csv'),\n", " PosixPath('data/2000-01-02.csv'),\n", " PosixPath('data/2000-01-19.csv'),\n", " PosixPath('data/2000-01-23.csv'),\n", " PosixPath('data/2000-01-10.csv'),\n", " PosixPath('data/2000-01-21.csv'),\n", " PosixPath('data/2000-01-17.csv'),\n", " PosixPath('data/2000-01-04.csv'),\n", " PosixPath('data/2000-01-27.csv'),\n", " PosixPath('data/2000-01-22.csv'),\n", " PosixPath('data/2000-01-14.csv'),\n", " PosixPath('data/2000-01-11.csv'),\n", " PosixPath('data/pdf_single_file.csv'),\n", " PosixPath('data/2000-01-13.csv'),\n", " PosixPath('data/2000-01-08.csv'),\n", " PosixPath('data/2000-01-09.csv'),\n", " PosixPath('data/2000-01-06.csv'),\n", " PosixPath('data/2000-01-01.csv'),\n", " PosixPath('data/2000-01-07.csv'),\n", " PosixPath('data/2000-01-12.csv'),\n", " PosixPath('data/2000-01-16.csv'),\n", " PosixPath('data/2000-01-26.csv'),\n", " PosixPath('data/2000-01-24.csv'),\n", " PosixPath('data/2000-01-18.csv'),\n", " PosixPath('data/2000-01-15.csv'),\n", " PosixPath('data/2000-01-03.csv'),\n", " PosixPath('data/2000-01-30.csv'),\n", " PosixPath('data/2000-01-28.csv'),\n", " PosixPath('data/2000-01-05.csv')]" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(output_dir_file.parent.glob('*.csv'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice the `'*'` to allow for multiple file renaming. " ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:12.425291Z", "iopub.status.busy": "2022-07-27T19:20:12.424843Z", "iopub.status.idle": "2022-07-27T19:20:12.428554Z", "shell.execute_reply": "2022-07-27T19:20:12.427919Z" } }, "outputs": [], "source": [ "output_dask_dir = Path('data/dask_multi_files/')\n", "output_dask_dir.mkdir(parents=True, exist_ok=True)" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:12.431517Z", "iopub.status.busy": "2022-07-27T19:20:12.431018Z", "iopub.status.idle": "2022-07-27T19:20:22.894033Z", "shell.execute_reply": "2022-07-27T19:20:22.893388Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 454 ms, sys: 46.5 ms, total: 500 ms\n", "Wall time: 10.4 s\n" ] }, { "data": { "text/plain": [ "['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Dask\n", "ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.npartitions.html) \n", "\n", "To change the number of output files use [repartition](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html) which is an expensive operation." ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:22.897191Z", "iopub.status.busy": "2022-07-27T19:20:22.896487Z", "iopub.status.idle": "2022-07-27T19:20:22.901283Z", "shell.execute_reply": "2022-07-27T19:20:22.900662Z" } }, "outputs": [ { "data": { "text/plain": [ "30" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.npartitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read Multiple files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe)." ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:22.904296Z", "iopub.status.busy": "2022-07-27T19:20:22.903826Z", "iopub.status.idle": "2022-07-27T19:20:25.911116Z", "shell.execute_reply": "2022-07-27T19:20:25.910425Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.75 s, sys: 318 ms, total: 3.07 s\n", "Wall time: 3 s\n" ] }, { "data": { "text/plain": [ "2592000" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Pandas\n", "concat_df = pd.concat([pd.read_csv(f) \n", " for f in list(output_dask_dir.iterdir())])\n", "len(concat_df)" ] }, { "cell_type": "code", "execution_count": 38, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:25.915799Z", "iopub.status.busy": "2022-07-27T19:20:25.914693Z", "iopub.status.idle": "2022-07-27T19:20:25.951061Z", "shell.execute_reply": "2022-07-27T19:20:25.950483Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms\n", "Wall time: 12.4 ms\n" ] }, { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
npartitions=30
int64objectfloat64float64object
...............
..................
...............
...............
\n", "
\n", "
Dask Name: read-csv, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " ID name x y times\n", "npartitions=30 \n", " int64 object float64 float64 object\n", " ... ... ... ... ...\n", "... ... ... ... ... ...\n", " ... ... ... ... ...\n", " ... ... ... ... ...\n", "Dask Name: read-csv, 30 tasks" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Dask\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "_ddf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remember that `Dask` is lazy - thus it does not *realy* read the file until it needs to..." ] }, { "cell_type": "code", "execution_count": 39, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:25.953993Z", "iopub.status.busy": "2022-07-27T19:20:25.953653Z", "iopub.status.idle": "2022-07-27T19:20:26.781068Z", "shell.execute_reply": "2022-07-27T19:20:26.780413Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 69.6 ms, sys: 11.3 ms, total: 81 ms\n", "Wall time: 818 ms\n" ] }, { "data": { "text/plain": [ "2592000" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "len(_ddf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Consider using client.persist()\n", " Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory \n", "Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an example in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes) \n", "This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.\n" ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:26.784518Z", "iopub.status.busy": "2022-07-27T19:20:26.783673Z", "iopub.status.idle": "2022-07-27T19:20:27.037579Z", "shell.execute_reply": "2022-07-27T19:20:27.036716Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0983Wendy-0.303374-0.42374400:00:00
1964Jerry0.0219150.58893000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 983 Wendy -0.303374 -0.423744 00:00:00\n", "1 964 Jerry 0.021915 0.588930 00:00:01" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# e.g.\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "# do some filter\n", "_ddf = client.persist(_ddf)\n", "# do some computations\n", "_ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Group By - custom aggregations\n", "In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/main/dataframes/02-groupby.ipynb) that is in the repository - \n", "This is another example how to try to eliminate the use of `groupby.apply`. \n", "In this example we are grouping columns into unique lists." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Pandas" ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:27.040806Z", "iopub.status.busy": "2022-07-27T19:20:27.040604Z", "iopub.status.idle": "2022-07-27T19:20:34.275556Z", "shell.execute_reply": "2022-07-27T19:20:34.274958Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameIDseconds
0Wendy98300
1Jerry96400
2Kevin99600
3Quinn103500
4Ingrid103900
\n", "
" ], "text/plain": [ " name ID seconds\n", "0 Wendy 983 00\n", "1 Jerry 964 00\n", "2 Kevin 996 00\n", "3 Quinn 1035 00\n", "4 Ingrid 1039 00" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# prepare pandas dataframe\n", "pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)\n", "pdf['seconds'] = pdf.time.astype(str).str[-2:]\n", "cols_for_demo =['name', 'ID','seconds']\n", "pdf[cols_for_demo].head()" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:34.281695Z", "iopub.status.busy": "2022-07-27T19:20:34.281274Z", "iopub.status.idle": "2022-07-27T19:20:35.823962Z", "shell.execute_reply": "2022-07-27T19:20:35.822866Z" } }, "outputs": [], "source": [ "pdf_gb = pdf.groupby(pdf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [pdf_gb[att_col_gr].apply\n", " (lambda x: list(set(x.to_list()))) \n", " for att_col_gr in gp_col]" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:35.828415Z", "iopub.status.busy": "2022-07-27T19:20:35.828129Z", "iopub.status.idle": "2022-07-27T19:20:35.855778Z", "shell.execute_reply": "2022-07-27T19:20:35.855045Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " Weight ID \\\n", "name \n", "Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "\n", " seconds \n", "name \n", "Alice [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6... \n", "Bob [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6... \n", "CPU times: user 23.1 ms, sys: 169 µs, total: 23.3 ms\n", "Wall time: 22.1 ms\n" ] } ], "source": [ "%%time\n", "df_edge_att = pdf_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left') \n", "print(df_edge_att.head(2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Remeber that in any some cases `Pandas` is more efficiante (assuming that you can load all the data into the RAM). " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Dask" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:35.859227Z", "iopub.status.busy": "2022-07-27T19:20:35.858901Z", "iopub.status.idle": "2022-07-27T19:20:36.455780Z", "shell.execute_reply": "2022-07-27T19:20:36.454858Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameIDseconds
0Wendy98300
1Jerry96401
\n", "
" ], "text/plain": [ " name ID seconds\n", "0 Wendy 983 00\n", "1 Jerry 964 01" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def set_list_att(x: dd.Series):\n", " return list(set([item for item in x.values]))\n", "ddf['seconds'] = ddf.times.astype(str).str[-2:]\n", "ddf = client.persist(ddf)\n", "ddf[cols_for_demo].head(2)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:36.459120Z", "iopub.status.busy": "2022-07-27T19:20:36.458912Z", "iopub.status.idle": "2022-07-27T19:20:36.469196Z", "shell.execute_reply": "2022-07-27T19:20:36.468535Z" } }, "outputs": [ { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.columns" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:36.472120Z", "iopub.status.busy": "2022-07-27T19:20:36.471921Z", "iopub.status.idle": "2022-07-27T19:20:36.538889Z", "shell.execute_reply": "2022-07-27T19:20:36.538092Z" } }, "outputs": [], "source": [ "df_gb = ddf.groupby(ddf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [df_gb[att_col_gr].apply(set_list_att\n", " ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) \n", " for att_col_gr in gp_col]" ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:36.542635Z", "iopub.status.busy": "2022-07-27T19:20:36.542418Z", "iopub.status.idle": "2022-07-27T19:20:45.592922Z", "shell.execute_reply": "2022-07-27T19:20:45.591798Z" } }, "outputs": [ { "ename": "ValueError", "evalue": "The columns in the computed data do not match the columns in the provided metadata\n Extra: ['name']\n Missing: [0]", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", "File \u001b[0;32m:4\u001b[0m, in \u001b[0;36m\u001b[0;34m\u001b[0m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1213\u001b[0m, in \u001b[0;36m_Frame.head\u001b[0;34m(self, n, npartitions, compute)\u001b[0m\n\u001b[1;32m 1211\u001b[0m \u001b[38;5;66;03m# No need to warn if we're already looking at all partitions\u001b[39;00m\n\u001b[1;32m 1212\u001b[0m safe \u001b[38;5;241m=\u001b[39m npartitions \u001b[38;5;241m!=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mnpartitions\n\u001b[0;32m-> 1213\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_head\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnpartitions\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mnpartitions\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcompute\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msafe\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43msafe\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1247\u001b[0m, in \u001b[0;36m_Frame._head\u001b[0;34m(self, n, npartitions, compute, safe)\u001b[0m\n\u001b[1;32m 1242\u001b[0m result \u001b[38;5;241m=\u001b[39m new_dd_object(\n\u001b[1;32m 1243\u001b[0m graph, name, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_meta, [\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdivisions[\u001b[38;5;241m0\u001b[39m], \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdivisions[npartitions]]\n\u001b[1;32m 1244\u001b[0m )\n\u001b[1;32m 1246\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m compute:\n\u001b[0;32m-> 1247\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mresult\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1248\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 268\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 269\u001b[0m \u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 270\u001b[0m \n\u001b[1;32m 271\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;124;03m dask.base.compute\u001b[39;00m\n\u001b[1;32m 291\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 292\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 293\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 572\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 573\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 575\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 576\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3002\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3003\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3004\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3005\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3006\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2176\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2177\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2178\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2179\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2180\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2181\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2182\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2183\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2184\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2185\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 316\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 317\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 318\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 319\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 320\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 384\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 385\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 386\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 387\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358\u001b[0m, in \u001b[0;36msync..f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 356\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mwait_for(future, callback_timeout)\n\u001b[1;32m 357\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 358\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 359\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 360\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 759\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 761\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 762\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 763\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 764\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2039\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2040\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2041\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2042\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2043\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990\u001b[0m, in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 988\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(args) \u001b[38;5;241m==\u001b[39m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys):\n\u001b[1;32m 989\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mExpected \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m args, got \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;241m%\u001b[39m (\u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys), \u001b[38;5;28mlen\u001b[39m(args)))\n\u001b[0;32m--> 990\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m core\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdsk, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39moutkey, \u001b[38;5;28mdict\u001b[39m(\u001b[38;5;28mzip\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys, args)))\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149\u001b[0m, in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n\u001b[1;32m 147\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m key \u001b[38;5;129;01min\u001b[39;00m toposort(dsk):\n\u001b[1;32m 148\u001b[0m task \u001b[38;5;241m=\u001b[39m dsk[key]\n\u001b[0;32m--> 149\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(task, cache)\n\u001b[1;32m 150\u001b[0m cache[key] \u001b[38;5;241m=\u001b[39m result\n\u001b[1;32m 151\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(out, cache)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[38;5;124;03m\"\"\"Do the actual work of collecting data and executing a function\u001b[39;00m\n\u001b[1;32m 84\u001b[0m \n\u001b[1;32m 85\u001b[0m \u001b[38;5;124;03mExamples\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 110\u001b[0m \u001b[38;5;124;03m'foo'\u001b[39;00m\n\u001b[1;32m 111\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 112\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(arg, \u001b[38;5;28mlist\u001b[39m):\n\u001b[0;32m--> 113\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m [_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m arg]\n\u001b[1;32m 114\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m istask(arg):\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[38;5;124;03m\"\"\"Do the actual work of collecting data and executing a function\u001b[39;00m\n\u001b[1;32m 84\u001b[0m \n\u001b[1;32m 85\u001b[0m \u001b[38;5;124;03mExamples\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 110\u001b[0m \u001b[38;5;124;03m'foo'\u001b[39;00m\n\u001b[1;32m 111\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 112\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(arg, \u001b[38;5;28mlist\u001b[39m):\n\u001b[0;32m--> 113\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m [_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m arg]\n\u001b[1;32m 114\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m istask(arg):\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40\u001b[0m, in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n\u001b[1;32m 38\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mapply\u001b[39m(func, args, kwargs\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m):\n\u001b[1;32m 39\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m kwargs:\n\u001b[0;32m---> 40\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs)\n\u001b[1;32m 41\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 42\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436\u001b[0m, in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n\u001b[1;32m 6434\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m meta\n\u001b[1;32m 6435\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m is_dataframe_like(df):\n\u001b[0;32m-> 6436\u001b[0m check_matching_columns(meta, df)\n\u001b[1;32m 6437\u001b[0m c \u001b[38;5;241m=\u001b[39m meta\u001b[38;5;241m.\u001b[39mcolumns\n\u001b[1;32m 6438\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415\u001b[0m, in \u001b[0;36mcheck_matching_columns\u001b[0;34m()\u001b[0m\n\u001b[1;32m 413\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 414\u001b[0m extra_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mOrder of columns does not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m--> 415\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\n\u001b[1;32m 416\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mThe columns in the computed data do not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 417\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m the columns in the provided metadata\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 418\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mextra_info\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 419\u001b[0m )\n", "\u001b[0;31mValueError\u001b[0m: The columns in the computed data do not match the columns in the provided metadata\n Extra: ['name']\n Missing: [0]" ] } ], "source": [ "%%time\n", "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n", "df_edge_att.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can do better... \n", "Using [dask custom aggregation](https://docs.dask.org/en/latest/generated/dask.dataframe.groupby.Aggregation.html) is consideribly better" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:45.596112Z", "iopub.status.busy": "2022-07-27T19:20:45.595506Z", "iopub.status.idle": "2022-07-27T19:20:45.604176Z", "shell.execute_reply": "2022-07-27T19:20:45.603185Z" } }, "outputs": [], "source": [ "import itertools\n", "custom_agg = dd.Aggregation(\n", " 'custom_agg', \n", " lambda s: s.apply(set), \n", " lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)" ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:45.606949Z", "iopub.status.busy": "2022-07-27T19:20:45.606466Z", "iopub.status.idle": "2022-07-27T19:20:46.957491Z", "shell.execute_reply": "2022-07-27T19:20:46.956240Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 185 ms, sys: 11.3 ms, total: 196 ms\n", "Wall time: 1.33 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
WeightIDseconds
name
Alice99833[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...[21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0...
Bob99508[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...[28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0...
\n", "
" ], "text/plain": [ " Weight ID \\\n", "name \n", "Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "\n", " seconds \n", "name \n", "Alice [21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0... \n", "Bob [28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0... " ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "df_gb = ddf.groupby(ddf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]\n", "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n", "df_edge_att.head(2) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## [Debugging](https://docs.dask.org/en/latest/debugging.html)\n", "Debugging may be challenging...\n", "1. Run code without client \n", "2. Use Dashboard profiler\n", "3. Verify integrity of DAG" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Corrupted DAG \n", "In this example we show that once the DAG is currupted you may need to reset the calculation" ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:46.961111Z", "iopub.status.busy": "2022-07-27T19:20:46.960593Z", "iopub.status.idle": "2022-07-27T19:20:47.025111Z", "shell.execute_reply": "2022-07-27T19:20:47.024140Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01996Ingrid-0.9320920.477965
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 996 Ingrid -0.932092 0.477965" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# reset dataframe\n", "ddf = dask.datasets.timeseries()\n", "ddf.head(1)" ] }, { "cell_type": "code", "execution_count": 51, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:47.028476Z", "iopub.status.busy": "2022-07-27T19:20:47.028076Z", "iopub.status.idle": "2022-07-27T19:20:47.041215Z", "shell.execute_reply": "2022-07-27T19:20:47.040565Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] } ], "source": [ "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax\n", " + (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Is everything OK?" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Results in error\n", "ddf.head()\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", " in \n", " 1 # returns an error because of ^2 (needs to be **2)\n", "----> 2 ddf.head()\n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n", " 898 \n", " 899 if compute:\n", "--> 900 result = result.compute()\n", " 901 return result\n", " 902 \n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n", " 154 dask.base.compute\n", " 155 \"\"\"\n", "--> 156 (result,) = compute(self, traverse=False, **kwargs)\n", " 157 return result\n", " 158 \n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Even if the function is corrected the DAG is corrupted" ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:47.044568Z", "iopub.status.busy": "2022-07-27T19:20:47.043991Z", "iopub.status.idle": "2022-07-27T19:20:47.056360Z", "shell.execute_reply": "2022-07-27T19:20:47.055803Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] } ], "source": [ "# Still results with an error\n", "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax\n", " + (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Still Results in error\n", "ddf.head()\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", " in \n", " 1 # returns an error because of ^2 (needs to be **2)\n", "----> 2 ddf.head()\n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n", " 898 \n", " 899 if compute:\n", "--> 900 result = result.compute()\n", " 901 return result\n", " 902 \n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n", " 154 dask.base.compute\n", " 155 \"\"\"\n", "--> 156 (result,) = compute(self, traverse=False, **kwargs)\n", " 157 return result\n", " 158 \n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We need to reset the dataframe" ] }, { "cell_type": "code", "execution_count": 53, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:47.059717Z", "iopub.status.busy": "2022-07-27T19:20:47.059308Z", "iopub.status.idle": "2022-07-27T19:20:47.134282Z", "shell.execute_reply": "2022-07-27T19:20:47.133686Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexycol
timestamp
2000-01-01 00:00:00979Frank0.8207770.098616NaN
2000-01-01 00:00:01990Laura-0.851323-0.5016781.77659
\n", "
" ], "text/plain": [ " id name x y col\n", "timestamp \n", "2000-01-01 00:00:00 979 Frank 0.820777 0.098616 NaN\n", "2000-01-01 00:00:01 990 Laura -0.851323 -0.501678 1.77659" ] }, "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf = dask.datasets.timeseries()\n", "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function\n", " + (df[coor_y] - df[coor_y].shift())**2 )\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))\n", "ddf.head(2)" ] } ], "metadata": { "file_extension": ".py", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "mimetype": "text/x-python", "name": "python", "npconvert_exporter": "python", "pygments_lexer": "ipython3", "version": 3 }, "nbformat": 4, "nbformat_minor": 4 }