Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Gotcha’s from Pandas to Dask

This notebook highlights some key differences when transfering code from Pandas to run in a Dask environment.
Most issues have a link to the Dask documentation for additional information.
[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.05.0
Pandas versoin: 1.4.2

Start Dask Client for Dashboard

Starting the Dask Client is optional. In this example we are running on a LocalCluster, this will also provide a dashboard which is useful to gain insight on the computation.
For additional information on Dask Client see documentation
The link to the dashboard will become visible when you create a client (as shown below).
When running within Jupyter Lab an extenstion can be installed to view the various dashboard widgets.
[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:

Client

Client-03e84462-0de1-11ed-a1e8-000d3a8f7959

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

See documentation for addtional cluster configuration

Create 2 DataFrames for comparison:

  1. for Dask

  2. for Pandas Dask comes with builtin dataset samples, we will use this sample for our example.

[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks
  • Remember Dask framework is lazy thus in order to see the result we need to run compute() (or head() which runs under the hood compute()) )

[4]:
ddf.head(2)
[4]:
id name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

In order to create a Pandas dataframe we can use the compute() method from a Dask dataframe

[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

We can also see dask laziness when using the shape attribute

[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-cfb1b2b5-09dd-494e-b2d2-f875ace2562d'), 4)

We cannot get the full shape before accessing all the partitions - running len will do so

[7]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive
Dask computed shape: 2,592,000

Creating a Dask dataframe from Pandas

In order to utilize Dask capablities on an existing Pandas dataframe (pdf) we need to convert the Pandas dataframe into a Dask dataframe (ddf) with the from_pandas method. You must supply the number of partitions or chunksize that will be used to generate the dask dataframe

[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
Dask DataFrame Structure:
id name x y
npartitions=10
2000-01-01 00:00:00 int64 object float64 float64
2000-01-04 00:00:00 ... ... ... ...
... ... ... ... ...
2000-01-28 00:00:00 ... ... ... ...
2000-01-30 23:59:59 ... ... ... ...
Dask Name: from_pandas, 10 tasks

Partitions in Dask Dataframes

Notice that when we created a Dask dataframe we needed to supply an argument of npartitions.
The number of partitions will assist Dask on how to breakup the Pandas Datafram and parallelize the computation.
Each partition is a separate dataframe. For additional information see partition documentation

An example for this can be seen when examing the reset_ index() method:

[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp    2000-01-01 00:00:00
id                           983
name                       Wendy
x                      -0.303374
y                      -0.423744
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp id name x y
0 2000-01-01 983 Wendy -0.303374 -0.423744
0 2000-01-04 1002 Kevin -0.825578 -0.584699
0 2000-01-07 963 Oliver 0.024036 -0.692546
0 2000-01-10 1023 Yvonne 0.897486 0.958034
0 2000-01-13 1088 Quinn -0.721954 0.261693
0 2000-01-16 994 George 0.463023 -0.166976
0 2000-01-19 932 Frank 0.272315 -0.585240
0 2000-01-22 1007 Ursula -0.919138 -0.173157
0 2000-01-25 983 Patricia -0.893431 -0.892484
0 2000-01-28 1043 Oliver -0.979336 -0.581927

Dask Dataframe vs Pandas Dataframe

Now that we have a dask (ddf) and a pandas (pdf) dataframe we can start to compair the interactions with them.

Conceptual shift - from Update to Insert/Delete

Dask does not update - thus there are no arguments such as inplace=True which exist in Pandas.
For more detials see issue#653 on github

Rename Columns

  • using inplace=True is not considerd to be best practice.

[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
# Dask - Error # ddf.rename(columns={'id':'ID'}, inplace=True) # ddf.columns ''' python --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # Dask - Error ----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) 3 ddf.columns TypeError: rename() got an unexpected keyword argument 'inplace' '''
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')

Data manipulations

There are several diffrences when manipulating data.

loc - Pandas

[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID name x y
timestamp
2000-01-01 00:00:06 1019 Xavier 0.634802 45.051214
2000-01-01 00:00:08 1013 Charlie 0.627523 -8.101142

Error

cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

Dask - use mask/where

[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID name x y
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930

For more information see dask mask documentation

Meta argument

One key feature in Dask is the introduction of meta arguement.
> meta is the prescription of the names/types of the output from the computation
Since Dask creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.
For additinal information see meta documentation
[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3946: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('name', 'object'))

  warnings.warn(meta_warning(meta))
[17]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID name x y initials
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je
[20]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000
    else:
        return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID name x y initials z
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646

Map partitions

  • We can supply an ad-hoc function to run on each partition using the map_partitions method. Mainly useful for functions that are not implemented in Dask or Pandas .

  • Finally we can return a new dataframe which needs to be described in the meta argument The function could also include arguments.

[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID name x y dist
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 NaN
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 1.063636
2000-01-01 00:00:02 996 Kevin 0.336184 0.150478 0.539449
2000-01-01 00:00:03 1035 Quinn 0.853655 0.031222 0.531035
2000-01-01 00:00:04 1039 Ingrid 0.890711 -0.992794 1.024686

Convert index into Time column

[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID name x y initials times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 00:00:01
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID name x y initials z times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 We 0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 Je 21.914646 00:00:01

Drop NA on column

[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID name x y times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01

In odrer for Dask to drop a column with all na it must check all the partitions with compute()

[28]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID name x y times
timestamp
2000-01-01 00:00:00 983 Wendy -0.303374 -0.423744 00:00:00
2000-01-01 00:00:01 964 Jerry 0.021915 0.588930 00:00:01

1.4 Reset Index

[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01

Read / Save files

  • When working with pandas and dask preferable use parquet format.

  • When working with Dask - files can be read with multiple workers .

  • Most kwargs are applicable for reading and writing files e.g.  ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False).

  • However some are not available such as nrows.

see documentaion (including the option for output file naming).

Save files

[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 16.4 s, sys: 433 ms, total: 16.8 s
Wall time: 16.6 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-25.csv'),
 PosixPath('data/2000-01-20.csv'),
 PosixPath('data/2000-01-29.csv'),
 PosixPath('data/2000-01-02.csv'),
 PosixPath('data/2000-01-19.csv'),
 PosixPath('data/2000-01-23.csv'),
 PosixPath('data/2000-01-10.csv'),
 PosixPath('data/2000-01-21.csv'),
 PosixPath('data/2000-01-17.csv'),
 PosixPath('data/2000-01-04.csv'),
 PosixPath('data/2000-01-27.csv'),
 PosixPath('data/2000-01-22.csv'),
 PosixPath('data/2000-01-14.csv'),
 PosixPath('data/2000-01-11.csv'),
 PosixPath('data/pdf_single_file.csv'),
 PosixPath('data/2000-01-13.csv'),
 PosixPath('data/2000-01-08.csv'),
 PosixPath('data/2000-01-09.csv'),
 PosixPath('data/2000-01-06.csv'),
 PosixPath('data/2000-01-01.csv'),
 PosixPath('data/2000-01-07.csv'),
 PosixPath('data/2000-01-12.csv'),
 PosixPath('data/2000-01-16.csv'),
 PosixPath('data/2000-01-26.csv'),
 PosixPath('data/2000-01-24.csv'),
 PosixPath('data/2000-01-18.csv'),
 PosixPath('data/2000-01-15.csv'),
 PosixPath('data/2000-01-03.csv'),
 PosixPath('data/2000-01-30.csv'),
 PosixPath('data/2000-01-28.csv'),
 PosixPath('data/2000-01-05.csv')]

Notice the '*' to allow for multiple file renaming.

[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 454 ms, sys: 46.5 ms, total: 500 ms
Wall time: 10.4 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']

To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions

To change the number of output files use repartition which is an expensive operation.

[36]:
ddf.npartitions
[36]:
30

Read Multiple files

For pandas it is possible to iterate and concat the files see answer from stack overflow.

[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.75 s, sys: 318 ms, total: 3.07 s
Wall time: 3 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 11.9 ms, sys: 0 ns, total: 11.9 ms
Wall time: 12.4 ms
[38]:
Dask DataFrame Structure:
ID name x y times
npartitions=30
int64 object float64 float64 object
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: read-csv, 30 tasks

Remember that Dask is lazy - thus it does not realy read the file until it needs to…

[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 69.6 ms, sys: 11.3 ms, total: 81 ms
Wall time: 818 ms
[39]:
2592000
## Consider using client.persist() Since Dask is lazy - it may run the entire graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use persist to keep the results in memory
Additional information can be read in this stackoverflow issue or see an example in this post
This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID name x y times
0 983 Wendy -0.303374 -0.423744 00:00:00
1 964 Jerry 0.021915 0.588930 00:00:01

Group By - custom aggregations

In addition to the groupby notebook example that is in the repository -
This is another example how to try to eliminate the use of groupby.apply.
In this example we are grouping columns into unique lists.
[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name ID seconds
0 Wendy 983 00
1 Jerry 964 00
2 Kevin 996 00
3 Quinn 1035 00
4 Ingrid 1039 00
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list())))
               for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
       Weight                                                 ID  \
name
Alice   99833  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob     99508  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...

                                                 seconds
name
Alice  [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
Bob    [32, 05, 12, 42, 69, 34, 23, 24, 60, 72, 98, 6...
CPU times: user 23.1 ms, sys: 169 µs, total: 23.3 ms
Wall time: 22.1 ms
  • Remeber that in any some cases Pandas is more efficiante (assuming that you can load all the data into the RAM).

[44]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name ID seconds
0 Wendy 983 00
1 Jerry 964 01
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
               for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <timed exec>:4, in <module>

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1213, in _Frame.head(self, n, npartitions, compute)
   1211 # No need to warn if we're already looking at all partitions
   1212 safe = npartitions != self.npartitions
-> 1213 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1247, in _Frame._head(self, n, npartitions, compute, safe)
   1242 result = new_dd_object(
   1243     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1244 )
   1246 if compute:
-> 1247     result = result.compute()
   1248 return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3004, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3002         should_rejoin = False
   3003 try:
-> 3004     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3005 finally:
   3006     for f in futures.values():

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2178, in Client.gather(self, futures, errors, direct, asynchronous)
   2176 else:
   2177     local_worker = None
-> 2178 return self.sync(
   2179     self._gather,
   2180     futures,
   2181     errors=errors,
   2182     direct=direct,
   2183     local_worker=local_worker,
   2184     asynchronous=asynchronous,
   2185 )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:318, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    316     return future
    317 else:
--> 318     return sync(
    319         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    320     )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:385, in sync(loop, func, callback_timeout, *args, **kwargs)
    383 if error:
    384     typ, exc, tb = error
--> 385     raise exc.with_traceback(tb)
    386 else:
    387     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:358, in sync.<locals>.f()
    356         future = asyncio.wait_for(future, callback_timeout)
    357     future = asyncio.ensure_future(future)
--> 358     result = yield future
    359 except Exception:
    360     error = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2041, in Client._gather(self, futures, errors, direct, local_worker)
   2039         exc = CancelledError(key)
   2040     else:
-> 2041         raise exception.with_traceback(traceback)
   2042     raise exc
   2043 if errors == "skip":

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:40, in apply()
     38 def apply(func, args, kwargs=None):
     39     if kwargs:
---> 40         return func(*args, **kwargs)
     41     else:
     42         return func(*args)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6436, in apply_and_enforce()
   6434     return meta
   6435 if is_dataframe_like(df):
-> 6436     check_matching_columns(meta, df)
   6437     c = meta.columns
   6438 else:

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
    413 else:
    414     extra_info = "Order of columns does not match"
--> 415 raise ValueError(
    416     "The columns in the computed data do not match"
    417     " the columns in the provided metadata\n"
    418     f"{extra_info}"
    419 )

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['name']
  Missing: [0]
We can do better…
Using dask custom aggregation is consideribly better
[48]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg',
    lambda s: s.apply(set),
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 185 ms, sys: 11.3 ms, total: 196 ms
Wall time: 1.33 s
[49]:
Weight ID seconds
name
Alice 99833 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [21, 28, 03, 20, 52, 02, 43, 38, 32, 49, 09, 0...
Bob 99508 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [28, 21, 03, 20, 02, 52, 43, 32, 38, 49, 09, 0...

Debugging

Debugging may be challenging… 1. Run code without client 2. Use Dashboard profiler 3. Verify integrity of DAG

Corrupted DAG

In this example we show that once the DAG is currupted you may need to reset the calculation

[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id name x y
timestamp
2000-01-01 996 Ingrid -0.932092 0.477965
[51]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(

Is everything OK?

# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'
  • Even if the function is corrected the DAG is corrupted

[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
# Still Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

We need to reset the dataframe

[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6335: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
[53]:
id name x y col
timestamp
2000-01-01 00:00:00 979 Frank 0.820777 0.098616 NaN
2000-01-01 00:00:01 990 Laura -0.851323 -0.501678 1.77659