Live Notebook

You can run this notebook in a live session Binder or view it on Github.

DataFrames: Reading in messy data

In the 01-data-access example we show how Dask Dataframes can read and store data in many of the same formats as Pandas dataframes. One key difference, when using Dask Dataframes is that instead of opening a single file with a function like pandas.read_csv, we typically open many files at once with dask.dataframe.read_csv. This enables us to treat a collection of files as a single dataset. Most of the time this works really well. But real data is messy and in this notebook we will explore a more advanced technique to bring messy datasets into a dask dataframe.

Start Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:

Client

Client-3a3f7d3e-0de1-11ed-a27f-000d3a8f7959

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Create artificial dataset

First we create an artificial dataset and write it to many CSV files.

You don’t need to understand this section, we’re just creating a dataset for the rest of the notebook.

[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks
[3]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name, index=False);

Read CSV files

We now have many CSV files in our data directory, one for each day in the month of January 2000. Each CSV file holds timeseries data for that day. We can read all of them as one logical dataframe using the dd.read_csv function with a glob string.

[4]:
!ls data/*.csv | head
data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv
[5]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df
[5]:
Dask DataFrame Structure:
id name x y
npartitions=30
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: read-csv, 30 tasks
[6]:
df.head()
[6]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417

Let’s look at some statistics on the data

[7]:
df.describe().compute()
[7]:
id x y
count 2.592000e+06 2.592000e+06 2.592000e+06
mean 9.999909e+02 -1.752288e-04 1.272128e-04
std 3.163993e+01 5.772766e-01 5.773655e-01
min 8.370000e+02 -9.999999e-01 -9.999995e-01
25% 9.790000e+02 -4.924166e-01 -4.938494e-01
50% 1.000000e+03 9.977439e-03 4.362202e-03
75% 1.022000e+03 5.070134e-01 5.083363e-01
max 1.160000e+03 9.999979e-01 9.999995e-01

Make some messy data

Now this works great, and in most cases dd.read_csv or dd.read_parquet etc are the preferred way to read in large collections of data files into a dask dataframe, but real world data is often very messy and some files may be broken or badly formatted. To simulate this we are going to create some fake messy data by tweaking our example csv files. For the file data/2000-01-05.csv we will replace with no data and for the file data/2000-01-07.csv we will remove the y column

[8]:
# corrupt the data in data/2000-01-05.csv
with open('data/2000-01-05.csv', 'w') as f:
    f.write('')
[9]:
# remove y column from data/2000-01-07.csv
import pandas as pd
df = pd.read_csv('data/2000-01-07.csv')
del df['y']
df.to_csv('data/2000-01-07.csv', index=False)
[10]:
!head data/2000-01-05.csv
[11]:
!head data/2000-01-07.csv
id,name,x
1032,Edith,0.341158963292153
1025,Yvonne,-0.0596561961788608
996,Hannah,-0.4598038238105364
1015,Norbert,-0.6893967021653444
976,Hannah,0.4339578272105588
1002,Dan,0.3519233500902228
917,Xavier,-0.928241343897473
1036,Hannah,-0.5115504865546654
972,Oliver,-0.3808144336718926

Reading the messy data

Let’s try reading in the collection of files again

[12]:
df = dd.read_csv('data/2000-*-*.csv')
[13]:
df.head()
[13]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417

Ok this looks like it worked, let us calculate the dataset statistics again

[14]:
df.describe().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [14], in <cell line: 1>()
----> 1 df.describe().compute()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3002         should_rejoin = False
   3003 try:
-> 3004     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3005 finally:
   3006     for f in futures.values():

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
   2176 else:
   2177     local_worker = None
-> 2178 return self.sync(
   2179     self._gather,
   2180     futures,
   2181     errors=errors,
   2182     direct=direct,
   2183     local_worker=local_worker,
   2184     asynchronous=asynchronous,
   2185 )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    316     return future
    317 else:
--> 318     return sync(
    319         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    320     )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
    383 if error:
    384     typ, exc, tb = error
--> 385     raise exc.with_traceback(tb)
    386 else:
    387     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
    356         future = asyncio.wait_for(future, callback_timeout)
    357     future = asyncio.ensure_future(future)
--> 358     result = yield future
    359 except Exception:
    360     error = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
   2039         exc = CancelledError(key)
   2040     else:
-> 2041         raise exception.with_traceback(traceback)
   2042     raise exc
   2043 if errors == "skip":

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
     38 def apply(func, args, kwargs=None):
     39     if kwargs:
---> 40         return func(*args, **kwargs)
     41     else:
     42         return func(*args)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
   6434     return meta
   6435 if is_dataframe_like(df):
-> 6436     check_matching_columns(meta, df)
   6437     c = meta.columns
   6438 else:

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
    413 else:
    414     extra_info = "Order of columns does not match"
--> 415 raise ValueError(
    416     "The columns in the computed data do not match"
    417     " the columns in the provided metadata\n"
    418     f"{extra_info}"
    419 )

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   []
  Missing: ['y']

So what happened?

When creating a dask dataframe from a collection of files, dd.read_csv samples the first few files in the dataset to determine the datatypes and columns available. Since it has not opened all the files it does not now if some of them are corrupt. Hence, df.head() works since it is only looking at the first file. df.describe.compute() fails because of the corrupt data in data/2000-01-05.csv

Building a delayed reader

To get around this problem we are going to use a more advanced technique to build our dask dataframe. This method can also be used any time some custom logic is required when reading each file. Essentially, we are going to build a function that uses pandas and some error checking and returns a pandas dataframe. If we find a bad data file we will either find a way to fix/clean the data or we will return and empty pandas dataframe with the same structure as the good data.

[15]:
import numpy as np
import io

def read_data(filename):

    # for this to work we need to explicitly set the datatypes of our pandas dataframe
    dtypes = {'id': int, 'name': str, 'x': float, 'y': float}
    try:
        # try reading in the data with pandas
        df = pd.read_csv(filename, dtype=dtypes)
    except:
        # if this fails create an empty pandas dataframe with the same dtypes as the good data
        df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)

    # for the case with the missing column, add a column of data with NaN's
    if 'y' not in df.columns:
        df['y'] = np.NaN

    return df

Let’s test this function on a good file and the two bad files

[16]:
# test function on a normal file
read_data('data/2000-01-01.csv').head()
[16]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417
[17]:
# test function on the empty file
read_data('data/2000-01-05.csv').head()
[17]:
id name x y
[18]:
# test function on the file missing the y column
read_data('data/2000-01-07.csv').head()
[18]:
id name x y
0 1032 Edith 0.341159 NaN
1 1025 Yvonne -0.059656 NaN
2 996 Hannah -0.459804 NaN
3 1015 Norbert -0.689397 NaN
4 976 Hannah 0.433958 NaN

Assembling the dask dataframe

First we take our read_data function and convert it to a dask delayed function

[19]:
from dask import delayed
read_data = delayed(read_data)

Let us look at what the function does now

[20]:
df = read_data('data/2000-01-01.csv')
df
[20]:
Delayed('read_data-604fd047-a660-4c67-87ad-60569554e79e')

It creates a delayed object, to actually run read the file we need to run .compute()

[21]:
df.compute()
[21]:
id name x y
0 988 Norbert -0.742721 -0.277954
1 1025 Bob 0.603313 -0.161292
2 992 Alice -0.049408 0.573142
3 1029 Bob -0.122566 0.533852
4 1032 Patricia 0.476066 -0.006417
... ... ... ... ...
86395 927 Alice 0.051035 0.051330
86396 968 George -0.389181 0.096867
86397 1039 Alice 0.396751 0.688604
86398 996 Patricia -0.042164 -0.924152
86399 956 Tim 0.854212 0.858070

86400 rows × 4 columns

Now let’s build a list of all the available csv files

[22]:
# loop over all the files
from glob import glob
files = glob('data/2000-*-*.csv')
files
[22]:
['data/2000-01-25.csv',
 'data/2000-01-20.csv',
 'data/2000-01-29.csv',
 'data/2000-01-02.csv',
 'data/2000-01-19.csv',
 'data/2000-01-23.csv',
 'data/2000-01-10.csv',
 'data/2000-01-21.csv',
 'data/2000-01-17.csv',
 'data/2000-01-04.csv',
 'data/2000-01-27.csv',
 'data/2000-01-22.csv',
 'data/2000-01-14.csv',
 'data/2000-01-11.csv',
 'data/2000-01-13.csv',
 'data/2000-01-08.csv',
 'data/2000-01-09.csv',
 'data/2000-01-06.csv',
 'data/2000-01-01.csv',
 'data/2000-01-07.csv',
 'data/2000-01-12.csv',
 'data/2000-01-16.csv',
 'data/2000-01-26.csv',
 'data/2000-01-24.csv',
 'data/2000-01-18.csv',
 'data/2000-01-15.csv',
 'data/2000-01-03.csv',
 'data/2000-01-30.csv',
 'data/2000-01-28.csv',
 'data/2000-01-05.csv']

Now we run the delayed read_data function on each file in the list

[23]:
df = [read_data(file) for file in files]
df
[23]:
[Delayed('read_data-6958e08a-f47f-4da2-9ceb-cb995eab99bf'),
 Delayed('read_data-30f33e21-323e-49c0-ae36-a7c6289d8ada'),
 Delayed('read_data-2fdfe85e-79e1-417d-af2d-3a577fe15975'),
 Delayed('read_data-72b35641-90b5-4518-bbac-fa9c9024c756'),
 Delayed('read_data-e3adc855-9df0-4985-87fd-f95e3f2d10b7'),
 Delayed('read_data-f070f86c-bff6-448e-abe0-50baaf9282b0'),
 Delayed('read_data-f4ed9f6d-c5ae-44aa-ba0e-a2eaf2cd749a'),
 Delayed('read_data-2a55c497-9a5a-4474-8dca-fc243ee5a5bf'),
 Delayed('read_data-1fb346a5-4a27-4772-ab6b-94419f328ae0'),
 Delayed('read_data-72610d5f-2847-4afb-9c86-af08217797d2'),
 Delayed('read_data-f3bbcc5b-c2f6-4a5f-8d3e-1d50fb30dc69'),
 Delayed('read_data-52b113b8-1692-4ff9-86b4-cb65e066e1c3'),
 Delayed('read_data-ff401421-8ccf-4e29-bf70-8b63ed4e8b90'),
 Delayed('read_data-ebe81647-e84f-4377-ba1c-26f220aed7e3'),
 Delayed('read_data-dabf5c6c-e459-4f89-9a02-4b4a11879708'),
 Delayed('read_data-c7b3408b-2cec-41e1-9553-fb9a24a338b0'),
 Delayed('read_data-fbd802e1-f886-4035-a285-1d657e1074e5'),
 Delayed('read_data-fc2fb366-2ef9-4eaf-bfee-6679420f4080'),
 Delayed('read_data-9f4b137b-6dd0-491c-bf55-6cb40a502918'),
 Delayed('read_data-3d109e18-3e32-495d-940b-1882b33ab6dd'),
 Delayed('read_data-8915acd4-a325-48fd-b147-a0c7a238f0df'),
 Delayed('read_data-ec5de8ae-f438-4b65-9214-3dab09f1e05a'),
 Delayed('read_data-9b519672-8a00-4c53-a1ff-c1f960272d4c'),
 Delayed('read_data-6594c4c0-d33e-4f13-8fcc-ae39a840b3f9'),
 Delayed('read_data-80c1be62-beeb-4317-91ba-6363d6f8eee5'),
 Delayed('read_data-cf7ac988-9874-4b62-91f8-148c60c670c0'),
 Delayed('read_data-2175062e-82b5-4d46-b1c7-31d301e26ba3'),
 Delayed('read_data-6a97a8fc-a3df-494e-8870-0ba7b6638444'),
 Delayed('read_data-14d8926a-674d-4e17-b603-9f2da75bd25c'),
 Delayed('read_data-d6be5de8-1b74-4a12-bc45-7d7f4e7bd190')]

Then we use dask.dataframe.from_delayed. This function creates a Dask DataFrame from a list of delayed objects as long as each delayed object returns a pandas dataframe. The structure of each individual dataframe returned must also be the same.

[24]:
df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})
df
[24]:
Dask DataFrame Structure:
id name x y
npartitions=30
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: from-delayed, 60 tasks

Note: we provided the dtypes in the meta keyword to explicitly tell Dask Dataframe what kind of dataframe to expect. If we did not do this Dask would infer this from the first delayed object which could be slow if it was a large csv file

Now let’s see if this works

[25]:
df.head()
[25]:
id name x y
0 976 Oliver 0.628767 0.765093
1 1053 Sarah -0.047006 -0.955109
2 1049 Quinn -0.032074 -0.099608
3 1005 Frank -0.255920 0.963524
4 993 Ursula 0.980263 -0.875488
[26]:
df.describe().compute()
[26]:
id x y
count 2.505600e+06 2.505600e+06 2.419200e+06
mean 9.999870e+02 -1.615087e-04 7.238821e-05
std 3.163868e+01 5.772576e-01 5.774155e-01
min 8.370000e+02 -9.999999e-01 -9.999995e-01
25% 9.790000e+02 -4.924166e-01 -4.938494e-01
50% 1.000000e+03 9.977439e-03 4.362202e-03
75% 1.022000e+03 5.070134e-01 5.083363e-01
max 1.160000e+03 9.999979e-01 9.999995e-01

Success!

To recap, in this example, we looked at an approach to create a Dask Dataframe from a collection of many data files. Typically you would use built-in functions like dd.read_csv or dd.read_parquet to do this. Sometimes, this is not possible because of messy/corrupted files in your dataset or some custom processing that might need to be done.

In these cases, you can build a Dask DataFrame with the following steps.

  1. Create a regular python function that reads the data, performs any transformations, error checking etc and always returns a Pandas dataframe with the same structure

  2. Convert this read function to a delayed object using the dask.delayed function

  3. Call each file in your dataset with the delayed data reader and assemble the output as a list of delayed objects

  4. Used dd.from_delayed to covert the list of delayed objects to a Dask Dataframe

This same technique can be used in other situations as well. Another example might be data files that require using a specialized reader, or several transformations before they can be converted to a pandas dataframe.