Async/Await and Non-Blocking Execution

Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Async/Await and Non-Blocking Execution

Dask integrates natively with concurrent applications using the Tornado or Asyncio frameworks, and can make use of Python’s async and await keywords.

This example shows a small example how to start up a Dask Client in asynchronous mode.

The asynchronous=True parameter

Dask LocalCluster and Client objects can operate in async-await mode if you pass the asynchronous=True parameter.

[1]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[2]:
def inc(x: int) -> int:
    return x + 1

future = client.submit(inc, 10)
future
[2]:
Future: inc status: pending, type: NoneType, key: inc-cf183ed345225f36fe30323c57a93811
[3]:
await future
[3]:
11

Collections

Note that blocking operations like the .compute() method aren’t ok to use in asynchronous mode. Instead you’ll have to use the Client.compute method.

[4]:
import dask
df = dask.datasets.timeseries()
df
[4]:
Dask DataFrame Structure:
id name x y
npartitions=30
2000-01-01 int64 object float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 30 tasks
[5]:
df = df.persist()             # persist is non-blocking, so it's ok
[6]:
total = df[['x', 'y']].sum()  # lazy computations are also ok
[7]:
# total.compute()             # but compute is bad, because compute blocks until done
[8]:
future = client.compute(total)
future
[8]:
Future: finalize status: pending, type: NoneType, key: finalize-c005c96bb73e558555c311bfabfc6ba2
[9]:
await future
[9]:
x    239.172407
y    762.941155
dtype: float64

Within a script

Running async/await code in Jupyter is a bit atypical. Jupyter already has an event loop running, so it’s easy to use async/await syntax directly within it. In a normal Python script this won’t be the case. Here is an example script that should run within a normal Python interpreter or as a script.

import asyncio
from dask.distributed import Client


def inc(x: int) -> int:
    return x + 1


async def f():
    async with Client(asynchronous=True) as client:
        future = client.submit(inc, 10)
        result = await future
        print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(f())