You can run this notebook in a live session Binder or view it on Github.

DataFrames: Groupby

This notebook uses the Pandas groupby-aggregate and groupby-apply on scalable Dask dataframes. It will discuss both common use and best practices.

Start Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client
[1]:

Client

Cluster

  • Workers: 1
  • Cores: 4
  • Memory: 2.00 GB

Artifical dataset

We create an artificial timeseries dataset to help us work with groupby operations

[2]:
import dask
df = dask.datasets.timeseries()
df
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/io/demo.py:91: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  freq=partition_freq))
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/io/demo.py:45: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  index = pd.DatetimeIndex(start=start, end=end, freq=freq, name='timestamp')
[2]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks

This dataset is small enough to fit in the cluster’s memory, so we persist it now.

You would skip this step if your dataset becomes too large to fit into memory.

[3]:
df = df.persist()

Groupby Aggregations

Dask dataframes implement a commonly used subset of the Pandas groupby API (see Pandas Groupby Documentation.

We start with groupby aggregations. These are generally fairly efficient, assuming that the number of groups is small (less than a million).

[4]:
df.groupby('name').x.mean().compute()
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/io/demo.py:45: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  index = pd.DatetimeIndex(start=start, end=end, freq=freq, name='timestamp')
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/io/demo.py:45: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  index = pd.DatetimeIndex(start=start, end=end, freq=freq, name='timestamp')
[4]:
name
Alice      -0.000419
Bob         0.000947
Charlie    -0.002959
Dan         0.000228
Edith       0.004281
Frank       0.000989
George     -0.000623
Hannah      0.000983
Ingrid     -0.002124
Jerry       0.001563
Kevin       0.001223
Laura       0.003199
Michael     0.002654
Norbert    -0.002670
Oliver     -0.001434
Patricia   -0.001516
Quinn      -0.003011
Ray         0.002881
Sarah      -0.002060
Tim         0.001920
Ursula     -0.000829
Victor     -0.000345
Wendy      -0.002388
Xavier     -0.001999
Yvonne      0.000217
Zelda      -0.001669
Name: x, dtype: float64

Performance will depend on the aggregation you do (mean vs std), the key on which you group (name vs id), and the number of total groups

[5]:
%time _ = df.groupby('id').x.mean().compute()
CPU times: user 400 ms, sys: 36 ms, total: 436 ms
Wall time: 329 ms
[6]:
%time _ = df.groupby('name').x.mean().compute()
CPU times: user 804 ms, sys: 12 ms, total: 816 ms
Wall time: 578 ms
[7]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/utils.py:367: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  tz=idx.tz, name=idx.name)
CPU times: user 1.47 s, sys: 80 ms, total: 1.55 s
Wall time: 1.2 s
[7]:
x y
mean std mean count
name
Alice -0.000419 0.576168 -0.000867 99262
Bob 0.000947 0.577386 0.000821 99860
Charlie -0.002959 0.577185 0.001377 99713
Dan 0.000228 0.577567 0.003589 100290
Edith 0.004281 0.576125 -0.000770 99829

This is the same as with Pandas. Generally speaking, Dask.dataframe groupby-aggregations are roughly same performance as Pandas groupby-aggregations, just more scalable.

You can read more about Pandas’ common aggregations in the Pandas documentation.

Many groups

By default groupby-aggregations (like groupby-mean or groupby-sum) return the result as a single-partition Dask dataframe. Their results are usually quite small, so this is usually a good choice.

However, sometimes people want to do groupby aggregations on many groups (millions or more). In these cases the full result may not fit into a single Pandas dataframe output, and you may need to split your output into multiple partitions. You can control this with the split_out= parameter

[8]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
[8]:
../_images/dataframes_02-groupby_15_0.png
[9]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})
[9]:
../_images/dataframes_02-groupby_16_0.png

Groupby Apply

Groupby-aggregations are generally quite fast because they can be broken down easily into well known operations. The data doesn’t have to move around too much and we can just pass around small intermediate values across the network.

For some operations however the function to be applied requires all data from a given group (like every record of someone named “Alice”). This will force a great deal of communication and be more expensive, but is still possible with the Groupby-apply method. This should be avoided if a groupby-aggregation works.

In the following example we train a simple Scikit-Learn machine learning model on every person’s name.

[10]:
from sklearn.linear_model import LinearRegression

def train(partition):
    est = LinearRegression()
    est.fit(partition[['x', 'id']].values, partition.y.values)
    return est
[11]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
/home/travis/miniconda/envs/test/lib/python3.6/site-packages/dask/dataframe/utils.py:367: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  tz=idx.tz, name=idx.name)
CPU times: user 6.98 s, sys: 772 ms, total: 7.76 s
Wall time: 6.31 s
[11]:
name
Alice       LinearRegression(copy_X=True, fit_intercept=Tr...
Bob         LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie     LinearRegression(copy_X=True, fit_intercept=Tr...
Dan         LinearRegression(copy_X=True, fit_intercept=Tr...
Edith       LinearRegression(copy_X=True, fit_intercept=Tr...
Frank       LinearRegression(copy_X=True, fit_intercept=Tr...
George      LinearRegression(copy_X=True, fit_intercept=Tr...
Hannah      LinearRegression(copy_X=True, fit_intercept=Tr...
Ingrid      LinearRegression(copy_X=True, fit_intercept=Tr...
Jerry       LinearRegression(copy_X=True, fit_intercept=Tr...
Kevin       LinearRegression(copy_X=True, fit_intercept=Tr...
Laura       LinearRegression(copy_X=True, fit_intercept=Tr...
Michael     LinearRegression(copy_X=True, fit_intercept=Tr...
Norbert     LinearRegression(copy_X=True, fit_intercept=Tr...
Oliver      LinearRegression(copy_X=True, fit_intercept=Tr...
Patricia    LinearRegression(copy_X=True, fit_intercept=Tr...
Quinn       LinearRegression(copy_X=True, fit_intercept=Tr...
Ray         LinearRegression(copy_X=True, fit_intercept=Tr...
Sarah       LinearRegression(copy_X=True, fit_intercept=Tr...
Tim         LinearRegression(copy_X=True, fit_intercept=Tr...
Ursula      LinearRegression(copy_X=True, fit_intercept=Tr...
Victor      LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy       LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier      LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne      LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda       LinearRegression(copy_X=True, fit_intercept=Tr...
dtype: object