Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Gotcha’s from Pandas to Dask

This notebook highlights some key differences when transfering code from Pandas to run in a Dask environment.
Most issues have a link to the Dask documentation for additional information.
[1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')
Dask versoin: 2022.04.0
Pandas versoin: 1.4.2

Start Dask Client for Dashboard

Starting the Dask Client is optional. In this example we are running on a LocalCluster, this will also provide a dashboard which is useful to gain insight on the computation.
For additional information on Dask Client see documentation
The link to the dashboard will become visible when you create a client (as shown below).
When running within Jupyter Lab an extenstion can be installed to view the various dashboard widgets.
[2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
[2]:

Client

Client-f3a78d75-d51f-11ec-a19b-000d3aeabb7a

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

See documentation for addtional cluster configuration

Create 2 DataFrames for comparison:

  1. for Dask

  2. for Pandas Dask comes with builtin dataset samples, we will use this sample for our example.

[3]:
ddf = dask.datasets.timeseries()
ddf
[3]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks
  • Remember Dask framework is lazy thus in order to see the result we need to run compute() (or head() which runs under the hood compute()) )

[4]:
ddf.head(2)
[4]:
id name x y
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270

In order to create a Pandas dataframe we can use the compute() method from a Dask dataframe

[5]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
<class 'pandas.core.frame.DataFrame'>
[5]:
id name x y
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270

We can also see dask laziness when using the shape attribute

[6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-457670a7-0118-48ff-b5cd-08db2a126430'), 4)

We cannot get the full shape before accessing all the partitions - running len will do so

[7]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive
Dask computed shape: 2,592,000

Creating a Dask dataframe from Pandas

In order to utilize Dask capablities on an existing Pandas dataframe (pdf) we need to convert the Pandas dataframe into a Dask dataframe (ddf) with the from_pandas method. You must supply the number of partitions or chunksize that will be used to generate the dask dataframe

[8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
[8]:
Dask DataFrame Structure:
id name x y
npartitions=10
2000-01-01 00:00:00 int64 object float64 float64
2000-01-04 00:00:00 ... ... ... ...
... ... ... ... ...
2000-01-28 00:00:00 ... ... ... ...
2000-01-30 23:59:59 ... ... ... ...
Dask Name: from_pandas, 10 tasks

Partitions in Dask Dataframes

Notice that when we created a Dask dataframe we needed to supply an argument of npartitions.
The number of partitions will assist Dask on how to breakup the Pandas Datafram and parallelize the computation.
Each partition is a separate dataframe. For additional information see partition documentation

An example for this can be seen when examing the reset_ index() method:

[9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[9]:
timestamp    2000-01-01 00:00:00
id                           995
name                      Yvonne
x                      -0.257197
y                       0.288303
Name: 0, dtype: object
[10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
[10]:
timestamp id name x y
0 2000-01-01 995 Yvonne -0.257197 0.288303
0 2000-01-04 1040 Kevin 0.071874 0.998809
0 2000-01-07 950 Dan -0.860903 -0.366538
0 2000-01-10 1012 Norbert -0.688807 -0.845508
0 2000-01-13 970 Kevin 0.964429 -0.470225
0 2000-01-16 1017 Ray -0.858035 -0.123796
0 2000-01-19 973 Norbert -0.595604 0.726321
0 2000-01-22 1024 Bob 0.272160 0.549340
0 2000-01-25 1026 Bob -0.942005 -0.867345
0 2000-01-28 996 Norbert 0.885387 -0.840972

Dask Dataframe vs Pandas Dataframe

Now that we have a dask (ddf) and a pandas (pdf) dataframe we can start to compair the interactions with them.

Conceptual shift - from Update to Insert/Delete

Dask does not update - thus there are no arguments such as inplace=True which exist in Pandas.
For more detials see issue#653 on github

Rename Columns

  • using inplace=True is not considerd to be best practice.

[11]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[11]:
Index(['ID', 'name', 'x', 'y'], dtype='object')
# Dask - Error # ddf.rename(columns={'id':'ID'}, inplace=True) # ddf.columns ''' python --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # Dask - Error ----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) 3 ddf.columns TypeError: rename() got an unexpected keyword argument 'inplace' '''
[12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
Index(['id', 'name', 'x', 'y'], dtype='object')
[12]:
Index(['ID', 'name', 'x', 'y'], dtype='object')

Data manipulations

There are several diffrences when manipulating data.

loc - Pandas

[13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
[13]:
ID name x y
timestamp
2000-01-01 00:00:17 1000 Norbert 0.727163 89.682962
2000-01-01 00:00:20 1060 Ursula 0.700307 -75.613821

Error

cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

Dask - use mask/where

[14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[14]:
ID name x y
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270
[15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
[15]:
ID name x y
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270

For more information see dask mask documentation

Meta argument

One key feature in Dask is the introduction of meta arguement.
> meta is the prescription of the names/types of the output from the computation
Since Dask creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.
For additinal information see meta documentation
[16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[16]:
ID name x y initials
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La
[17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:3930: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('name', 'object'))

  warnings.warn(meta_warning(meta))
[17]:
ID name x y initials
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La
[18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[19]:
ID name x y initials
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La
[20]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000
    else:
        return row['y'] * -1
[21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
[21]:
ID name x y initials z
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv -0.288303
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La -0.630270

Map partitions

  • We can supply an ad-hoc function to run on each partition using the map_partitions method. Mainly useful for functions that are not implemented in Dask or Pandas .

  • Finally we can return a new dataframe which needs to be described in the meta argument The function could also include arguments.

[22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()
[22]:
ID name x y dist
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 NaN
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 0.636862
2000-01-01 00:00:02 1012 Ursula -0.013075 0.732633 0.788061
2000-01-01 00:00:03 960 Yvonne -0.361644 -0.202956 0.998412
2000-01-01 00:00:04 988 Kevin 0.918343 -0.933568 1.473825

Convert index into Time column

[23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[23]:
ID name x y initials times
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv 00:00:00
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La 00:00:01
[24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
[24]:
ID name x y initials z times
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 Yv -0.288303 00:00:00
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 La -0.630270 00:00:01

Drop NA on column

[25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
[27]:
ID name x y times
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 00:00:00
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 00:00:01

In odrer for Dask to drop a column with all na it must check all the partitions with compute()

[28]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
[28]:
ID name x y times
timestamp
2000-01-01 00:00:00 995 Yvonne -0.257197 0.288303 00:00:00
2000-01-01 00:00:01 1007 Laura -0.794460 0.630270 00:00:01

1.4 Reset Index

[29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[29]:
ID name x y times
0 995 Yvonne -0.257197 0.288303 00:00:00
1 1007 Laura -0.794460 0.630270 00:00:01
[30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
[30]:
ID name x y times
0 995 Yvonne -0.257197 0.288303 00:00:00
1 1007 Laura -0.794460 0.630270 00:00:01

Read / Save files

  • When working with pandas and dask preferable use parquet format.

  • When working with Dask - files can be read with multiple workers .

  • Most kwargs are applicable for reading and writing files e.g.  ddf = dd.read_csv(’data/pd2dd/ddf*.csv’, compression=‘gzip’, header=False).

  • However some are not available such as nrows.

see documentaion (including the option for output file naming).

Save files

[31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
CPU times: user 15.1 s, sys: 343 ms, total: 15.5 s
Wall time: 15.3 s
[33]:
list(output_dir_file.parent.glob('*.csv'))
[33]:
[PosixPath('data/2000-01-26.csv'),
 PosixPath('data/2000-01-09.csv'),
 PosixPath('data/2000-01-01.csv'),
 PosixPath('data/2000-01-11.csv'),
 PosixPath('data/2000-01-02.csv'),
 PosixPath('data/2000-01-22.csv'),
 PosixPath('data/2000-01-08.csv'),
 PosixPath('data/2000-01-07.csv'),
 PosixPath('data/2000-01-03.csv'),
 PosixPath('data/2000-01-30.csv'),
 PosixPath('data/2000-01-29.csv'),
 PosixPath('data/2000-01-12.csv'),
 PosixPath('data/2000-01-19.csv'),
 PosixPath('data/2000-01-20.csv'),
 PosixPath('data/2000-01-23.csv'),
 PosixPath('data/2000-01-04.csv'),
 PosixPath('data/2000-01-13.csv'),
 PosixPath('data/2000-01-06.csv'),
 PosixPath('data/2000-01-21.csv'),
 PosixPath('data/2000-01-10.csv'),
 PosixPath('data/2000-01-17.csv'),
 PosixPath('data/pdf_single_file.csv'),
 PosixPath('data/2000-01-14.csv'),
 PosixPath('data/2000-01-05.csv'),
 PosixPath('data/2000-01-16.csv'),
 PosixPath('data/2000-01-28.csv'),
 PosixPath('data/2000-01-25.csv'),
 PosixPath('data/2000-01-27.csv'),
 PosixPath('data/2000-01-18.csv'),
 PosixPath('data/2000-01-15.csv'),
 PosixPath('data/2000-01-24.csv')]

Notice the '*' to allow for multiple file renaming.

[34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
CPU times: user 380 ms, sys: 31.3 ms, total: 412 ms
Wall time: 9.27 s
[35]:
['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf10.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf11.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf12.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf13.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf14.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf15.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf16.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf17.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf18.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf19.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf20.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf21.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf22.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf23.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf24.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf25.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf26.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf27.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf28.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf29.csv']

To find the number of partitions which will determine the number of output files use dask.dataframe.npartitions

To change the number of output files use repartition which is an expensive operation.

[36]:
ddf.npartitions
[36]:
30

Read Multiple files

For pandas it is possible to iterate and concat the files see answer from stack overflow.

[37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)
CPU times: user 2.6 s, sys: 350 ms, total: 2.95 s
Wall time: 2.88 s
[37]:
2592000
[38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
CPU times: user 9.84 ms, sys: 0 ns, total: 9.84 ms
Wall time: 9.3 ms
[38]:
Dask DataFrame Structure:
ID name x y times
npartitions=30
int64 object float64 float64 object
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: read-csv, 30 tasks

Remember that Dask is lazy - thus it does not realy read the file until it needs to…

[39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
CPU times: user 70.5 ms, sys: 6.65 ms, total: 77.2 ms
Wall time: 769 ms
[39]:
2592000
## Consider using client.persist() Since Dask is lazy - it may run the entire graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use persist to keep the results in memory
Additional information can be read in this stackoverflow issue or see an example in this post
This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.
[40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
[40]:
ID name x y times
0 995 Yvonne -0.257197 0.288303 00:00:00
1 1007 Laura -0.794460 0.630270 00:00:01

Group By - custom aggregations

In addition to the groupby notebook example that is in the repository -
This is another example how to try to eliminate the use of groupby.apply.
In this example we are grouping columns into unique lists.
[41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[41]:
name ID seconds
0 Yvonne 995 00
1 Laura 1007 00
2 Ursula 1012 00
3 Yvonne 960 00
4 Kevin 988 00
[42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list())))
               for att_col_gr in gp_col]
[43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
       Weight                                                 ID  \
name
Alice   99633  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...
Bob     99782  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...

                                                 seconds
name
Alice  [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...
Bob    [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...
CPU times: user 22.4 ms, sys: 0 ns, total: 22.4 ms
Wall time: 20.1 ms
  • Remeber that in any some cases Pandas is more efficiante (assuming that you can load all the data into the RAM).

[44]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[44]:
name ID seconds
0 Yvonne 995 00
1 Laura 1007 01
[45]:
ddf.columns
[45]:
Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')
[46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
               for att_col_gr in gp_col]
[47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <timed exec>:4, in <module>

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1208, in _Frame.head(self, n, npartitions, compute)
   1206 # No need to warn if we're already looking at all partitions
   1207 safe = npartitions != self.npartitions
-> 1208 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:1242, in _Frame._head(self, n, npartitions, compute, safe)
   1237 result = new_dd_object(
   1238     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1239 )
   1241 if compute:
-> 1242     result = result.compute()
   1243 return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:292, in DaskMethodsMixin.compute(self, **kwargs)
    268 def compute(self, **kwargs):
    269     """Compute this dask collection
    270
    271     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    290     dask.base.compute
    291     """
--> 292     (result,) = compute(self, traverse=False, **kwargs)
    293     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:3018, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3016         should_rejoin = False
   3017 try:
-> 3018     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3019 finally:
   3020     for f in futures.values():

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2171, in Client.gather(self, futures, errors, direct, asynchronous)
   2169 else:
   2170     local_worker = None
-> 2171 return self.sync(
   2172     self._gather,
   2173     futures,
   2174     errors=errors,
   2175     direct=direct,
   2176     local_worker=local_worker,
   2177     asynchronous=asynchronous,
   2178 )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:309, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    307     return future
    308 else:
--> 309     return sync(
    310         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    311     )

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:376, in sync(loop, func, callback_timeout, *args, **kwargs)
    374 if error:
    375     typ, exc, tb = error
--> 376     raise exc.with_traceback(tb)
    377 else:
    378     return result

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/utils.py:349, in sync.<locals>.f()
    347         future = asyncio.wait_for(future, callback_timeout)
    348     future = asyncio.ensure_future(future)
--> 349     result = yield future
    350 except Exception:
    351     error = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/distributed/client.py:2034, in Client._gather(self, futures, errors, direct, local_worker)
   2032         exc = CancelledError(key)
   2033     else:
-> 2034         raise exception.with_traceback(traceback)
   2035     raise exc
   2036 if errors == "skip":

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in _execute_task()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:113, in <listcomp>()
     83 """Do the actual work of collecting data and executing a function
     84
     85 Examples
   (...)
    110 'foo'
    111 """
    112 if isinstance(arg, list):
--> 113     return [_execute_task(a, cache) for a in arg]
    114 elif istask(arg):
    115     func, args = arg[0], arg[1:]

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/utils.py:39, in apply()
     37 def apply(func, args, kwargs=None):
     38     if kwargs:
---> 39         return func(*args, **kwargs)
     40     else:
     41         return func(*args)

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6355, in apply_and_enforce()
   6353     return meta
   6354 if is_dataframe_like(df):
-> 6355     check_matching_columns(meta, df)
   6356     c = meta.columns
   6357 else:

File /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/utils.py:415, in check_matching_columns()
    413 else:
    414     extra_info = "Order of columns does not match"
--> 415 raise ValueError(
    416     "The columns in the computed data do not match"
    417     " the columns in the provided metadata\n"
    418     f"{extra_info}"
    419 )

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['name']
  Missing: [0]
We can do better…
Using dask custom aggregation is consideribly better
[48]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg',
    lambda s: s.apply(set),
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
CPU times: user 171 ms, sys: 4.06 ms, total: 175 ms
Wall time: 1.21 s
[49]:
Weight ID seconds
name
Alice 99633 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [23, 55, 51, 21, 28, 58, 35, 06, 53, 11, 39, 3...
Bob 99782 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... [23, 55, 51, 21, 28, 58, 06, 35, 53, 39, 11, 3...

Debugging

Debugging may be challenging… 1. Run code without client 2. Use Dashboard profiler 3. Verify integrity of DAG

Corrupted DAG

In this example we show that once the DAG is currupted you may need to reset the calculation

[50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[50]:
id name x y
timestamp
2000-01-01 1011 Xavier -0.409016 0.331355
[51]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(

Is everything OK?

# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'
  • Even if the function is corrected the DAG is corrupted

[52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
# Still Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

We need to reset the dataframe

[53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/core.py:6254: FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.
  warnings.warn(
[53]:
id name x y col
timestamp
2000-01-01 00:00:00 1000 Alice -0.805901 -0.690125 NaN
2000-01-01 00:00:01 974 Quinn 0.341908 0.012709 1.345898