Gotcha’s from Pandas to Dask
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Gotcha’s from Pandas to Dask¶
Pandas
to run in a Dask
environment.[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.05.0
Pandas versoin: 1.4.2
Start Dask Client for Dashboard¶
LocalCluster
, this will also provide a dashboard which is useful to gain insight on the computation.Jupyter Lab
an extenstion can be installed to view the various dashboard widgets.[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:
Client
Client-03e84462-0de1-11ed-a1e8-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
45bdf0f8
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 2 | Total memory: 6.78 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-1dfabd40-56f7-4aad-a87c-63bd33674848
Comm: tcp://127.0.0.1:36429 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:39049 | Total threads: 1 |
Dashboard: http://127.0.0.1:37891/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:41889 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-t0q30i9q |
Worker: 1
Comm: tcp://127.0.0.1:36835 | Total threads: 1 |
Dashboard: http://127.0.0.1:36827/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:43323 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-uo70pumh |
Create 2 DataFrames for comparison:¶
for Dask
for Pandas Dask comes with builtin dataset samples, we will use this sample for our example.
[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
Remember
Dask framework
is lazy thus in order to see the result we need to run compute() (orhead()
which runs under the hood compute()) )
[4]:
ddf.head(2)
[4]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
In order to create a Pandas
dataframe we can use the compute()
method from a Dask dataframe
[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
We can also see dask laziness when using the shape attribute
[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-cfb1b2b5-09dd-494e-b2d2-f875ace2562d'), 4)
We cannot get the full shape before accessing all the partitions - running len
will do so
[7]:
print(f'Dask computed shape: {len(ddf.index):,}') # expensive
Dask computed shape: 2,592,000
Creating a Dask dataframe
from Pandas
¶
In order to utilize Dask
capablities on an existing Pandas dataframe
(pdf) we need to convert the Pandas dataframe
into a Dask dataframe
(ddf) with the from_pandas method. You must supply the number of partitions or chunksize that will be used to generate the dask dataframe
[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
id | name | x | y | |
---|---|---|---|---|
npartitions=10 | ||||
2000-01-01 00:00:00 | int64 | object | float64 | float64 |
2000-01-04 00:00:00 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-28 00:00:00 | ... | ... | ... | ... |
2000-01-30 23:59:59 | ... | ... | ... | ... |
Partitions in Dask Dataframes¶
Dask dataframe
we needed to supply an argument of npartitions
.Dask
on how to breakup the Pandas Datafram
and parallelize the computation.An example for this can be seen when examing the reset_ index()
method:
[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp 2000-01-01 00:00:00
id 983
name Wendy
x -0.303374
y -0.423744
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
0 | 2000-01-01 | 983 | Wendy | -0.303374 | -0.423744 |
0 | 2000-01-04 | 1002 | Kevin | -0.825578 | -0.584699 |
0 | 2000-01-07 | 963 | Oliver | 0.024036 | -0.692546 |
0 | 2000-01-10 | 1023 | Yvonne | 0.897486 | 0.958034 |
0 | 2000-01-13 | 1088 | Quinn | -0.721954 | 0.261693 |
0 | 2000-01-16 | 994 | George | 0.463023 | -0.166976 |
0 | 2000-01-19 | 932 | Frank | 0.272315 | -0.585240 |
0 | 2000-01-22 | 1007 | Ursula | -0.919138 | -0.173157 |
0 | 2000-01-25 | 983 | Patricia | -0.893431 | -0.892484 |
0 | 2000-01-28 | 1043 | Oliver | -0.979336 | -0.581927 |
Dask Dataframe vs Pandas Dataframe¶
Now that we have a dask
(ddf) and a pandas
(pdf) dataframe we can start to compair the interactions with them.
Conceptual shift - from Update to Insert/Delete¶
inplace=True
which exist in Pandas.Rename Columns¶
using
inplace=True
is not considerd to be best practice.
[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
Data manipulations¶
There are several diffrences when manipulating data.
loc - Pandas¶
[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:06 | 1019 | Xavier | 0.634802 | 45.051214 |
2000-01-01 00:00:08 | 1013 | Charlie | 0.627523 | -8.101142 |
Error¶
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [14], in <cell line: 2>()
1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
3 ddf[cond_ddf].head(2)
TypeError: '_LocIndexer' object does not support item assignment
Dask - use mask/where¶
[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 |
For more information see dask mask documentation
Meta argument¶
Dask
is the introduction of meta
arguement.meta
is the prescription of the names/types of the output from the computationDask
creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3946: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
Before: .apply(func)
After: .apply(func, meta=('name', 'object'))
warnings.warn(meta_warning(meta))
[17]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je |
[20]:
# similar when using a function
def func(row):
if (row['x']> 0):
return row['x'] * 1000
else:
return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID | name | x | y | initials | z | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 0.423744 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 21.914646 |
Map partitions¶
We can supply an ad-hoc function to run on each partition using the map_partitions method. Mainly useful for functions that are not implemented in
Dask
orPandas
.Finally we can return a new
dataframe
which needs to be described in themeta
argument The function could also include arguments.
[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
+ (df[coor_y] - df[coor_y].shift())**2 )
return df.drop(drop_cols, axis=1)
ddf2 = ddf.map_partitions(func2
, coor_x='x'
, coor_y='y'
, drop_cols=['initials', 'z']
, meta=pd.DataFrame({'ID':'i8'
, 'name':str
, 'x':'f8'
, 'y':'f8'
, 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID | name | x | y | dist | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | NaN |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 1.063636 |
2000-01-01 00:00:02 | 996 | Kevin | 0.336184 | 0.150478 | 0.539449 |
2000-01-01 00:00:03 | 1035 | Quinn | 0.853655 | 0.031222 | 0.531035 |
2000-01-01 00:00:04 | 1039 | Ingrid | 0.890711 | -0.992794 | 1.024686 |
Convert index into Time column¶
[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID | name | x | y | initials | times | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 00:00:01 |
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID | name | x | y | initials | z | times | |
---|---|---|---|---|---|---|---|
timestamp | |||||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | We | 0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | Je | 21.914646 | 00:00:01 |
Drop NA on column¶
[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
In odrer for Dask
to drop a column with all na
it must check all the partitions with compute()
[28]:
if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive
ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
2000-01-01 00:00:01 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
1.4 Reset Index¶
[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
Read / Save files¶
When working with
pandas
anddask
preferable use parquet format.When working with
Dask
- files can be read with multiple workers .Most
kwargs
are applicable for reading and writing files e.g. ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False).However some are not available such as
nrows
.
see documentaion (including the option for output file naming).
Save files¶
[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 16.4 s, sys: 433 ms, total: 16.8 s
Wall time: 16.6 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-25.csv'),
PosixPath('data/2000-01-20.csv'),
PosixPath('data/2000-01-29.csv'),
PosixPath('data/2000-01-02.csv'),
PosixPath('data/2000-01-19.csv'),
PosixPath('data/2000-01-23.csv'),
PosixPath('data/2000-01-10.csv'),
PosixPath('data/2000-01-21.csv'),
PosixPath('data/2000-01-17.csv'),
PosixPath('data/2000-01-04.csv'),
PosixPath('data/2000-01-27.csv'),
PosixPath('data/2000-01-22.csv'),
PosixPath('data/2000-01-14.csv'),
PosixPath('data/2000-01-11.csv'),
PosixPath('data/pdf_single_file.csv'),
PosixPath('data/2000-01-13.csv'),
PosixPath('data/2000-01-08.csv'),
PosixPath('data/2000-01-09.csv'),
PosixPath('data/2000-01-06.csv'),
PosixPath('data/2000-01-01.csv'),
PosixPath('data/2000-01-07.csv'),
PosixPath('data/2000-01-12.csv'),
PosixPath('data/2000-01-16.csv'),
PosixPath('data/2000-01-26.csv'),
PosixPath('data/2000-01-24.csv'),
PosixPath('data/2000-01-18.csv'),
PosixPath('data/2000-01-15.csv'),
PosixPath('data/2000-01-03.csv'),
PosixPath('data/2000-01-30.csv'),
PosixPath('data/2000-01-28.csv'),
PosixPath('data/2000-01-05.csv')]
Notice the '*'
to allow for multiple file renaming.
[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 454 ms, sys: 46.5 ms, total: 500 ms
Wall time: 10.4 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']
To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions
To change the number of output files use repartition which is an expensive operation.
[36]:
ddf.npartitions
[36]:
30
Read Multiple files¶
For pandas
it is possible to iterate and concat the files see answer from stack overflow.
[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.75 s, sys: 318 ms, total: 3.07 s
Wall time: 3 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms
Wall time: 12.4 ms
[38]:
ID | name | x | y | times | |
---|---|---|---|---|---|
npartitions=30 | |||||
int64 | object | float64 | float64 | object | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
Remember that Dask
is lazy - thus it does not realy read the file until it needs to…
[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 69.6 ms, sys: 11.3 ms, total: 81 ms
Wall time: 818 ms
[39]:
2592000
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 983 | Wendy | -0.303374 | -0.423744 | 00:00:00 |
1 | 964 | Jerry | 0.021915 | 0.588930 | 00:00:01 |
Group By - custom aggregations¶
groupby.apply
.[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name | ID | seconds | |
---|---|---|---|
0 | Wendy | 983 | 00 |
1 | Jerry | 964 | 00 |
2 | Kevin | 996 | 00 |
3 | Quinn | 1035 | 00 |
4 | Ingrid | 1039 | 00 |
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
(lambda x: list(set(x.to_list())))
for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
Weight ID \
name
Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
seconds
name
Alice [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
Bob [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
CPU times: user 23.1 ms, sys: 169 µs, total: 23.3 ms
Wall time: 22.1 ms
Remeber that in any some cases
Pandas
is more efficiante (assuming that you can load all the data into the RAM).
[44]:
def set_list_att(x: dd.Series):
return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name | ID | seconds | |
---|---|---|---|
0 | Wendy | 983 | 00 |
1 | Jerry | 964 | 01 |
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File <timed exec>:4, in <module>
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1213, in _Frame.head(self, n, npartitions, compute)
1211 # No need to warn if we're already looking at all partitions
1212 safe = npartitions != self.npartitions
-> 1213 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1247, in _Frame._head(self, n, npartitions, compute, safe)
1242 result = new_dd_object(
1243 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1244 )
1246 if compute:
-> 1247 result = result.compute()
1248 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
268 def compute(self, **kwargs):
269 """Compute this dask collection
270
271 This turns a lazy Dask collection into its in-memory equivalent.
(...)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3002 should_rejoin = False
3003 try:
-> 3004 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3005 finally:
3006 for f in futures.values():
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
2176 else:
2177 local_worker = None
-> 2178 return self.sync(
2179 self._gather,
2180 futures,
2181 errors=errors,
2182 direct=direct,
2183 local_worker=local_worker,
2184 asynchronous=asynchronous,
2185 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
316 return future
317 else:
--> 318 return sync(
319 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
320 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
383 if error:
384 typ, exc, tb = error
--> 385 raise exc.with_traceback(tb)
386 else:
387 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
356 future = asyncio.wait_for(future, callback_timeout)
357 future = asyncio.ensure_future(future)
--> 358 result = yield future
359 except Exception:
360 error = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
2039 exc = CancelledError(key)
2040 else:
-> 2041 raise exception.with_traceback(traceback)
2042 raise exc
2043 if errors == "skip":
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
38 def apply(func, args, kwargs=None):
39 if kwargs:
---> 40 return func(*args, **kwargs)
41 else:
42 return func(*args)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
6434 return meta
6435 if is_dataframe_like(df):
-> 6436 check_matching_columns(meta, df)
6437 c = meta.columns
6438 else:
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
413 else:
414 extra_info = "Order of columns does not match"
--> 415 raise ValueError(
416 "The columns in the computed data do not match"
417 " the columns in the provided metadata\n"
418 f"{extra_info}"
419 )
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: ['name']
Missing: [0]
[48]:
import itertools
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 185 ms, sys: 11.3 ms, total: 196 ms
Wall time: 1.33 s
[49]:
Weight | ID | seconds | |
---|---|---|---|
name | |||
Alice | 99833 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0... |
Bob | 99508 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0... |
Debugging¶
Debugging may be challenging… 1. Run code without client 2. Use Dashboard profiler 3. Verify integrity of DAG
Corrupted DAG¶
In this example we show that once the DAG is currupted you may need to reset the calculation
[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 | 996 | Ingrid | -0.932092 | 0.477965 |
[51]:
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax
+ (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
Is everything OK?
# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)Even if the function is corrected the DAG is corrupted
[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax
+ (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
We need to reset the dataframe
[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function
+ (df[coor_y] - df[coor_y].shift())**2 )
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
[53]:
id | name | x | y | col | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 979 | Frank | 0.820777 | 0.098616 | NaN |
2000-01-01 00:00:01 | 990 | Laura | -0.851323 | -0.501678 | 1.77659 |