You can run this notebook in a live session Binder or view it on Github.

Dask DataFrames

Dask Dataframes coordinate many Pandas dataframes, partitioned along an index. They support a large subset of the Pandas API.

Start Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
Out[1]:

Client

Cluster

  • Workers: 2
  • Cores: 4
  • Memory: 2.00 GB

Create Random Dataframe

We create a random timeseries of data with the following attributes:

  1. It stores a record for every 10 seconds of the year 2000
  2. It splits that year by month, keeping every month as a separate Pandas dataframe
  3. Along with a datetime index it has columns for names, ids, and numeric values

This is a small dataset of about 240 MB. Increase the number of days or reduce the frequency to practice with a larger dataset.

In [2]:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()

Unlike Pandas, Dask DataFrames are lazy and so no data is printed here.

In [3]:
df
Out[3]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks

But the column names and dtypes are known.

In [4]:
df.dtypes
Out[4]:
id        int64
name     object
x       float64
y       float64
dtype: object

Some operations will automatically display the data.

In [5]:
import pandas as pd
pd.options.display.precision = 2
pd.options.display.max_rows = 10
In [6]:
df.head(3)
Out[6]:
id name x y
timestamp
2000-01-01 00:00:00 1007 Zelda 0.81 0.71
2000-01-01 00:00:01 974 Patricia -0.07 0.34
2000-01-01 00:00:02 984 George -0.67 -0.93

Use Standard Pandas Operations

Most common Pandas operations operate identically on Dask dataframes

In [7]:
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3
Out[7]:
Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks

Call .compute() when you want your result as a Pandas dataframe.

If you started Client() above then you may want to watch the status page during computation.

In [8]:
computed_df = df3.compute()
type(computed_df)
Out[8]:
pandas.core.series.Series
In [9]:
computed_df
Out[9]:
name
Alice      0.58
Bob        0.58
Charlie    0.58
Dan        0.58
Edith      0.58
           ...
Victor     0.58
Wendy      0.58
Xavier     0.58
Yvonne     0.58
Zelda      0.58
Name: x, Length: 26, dtype: float64

Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.

This allows future computations to be much faster.

In [10]:
df = df.persist()

Time Series Operations

Because we have a datetime index time-series operations work efficiently

In [11]:
%matplotlib inline
In [12]:
df[['x', 'y']].resample('1h').mean().head()
Out[12]:
x y
timestamp
2000-01-01 00:00:00 -1.66e-03 -5.39e-03
2000-01-01 01:00:00 6.29e-03 1.20e-02
2000-01-01 02:00:00 3.46e-03 -1.08e-03
2000-01-01 03:00:00 7.77e-04 2.15e-03
2000-01-01 04:00:00 -2.05e-02 -3.36e-03
In [13]:
df[['x', 'y']].resample('24h').mean().compute().plot()
Out[13]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f84c4f88f28>
_images/dataframe_22_1.png
In [14]:
df[['x', 'y']].rolling(window='24h').mean().head()
Out[14]:
x y
timestamp
2000-01-01 00:00:00 0.81 0.71
2000-01-01 00:00:01 0.37 0.53
2000-01-01 00:00:02 0.02 0.04
2000-01-01 00:00:03 -0.04 0.17
2000-01-01 00:00:04 -0.14 0.03

Random access is cheap along the index, but must still be computed.

In [15]:
df.loc['2000-01-05']
Out[15]:
Dask DataFrame Structure:
id name x y
npartitions=1
2000-01-05 00:00:00.000000000 int64 object float64 float64
2000-01-05 23:59:59.999999999 ... ... ... ...
Dask Name: loc, 31 tasks
In [16]:
%time df.loc['2000-01-05'].compute()
CPU times: user 44 ms, sys: 12 ms, total: 56 ms
Wall time: 76.6 ms
Out[16]:
id name x y
timestamp
2000-01-05 00:00:00 1045 George -0.15 0.73
2000-01-05 00:00:01 1043 Patricia -0.94 -0.54
2000-01-05 00:00:02 972 Ursula 0.80 0.21
2000-01-05 00:00:03 1031 Wendy -0.98 -0.27
2000-01-05 00:00:04 998 Laura -0.72 -0.18
... ... ... ... ...
2000-01-05 23:59:55 965 Sarah -0.56 0.28
2000-01-05 23:59:56 1019 Ray -0.47 -0.59
2000-01-05 23:59:57 1001 Xavier 0.59 -0.89
2000-01-05 23:59:58 963 Hannah 0.81 0.45
2000-01-05 23:59:59 1021 Zelda -0.09 0.16

86400 rows × 4 columns

Set Index

Data is sorted by the index column. This allows for faster access, joins, groupby-apply operations, etc.. However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently.

In [17]:
df = df.set_index('name')
df
Out[17]:
Dask DataFrame Structure:
id x y
npartitions=30
Alice int64 float64 float64
Alice ... ... ...
... ... ... ...
Zelda ... ... ...
Zelda ... ... ...
Dask Name: sort_index, 1200 tasks

Because computing this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory.

In [18]:
df = df.persist()

Dask now knows where all data lives, indexed cleanly by name. As a result oerations like random access are cheap and efficient

In [19]:
%time df.loc['Alice'].compute()
CPU times: user 800 ms, sys: 32 ms, total: 832 ms
Wall time: 3.58 s
Out[19]:
id x y
name
Alice 1017 -0.62 -0.71
Alice 978 0.45 -0.96
Alice 1057 -0.35 -0.59
Alice 1020 -0.56 0.11
Alice 1045 0.27 0.10
... ... ... ...
Alice 1018 0.47 -0.12
Alice 996 0.32 0.14
Alice 987 0.84 -0.97
Alice 981 0.37 0.21
Alice 981 -0.56 -0.43

99942 rows × 3 columns

Groupby Apply with Scikit-Learn

Now that our data is sorted by name we can easily do operations like random access on name, or groupby-apply with custom functions.

Here we train a different Scikit-Learn linear regression model on each name.

In [20]:
from  sklearn.linear_model import LinearRegression

def train(partition):
    est = LinearRegression()
    est.fit(partition[['x']].values, partition.y.values)
    return est
In [21]:
df.groupby('name').apply(train, meta=object).compute()
Out[21]:
name
Alice      LinearRegression(copy_X=True, fit_intercept=Tr...
Bob        LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie    LinearRegression(copy_X=True, fit_intercept=Tr...
Dan        LinearRegression(copy_X=True, fit_intercept=Tr...
Edith      LinearRegression(copy_X=True, fit_intercept=Tr...
                                 ...
Victor     LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy      LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier     LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne     LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda      LinearRegression(copy_X=True, fit_intercept=Tr...
Length: 26, dtype: object
In [22]: