{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Gotcha's from Pandas to Dask\n", "\n", "This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment. \n", "Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:14.099726Z", "iopub.status.busy": "2022-05-16T13:56:14.099163Z", "iopub.status.idle": "2022-05-16T13:56:14.872911Z", "shell.execute_reply": "2022-05-16T13:56:14.871298Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dask versoin: 2022.04.0\n", "Pandas versoin: 1.4.2\n" ] } ], "source": [ "# since Dask is activly beeing developed - the current example is running with the below version\n", "import dask\n", "import dask.dataframe as dd\n", "import pandas as pd\n", "print(f'Dask versoin: {dask.__version__}')\n", "print(f'Pandas versoin: {pd.__version__}')" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. In this example we are running on a `LocalCluster`, this will also provide a dashboard which is useful to gain insight on the computation. \n", "For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup) \n", "\n", "The link to the dashboard will become visible when you create a client (as shown below). \n", "When running within `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to view the various dashboard widgets. " ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:14.876780Z", "iopub.status.busy": "2022-05-16T13:56:14.876583Z", "iopub.status.idle": "2022-05-16T13:56:16.530010Z", "shell.execute_reply": "2022-05-16T13:56:16.529323Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-f3a78d75-d51f-11ec-a19b-000d3aeabb7a

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

24dc48f5

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 2\n", "
\n", " Total threads: 2\n", " \n", " Total memory: 6.78 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-b2e4d2b9-7c39-4aac-9130-ad0f3ecc6639

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:39211\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 2\n", "
\n", " Started: Just now\n", " \n", " Total memory: 6.78 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:37931\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:42685/status\n", " \n", " Memory: 3.39 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:41003\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8dvc6orb\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:40067\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:40229/status\n", " \n", " Memory: 3.39 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:40143\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-7x38qz2h\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n", "client = Client()\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Create 2 DataFrames for comparison: \n", "1. for Dask \n", "2. for Pandas \n", "Dask comes with builtin dataset samples, we will use this sample for our example. " ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:16.533465Z", "iopub.status.busy": "2022-05-16T13:56:16.532981Z", "iopub.status.idle": "2022-05-16T13:56:16.557473Z", "shell.execute_reply": "2022-05-16T13:56:16.556954Z" }, "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf = dask.datasets.timeseries()\n", "ddf" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "* Remember `Dask framework` is **lazy** thus in order to see the result we need to run [compute()](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.compute.html) \n", " (or `head()` which runs under the hood compute()) )" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:16.560339Z", "iopub.status.busy": "2022-05-16T13:56:16.559835Z", "iopub.status.idle": "2022-05-16T13:56:17.014141Z", "shell.execute_reply": "2022-05-16T13:56:17.013476Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303
2000-01-01 00:00:011007Laura-0.7944600.630270
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "#### Pandas Dataframe\n", "In order to create a `Pandas` dataframe we can use the `compute()` method from a `Dask dataframe`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:17.017335Z", "iopub.status.busy": "2022-05-16T13:56:17.016890Z", "iopub.status.idle": "2022-05-16T13:56:18.620113Z", "shell.execute_reply": "2022-05-16T13:56:18.619403Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303
2000-01-01 00:00:011007Laura-0.7944600.630270
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = ddf.compute() \n", "print(type(pdf))\n", "pdf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### dataframe.shape \n", "We can also see *dask laziness* when using the shape attribute" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:18.623566Z", "iopub.status.busy": "2022-05-16T13:56:18.623169Z", "iopub.status.idle": "2022-05-16T13:56:18.631079Z", "shell.execute_reply": "2022-05-16T13:56:18.630514Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pandas shape: (2592000, 4)\n", "---------------------------\n", "Dask lazy shape: (Delayed('int-457670a7-0118-48ff-b5cd-08db2a126430'), 4)\n" ] } ], "source": [ "print(f'Pandas shape: {pdf.shape}')\n", "print('---------------------------')\n", "print(f'Dask lazy shape: {ddf.shape}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We cannot get the full shape before accessing all the partitions - running `len` will do so" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:18.633893Z", "iopub.status.busy": "2022-05-16T13:56:18.633424Z", "iopub.status.idle": "2022-05-16T13:56:19.045484Z", "shell.execute_reply": "2022-05-16T13:56:19.044310Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dask computed shape: 2,592,000\n" ] } ], "source": [ "print(f'Dask computed shape: {len(ddf.index):,}') # expensive" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Creating a `Dask dataframe` from `Pandas`\n", "In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf) with the [from_pandas](https://docs.dask.org/en/latest/generated/dask.dataframe.from_pandas.html) method. \n", "You must supply the number of partitions or chunksize that will be used to generate the dask dataframe" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:19.048716Z", "iopub.status.busy": "2022-05-16T13:56:19.048090Z", "iopub.status.idle": "2022-05-16T13:56:19.318117Z", "shell.execute_reply": "2022-05-16T13:56:19.317045Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=10
2000-01-01 00:00:00int64objectfloat64float64
2000-01-04 00:00:00............
...............
2000-01-28 00:00:00............
2000-01-30 23:59:59............
\n", "
\n", "
Dask Name: from_pandas, 10 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=10 \n", "2000-01-01 00:00:00 int64 object float64 float64\n", "2000-01-04 00:00:00 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-28 00:00:00 ... ... ... ...\n", "2000-01-30 23:59:59 ... ... ... ...\n", "Dask Name: from_pandas, 10 tasks" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)\n", "ddf2" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Partitions in Dask Dataframes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`. \n", " The number of partitions will assist `Dask` on how to breakup the `Pandas Datafram` and parallelize the computation. \n", "Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions) \n", "\n", "An example for this can be seen when examing the `reset_ index()` method:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:19.320948Z", "iopub.status.busy": "2022-05-16T13:56:19.320751Z", "iopub.status.idle": "2022-05-16T13:56:19.386400Z", "shell.execute_reply": "2022-05-16T13:56:19.385312Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "timestamp 2000-01-01 00:00:00\n", "id 995\n", "name Yvonne\n", "x -0.257197\n", "y 0.288303\n", "Name: 0, dtype: object" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf2 = pdf.reset_index()\n", "# Only 1 row\n", "pdf2.loc[0]" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:19.389821Z", "iopub.status.busy": "2022-05-16T13:56:19.389316Z", "iopub.status.idle": "2022-05-16T13:56:20.413639Z", "shell.execute_reply": "2022-05-16T13:56:20.413164Z" }, "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
timestampidnamexy
02000-01-01995Yvonne-0.2571970.288303
02000-01-041040Kevin0.0718740.998809
02000-01-07950Dan-0.860903-0.366538
02000-01-101012Norbert-0.688807-0.845508
02000-01-13970Kevin0.964429-0.470225
02000-01-161017Ray-0.858035-0.123796
02000-01-19973Norbert-0.5956040.726321
02000-01-221024Bob0.2721600.549340
02000-01-251026Bob-0.942005-0.867345
02000-01-28996Norbert0.885387-0.840972
\n", "
" ], "text/plain": [ " timestamp id name x y\n", "0 2000-01-01 995 Yvonne -0.257197 0.288303\n", "0 2000-01-04 1040 Kevin 0.071874 0.998809\n", "0 2000-01-07 950 Dan -0.860903 -0.366538\n", "0 2000-01-10 1012 Norbert -0.688807 -0.845508\n", "0 2000-01-13 970 Kevin 0.964429 -0.470225\n", "0 2000-01-16 1017 Ray -0.858035 -0.123796\n", "0 2000-01-19 973 Norbert -0.595604 0.726321\n", "0 2000-01-22 1024 Bob 0.272160 0.549340\n", "0 2000-01-25 1026 Bob -0.942005 -0.867345\n", "0 2000-01-28 996 Norbert 0.885387 -0.840972" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf2 = ddf2.reset_index()\n", "# each partition has an index=0\n", "ddf2.loc[0].compute() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask Dataframe vs Pandas Dataframe\n", "Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Conceptual shift - from Update to Insert/Delete\n", "Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas. \n", "For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Rename Columns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* using `inplace=True` is not considerd to be *best practice*. " ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:20.420367Z", "iopub.status.busy": "2022-05-16T13:56:20.419897Z", "iopub.status.idle": "2022-05-16T13:56:20.500427Z", "shell.execute_reply": "2022-05-16T13:56:20.499810Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index(['id', 'name', 'x', 'y'], dtype='object')\n" ] }, { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y'], dtype='object')" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas \n", "print(pdf.columns)\n", "# pdf.rename(columns={'id':'ID'}, inplace=True)\n", "pdf = pdf.rename(columns={'id':'ID'})\n", "pdf.columns" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Dask - Error\n", "# ddf.rename(columns={'id':'ID'}, inplace=True)\n", "# ddf.columns\n", "\n", "''' python\n", "--------------------------------------------------------------------------- \n", "TypeError Traceback (most recent call last) \n", " in \n", " 1 # Dask - Error \n", "----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) \n", " 3 ddf.columns \n", "TypeError: rename() got an unexpected keyword argument 'inplace' \n", "'''" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:20.503008Z", "iopub.status.busy": "2022-05-16T13:56:20.502796Z", "iopub.status.idle": "2022-05-16T13:56:20.511981Z", "shell.execute_reply": "2022-05-16T13:56:20.511522Z" }, "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index(['id', 'name', 'x', 'y'], dtype='object')\n" ] }, { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y'], dtype='object')" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask\n", "print(ddf.columns)\n", "ddf = ddf.rename(columns={'id':'ID'})\n", "ddf.columns" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Data manipulations \n", "There are several diffrences when manipulating data. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### loc - Pandas" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:20.514551Z", "iopub.status.busy": "2022-05-16T13:56:20.514254Z", "iopub.status.idle": "2022-05-16T13:56:21.021037Z", "shell.execute_reply": "2022-05-16T13:56:21.020425Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:171000Norbert0.72716389.682962
2000-01-01 00:00:201060Ursula0.700307-75.613821
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:17 1000 Norbert 0.727163 89.682962\n", "2000-01-01 00:00:20 1060 Ursula 0.700307 -75.613821" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)\n", "pdf.loc[cond_pdf, ['y']] = pdf['y']* 100\n", "pdf[cond_pdf].head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Error" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n", "ddf[cond_ddf].head(2)\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", "Input In [14], in ()\n", " 1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n", " 3 ddf[cond_ddf].head(2)\n", "\n", "TypeError: '_LocIndexer' object does not support item assignment\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dask - use mask/where" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.024138Z", "iopub.status.busy": "2022-05-16T13:56:21.023636Z", "iopub.status.idle": "2022-05-16T13:56:21.093950Z", "shell.execute_reply": "2022-05-16T13:56:21.093295Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303
2000-01-01 00:00:011007Laura-0.7944600.630270
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas\n", "pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.096810Z", "iopub.status.busy": "2022-05-16T13:56:21.096469Z", "iopub.status.idle": "2022-05-16T13:56:21.168635Z", "shell.execute_reply": "2022-05-16T13:56:21.168059Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexy
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303
2000-01-01 00:00:011007Laura-0.7944600.630270
\n", "
" ], "text/plain": [ " ID name x y\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "#Dask\n", "cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n", "ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more information see [dask mask documentation](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.mask.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Meta argument\n", "One key feature in `Dask` is the introduction of `meta` arguement. \n", "> `meta` is the prescription of the names/types of the output from the computation \n", "from [stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)\n", "\n", "Since `Dask` creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage. \n", "For additinal information see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.171560Z", "iopub.status.busy": "2022-05-16T13:56:21.171090Z", "iopub.status.idle": "2022-05-16T13:56:21.824171Z", "shell.execute_reply": "2022-05-16T13:56:21.823277Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv
2000-01-01 00:00:011007Laura-0.7944600.630270La
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.827023Z", "iopub.status.busy": "2022-05-16T13:56:21.826821Z", "iopub.status.idle": "2022-05-16T13:56:21.926361Z", "shell.execute_reply": "2022-05-16T13:56:21.925720Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3930: UserWarning: \n", "You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n", "To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n", " Before: .apply(func)\n", " After: .apply(func, meta=('name', 'object'))\n", "\n", " warnings.warn(meta_warning(meta))\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv
2000-01-01 00:00:011007Laura-0.7944600.630270La
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask - Warning\n", "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Introducing meta argument" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.929737Z", "iopub.status.busy": "2022-05-16T13:56:21.929324Z", "iopub.status.idle": "2022-05-16T13:56:21.933949Z", "shell.execute_reply": "2022-05-16T13:56:21.933354Z" } }, "outputs": [], "source": [ "# Describe the outcome type of the calculation\n", "meta_arg = pd.Series(object, name='initials')" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:21.937579Z", "iopub.status.busy": "2022-05-16T13:56:21.937245Z", "iopub.status.idle": "2022-05-16T13:56:22.067533Z", "shell.execute_reply": "2022-05-16T13:56:22.066966Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitials
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv
2000-01-01 00:00:011007Laura-0.7944600.630270La
\n", "
" ], "text/plain": [ " ID name x y initials\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)\n", "ddf.head(2)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:22.070631Z", "iopub.status.busy": "2022-05-16T13:56:22.070209Z", "iopub.status.idle": "2022-05-16T13:56:22.075324Z", "shell.execute_reply": "2022-05-16T13:56:22.074577Z" } }, "outputs": [], "source": [ "# similar when using a function\n", "def func(row):\n", " if (row['x']> 0):\n", " return row['x'] * 1000 \n", " else:\n", " return row['y'] * -1" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:22.078046Z", "iopub.status.busy": "2022-05-16T13:56:22.077741Z", "iopub.status.idle": "2022-05-16T13:56:23.323227Z", "shell.execute_reply": "2022-05-16T13:56:23.322802Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialsz
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv-0.288303
2000-01-01 00:00:011007Laura-0.7944600.630270La-0.630270
\n", "
" ], "text/plain": [ " ID name x y initials z\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv -0.288303\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La -0.630270" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Map partitions\n", "* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html) method. \n", "Mainly useful for functions that are not implemented in `Dask` or `Pandas` . \n", "* Finally we can return a new `dataframe` which needs to be described in the `meta` argument \n", "The function could also include arguments." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:23.326255Z", "iopub.status.busy": "2022-05-16T13:56:23.325833Z", "iopub.status.idle": "2022-05-16T13:56:24.574714Z", "shell.execute_reply": "2022-05-16T13:56:24.573911Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexydist
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303NaN
2000-01-01 00:00:011007Laura-0.7944600.6302700.636862
2000-01-01 00:00:021012Ursula-0.0130750.7326330.788061
2000-01-01 00:00:03960Yvonne-0.361644-0.2029560.998412
2000-01-01 00:00:04988Kevin0.918343-0.9335681.473825
\n", "
" ], "text/plain": [ " ID name x y dist\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 NaN\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 0.636862\n", "2000-01-01 00:00:02 1012 Ursula -0.013075 0.732633 0.788061\n", "2000-01-01 00:00:03 960 Yvonne -0.361644 -0.202956 0.998412\n", "2000-01-01 00:00:04 988 Kevin 0.918343 -0.933568 1.473825" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "def func2(df, coor_x, coor_y, drop_cols):\n", " df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 \n", " + (df[coor_y] - df[coor_y].shift())**2 )\n", " return df.drop(drop_cols, axis=1)\n", "\n", "ddf2 = ddf.map_partitions(func2\n", " , coor_x='x'\n", " , coor_y='y'\n", " , drop_cols=['initials', 'z']\n", " , meta=pd.DataFrame({'ID':'i8'\n", " , 'name':str\n", " , 'x':'f8'\n", " , 'y':'f8' \n", " , 'dist':'f8'}, index=[0]))\n", "ddf2.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Convert index into Time column" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:24.577618Z", "iopub.status.busy": "2022-05-16T13:56:24.577438Z", "iopub.status.idle": "2022-05-16T13:56:26.311103Z", "shell.execute_reply": "2022-05-16T13:56:26.310332Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialstimes
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv00:00:00
2000-01-01 00:00:011007Laura-0.7944600.630270La00:00:01
\n", "
" ], "text/plain": [ " ID name x y initials times\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv 00:00:00\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La 00:00:01" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Only Pandas\n", "pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:26.314215Z", "iopub.status.busy": "2022-05-16T13:56:26.313769Z", "iopub.status.idle": "2022-05-16T13:56:28.888773Z", "shell.execute_reply": "2022-05-16T13:56:28.888054Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexyinitialsztimes
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.288303Yv-0.28830300:00:00
2000-01-01 00:00:011007Laura-0.7944600.630270La-0.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y initials z \\\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv -0.288303 \n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La -0.630270 \n", "\n", " times \n", "timestamp \n", "2000-01-01 00:00:00 00:00:00 \n", "2000-01-01 00:00:01 00:00:01 " ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask or Pandas\n", "ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))\n", "# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )\n", "ddf['times'] = ddf['times'].dt.time\n", "ddf =client.persist(ddf)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Drop NA on column" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:28.892238Z", "iopub.status.busy": "2022-05-16T13:56:28.891935Z", "iopub.status.idle": "2022-05-16T13:56:29.612104Z", "shell.execute_reply": "2022-05-16T13:56:29.611547Z" } }, "outputs": [], "source": [ "# no issue with regular drop columns\n", "pdf = pdf.drop(labels=['initials'],axis=1)\n", "ddf = ddf.drop(labels=['initials','z'],axis=1) " ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:29.615543Z", "iopub.status.busy": "2022-05-16T13:56:29.615236Z", "iopub.status.idle": "2022-05-16T13:56:29.976770Z", "shell.execute_reply": "2022-05-16T13:56:29.976203Z" } }, "outputs": [], "source": [ "# Pandas\n", "pdf = pdf.assign(colna = None)\n", "# Dask\n", "ddf = ddf.assign(colna = None)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:29.980194Z", "iopub.status.busy": "2022-05-16T13:56:29.979816Z", "iopub.status.idle": "2022-05-16T13:56:32.373324Z", "shell.execute_reply": "2022-05-16T13:56:32.372574Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.28830300:00:00
2000-01-01 00:00:011007Laura-0.7944600.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 00:00:00\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 00:00:01" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = pdf.dropna(axis=1, how='all')\n", "pdf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In odrer for `Dask` to drop a column with all `na` it must check all the partitions with `compute()`" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:32.376833Z", "iopub.status.busy": "2022-05-16T13:56:32.376616Z", "iopub.status.idle": "2022-05-16T13:56:48.031592Z", "shell.execute_reply": "2022-05-16T13:56:48.030852Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
timestamp
2000-01-01 00:00:00995Yvonne-0.2571970.28830300:00:00
2000-01-01 00:00:011007Laura-0.7944600.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "timestamp \n", "2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 00:00:00\n", "2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 00:00:01" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive\n", " ddf = ddf.drop(labels=['colna'],axis=1)\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 Reset Index" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:48.034840Z", "iopub.status.busy": "2022-05-16T13:56:48.034402Z", "iopub.status.idle": "2022-05-16T13:56:48.178332Z", "shell.execute_reply": "2022-05-16T13:56:48.177125Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0995Yvonne-0.2571970.28830300:00:00
11007Laura-0.7944600.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 995 Yvonne -0.257197 0.288303 00:00:00\n", "1 1007 Laura -0.794460 0.630270 00:00:01" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pandas\n", "pdf =pdf.reset_index(drop=True)\n", "pdf.head(2)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:48.181375Z", "iopub.status.busy": "2022-05-16T13:56:48.180896Z", "iopub.status.idle": "2022-05-16T13:56:48.244382Z", "shell.execute_reply": "2022-05-16T13:56:48.243751Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0995Yvonne-0.2571970.28830300:00:00
11007Laura-0.7944600.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 995 Yvonne -0.257197 0.288303 00:00:00\n", "1 1007 Laura -0.794460 0.630270 00:00:01" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask\n", "ddf = ddf.reset_index()\n", "ddf = ddf.drop(labels=['timestamp'], axis=1 )\n", "ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Read / Save files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* When working with `pandas` and `dask` preferable use [parquet format](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format). \n", "* When working with `Dask` - files can be read with multiple workers . \n", "* Most `kwargs` are applicable for reading and writing files \n", "e.g. \n", "ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False). \n", "* However some are not available such as `nrows`.\n", "\n", "[see documentaion](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.to_csv.html) (including the option for output file naming)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save files" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:48.247934Z", "iopub.status.busy": "2022-05-16T13:56:48.247468Z", "iopub.status.idle": "2022-05-16T13:56:48.252342Z", "shell.execute_reply": "2022-05-16T13:56:48.251792Z" } }, "outputs": [], "source": [ "from pathlib import Path\n", "output_dir_file = Path('data/pdf_single_file.csv')\n", "output_dir_file.parent.mkdir(parents=True, exist_ok=True)" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:56:48.255334Z", "iopub.status.busy": "2022-05-16T13:56:48.254892Z", "iopub.status.idle": "2022-05-16T13:57:03.526168Z", "shell.execute_reply": "2022-05-16T13:57:03.525355Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 15.1 s, sys: 343 ms, total: 15.5 s\n", "Wall time: 15.3 s\n" ] } ], "source": [ "%%time\n", "# Pandas\n", "pdf.to_csv(output_dir_file)" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:03.529550Z", "iopub.status.busy": "2022-05-16T13:57:03.529124Z", "iopub.status.idle": "2022-05-16T13:57:03.534892Z", "shell.execute_reply": "2022-05-16T13:57:03.534220Z" } }, "outputs": [ { "data": { "text/plain": [ "[PosixPath('data/2000-01-26.csv'),\n", " PosixPath('data/2000-01-09.csv'),\n", " PosixPath('data/2000-01-01.csv'),\n", " PosixPath('data/2000-01-11.csv'),\n", " PosixPath('data/2000-01-02.csv'),\n", " PosixPath('data/2000-01-22.csv'),\n", " PosixPath('data/2000-01-08.csv'),\n", " PosixPath('data/2000-01-07.csv'),\n", " PosixPath('data/2000-01-03.csv'),\n", " PosixPath('data/2000-01-30.csv'),\n", " PosixPath('data/2000-01-29.csv'),\n", " PosixPath('data/2000-01-12.csv'),\n", " PosixPath('data/2000-01-19.csv'),\n", " PosixPath('data/2000-01-20.csv'),\n", " PosixPath('data/2000-01-23.csv'),\n", " PosixPath('data/2000-01-04.csv'),\n", " PosixPath('data/2000-01-13.csv'),\n", " PosixPath('data/2000-01-06.csv'),\n", " PosixPath('data/2000-01-21.csv'),\n", " PosixPath('data/2000-01-10.csv'),\n", " PosixPath('data/2000-01-17.csv'),\n", " PosixPath('data/pdf_single_file.csv'),\n", " PosixPath('data/2000-01-14.csv'),\n", " PosixPath('data/2000-01-05.csv'),\n", " PosixPath('data/2000-01-16.csv'),\n", " PosixPath('data/2000-01-28.csv'),\n", " PosixPath('data/2000-01-25.csv'),\n", " PosixPath('data/2000-01-27.csv'),\n", " PosixPath('data/2000-01-18.csv'),\n", " PosixPath('data/2000-01-15.csv'),\n", " PosixPath('data/2000-01-24.csv')]" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(output_dir_file.parent.glob('*.csv'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice the `'*'` to allow for multiple file renaming. " ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:03.537638Z", "iopub.status.busy": "2022-05-16T13:57:03.537242Z", "iopub.status.idle": "2022-05-16T13:57:03.542635Z", "shell.execute_reply": "2022-05-16T13:57:03.542089Z" } }, "outputs": [], "source": [ "output_dask_dir = Path('data/dask_multi_files/')\n", "output_dask_dir.mkdir(parents=True, exist_ok=True)" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:03.545496Z", "iopub.status.busy": "2022-05-16T13:57:03.545044Z", "iopub.status.idle": "2022-05-16T13:57:12.827508Z", "shell.execute_reply": "2022-05-16T13:57:12.826952Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 380 ms, sys: 31.3 ms, total: 412 ms\n", "Wall time: 9.27 s\n" ] }, { "data": { "text/plain": [ "['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',\n", " '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Dask\n", "ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.npartitions.html) \n", "\n", "To change the number of output files use [repartition](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html) which is an expensive operation." ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:12.831310Z", "iopub.status.busy": "2022-05-16T13:57:12.831096Z", "iopub.status.idle": "2022-05-16T13:57:12.838088Z", "shell.execute_reply": "2022-05-16T13:57:12.837209Z" } }, "outputs": [ { "data": { "text/plain": [ "30" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.npartitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read Multiple files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe)." ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:12.840939Z", "iopub.status.busy": "2022-05-16T13:57:12.840490Z", "iopub.status.idle": "2022-05-16T13:57:15.729069Z", "shell.execute_reply": "2022-05-16T13:57:15.728444Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.6 s, sys: 350 ms, total: 2.95 s\n", "Wall time: 2.88 s\n" ] }, { "data": { "text/plain": [ "2592000" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Pandas\n", "concat_df = pd.concat([pd.read_csv(f) \n", " for f in list(output_dask_dir.iterdir())])\n", "len(concat_df)" ] }, { "cell_type": "code", "execution_count": 38, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:15.732102Z", "iopub.status.busy": "2022-05-16T13:57:15.731761Z", "iopub.status.idle": "2022-05-16T13:57:15.755401Z", "shell.execute_reply": "2022-05-16T13:57:15.754720Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 9.84 ms, sys: 0 ns, total: 9.84 ms\n", "Wall time: 9.3 ms\n" ] }, { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
npartitions=30
int64objectfloat64float64object
...............
..................
...............
...............
\n", "
\n", "
Dask Name: read-csv, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " ID name x y times\n", "npartitions=30 \n", " int64 object float64 float64 object\n", " ... ... ... ... ...\n", "... ... ... ... ... ...\n", " ... ... ... ... ...\n", " ... ... ... ... ...\n", "Dask Name: read-csv, 30 tasks" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "# Dask\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "_ddf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remember that `Dask` is lazy - thus it does not *realy* read the file until it needs to..." ] }, { "cell_type": "code", "execution_count": 39, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:15.758365Z", "iopub.status.busy": "2022-05-16T13:57:15.757874Z", "iopub.status.idle": "2022-05-16T13:57:16.534838Z", "shell.execute_reply": "2022-05-16T13:57:16.534220Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 70.5 ms, sys: 6.65 ms, total: 77.2 ms\n", "Wall time: 769 ms\n" ] }, { "data": { "text/plain": [ "2592000" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "len(_ddf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Consider using client.persist()\n", " Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory \n", "Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an example in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes) \n", "This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.\n" ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:16.538103Z", "iopub.status.busy": "2022-05-16T13:57:16.537656Z", "iopub.status.idle": "2022-05-16T13:57:16.792233Z", "shell.execute_reply": "2022-05-16T13:57:16.791710Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
IDnamexytimes
0995Yvonne-0.2571970.28830300:00:00
11007Laura-0.7944600.63027000:00:01
\n", "
" ], "text/plain": [ " ID name x y times\n", "0 995 Yvonne -0.257197 0.288303 00:00:00\n", "1 1007 Laura -0.794460 0.630270 00:00:01" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# e.g.\n", "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n", "# do some filter\n", "_ddf = client.persist(_ddf)\n", "# do some computations\n", "_ddf.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Group By - custom aggregations\n", "In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/main/dataframes/02-groupby.ipynb) that is in the repository - \n", "This is another example how to try to eliminate the use of `groupby.apply`. \n", "In this example we are grouping columns into unique lists." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Pandas" ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:16.804633Z", "iopub.status.busy": "2022-05-16T13:57:16.798082Z", "iopub.status.idle": "2022-05-16T13:57:23.549303Z", "shell.execute_reply": "2022-05-16T13:57:23.548734Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameIDseconds
0Yvonne99500
1Laura100700
2Ursula101200
3Yvonne96000
4Kevin98800
\n", "
" ], "text/plain": [ " name ID seconds\n", "0 Yvonne 995 00\n", "1 Laura 1007 00\n", "2 Ursula 1012 00\n", "3 Yvonne 960 00\n", "4 Kevin 988 00" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# prepare pandas dataframe\n", "pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)\n", "pdf['seconds'] = pdf.time.astype(str).str[-2:]\n", "cols_for_demo =['name', 'ID','seconds']\n", "pdf[cols_for_demo].head()" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:23.552722Z", "iopub.status.busy": "2022-05-16T13:57:23.551882Z", "iopub.status.idle": "2022-05-16T13:57:25.003337Z", "shell.execute_reply": "2022-05-16T13:57:25.002563Z" } }, "outputs": [], "source": [ "pdf_gb = pdf.groupby(pdf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [pdf_gb[att_col_gr].apply\n", " (lambda x: list(set(x.to_list()))) \n", " for att_col_gr in gp_col]" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:25.007938Z", "iopub.status.busy": "2022-05-16T13:57:25.007435Z", "iopub.status.idle": "2022-05-16T13:57:25.032666Z", "shell.execute_reply": "2022-05-16T13:57:25.031650Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " Weight ID \\\n", "name \n", "Alice 99633 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "Bob 99782 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "\n", " seconds \n", "name \n", "Alice [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7... \n", "Bob [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7... \n", "CPU times: user 22.4 ms, sys: 0 ns, total: 22.4 ms\n", "Wall time: 20.1 ms\n" ] } ], "source": [ "%%time\n", "df_edge_att = pdf_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left') \n", "print(df_edge_att.head(2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Remeber that in any some cases `Pandas` is more efficiante (assuming that you can load all the data into the RAM). " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Dask" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:25.035924Z", "iopub.status.busy": "2022-05-16T13:57:25.035273Z", "iopub.status.idle": "2022-05-16T13:57:25.565456Z", "shell.execute_reply": "2022-05-16T13:57:25.564604Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameIDseconds
0Yvonne99500
1Laura100701
\n", "
" ], "text/plain": [ " name ID seconds\n", "0 Yvonne 995 00\n", "1 Laura 1007 01" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def set_list_att(x: dd.Series):\n", " return list(set([item for item in x.values]))\n", "ddf['seconds'] = ddf.times.astype(str).str[-2:]\n", "ddf = client.persist(ddf)\n", "ddf[cols_for_demo].head(2)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:25.569519Z", "iopub.status.busy": "2022-05-16T13:57:25.569199Z", "iopub.status.idle": "2022-05-16T13:57:25.575913Z", "shell.execute_reply": "2022-05-16T13:57:25.575221Z" } }, "outputs": [ { "data": { "text/plain": [ "Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.columns" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:25.578714Z", "iopub.status.busy": "2022-05-16T13:57:25.578352Z", "iopub.status.idle": "2022-05-16T13:57:25.639312Z", "shell.execute_reply": "2022-05-16T13:57:25.638671Z" } }, "outputs": [], "source": [ "df_gb = ddf.groupby(ddf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [df_gb[att_col_gr].apply(set_list_att\n", " ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) \n", " for att_col_gr in gp_col]" ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:25.643019Z", "iopub.status.busy": "2022-05-16T13:57:25.642694Z", "iopub.status.idle": "2022-05-16T13:57:35.119074Z", "shell.execute_reply": "2022-05-16T13:57:35.118412Z" } }, "outputs": [ { "ename": "ValueError", "evalue": "The columns in the computed data do not match the columns in the provided metadata\n Extra: ['name']\n Missing: [0]", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", "File \u001b[0;32m:4\u001b[0m, in \u001b[0;36m\u001b[0;34m\u001b[0m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1208\u001b[0m, in \u001b[0;36m_Frame.head\u001b[0;34m(self, n, npartitions, compute)\u001b[0m\n\u001b[1;32m 1206\u001b[0m \u001b[38;5;66;03m# No need to warn if we're already looking at all partitions\u001b[39;00m\n\u001b[1;32m 1207\u001b[0m safe \u001b[38;5;241m=\u001b[39m npartitions \u001b[38;5;241m!=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mnpartitions\n\u001b[0;32m-> 1208\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_head\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnpartitions\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mnpartitions\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcompute\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msafe\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43msafe\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1242\u001b[0m, in \u001b[0;36m_Frame._head\u001b[0;34m(self, n, npartitions, compute, safe)\u001b[0m\n\u001b[1;32m 1237\u001b[0m result \u001b[38;5;241m=\u001b[39m new_dd_object(\n\u001b[1;32m 1238\u001b[0m graph, name, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_meta, [\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdivisions[\u001b[38;5;241m0\u001b[39m], \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdivisions[npartitions]]\n\u001b[1;32m 1239\u001b[0m )\n\u001b[1;32m 1241\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m compute:\n\u001b[0;32m-> 1242\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mresult\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1243\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 268\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 269\u001b[0m \u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 270\u001b[0m \n\u001b[1;32m 271\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;124;03m dask.base.compute\u001b[39;00m\n\u001b[1;32m 291\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 292\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 293\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 572\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 573\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 575\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 576\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3018\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3016\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3017\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3018\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3019\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3020\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2171\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2169\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2170\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2171\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2172\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2173\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2174\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2175\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2176\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2177\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2178\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:309\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 307\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 308\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 309\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 310\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 311\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:376\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 374\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 375\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 376\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 377\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 378\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:349\u001b[0m, in \u001b[0;36msync..f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 347\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mwait_for(future, callback_timeout)\n\u001b[1;32m 348\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 349\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 350\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 351\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 759\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 761\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 762\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 763\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 764\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2034\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2032\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2033\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2034\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2035\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2036\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990\u001b[0m, in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 988\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(args) \u001b[38;5;241m==\u001b[39m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys):\n\u001b[1;32m 989\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mExpected \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m args, got \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;241m%\u001b[39m (\u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys), \u001b[38;5;28mlen\u001b[39m(args)))\n\u001b[0;32m--> 990\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m core\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdsk, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39moutkey, \u001b[38;5;28mdict\u001b[39m(\u001b[38;5;28mzip\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys, args)))\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149\u001b[0m, in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n\u001b[1;32m 147\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m key \u001b[38;5;129;01min\u001b[39;00m toposort(dsk):\n\u001b[1;32m 148\u001b[0m task \u001b[38;5;241m=\u001b[39m dsk[key]\n\u001b[0;32m--> 149\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(task, cache)\n\u001b[1;32m 150\u001b[0m cache[key] \u001b[38;5;241m=\u001b[39m result\n\u001b[1;32m 151\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(out, cache)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[38;5;124;03m\"\"\"Do the actual work of collecting data and executing a function\u001b[39;00m\n\u001b[1;32m 84\u001b[0m \n\u001b[1;32m 85\u001b[0m \u001b[38;5;124;03mExamples\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 110\u001b[0m \u001b[38;5;124;03m'foo'\u001b[39;00m\n\u001b[1;32m 111\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 112\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(arg, \u001b[38;5;28mlist\u001b[39m):\n\u001b[0;32m--> 113\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m [_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m arg]\n\u001b[1;32m 114\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m istask(arg):\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 83\u001b[0m \u001b[38;5;124;03m\"\"\"Do the actual work of collecting data and executing a function\u001b[39;00m\n\u001b[1;32m 84\u001b[0m \n\u001b[1;32m 85\u001b[0m \u001b[38;5;124;03mExamples\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 110\u001b[0m \u001b[38;5;124;03m'foo'\u001b[39;00m\n\u001b[1;32m 111\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 112\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(arg, \u001b[38;5;28mlist\u001b[39m):\n\u001b[0;32m--> 113\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m [_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m arg]\n\u001b[1;32m 114\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m istask(arg):\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:39\u001b[0m, in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n\u001b[1;32m 37\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mapply\u001b[39m(func, args, kwargs\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m):\n\u001b[1;32m 38\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m kwargs:\n\u001b[0;32m---> 39\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs)\n\u001b[1;32m 40\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 41\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6355\u001b[0m, in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n\u001b[1;32m 6353\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m meta\n\u001b[1;32m 6354\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m is_dataframe_like(df):\n\u001b[0;32m-> 6355\u001b[0m check_matching_columns(meta, df)\n\u001b[1;32m 6356\u001b[0m c \u001b[38;5;241m=\u001b[39m meta\u001b[38;5;241m.\u001b[39mcolumns\n\u001b[1;32m 6357\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415\u001b[0m, in \u001b[0;36mcheck_matching_columns\u001b[0;34m()\u001b[0m\n\u001b[1;32m 413\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 414\u001b[0m extra_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mOrder of columns does not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m--> 415\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\n\u001b[1;32m 416\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mThe columns in the computed data do not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 417\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m the columns in the provided metadata\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 418\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mextra_info\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 419\u001b[0m )\n", "\u001b[0;31mValueError\u001b[0m: The columns in the computed data do not match the columns in the provided metadata\n Extra: ['name']\n Missing: [0]" ] } ], "source": [ "%%time\n", "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n", "df_edge_att.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can do better... \n", "Using [dask custom aggregation](https://docs.dask.org/en/latest/generated/dask.dataframe.groupby.Aggregation.html) is consideribly better" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:35.122312Z", "iopub.status.busy": "2022-05-16T13:57:35.121882Z", "iopub.status.idle": "2022-05-16T13:57:35.129015Z", "shell.execute_reply": "2022-05-16T13:57:35.128349Z" } }, "outputs": [], "source": [ "import itertools\n", "custom_agg = dd.Aggregation(\n", " 'custom_agg', \n", " lambda s: s.apply(set), \n", " lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)" ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:35.131910Z", "iopub.status.busy": "2022-05-16T13:57:35.131520Z", "iopub.status.idle": "2022-05-16T13:57:36.354206Z", "shell.execute_reply": "2022-05-16T13:57:36.353602Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 171 ms, sys: 4.06 ms, total: 175 ms\n", "Wall time: 1.21 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
WeightIDseconds
name
Alice99633[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...[23, 55, 51, 21, 28, 58, 35, 06, 53, 11, 39, 3...
Bob99782[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...[23, 55, 51, 21, 28, 58, 06, 35, 53, 39, 11, 3...
\n", "
" ], "text/plain": [ " Weight ID \\\n", "name \n", "Alice 99633 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "Bob 99782 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n", "\n", " seconds \n", "name \n", "Alice [23, 55, 51, 21, 28, 58, 35, 06, 53, 11, 39, 3... \n", "Bob [23, 55, 51, 21, 28, 58, 06, 35, 53, 39, 11, 3... " ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "df_gb = ddf.groupby(ddf.name)\n", "gp_col = ['ID', 'seconds']\n", "list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]\n", "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n", "for ser in list_ser_gb:\n", " df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n", "df_edge_att.head(2) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## [Debugging](https://docs.dask.org/en/latest/debugging.html)\n", "Debugging may be challenging...\n", "1. Run code without client \n", "2. Use Dashboard profiler\n", "3. Verify integrity of DAG" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Corrupted DAG \n", "In this example we show that once the DAG is currupted you may need to reset the calculation" ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:36.357153Z", "iopub.status.busy": "2022-05-16T13:57:36.356950Z", "iopub.status.idle": "2022-05-16T13:57:36.413946Z", "shell.execute_reply": "2022-05-16T13:57:36.413409Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
timestamp
2000-01-011011Xavier-0.4090160.331355
\n", "
" ], "text/plain": [ " id name x y\n", "timestamp \n", "2000-01-01 1011 Xavier -0.409016 0.331355" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# reset dataframe\n", "ddf = dask.datasets.timeseries()\n", "ddf.head(1)" ] }, { "cell_type": "code", "execution_count": 51, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:36.417249Z", "iopub.status.busy": "2022-05-16T13:57:36.417060Z", "iopub.status.idle": "2022-05-16T13:57:36.429837Z", "shell.execute_reply": "2022-05-16T13:57:36.429044Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] } ], "source": [ "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax\n", " + (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Is everything OK?" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Results in error\n", "ddf.head()\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", " in \n", " 1 # returns an error because of ^2 (needs to be **2)\n", "----> 2 ddf.head()\n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n", " 898 \n", " 899 if compute:\n", "--> 900 result = result.compute()\n", " 901 return result\n", " 902 \n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n", " 154 dask.base.compute\n", " 155 \"\"\"\n", "--> 156 (result,) = compute(self, traverse=False, **kwargs)\n", " 157 return result\n", " 158 \n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Even if the function is corrected the DAG is corrupted" ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:36.432507Z", "iopub.status.busy": "2022-05-16T13:57:36.432331Z", "iopub.status.idle": "2022-05-16T13:57:36.445606Z", "shell.execute_reply": "2022-05-16T13:57:36.444762Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] } ], "source": [ "# Still results with an error\n", "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax\n", " + (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "# Still Results in error\n", "ddf.head()\n", "\n", "---------------------------------------------------------------------------\n", "TypeError Traceback (most recent call last)\n", " in \n", " 1 # returns an error because of ^2 (needs to be **2)\n", "----> 2 ddf.head()\n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n", " 898 \n", " 899 if compute:\n", "--> 900 result = result.compute()\n", " 901 return result\n", " 902 \n", "\n", "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n", " 154 dask.base.compute\n", " 155 \"\"\"\n", "--> 156 (result,) = compute(self, traverse=False, **kwargs)\n", " 157 return result\n", " 158 \n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n", "\n", "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We need to reset the dataframe" ] }, { "cell_type": "code", "execution_count": 53, "metadata": { "execution": { "iopub.execute_input": "2022-05-16T13:57:36.448158Z", "iopub.status.busy": "2022-05-16T13:57:36.447851Z", "iopub.status.idle": "2022-05-16T13:57:36.515331Z", "shell.execute_reply": "2022-05-16T13:57:36.514886Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", " warnings.warn(\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexycol
timestamp
2000-01-01 00:00:001000Alice-0.805901-0.690125NaN
2000-01-01 00:00:01974Quinn0.3419080.0127091.345898
\n", "
" ], "text/plain": [ " id name x y col\n", "timestamp \n", "2000-01-01 00:00:00 1000 Alice -0.805901 -0.690125 NaN\n", "2000-01-01 00:00:01 974 Quinn 0.341908 0.012709 1.345898" ] }, "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf = dask.datasets.timeseries()\n", "def func_dist2(df, coor_x, coor_y):\n", " dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function\n", " + (df[coor_y] - df[coor_y].shift())**2 )\n", " return dist\n", "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n", " , meta=('float'))\n", "ddf.head(2)" ] } ], "metadata": { "file_extension": ".py", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "mimetype": "text/x-python", "name": "python", "npconvert_exporter": "python", "pygments_lexer": "ipython3", "version": 3 }, "nbformat": 4, "nbformat_minor": 4 }