You can run this notebook in a live session Binder or view it on Github.

Embarrassingly parallel Workloads

This notebook shows how to use Dask to parallelize embarrassingly parallel workloads where you want to apply one function to many pieces of data independently. It will show three different ways of doing this with Dask:

  1. dask.delayed
  2. concurrent.Futures
  3. dask.bag

This example focuses on using Dask for building large embarrassingly parallel computation as often seen in scientific communities and on High Performance Computing facilities, for example with Monte Carlo methods. This kind of simulation assume the following:

  • We have a function that runs a heavy computation given some parameters.
  • We need to compute this function on many different input parameters, each function call being independent.
  • We want to gather all the results in one place for further analysis.

Start Dask Client for Dashboard

Starting the Dask Client will provide a dashboard which is useful to gain insight on the computation. We will also need it for the Futures API part of this example. Moreover, as this kind of computation is often launched on super computer or in the Cloud, you will probably end up having to start a cluster and connect a client to scale. See dask-jobqueue, dask-kubernetes or dask-yarn for easy ways to achieve this on respectively an HPC, Cloud or Big Data infrastructure.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same time is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
Out[1]:

Client

Cluster

  • Workers: 1
  • Cores: 4
  • Memory: 7.84 GB

Define your computation calling function

This function does a simple operation: add all numbers of a list/array together, but it also sleeps for a random amount of time to simulate real work. In real use cases, this could call another python module, or even run an executable using subprocess module.

In [2]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

We try it locally below

In [3]:
%time costly_simulation([1, 2, 3, 4])
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 191 ms
Out[3]:
10

Define the set of input parameters to call the function

We will generate a set of inputs on which we want to run our simulation function. Here we use Pandas dataframe, but we could also use a simple list. Lets say that our simulation is run with four parameters called param_[a-d].

In [4]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()
Out[4]:
param_a param_b param_c param_d
0 0.831499 0.012514 0.883766 0.237989
1 0.792196 0.529474 0.449769 0.860705
2 0.136540 0.256082 0.879215 0.595899
3 0.939097 0.887667 0.001532 0.991466
4 0.440915 0.977699 0.504589 0.453680

Without using Dask, we could call our simulation on all of these parameters using normal Python for loops.

Let’s only do this on a sample of our parameters as it would be quite long otherwise.

In [5]:
%%time
results = []
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)
CPU times: user 184 ms, sys: 16 ms, total: 200 ms
Wall time: 4.31 s
In [6]:
results
Out[6]:
[1.9657675620121111,
 2.632143177141037,
 1.8677364842775241,
 2.819762139463247,
 2.376883771291495,
 0.949525352654632,
 2.683537988595758,
 1.5278240566637113,
 1.7156633836443302,
 2.9229733363112924]

Note that this is not very clever as we can easily parallelize code.

There are many ways to parallelize this function in Python with libraries like multiprocessing, concurrent.futures, joblib or others. These are good first steps. Dask is a good second step, especially when you want to scale across many machines.

Use Dask Delayed to make our function lazy

We can call dask.delayed on our funtion to make it lazy. Rather than compute its result immediately, it records what we want to compute as a task into a graph that we’ll run later on parallel hardware. Using dask.delayed is a relatively straightforward way to parallelize an existing code base, even if the computation isn’t embarrassingly parallel like this one.

Calling these lazy functions is now almost free. In the cell below we only construct a simple graph.

In [7]:
%%time
import dask
lazy_results = []

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 2.41 ms
In [8]:
lazy_results[0]
Out[8]:
Delayed('costly_simulation-3c08bbb4-e3f0-4d82-b8b7-5ce7ebec186e')

Run in parallel

The lazy_results list contains information about ten calls to costly_simulation that have not yet been run. Call .compute() when you want your result as normal Python objects.

If you started Client() above then you may want to watch the status page during computation.

In [9]:
%time dask.compute(*lazy_results)
CPU times: user 132 ms, sys: 4 ms, total: 136 ms
Wall time: 1.87 s
Out[9]:
(1.9657675620121111,
 2.632143177141037,
 1.8677364842775241,
 2.819762139463247,
 2.376883771291495,
 0.949525352654632,
 2.683537988595758,
 1.5278240566637113,
 1.7156633836443302,
 2.9229733363112924)

Notice that this was faster than running these same computations sequentially with a for loop.

We can now run this on all of our input parameters:

In [10]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results)  # trigger computation in the background

To make this go faster, we can add additional workers.

(although we’re still only working on our local machine, this is more practical when using an actual cluster)

In [11]:
for i in range(10):
    client.cluster.start_worker(ncores=4)

By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..

Then get the result:

In [12]:
results = dask.compute(*futures)
results[:5]
Out[12]:
(1.9657675620121111,
 2.632143177141037,
 1.8677364842775241,
 2.819762139463247,
 2.376883771291495)

Using the Futures API

The same example can be implemented using Dask’s Futures API by using the client object itself. For our use case of applying a function across many inputs both Dask delayed and Dask Futures are equally useful. The Futures API is a little bit different because it starts work immediately rather than being completely lazy.

For example, notice that work starts immediately in the cell below as we submit work to the cluster:

In [13]:
futures = []
for parameters in input_params.values:
    future = client.submit(costly_simulation, parameters)
    futures.append(future)

We can explicitly wait until this work is done and gather the results to our local process by calling client.gather:

In [14]:
results = client.gather(futures)
results[:5]
Out[14]:
[1.9657675620121111,
 2.632143177141037,
 1.8677364842775241,
 2.819762139463247,
 2.376883771291495]

But the code above can be run in fewer lines with client.map() function, allowing to call a given function on a list of parameters.

As for delayed, we can only start the computation and not wait for results by not calling client.gather() right now.

It shall be noted that as Dask cluster has already performed tasks launching costly_simulation with Futures API on the given input parameters, the call to client.map() won’t actually trigger any computation, and just retrieve already computed results.

In [15]:
futures = client.map(costly_simulation, input_params.values)

Then just get the results later:

In [16]:
results = client.gather(futures)
len(results)
Out[16]:
500
In [17]:
print(results[0])
1.9657675620121111

We encourage you to watch the dashboard’s status page to watch on going computation.

Doing some analysis on the results

One of the interests of Dask here, outside from API simplicity, is that you are able to gather the result for all your simulations in one call. There is no need to implement a complex mechanism or to write individual results in a shared file system or object store.

Just get your result, and do some computation.

Here, we will just get the results and expand our initial dataframe to have a nice view of parameters vs results for our computation

In [18]:
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)
Out[18]:
param_a param_b param_c param_d result
390 0.425560 0.329860 0.449316 0.530277 1.735013
338 0.876114 0.542568 0.850862 0.191994 2.461538
471 0.255616 0.556455 0.148708 0.904605 1.865384
484 0.300414 0.464684 0.200835 0.610515 1.576447
284 0.779851 0.143986 0.231381 0.775128 1.930345

Then we can do some nice statistical plots or save result locally with pandas interface here

In [19]:
%matplotlib inline
output['result'].plot()
Out[19]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f5d893140f0>
_images/embarrassingly-parallel_39_1.png
In [20]:
output['result'].mean()
Out[20]:
2.023481737783638
In [21]:
filtered_output = output[output['result'] > 2]
print(len(filtered_output))
filtered_output.to_csv('/tmp/simulation_result.csv')
257

Handling very large simulation with Bags

The methods above work well for a size of input parameters up to about 100,000. Above that, the Dask scheduler has trouble handling the amount of tasks to schedule to workers. The solution to this problem is to bundle many parameters into a single task. You could do this either by making a new function that operated on a batch of parameters and using the delayed or futures APIs on that function. You could also use the Dask Bag API. This is described more in the documentation about avoiding too many tasks.

Dask Bags hold onto large sequences in a few partitions. We can convert our input_params sequence into a dask.bag collection, asking for fewer partitions (so at most 100,000, which is already huge), and apply our function on every item of the bag.

In [22]:
import dask.bag as db
b = db.from_sequence(list(input_params.values), npartitions=100)
b = b.map(costly_simulation)
In [23]:
%time results_bag = b.compute()
CPU times: user 1.46 s, sys: 160 ms, total: 1.62 s
Wall time: 8.17 s

Looking on Dashboard here, you should see only 100 tasks to run instead of 500, each taking 5x more time in average, because each one is actually calling our function 5 times.

In [24]:
np.all(results) == np.all(results_bag)
Out[24]:
True