You can run this notebook in a live session or view it on Github.
DataFrames: Groupby¶
This notebook uses the Pandas groupbyaggregate and groupbyapply on scalable Dask dataframes. It will discuss both common use and best practices.
Start Dask Client for Dashboard¶
Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.
The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.
In [1]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client
Out[1]:
Client

Cluster

Artifical dataset¶
We create an artificial timeseries dataset to help us work with groupby operations
In [2]:
import dask
df = dask.datasets.timeseries()
df
Out[2]:
id  name  x  y  

npartitions=30  
20000101  int64  object  float64  float64 
20000102  ...  ...  ...  ... 
...  ...  ...  ...  ... 
20000130  ...  ...  ...  ... 
20000131  ...  ...  ...  ... 
This dataset is small enough to fit in the cluster’s memory, so we persist it now.
You would skip this step if your dataset becomes too large to fit into memory.
In [3]:
df = df.persist()
Groupby Aggregations¶
Dask dataframes implement a commonly used subset of the Pandas groupby API (see Pandas Groupby Documentation.
We start with groupby aggregations. These are generally fairly efficient, assuming that the number of groups is small (less than a million).
In [4]:
df.groupby('name').x.mean().compute()
Out[4]:
name
Alice 0.003574
Bob 0.004333
Charlie 0.002944
Dan 0.000717
Edith 0.000791
Frank 0.000112
George 0.002299
Hannah 0.001146
Ingrid 0.000887
Jerry 0.000448
Kevin 0.000860
Laura 0.001426
Michael 0.000779
Norbert 0.003489
Oliver 0.000035
Patricia 0.002386
Quinn 0.001182
Ray 0.003122
Sarah 0.000034
Tim 0.000152
Ursula 0.001579
Victor 0.001319
Wendy 0.001101
Xavier 0.000267
Yvonne 0.000073
Zelda 0.000965
Name: x, dtype: float64
Performance will depend on the aggregation you do (mean vs std), the key on which you group (name vs id), and the number of total groups
In [5]:
%time _ = df.groupby('id').x.mean().compute()
CPU times: user 380 ms, sys: 12 ms, total: 392 ms
Wall time: 293 ms
In [6]:
%time _ = df.groupby('name').x.mean().compute()
CPU times: user 708 ms, sys: 32 ms, total: 740 ms
Wall time: 524 ms
In [7]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()
CPU times: user 1.2 s, sys: 64 ms, total: 1.27 s
Wall time: 966 ms
Out[7]:
x  y  

mean  std  mean  count  
name  
Alice  0.003574  0.577825  0.000265  100187 
Bob  0.004333  0.577542  0.000859  99430 
Charlie  0.002944  0.577864  0.000018  99684 
Dan  0.000717  0.576092  0.001146  99802 
Edith  0.000791  0.577220  0.001763  99987 
This is the same as with Pandas. Generally speaking, Dask.dataframe groupbyaggregations are roughly same performance as Pandas groupbyaggregations, just more scalable.
You can read more about Pandas’ common aggregations in the Pandas documentation.
Many groups¶
By default groupbyaggregations (like groupbymean or groupbysum) return the result as a singlepartition Dask dataframe. Their results are usually quite small, so this is usually a good choice.
However, sometimes people want to do groupby aggregations on many
groups (millions or more). In these cases the full result may not fit
into a single Pandas dataframe output, and you may need to split your
output into multiple partitions. You can control this with the
split_out=
parameter
In [8]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
Out[8]:
In [9]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})
dot: graph is too large for cairorenderer bitmaps. Scaling by 0.717677 to fit
Out[9]:
Groupby Apply¶
Groupbyaggregations are generally quite fast because they can be broken down easily into well known operations. The data doesn’t have to move around too much and we can just pass around small intermediate values across the network.
For some operations however the function to be applied requires all data from a given group (like every record of someone named “Alice”). This will force a great deal of communication and be more expensive, but is still possible with the Groupbyapply method. This should be avoided if a groupbyaggregation works.
In the following example we train a simple ScikitLearn machine learning model on every person’s name.
In [10]:
from sklearn.linear_model import LinearRegression
def train(partition):
est = LinearRegression()
est.fit(partition[['x', 'id']].values, partition.y.values)
return est
In [11]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
CPU times: user 5.88 s, sys: 680 ms, total: 6.56 s
Wall time: 5.27 s
Out[11]:
name
Alice LinearRegression(copy_X=True, fit_intercept=Tr...
Bob LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie LinearRegression(copy_X=True, fit_intercept=Tr...
Dan LinearRegression(copy_X=True, fit_intercept=Tr...
Edith LinearRegression(copy_X=True, fit_intercept=Tr...
Frank LinearRegression(copy_X=True, fit_intercept=Tr...
George LinearRegression(copy_X=True, fit_intercept=Tr...
Hannah LinearRegression(copy_X=True, fit_intercept=Tr...
Ingrid LinearRegression(copy_X=True, fit_intercept=Tr...
Jerry LinearRegression(copy_X=True, fit_intercept=Tr...
Kevin LinearRegression(copy_X=True, fit_intercept=Tr...
Laura LinearRegression(copy_X=True, fit_intercept=Tr...
Michael LinearRegression(copy_X=True, fit_intercept=Tr...
Norbert LinearRegression(copy_X=True, fit_intercept=Tr...
Oliver LinearRegression(copy_X=True, fit_intercept=Tr...
Patricia LinearRegression(copy_X=True, fit_intercept=Tr...
Quinn LinearRegression(copy_X=True, fit_intercept=Tr...
Ray LinearRegression(copy_X=True, fit_intercept=Tr...
Sarah LinearRegression(copy_X=True, fit_intercept=Tr...
Tim LinearRegression(copy_X=True, fit_intercept=Tr...
Ursula LinearRegression(copy_X=True, fit_intercept=Tr...
Victor LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda LinearRegression(copy_X=True, fit_intercept=Tr...
dtype: object