{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DataFrames: Reading in messy data\n", " \n", "In the [01-data-access](./01-data-access.ipynb) example we show how Dask Dataframes can read and store data in many of the same formats as Pandas dataframes. One key difference, when using Dask Dataframes is that instead of opening a single file with a function like [pandas.read_csv](https://docs.dask.org/en/latest/generated/dask.dataframe.read_csv.html), we typically open many files at once with [dask.dataframe.read_csv](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.read_csv). This enables us to treat a collection of files as a single dataset. Most of the time this works really well. But real data is messy and in this notebook we will explore a more advanced technique to bring messy datasets into a dask dataframe." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. It will provide a dashboard which \n", "is useful to gain insight on the computation. \n", "\n", "The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:51.021112Z", "iopub.status.busy": "2022-07-27T19:20:51.020744Z", "iopub.status.idle": "2022-07-27T19:20:53.134670Z", "shell.execute_reply": "2022-07-27T19:20:53.133799Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-3a3f7d3e-0de1-11ed-a27f-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

df920051

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 1\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 1.86 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-0c7591d1-fd03-435a-bfcf-c75404389145

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:43981\n", " \n", " Workers: 1\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 1.86 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:45589\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:37755/status\n", " \n", " Memory: 1.86 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:44473\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-pz_o1b0r\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create artificial dataset\n", "\n", "First we create an artificial dataset and write it to many CSV files.\n", "\n", "You don't need to understand this section, we're just creating a dataset for the rest of the notebook." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:53.138396Z", "iopub.status.busy": "2022-07-27T19:20:53.137795Z", "iopub.status.idle": "2022-07-27T19:20:53.522636Z", "shell.execute_reply": "2022-07-27T19:20:53.521943Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "df = dask.datasets.timeseries()\n", "df" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:20:53.526214Z", "iopub.status.busy": "2022-07-27T19:20:53.525705Z", "iopub.status.idle": "2022-07-27T19:21:10.621708Z", "shell.execute_reply": "2022-07-27T19:21:10.621022Z" } }, "outputs": [], "source": [ "import os\n", "import datetime\n", "\n", "if not os.path.exists('data'):\n", " os.mkdir('data')\n", "\n", "def name(i):\n", " \"\"\" Provide date for filename given index\n", " \n", " Examples\n", " --------\n", " >>> name(0)\n", " '2000-01-01'\n", " >>> name(10)\n", " '2000-01-11'\n", " \"\"\"\n", " return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))\n", " \n", "df.to_csv('data/*.csv', name_function=name, index=False);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read CSV files\n", "\n", "We now have many CSV files in our data directory, one for each day in the month of January 2000. Each CSV file holds timeseries data for that day. We can read all of them as one logical dataframe using the `dd.read_csv` function with a glob string." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:10.625892Z", "iopub.status.busy": "2022-07-27T19:21:10.625337Z", "iopub.status.idle": "2022-07-27T19:21:10.799697Z", "shell.execute_reply": "2022-07-27T19:21:10.798538Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "data/2000-01-01.csv\r\n", "data/2000-01-02.csv\r\n", "data/2000-01-03.csv\r\n", "data/2000-01-04.csv\r\n", "data/2000-01-05.csv\r\n", "data/2000-01-06.csv\r\n", "data/2000-01-07.csv\r\n", "data/2000-01-08.csv\r\n", "data/2000-01-09.csv\r\n", "data/2000-01-10.csv\r\n" ] } ], "source": [ "!ls data/*.csv | head" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:10.805660Z", "iopub.status.busy": "2022-07-27T19:21:10.805309Z", "iopub.status.idle": "2022-07-27T19:21:10.838190Z", "shell.execute_reply": "2022-07-27T19:21:10.837591Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
int64objectfloat64float64
............
...............
............
............
\n", "
\n", "
Dask Name: read-csv, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", " int64 object float64 float64\n", " ... ... ... ...\n", "... ... ... ... ...\n", " ... ... ... ...\n", " ... ... ... ...\n", "Dask Name: read-csv, 30 tasks" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "\n", "df = dd.read_csv('data/2000-*-*.csv')\n", "df" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:10.841300Z", "iopub.status.busy": "2022-07-27T19:21:10.841075Z", "iopub.status.idle": "2022-07-27T19:21:10.951521Z", "shell.execute_reply": "2022-07-27T19:21:10.950370Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
0988Norbert-0.742721-0.277954
11025Bob0.603313-0.161292
2992Alice-0.0494080.573142
31029Bob-0.1225660.533852
41032Patricia0.476066-0.006417
\n", "
" ], "text/plain": [ " id name x y\n", "0 988 Norbert -0.742721 -0.277954\n", "1 1025 Bob 0.603313 -0.161292\n", "2 992 Alice -0.049408 0.573142\n", "3 1029 Bob -0.122566 0.533852\n", "4 1032 Patricia 0.476066 -0.006417" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at some statistics on the data" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:10.954638Z", "iopub.status.busy": "2022-07-27T19:21:10.954129Z", "iopub.status.idle": "2022-07-27T19:21:13.109253Z", "shell.execute_reply": "2022-07-27T19:21:13.108393Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
count2.592000e+062.592000e+062.592000e+06
mean9.999909e+02-1.752288e-041.272128e-04
std3.163993e+015.772766e-015.773655e-01
min8.370000e+02-9.999999e-01-9.999995e-01
25%9.790000e+02-4.924166e-01-4.938494e-01
50%1.000000e+039.977439e-034.362202e-03
75%1.022000e+035.070134e-015.083363e-01
max1.160000e+039.999979e-019.999995e-01
\n", "
" ], "text/plain": [ " id x y\n", "count 2.592000e+06 2.592000e+06 2.592000e+06\n", "mean 9.999909e+02 -1.752288e-04 1.272128e-04\n", "std 3.163993e+01 5.772766e-01 5.773655e-01\n", "min 8.370000e+02 -9.999999e-01 -9.999995e-01\n", "25% 9.790000e+02 -4.924166e-01 -4.938494e-01\n", "50% 1.000000e+03 9.977439e-03 4.362202e-03\n", "75% 1.022000e+03 5.070134e-01 5.083363e-01\n", "max 1.160000e+03 9.999979e-01 9.999995e-01" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.describe().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Make some messy data\n", "\n", "Now this works great, and in most cases dd.read_csv or dd.read_parquet etc are the preferred way to read in large collections of data files into a dask dataframe, but real world data is often very messy and some files may be broken or badly formatted. To simulate this we are going to create some fake messy data by tweaking our example csv files. For the file `data/2000-01-05.csv` we will replace with no data and for the file `data/2000-01-07.csv` we will remove the `y` column " ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.113489Z", "iopub.status.busy": "2022-07-27T19:21:13.112475Z", "iopub.status.idle": "2022-07-27T19:21:13.118129Z", "shell.execute_reply": "2022-07-27T19:21:13.117560Z" } }, "outputs": [], "source": [ "# corrupt the data in data/2000-01-05.csv\n", "with open('data/2000-01-05.csv', 'w') as f:\n", " f.write('')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.120889Z", "iopub.status.busy": "2022-07-27T19:21:13.120474Z", "iopub.status.idle": "2022-07-27T19:21:13.442551Z", "shell.execute_reply": "2022-07-27T19:21:13.441897Z" } }, "outputs": [], "source": [ "# remove y column from data/2000-01-07.csv\n", "import pandas as pd\n", "df = pd.read_csv('data/2000-01-07.csv')\n", "del df['y']\n", "df.to_csv('data/2000-01-07.csv', index=False)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.446643Z", "iopub.status.busy": "2022-07-27T19:21:13.446198Z", "iopub.status.idle": "2022-07-27T19:21:13.610473Z", "shell.execute_reply": "2022-07-27T19:21:13.609155Z" } }, "outputs": [], "source": [ "!head data/2000-01-05.csv" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.616143Z", "iopub.status.busy": "2022-07-27T19:21:13.615580Z", "iopub.status.idle": "2022-07-27T19:21:13.776653Z", "shell.execute_reply": "2022-07-27T19:21:13.775958Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "id,name,x\r\n", "1032,Edith,0.341158963292153\r\n", "1025,Yvonne,-0.0596561961788608\r\n", "996,Hannah,-0.4598038238105364\r\n", "1015,Norbert,-0.6893967021653444\r\n", "976,Hannah,0.4339578272105588\r\n", "1002,Dan,0.3519233500902228\r\n", "917,Xavier,-0.928241343897473\r\n", "1036,Hannah,-0.5115504865546654\r\n", "972,Oliver,-0.3808144336718926\r\n" ] } ], "source": [ "!head data/2000-01-07.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reading the messy data\n", "\n", "Let's try reading in the collection of files again" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.780400Z", "iopub.status.busy": "2022-07-27T19:21:13.780180Z", "iopub.status.idle": "2022-07-27T19:21:13.798485Z", "shell.execute_reply": "2022-07-27T19:21:13.797845Z" } }, "outputs": [], "source": [ "df = dd.read_csv('data/2000-*-*.csv')" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.801585Z", "iopub.status.busy": "2022-07-27T19:21:13.800983Z", "iopub.status.idle": "2022-07-27T19:21:13.901064Z", "shell.execute_reply": "2022-07-27T19:21:13.900486Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
0988Norbert-0.742721-0.277954
11025Bob0.603313-0.161292
2992Alice-0.0494080.573142
31029Bob-0.1225660.533852
41032Patricia0.476066-0.006417
\n", "
" ], "text/plain": [ " id name x y\n", "0 988 Norbert -0.742721 -0.277954\n", "1 1025 Bob 0.603313 -0.161292\n", "2 992 Alice -0.049408 0.573142\n", "3 1029 Bob -0.122566 0.533852\n", "4 1032 Patricia 0.476066 -0.006417" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok this looks like it worked, let us calculate the dataset statistics again" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:13.904633Z", "iopub.status.busy": "2022-07-27T19:21:13.904220Z", "iopub.status.idle": "2022-07-27T19:21:18.316080Z", "shell.execute_reply": "2022-07-27T19:21:18.315272Z" }, "tags": [ "raises-exception" ] }, "outputs": [ { "ename": "ValueError", "evalue": "The columns in the computed data do not match the columns in the provided metadata\n Extra: []\n Missing: ['y']", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", "Input \u001b[0;32mIn [14]\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdescribe\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 268\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 269\u001b[0m \u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 270\u001b[0m \n\u001b[1;32m 271\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;124;03m dask.base.compute\u001b[39;00m\n\u001b[1;32m 291\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 292\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 293\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 572\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 573\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 575\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 576\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3002\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3003\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3004\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3005\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3006\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2176\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2177\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2178\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2179\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2180\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2181\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2182\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2183\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2184\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2185\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 316\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 317\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 318\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 319\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 320\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 384\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 385\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 386\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 387\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358\u001b[0m, in \u001b[0;36msync..f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 356\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mwait_for(future, callback_timeout)\n\u001b[1;32m 357\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 358\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 359\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 360\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 759\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 761\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 762\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 763\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 764\u001b[0m exc_info \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2039\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2040\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2041\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2042\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2043\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990\u001b[0m, in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 988\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(args) \u001b[38;5;241m==\u001b[39m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys):\n\u001b[1;32m 989\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mExpected \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m args, got \u001b[39m\u001b[38;5;132;01m%d\u001b[39;00m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;241m%\u001b[39m (\u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys), \u001b[38;5;28mlen\u001b[39m(args)))\n\u001b[0;32m--> 990\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m core\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdsk, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39moutkey, \u001b[38;5;28mdict\u001b[39m(\u001b[38;5;28mzip\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39minkeys, args)))\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149\u001b[0m, in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n\u001b[1;32m 147\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m key \u001b[38;5;129;01min\u001b[39;00m toposort(dsk):\n\u001b[1;32m 148\u001b[0m task \u001b[38;5;241m=\u001b[39m dsk[key]\n\u001b[0;32m--> 149\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(task, cache)\n\u001b[1;32m 150\u001b[0m cache[key] \u001b[38;5;241m=\u001b[39m result\n\u001b[1;32m 151\u001b[0m result \u001b[38;5;241m=\u001b[39m _execute_task(out, cache)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 115\u001b[0m func, args \u001b[38;5;241m=\u001b[39m arg[\u001b[38;5;241m0\u001b[39m], arg[\u001b[38;5;241m1\u001b[39m:]\n\u001b[1;32m 116\u001b[0m \u001b[38;5;66;03m# Note: Don't assign the subtask results to a variable. numpy detects\u001b[39;00m\n\u001b[1;32m 117\u001b[0m \u001b[38;5;66;03m# temporaries by their reference count and can execute certain\u001b[39;00m\n\u001b[1;32m 118\u001b[0m \u001b[38;5;66;03m# operations in-place.\u001b[39;00m\n\u001b[0;32m--> 119\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39m(_execute_task(a, cache) \u001b[38;5;28;01mfor\u001b[39;00m a \u001b[38;5;129;01min\u001b[39;00m args))\n\u001b[1;32m 120\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m ishashable(arg):\n\u001b[1;32m 121\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m arg\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40\u001b[0m, in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n\u001b[1;32m 38\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mapply\u001b[39m(func, args, kwargs\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m):\n\u001b[1;32m 39\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m kwargs:\n\u001b[0;32m---> 40\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs)\n\u001b[1;32m 41\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 42\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m func(\u001b[38;5;241m*\u001b[39margs)\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436\u001b[0m, in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n\u001b[1;32m 6434\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m meta\n\u001b[1;32m 6435\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m is_dataframe_like(df):\n\u001b[0;32m-> 6436\u001b[0m check_matching_columns(meta, df)\n\u001b[1;32m 6437\u001b[0m c \u001b[38;5;241m=\u001b[39m meta\u001b[38;5;241m.\u001b[39mcolumns\n\u001b[1;32m 6438\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n", "File \u001b[0;32m/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415\u001b[0m, in \u001b[0;36mcheck_matching_columns\u001b[0;34m()\u001b[0m\n\u001b[1;32m 413\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 414\u001b[0m extra_info \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mOrder of columns does not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m--> 415\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\n\u001b[1;32m 416\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mThe columns in the computed data do not match\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 417\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m the columns in the provided metadata\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 418\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mextra_info\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 419\u001b[0m )\n", "\u001b[0;31mValueError\u001b[0m: The columns in the computed data do not match the columns in the provided metadata\n Extra: []\n Missing: ['y']" ] } ], "source": [ "df.describe().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So what happened? \n", "\n", "When creating a dask dataframe from a collection of files, dd.read_csv samples the first few files in the dataset to determine the datatypes and columns available. Since it has not opened all the files it does not now if some of them are corrupt. Hence, `df.head()` works since it is only looking at the first file. `df.describe.compute()` fails because of the corrupt data in `data/2000-01-05.csv`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Building a delayed reader\n", "\n", "To get around this problem we are going to use a more advanced technique to build our dask dataframe. This method can also be used any time some custom logic is required when reading each file. Essentially, we are going to build a function that uses pandas and some error checking and returns a pandas dataframe. If we find a bad data file we will either find a way to fix/clean the data or we will return and empty pandas dataframe with the same structure as the good data." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.319680Z", "iopub.status.busy": "2022-07-27T19:21:18.319225Z", "iopub.status.idle": "2022-07-27T19:21:18.327783Z", "shell.execute_reply": "2022-07-27T19:21:18.327095Z" } }, "outputs": [], "source": [ "import numpy as np\n", "import io\n", "\n", "def read_data(filename):\n", " \n", " # for this to work we need to explicitly set the datatypes of our pandas dataframe \n", " dtypes = {'id': int, 'name': str, 'x': float, 'y': float}\n", " try:\n", " # try reading in the data with pandas \n", " df = pd.read_csv(filename, dtype=dtypes)\n", " except:\n", " # if this fails create an empty pandas dataframe with the same dtypes as the good data\n", " df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)\n", " \n", " # for the case with the missing column, add a column of data with NaN's\n", " if 'y' not in df.columns:\n", " df['y'] = np.NaN\n", " \n", " return df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's test this function on a good file and the two bad files" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.330721Z", "iopub.status.busy": "2022-07-27T19:21:18.330313Z", "iopub.status.idle": "2022-07-27T19:21:18.401958Z", "shell.execute_reply": "2022-07-27T19:21:18.401315Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
0988Norbert-0.742721-0.277954
11025Bob0.603313-0.161292
2992Alice-0.0494080.573142
31029Bob-0.1225660.533852
41032Patricia0.476066-0.006417
\n", "
" ], "text/plain": [ " id name x y\n", "0 988 Norbert -0.742721 -0.277954\n", "1 1025 Bob 0.603313 -0.161292\n", "2 992 Alice -0.049408 0.573142\n", "3 1029 Bob -0.122566 0.533852\n", "4 1032 Patricia 0.476066 -0.006417" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# test function on a normal file\n", "read_data('data/2000-01-01.csv').head()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.405441Z", "iopub.status.busy": "2022-07-27T19:21:18.404972Z", "iopub.status.idle": "2022-07-27T19:21:18.414507Z", "shell.execute_reply": "2022-07-27T19:21:18.413805Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
\n", "
" ], "text/plain": [ "Empty DataFrame\n", "Columns: [id, name, x, y]\n", "Index: []" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# test function on the empty file\n", "read_data('data/2000-01-05.csv').head()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.417982Z", "iopub.status.busy": "2022-07-27T19:21:18.417491Z", "iopub.status.idle": "2022-07-27T19:21:18.465956Z", "shell.execute_reply": "2022-07-27T19:21:18.465471Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
01032Edith0.341159NaN
11025Yvonne-0.059656NaN
2996Hannah-0.459804NaN
31015Norbert-0.689397NaN
4976Hannah0.433958NaN
\n", "
" ], "text/plain": [ " id name x y\n", "0 1032 Edith 0.341159 NaN\n", "1 1025 Yvonne -0.059656 NaN\n", "2 996 Hannah -0.459804 NaN\n", "3 1015 Norbert -0.689397 NaN\n", "4 976 Hannah 0.433958 NaN" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# test function on the file missing the y column\n", "read_data('data/2000-01-07.csv').head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Assembling the dask dataframe\n", "\n", "First we take our `read_data` function and convert it to a dask delayed function" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.468938Z", "iopub.status.busy": "2022-07-27T19:21:18.468476Z", "iopub.status.idle": "2022-07-27T19:21:18.473083Z", "shell.execute_reply": "2022-07-27T19:21:18.471411Z" } }, "outputs": [], "source": [ "from dask import delayed\n", "read_data = delayed(read_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us look at what the function does now" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.476336Z", "iopub.status.busy": "2022-07-27T19:21:18.475774Z", "iopub.status.idle": "2022-07-27T19:21:18.480395Z", "shell.execute_reply": "2022-07-27T19:21:18.479788Z" } }, "outputs": [ { "data": { "text/plain": [ "Delayed('read_data-604fd047-a660-4c67-87ad-60569554e79e')" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = read_data('data/2000-01-01.csv')\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It creates a delayed object, to actually run read the file we need to run `.compute()`" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.483225Z", "iopub.status.busy": "2022-07-27T19:21:18.482910Z", "iopub.status.idle": "2022-07-27T19:21:18.580952Z", "shell.execute_reply": "2022-07-27T19:21:18.580315Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
0988Norbert-0.742721-0.277954
11025Bob0.603313-0.161292
2992Alice-0.0494080.573142
31029Bob-0.1225660.533852
41032Patricia0.476066-0.006417
...............
86395927Alice0.0510350.051330
86396968George-0.3891810.096867
863971039Alice0.3967510.688604
86398996Patricia-0.042164-0.924152
86399956Tim0.8542120.858070
\n", "

86400 rows × 4 columns

\n", "
" ], "text/plain": [ " id name x y\n", "0 988 Norbert -0.742721 -0.277954\n", "1 1025 Bob 0.603313 -0.161292\n", "2 992 Alice -0.049408 0.573142\n", "3 1029 Bob -0.122566 0.533852\n", "4 1032 Patricia 0.476066 -0.006417\n", "... ... ... ... ...\n", "86395 927 Alice 0.051035 0.051330\n", "86396 968 George -0.389181 0.096867\n", "86397 1039 Alice 0.396751 0.688604\n", "86398 996 Patricia -0.042164 -0.924152\n", "86399 956 Tim 0.854212 0.858070\n", "\n", "[86400 rows x 4 columns]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's build a list of all the available csv files" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.585199Z", "iopub.status.busy": "2022-07-27T19:21:18.584035Z", "iopub.status.idle": "2022-07-27T19:21:18.592027Z", "shell.execute_reply": "2022-07-27T19:21:18.591268Z" } }, "outputs": [ { "data": { "text/plain": [ "['data/2000-01-25.csv',\n", " 'data/2000-01-20.csv',\n", " 'data/2000-01-29.csv',\n", " 'data/2000-01-02.csv',\n", " 'data/2000-01-19.csv',\n", " 'data/2000-01-23.csv',\n", " 'data/2000-01-10.csv',\n", " 'data/2000-01-21.csv',\n", " 'data/2000-01-17.csv',\n", " 'data/2000-01-04.csv',\n", " 'data/2000-01-27.csv',\n", " 'data/2000-01-22.csv',\n", " 'data/2000-01-14.csv',\n", " 'data/2000-01-11.csv',\n", " 'data/2000-01-13.csv',\n", " 'data/2000-01-08.csv',\n", " 'data/2000-01-09.csv',\n", " 'data/2000-01-06.csv',\n", " 'data/2000-01-01.csv',\n", " 'data/2000-01-07.csv',\n", " 'data/2000-01-12.csv',\n", " 'data/2000-01-16.csv',\n", " 'data/2000-01-26.csv',\n", " 'data/2000-01-24.csv',\n", " 'data/2000-01-18.csv',\n", " 'data/2000-01-15.csv',\n", " 'data/2000-01-03.csv',\n", " 'data/2000-01-30.csv',\n", " 'data/2000-01-28.csv',\n", " 'data/2000-01-05.csv']" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# loop over all the files\n", "from glob import glob\n", "files = glob('data/2000-*-*.csv')\n", "files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we run the delayed read_data function on each file in the list" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.596619Z", "iopub.status.busy": "2022-07-27T19:21:18.595064Z", "iopub.status.idle": "2022-07-27T19:21:18.603491Z", "shell.execute_reply": "2022-07-27T19:21:18.602886Z" } }, "outputs": [ { "data": { "text/plain": [ "[Delayed('read_data-6958e08a-f47f-4da2-9ceb-cb995eab99bf'),\n", " Delayed('read_data-30f33e21-323e-49c0-ae36-a7c6289d8ada'),\n", " Delayed('read_data-2fdfe85e-79e1-417d-af2d-3a577fe15975'),\n", " Delayed('read_data-72b35641-90b5-4518-bbac-fa9c9024c756'),\n", " Delayed('read_data-e3adc855-9df0-4985-87fd-f95e3f2d10b7'),\n", " Delayed('read_data-f070f86c-bff6-448e-abe0-50baaf9282b0'),\n", " Delayed('read_data-f4ed9f6d-c5ae-44aa-ba0e-a2eaf2cd749a'),\n", " Delayed('read_data-2a55c497-9a5a-4474-8dca-fc243ee5a5bf'),\n", " Delayed('read_data-1fb346a5-4a27-4772-ab6b-94419f328ae0'),\n", " Delayed('read_data-72610d5f-2847-4afb-9c86-af08217797d2'),\n", " Delayed('read_data-f3bbcc5b-c2f6-4a5f-8d3e-1d50fb30dc69'),\n", " Delayed('read_data-52b113b8-1692-4ff9-86b4-cb65e066e1c3'),\n", " Delayed('read_data-ff401421-8ccf-4e29-bf70-8b63ed4e8b90'),\n", " Delayed('read_data-ebe81647-e84f-4377-ba1c-26f220aed7e3'),\n", " Delayed('read_data-dabf5c6c-e459-4f89-9a02-4b4a11879708'),\n", " Delayed('read_data-c7b3408b-2cec-41e1-9553-fb9a24a338b0'),\n", " Delayed('read_data-fbd802e1-f886-4035-a285-1d657e1074e5'),\n", " Delayed('read_data-fc2fb366-2ef9-4eaf-bfee-6679420f4080'),\n", " Delayed('read_data-9f4b137b-6dd0-491c-bf55-6cb40a502918'),\n", " Delayed('read_data-3d109e18-3e32-495d-940b-1882b33ab6dd'),\n", " Delayed('read_data-8915acd4-a325-48fd-b147-a0c7a238f0df'),\n", " Delayed('read_data-ec5de8ae-f438-4b65-9214-3dab09f1e05a'),\n", " Delayed('read_data-9b519672-8a00-4c53-a1ff-c1f960272d4c'),\n", " Delayed('read_data-6594c4c0-d33e-4f13-8fcc-ae39a840b3f9'),\n", " Delayed('read_data-80c1be62-beeb-4317-91ba-6363d6f8eee5'),\n", " Delayed('read_data-cf7ac988-9874-4b62-91f8-148c60c670c0'),\n", " Delayed('read_data-2175062e-82b5-4d46-b1c7-31d301e26ba3'),\n", " Delayed('read_data-6a97a8fc-a3df-494e-8870-0ba7b6638444'),\n", " Delayed('read_data-14d8926a-674d-4e17-b603-9f2da75bd25c'),\n", " Delayed('read_data-d6be5de8-1b74-4a12-bc45-7d7f4e7bd190')]" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = [read_data(file) for file in files]\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we use [dask.dataframe.from_delayed](https://docs.dask.org/en/latest/generated/dask.dataframe.from_delayed.html). This function creates a Dask DataFrame from a list of delayed objects as long as each delayed object returns a pandas dataframe. The structure of each individual dataframe returned must also be the same." ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.606147Z", "iopub.status.busy": "2022-07-27T19:21:18.605810Z", "iopub.status.idle": "2022-07-27T19:21:18.621821Z", "shell.execute_reply": "2022-07-27T19:21:18.620816Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
int64objectfloat64float64
............
...............
............
............
\n", "
\n", "
Dask Name: from-delayed, 60 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", " int64 object float64 float64\n", " ... ... ... ...\n", "... ... ... ... ...\n", " ... ... ... ...\n", " ... ... ... ...\n", "Dask Name: from-delayed, 60 tasks" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: we provided the dtypes in the `meta` keyword to explicitly tell Dask Dataframe what kind of dataframe to expect. If we did not do this Dask would infer this from the first delayed object which could be slow if it was a large csv file" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Now let's see if this works" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.627407Z", "iopub.status.busy": "2022-07-27T19:21:18.627099Z", "iopub.status.idle": "2022-07-27T19:21:18.723178Z", "shell.execute_reply": "2022-07-27T19:21:18.722611Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
0976Oliver0.6287670.765093
11053Sarah-0.047006-0.955109
21049Quinn-0.032074-0.099608
31005Frank-0.2559200.963524
4993Ursula0.980263-0.875488
\n", "
" ], "text/plain": [ " id name x y\n", "0 976 Oliver 0.628767 0.765093\n", "1 1053 Sarah -0.047006 -0.955109\n", "2 1049 Quinn -0.032074 -0.099608\n", "3 1005 Frank -0.255920 0.963524\n", "4 993 Ursula 0.980263 -0.875488" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:21:18.726417Z", "iopub.status.busy": "2022-07-27T19:21:18.725895Z", "iopub.status.idle": "2022-07-27T19:21:20.889677Z", "shell.execute_reply": "2022-07-27T19:21:20.889090Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
count2.505600e+062.505600e+062.419200e+06
mean9.999870e+02-1.615087e-047.238821e-05
std3.163868e+015.772576e-015.774155e-01
min8.370000e+02-9.999999e-01-9.999995e-01
25%9.790000e+02-4.924166e-01-4.938494e-01
50%1.000000e+039.977439e-034.362202e-03
75%1.022000e+035.070134e-015.083363e-01
max1.160000e+039.999979e-019.999995e-01
\n", "
" ], "text/plain": [ " id x y\n", "count 2.505600e+06 2.505600e+06 2.419200e+06\n", "mean 9.999870e+02 -1.615087e-04 7.238821e-05\n", "std 3.163868e+01 5.772576e-01 5.774155e-01\n", "min 8.370000e+02 -9.999999e-01 -9.999995e-01\n", "25% 9.790000e+02 -4.924166e-01 -4.938494e-01\n", "50% 1.000000e+03 9.977439e-03 4.362202e-03\n", "75% 1.022000e+03 5.070134e-01 5.083363e-01\n", "max 1.160000e+03 9.999979e-01 9.999995e-01" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.describe().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Success!\n", "\n", "To recap, in this example, we looked at an approach to create a Dask Dataframe from a collection of many data files. Typically you would use built-in functions like `dd.read_csv` or `dd.read_parquet` to do this. Sometimes, this is not possible because of messy/corrupted files in your dataset or some custom processing that might need to be done. \n", "\n", "In these cases, you can build a Dask DataFrame with the following steps.\n", "\n", "1. Create a regular python function that reads the data, performs any transformations, error checking etc and always returns a Pandas dataframe with the same structure\n", "2. Convert this read function to a delayed object using the `dask.delayed` function\n", "3. Call each file in your dataset with the delayed data reader and assemble the output as a list of delayed objects\n", "4. Used `dd.from_delayed` to covert the list of delayed objects to a Dask Dataframe \n", "\n", "This same technique can be used in other situations as well. Another example might be data files that require using a specialized reader, or several transformations before they can be converted to a pandas dataframe." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }