Gotcha’s from Pandas to Dask
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Gotcha’s from Pandas to Dask¶
Pandas
to run in a Dask
environment.[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.04.0
Pandas versoin: 1.4.2
Start Dask Client for Dashboard¶
LocalCluster
, this will also provide a dashboard which is useful to gain insight on the computation.Jupyter Lab
an extenstion can be installed to view the various dashboard widgets.[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:
Client
Client-f3a78d75-d51f-11ec-a19b-000d3aeabb7a
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
24dc48f5
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 2 | Total memory: 6.78 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-b2e4d2b9-7c39-4aac-9130-ad0f3ecc6639
Comm: tcp://127.0.0.1:39211 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:37931 | Total threads: 1 |
Dashboard: http://127.0.0.1:42685/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:41003 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8dvc6orb |
Worker: 1
Comm: tcp://127.0.0.1:40067 | Total threads: 1 |
Dashboard: http://127.0.0.1:40229/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:40143 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-7x38qz2h |
Create 2 DataFrames for comparison:¶
for Dask
for Pandas Dask comes with builtin dataset samples, we will use this sample for our example.
[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
Remember
Dask framework
is lazy thus in order to see the result we need to run compute() (orhead()
which runs under the hood compute()) )
[4]:
ddf.head(2)
[4]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 |
In order to create a Pandas
dataframe we can use the compute()
method from a Dask dataframe
[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 |
We can also see dask laziness when using the shape attribute
[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-457670a7-0118-48ff-b5cd-08db2a126430'), 4)
We cannot get the full shape before accessing all the partitions - running len
will do so
[7]:
print(f'Dask computed shape: {len(ddf.index):,}') # expensive
Dask computed shape: 2,592,000
Creating a Dask dataframe
from Pandas
¶
In order to utilize Dask
capablities on an existing Pandas dataframe
(pdf) we need to convert the Pandas dataframe
into a Dask dataframe
(ddf) with the from_pandas method. You must supply the number of partitions or chunksize that will be used to generate the dask dataframe
[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
id | name | x | y | |
---|---|---|---|---|
npartitions=10 | ||||
2000-01-01 00:00:00 | int64 | object | float64 | float64 |
2000-01-04 00:00:00 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-28 00:00:00 | ... | ... | ... | ... |
2000-01-30 23:59:59 | ... | ... | ... | ... |
Partitions in Dask Dataframes¶
Dask dataframe
we needed to supply an argument of npartitions
.Dask
on how to breakup the Pandas Datafram
and parallelize the computation.An example for this can be seen when examing the reset_ index()
method:
[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp 2000-01-01 00:00:00
id 995
name Yvonne
x -0.257197
y 0.288303
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp | id | name | x | y | |
---|---|---|---|---|---|
0 | 2000-01-01 | 995 | Yvonne | -0.257197 | 0.288303 |
0 | 2000-01-04 | 1040 | Kevin | 0.071874 | 0.998809 |
0 | 2000-01-07 | 950 | Dan | -0.860903 | -0.366538 |
0 | 2000-01-10 | 1012 | Norbert | -0.688807 | -0.845508 |
0 | 2000-01-13 | 970 | Kevin | 0.964429 | -0.470225 |
0 | 2000-01-16 | 1017 | Ray | -0.858035 | -0.123796 |
0 | 2000-01-19 | 973 | Norbert | -0.595604 | 0.726321 |
0 | 2000-01-22 | 1024 | Bob | 0.272160 | 0.549340 |
0 | 2000-01-25 | 1026 | Bob | -0.942005 | -0.867345 |
0 | 2000-01-28 | 996 | Norbert | 0.885387 | -0.840972 |
Dask Dataframe vs Pandas Dataframe¶
Now that we have a dask
(ddf) and a pandas
(pdf) dataframe we can start to compair the interactions with them.
Conceptual shift - from Update to Insert/Delete¶
inplace=True
which exist in Pandas.Rename Columns¶
using
inplace=True
is not considerd to be best practice.
[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
Data manipulations¶
There are several diffrences when manipulating data.
loc - Pandas¶
[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:17 | 1000 | Norbert | 0.727163 | 89.682962 |
2000-01-01 00:00:20 | 1060 | Ursula | 0.700307 | -75.613821 |
Error¶
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [14], in <cell line: 2>()
1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
3 ddf[cond_ddf].head(2)
TypeError: '_LocIndexer' object does not support item assignment
Dask - use mask/where¶
[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 |
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 |
For more information see dask mask documentation
Meta argument¶
Dask
is the introduction of meta
arguement.meta
is the prescription of the names/types of the output from the computationDask
creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La |
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3930: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
Before: .apply(func)
After: .apply(func, meta=('name', 'object'))
warnings.warn(meta_warning(meta))
[17]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La |
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID | name | x | y | initials | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La |
[20]:
# similar when using a function
def func(row):
if (row['x']> 0):
return row['x'] * 1000
else:
return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID | name | x | y | initials | z | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv | -0.288303 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La | -0.630270 |
Map partitions¶
We can supply an ad-hoc function to run on each partition using the map_partitions method. Mainly useful for functions that are not implemented in
Dask
orPandas
.Finally we can return a new
dataframe
which needs to be described in themeta
argument The function could also include arguments.
[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
+ (df[coor_y] - df[coor_y].shift())**2 )
return df.drop(drop_cols, axis=1)
ddf2 = ddf.map_partitions(func2
, coor_x='x'
, coor_y='y'
, drop_cols=['initials', 'z']
, meta=pd.DataFrame({'ID':'i8'
, 'name':str
, 'x':'f8'
, 'y':'f8'
, 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID | name | x | y | dist | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | NaN |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | 0.636862 |
2000-01-01 00:00:02 | 1012 | Ursula | -0.013075 | 0.732633 | 0.788061 |
2000-01-01 00:00:03 | 960 | Yvonne | -0.361644 | -0.202956 | 0.998412 |
2000-01-01 00:00:04 | 988 | Kevin | 0.918343 | -0.933568 | 1.473825 |
Convert index into Time column¶
[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID | name | x | y | initials | times | |
---|---|---|---|---|---|---|
timestamp | ||||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv | 00:00:00 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La | 00:00:01 |
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID | name | x | y | initials | z | times | |
---|---|---|---|---|---|---|---|
timestamp | |||||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | Yv | -0.288303 | 00:00:00 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | La | -0.630270 | 00:00:01 |
Drop NA on column¶
[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | 00:00:00 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | 00:00:01 |
In odrer for Dask
to drop a column with all na
it must check all the partitions with compute()
[28]:
if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive
ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID | name | x | y | times | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 995 | Yvonne | -0.257197 | 0.288303 | 00:00:00 |
2000-01-01 00:00:01 | 1007 | Laura | -0.794460 | 0.630270 | 00:00:01 |
1.4 Reset Index¶
[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 995 | Yvonne | -0.257197 | 0.288303 | 00:00:00 |
1 | 1007 | Laura | -0.794460 | 0.630270 | 00:00:01 |
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 995 | Yvonne | -0.257197 | 0.288303 | 00:00:00 |
1 | 1007 | Laura | -0.794460 | 0.630270 | 00:00:01 |
Read / Save files¶
When working with
pandas
anddask
preferable use parquet format.When working with
Dask
- files can be read with multiple workers .Most
kwargs
are applicable for reading and writing files e.g. ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False).However some are not available such as
nrows
.
see documentaion (including the option for output file naming).
Save files¶
[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 15.1 s, sys: 343 ms, total: 15.5 s
Wall time: 15.3 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-26.csv'),
PosixPath('data/2000-01-09.csv'),
PosixPath('data/2000-01-01.csv'),
PosixPath('data/2000-01-11.csv'),
PosixPath('data/2000-01-02.csv'),
PosixPath('data/2000-01-22.csv'),
PosixPath('data/2000-01-08.csv'),
PosixPath('data/2000-01-07.csv'),
PosixPath('data/2000-01-03.csv'),
PosixPath('data/2000-01-30.csv'),
PosixPath('data/2000-01-29.csv'),
PosixPath('data/2000-01-12.csv'),
PosixPath('data/2000-01-19.csv'),
PosixPath('data/2000-01-20.csv'),
PosixPath('data/2000-01-23.csv'),
PosixPath('data/2000-01-04.csv'),
PosixPath('data/2000-01-13.csv'),
PosixPath('data/2000-01-06.csv'),
PosixPath('data/2000-01-21.csv'),
PosixPath('data/2000-01-10.csv'),
PosixPath('data/2000-01-17.csv'),
PosixPath('data/pdf_single_file.csv'),
PosixPath('data/2000-01-14.csv'),
PosixPath('data/2000-01-05.csv'),
PosixPath('data/2000-01-16.csv'),
PosixPath('data/2000-01-28.csv'),
PosixPath('data/2000-01-25.csv'),
PosixPath('data/2000-01-27.csv'),
PosixPath('data/2000-01-18.csv'),
PosixPath('data/2000-01-15.csv'),
PosixPath('data/2000-01-24.csv')]
Notice the '*'
to allow for multiple file renaming.
[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 380 ms, sys: 31.3 ms, total: 412 ms
Wall time: 9.27 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
'/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']
To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions
To change the number of output files use repartition which is an expensive operation.
[36]:
ddf.npartitions
[36]:
30
Read Multiple files¶
For pandas
it is possible to iterate and concat the files see answer from stack overflow.
[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.6 s, sys: 350 ms, total: 2.95 s
Wall time: 2.88 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 9.84 ms, sys: 0 ns, total: 9.84 ms
Wall time: 9.3 ms
[38]:
ID | name | x | y | times | |
---|---|---|---|---|---|
npartitions=30 | |||||
int64 | object | float64 | float64 | object | |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | |
... | ... | ... | ... | ... |
Remember that Dask
is lazy - thus it does not realy read the file until it needs to…
[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 70.5 ms, sys: 6.65 ms, total: 77.2 ms
Wall time: 769 ms
[39]:
2592000
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID | name | x | y | times | |
---|---|---|---|---|---|
0 | 995 | Yvonne | -0.257197 | 0.288303 | 00:00:00 |
1 | 1007 | Laura | -0.794460 | 0.630270 | 00:00:01 |
Group By - custom aggregations¶
groupby.apply
.[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name | ID | seconds | |
---|---|---|---|
0 | Yvonne | 995 | 00 |
1 | Laura | 1007 | 00 |
2 | Ursula | 1012 | 00 |
3 | Yvonne | 960 | 00 |
4 | Kevin | 988 | 00 |
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
(lambda x: list(set(x.to_list())))
for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
Weight ID \
name
Alice 99633 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob 99782 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
seconds
name
Alice [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...
Bob [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...
CPU times: user 22.4 ms, sys: 0 ns, total: 22.4 ms
Wall time: 20.1 ms
Remeber that in any some cases
Pandas
is more efficiante (assuming that you can load all the data into the RAM).
[44]:
def set_list_att(x: dd.Series):
return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name | ID | seconds | |
---|---|---|---|
0 | Yvonne | 995 | 00 |
1 | Laura | 1007 | 01 |
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File <timed exec>:4, in <module>
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1208, in _Frame.head(self, n, npartitions, compute)
1206 # No need to warn if we're already looking at all partitions
1207 safe = npartitions != self.npartitions
-> 1208 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1242, in _Frame._head(self, n, npartitions, compute, safe)
1237 result = new_dd_object(
1238 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1239 )
1241 if compute:
-> 1242 result = result.compute()
1243 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
268 def compute(self, **kwargs):
269 """Compute this dask collection
270
271 This turns a lazy Dask collection into its in-memory equivalent.
(...)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3018, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3016 should_rejoin = False
3017 try:
-> 3018 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3019 finally:
3020 for f in futures.values():
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2171, in Client.gather(self, futures, errors, direct, asynchronous)
2169 else:
2170 local_worker = None
-> 2171 return self.sync(
2172 self._gather,
2173 futures,
2174 errors=errors,
2175 direct=direct,
2176 local_worker=local_worker,
2177 asynchronous=asynchronous,
2178 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:309, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
307 return future
308 else:
--> 309 return sync(
310 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
311 )
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:376, in sync(loop, func, callback_timeout, *args, **kwargs)
374 if error:
375 typ, exc, tb = error
--> 376 raise exc.with_traceback(tb)
377 else:
378 return result
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:349, in sync.<locals>.f()
347 future = asyncio.wait_for(future, callback_timeout)
348 future = asyncio.ensure_future(future)
--> 349 result = yield future
350 except Exception:
351 error = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2034, in Client._gather(self, futures, errors, direct, local_worker)
2032 exc = CancelledError(key)
2033 else:
-> 2034 raise exception.with_traceback(traceback)
2035 raise exc
2036 if errors == "skip":
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
83 """Do the actual work of collecting data and executing a function
84
85 Examples
(...)
110 'foo'
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:39, in apply()
37 def apply(func, args, kwargs=None):
38 if kwargs:
---> 39 return func(*args, **kwargs)
40 else:
41 return func(*args)
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6355, in apply_and_enforce()
6353 return meta
6354 if is_dataframe_like(df):
-> 6355 check_matching_columns(meta, df)
6356 c = meta.columns
6357 else:
File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
413 else:
414 extra_info = "Order of columns does not match"
--> 415 raise ValueError(
416 "The columns in the computed data do not match"
417 " the columns in the provided metadata\n"
418 f"{extra_info}"
419 )
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: ['name']
Missing: [0]
[48]:
import itertools
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 171 ms, sys: 4.06 ms, total: 175 ms
Wall time: 1.21 s
[49]:
Weight | ID | seconds | |
---|---|---|---|
name | |||
Alice | 99633 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [23, 55, 51, 21, 28, 58, 35, 06, 53, 11, 39, 3... |
Bob | 99782 | [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | [23, 55, 51, 21, 28, 58, 06, 35, 53, 39, 11, 3... |
Debugging¶
Debugging may be challenging… 1. Run code without client 2. Use Dashboard profiler 3. Verify integrity of DAG
Corrupted DAG¶
In this example we show that once the DAG is currupted you may need to reset the calculation
[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id | name | x | y | |
---|---|---|---|---|
timestamp | ||||
2000-01-01 | 1011 | Xavier | -0.409016 | 0.331355 |
[51]:
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax
+ (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
Is everything OK?
# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)Even if the function is corrected the DAG is corrupted
[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax
+ (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
We need to reset the dataframe
[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function
+ (df[coor_y] - df[coor_y].shift())**2 )
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
warnings.warn(
[53]:
id | name | x | y | col | |
---|---|---|---|---|---|
timestamp | |||||
2000-01-01 00:00:00 | 1000 | Alice | -0.805901 | -0.690125 | NaN |
2000-01-01 00:00:01 | 974 | Quinn | 0.341908 | 0.012709 | 1.345898 |