DataFrames: Groupby
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
DataFrames: Groupby¶
This notebook uses the Pandas groupby-aggregate and groupby-apply on scalable Dask dataframes. It will discuss both common use and best practices.
Start Dask Client for Dashboard¶
Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.
The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.
[1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
[1]:
Client
Client-f66fcc00-0de0-11ed-a18c-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
40f50cf8
Dashboard: http://127.0.0.1:8787/status | Workers: 1 |
Total threads: 4 | Total memory: 1.86 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-c95eb614-e548-45f8-bbbb-9867335584b4
Comm: tcp://127.0.0.1:37903 | Workers: 1 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 1.86 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:34411 | Total threads: 4 |
Dashboard: http://127.0.0.1:44441/status | Memory: 1.86 GiB |
Nanny: tcp://127.0.0.1:36749 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8cgbsa8t |
Artifical dataset¶
We create an artificial timeseries dataset to help us work with groupby operations
[2]:
import dask
df = dask.datasets.timeseries()
df
[2]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
This dataset is small enough to fit in the cluster’s memory, so we persist it now.
You would skip this step if your dataset becomes too large to fit into memory.
[3]:
df = df.persist()
Groupby Aggregations¶
Dask dataframes implement a commonly used subset of the Pandas groupby API (see Pandas Groupby Documentation.
We start with groupby aggregations. These are generally fairly efficient, assuming that the number of groups is small (less than a million).
[4]:
df.groupby('name').x.mean().compute()
[4]:
name
Alice -0.000260
Bob -0.000536
Charlie 0.001401
Dan 0.000685
Edith -0.000263
Frank -0.000048
George -0.001241
Hannah -0.002285
Ingrid -0.002062
Jerry 0.000779
Kevin 0.002219
Laura -0.000472
Michael -0.003100
Norbert -0.001520
Oliver 0.001986
Patricia 0.000934
Quinn -0.000233
Ray -0.001600
Sarah -0.000426
Tim 0.001102
Ursula 0.002923
Victor -0.000821
Wendy -0.001977
Xavier -0.002330
Yvonne -0.001805
Zelda -0.004697
Name: x, dtype: float64
Performance will depend on the aggregation you do (mean vs std), the key on which you group (name vs id), and the number of total groups
[5]:
%time _ = df.groupby('id').x.mean().compute()
CPU times: user 63.9 ms, sys: 3.83 ms, total: 67.7 ms
Wall time: 235 ms
[6]:
%time _ = df.groupby('name').x.mean().compute()
CPU times: user 87.4 ms, sys: 0 ns, total: 87.4 ms
Wall time: 452 ms
[7]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()
CPU times: user 66.1 ms, sys: 4.25 ms, total: 70.4 ms
Wall time: 355 ms
[7]:
x | y | |||
---|---|---|---|---|
mean | std | mean | count | |
name | ||||
Alice | -0.000260 | 0.577759 | 0.001977 | 99471 |
Bob | -0.000536 | 0.577571 | -0.000872 | 99283 |
Charlie | 0.001401 | 0.577126 | -0.001523 | 99326 |
Dan | 0.000685 | 0.577608 | -0.002160 | 99687 |
Edith | -0.000263 | 0.577649 | 0.001361 | 100010 |
This is the same as with Pandas. Generally speaking, Dask.dataframe groupby-aggregations are roughly same performance as Pandas groupby-aggregations, just more scalable.
You can read more about Pandas’ common aggregations in the Pandas documentation.
Custom Aggregations¶
Dask dataframe Aggregate is available for custom aggregations (See Dask dataframe Aggregate Documentation)
Many groups¶
By default groupby-aggregations (like groupby-mean or groupby-sum) return the result as a single-partition Dask dataframe. Their results are usually quite small, so this is usually a good choice.
However, sometimes people want to do groupby aggregations on many groups (millions or more). In these cases the full result may not fit into a single Pandas dataframe output, and you may need to split your output into multiple partitions. You can control this with the split_out=
parameter
[8]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
[8]:
[9]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})
[9]:
Groupby Apply¶
Groupby-aggregations are generally quite fast because they can be broken down easily into well known operations. The data doesn’t have to move around too much and we can just pass around small intermediate values across the network.
For some operations however the function to be applied requires all data from a given group (like every record of someone named “Alice”). This will force a great deal of communication and be more expensive, but is still possible with the Groupby-apply method. This should be avoided if a groupby-aggregation works.
In the following example we train a simple Scikit-Learn machine learning model on every person’s name.
[10]:
from sklearn.linear_model import LinearRegression
def train(partition):
if partition.empty:
return
est = LinearRegression()
est.fit(partition[['x', 'id']].values, partition.y.values)
return est
[11]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
CPU times: user 676 ms, sys: 46.7 ms, total: 723 ms
Wall time: 4.25 s
[11]:
name
Alice LinearRegression()
Bob LinearRegression()
Charlie LinearRegression()
Dan LinearRegression()
Edith LinearRegression()
Frank LinearRegression()
George LinearRegression()
Hannah LinearRegression()
Ingrid LinearRegression()
Jerry LinearRegression()
Kevin LinearRegression()
Laura LinearRegression()
Michael LinearRegression()
Norbert LinearRegression()
Oliver LinearRegression()
Patricia LinearRegression()
Quinn LinearRegression()
Ray LinearRegression()
Sarah LinearRegression()
Tim LinearRegression()
Ursula LinearRegression()
Victor LinearRegression()
Wendy LinearRegression()
Xavier LinearRegression()
Yvonne LinearRegression()
Zelda LinearRegression()
dtype: object
[ ]: