Embarrassingly parallel Workloads¶
This notebook shows how to use Dask to parallelize embarrassingly parallel workloads where you want to apply one function to many pieces of data independently. It will show three different ways of doing this with Dask:
This example focuses on using Dask for building large embarrassingly parallel computation as often seen in scientific communities and on High Performance Computing facilities, for example with Monte Carlo methods. This kind of simulation assume the following:
We have a function that runs a heavy computation given some parameters.
We need to compute this function on many different input parameters, each function call being independent.
We want to gather all the results in one place for further analysis.
Start Dask Client for Dashboard¶
Starting the Dask Client will provide a dashboard which is useful to gain insight on the computation. We will also need it for the Futures API part of this example. Moreover, as this kind of computation is often launched on super computer or in the Cloud, you will probably end up having to start a cluster and connect a client to scale. See dask-jobqueue, dask-kubernetes or dask-yarn for easy ways to achieve this on respectively an HPC, Cloud or Big Data infrastructure.
The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same time is very useful when learning.
from dask.distributed import Client, progress client = Client(threads_per_worker=4, n_workers=1) client
/usr/share/miniconda3/envs/dask-examples/lib/python3.8/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 46611 instead warnings.warn(
Define your computation calling function¶
This function does a simple operation: add all numbers of a list/array together, but it also sleeps for a random amount of time to simulate real work. In real use cases, this could call another python module, or even run an executable using subprocess module.
import time import random def costly_simulation(list_param): time.sleep(random.random()) return sum(list_param)
We try it locally below
%time costly_simulation([1, 2, 3, 4])
CPU times: user 13.3 ms, sys: 0 ns, total: 13.3 ms Wall time: 674 ms
Define the set of input parameters to call the function¶
We will generate a set of inputs on which we want to run our simulation function. Here we use Pandas dataframe, but we could also use a simple list. Lets say that our simulation is run with four parameters called param_[a-d].
import pandas as pd import numpy as np input_params = pd.DataFrame(np.random.random(size=(500, 4)), columns=['param_a', 'param_b', 'param_c', 'param_d']) input_params.head()
Without using Dask, we could call our simulation on all of these parameters using normal Python for loops.
Let’s only do this on a sample of our parameters as it would be quite long otherwise.
results = 
%%time for parameters in input_params.values[:10]: result = costly_simulation(parameters) results.append(result)
CPU times: user 106 ms, sys: 12.7 ms, total: 118 ms Wall time: 5.28 s
[1.5819703016190254, 1.8895814885666007, 0.7465606931938433, 2.094161800385659, 1.4885176062223044, 1.3538389971930949, 2.2433180424900443, 1.323109777429773, 0.6577320146757253, 0.8289558037253644]
Note that this is not very clever as we can easily parallelize code.
There are many ways to parallelize this function in Python with libraries like
joblib or others. These are good first steps. Dask is a good second step, especially when you want to scale across many machines.
Use Dask Delayed to make our function lazy¶
We can call
dask.delayed on our funtion to make it lazy. Rather than compute its result immediately, it records what we want to compute as a task into a graph that we’ll run later on parallel hardware. Using
dask.delayed is a relatively straightforward way to parallelize an existing code base, even if the computation isn’t embarrassingly parallel like this one.
Calling these lazy functions is now almost free. In the cell below we only construct a simple graph.
import dask lazy_results = 
%%time for parameters in input_params.values[:10]: lazy_result = dask.delayed(costly_simulation)(parameters) lazy_results.append(lazy_result)
CPU times: user 1.57 ms, sys: 0 ns, total: 1.57 ms Wall time: 1.37 ms
Run in parallel¶
lazy_results list contains information about ten calls to
costly_simulation that have not yet been run. Call
.compute() when you want your result as normal Python objects.
If you started
Client() above then you may want to watch the status page during computation.
CPU times: user 47.5 ms, sys: 2.4 ms, total: 49.9 ms Wall time: 1.5 s
(1.5819703016190254, 1.8895814885666007, 0.7465606931938433, 2.094161800385659, 1.4885176062223044, 1.3538389971930949, 2.2433180424900443, 1.323109777429773, 0.6577320146757253, 0.8289558037253644)
Notice that this was faster than running these same computations sequentially with a for loop.
We can now run this on all of our input parameters:
import dask lazy_results =  for parameters in input_params.values: lazy_result = dask.delayed(costly_simulation)(parameters) lazy_results.append(lazy_result) futures = dask.persist(*lazy_results) # trigger computation in the background
To make this go faster, we can add additional workers.
(although we’re still only working on our local machine, this is more practical when using an actual cluster)
client.cluster.scale(10) # ask for ten 4-thread workers
By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..
Then get the result:
results = dask.compute(*futures) results[:5]
(1.5819703016190254, 1.8895814885666007, 0.7465606931938433, 2.094161800385659, 1.4885176062223044)
Using the Futures API¶
The same example can be implemented using Dask’s Futures API by using the
client object itself. For our use case of applying a function across many inputs both Dask delayed and Dask Futures are equally useful. The Futures API is a little bit different because it starts work immediately rather than being completely lazy.
For example, notice that work starts immediately in the cell below as we submit work to the cluster:
futures =  for parameters in input_params.values: future = client.submit(costly_simulation, parameters) futures.append(future)
We can explicitly wait until this work is done and gather the results to our local process by calling
results = client.gather(futures) results[:5]
[1.5819703016190254, 1.8895814885666007, 0.7465606931938433, 2.094161800385659, 1.4885176062223044]
But the code above can be run in fewer lines with
client.map() function, allowing to call a given function on a list of parameters.
As for delayed, we can only start the computation and not wait for results by not calling
client.gather() right now.
It shall be noted that as Dask cluster has already performed tasks launching
costly_simulation with Futures API on the given input parameters, the call to
client.map() won’t actually trigger any computation, and just retrieve already computed results.
futures = client.map(costly_simulation, input_params.values)
Then just get the results later:
results = client.gather(futures) len(results)
We encourage you to watch the dashboard’s status page to watch on going computation.
Doing some analysis on the results¶
One of the interests of Dask here, outside from API simplicity, is that you are able to gather the result for all your simulations in one call. There is no need to implement a complex mechanism or to write individual results in a shared file system or object store.
Just get your result, and do some computation.
Here, we will just get the results and expand our initial dataframe to have a nice view of parameters vs results for our computation
output = input_params.copy() output['result'] = pd.Series(results, index=output.index) output.sample(5)
Then we can do some nice statistical plots or save result locally with pandas interface here
%matplotlib inline output['result'].plot()
filtered_output = output[output['result'] > 2] print(len(filtered_output)) filtered_output.to_csv('/tmp/simulation_result.csv')
The methods above work well for a size of input parameters up to about 100,000. Above that, the Dask scheduler has trouble handling the amount of tasks to schedule to workers. The solution to this problem is to bundle many parameters into a single task. You could do this either by making a new function that operated on a batch of parameters and using the delayed or futures APIs on that function. You could also use the Dask Bag API. This is described more in the documentation about avoiding too many tasks.
Dask Bags hold onto large sequences in a few partitions. We can convert our
input_params sequence into a
dask.bag collection, asking for fewer partitions (so at most 100,000, which is already huge), and apply our function on every item of the bag.
import dask.bag as db b = db.from_sequence(list(input_params.values), npartitions=100) b = b.map(costly_simulation)
%time results_bag = b.compute()
CPU times: user 779 ms, sys: 87.3 ms, total: 867 ms Wall time: 8.89 s
Looking on Dashboard here, you should see only 100 tasks to run instead of 500, each taking 5x more time in average, because each one is actually calling our function 5 times.
np.all(results) == np.all(results_bag)