Async/Await and Non-Blocking Execution
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Async/Await and Non-Blocking Execution¶
Dask integrates natively with concurrent applications using the Tornado or Asyncio frameworks, and can make use of Python’s async
and await
keywords.
This example shows a small example how to start up a Dask Client in asynchronous mode.
The asynchronous=True
parameter¶
Dask LocalCluster and Client objects can operate in async-await mode if you pass the asynchronous=True
parameter.
[1]:
from dask.distributed import Client
client = await Client(asynchronous=True)
[2]:
def inc(x: int) -> int:
return x + 1
future = client.submit(inc, 10)
future
[2]:
[3]:
await future
[3]:
11
Collections¶
Note that blocking operations like the .compute()
method aren’t ok to use in asynchronous mode. Instead you’ll have to use the Client.compute
method.
[4]:
import dask
df = dask.datasets.timeseries()
df
[4]:
id | name | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | int64 | object | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
[5]:
df = df.persist() # persist is non-blocking, so it's ok
[6]:
total = df[['x', 'y']].sum() # lazy computations are also ok
[7]:
# total.compute() # but compute is bad, because compute blocks until done
[8]:
future = client.compute(total)
future
[8]:
[9]:
await future
[9]:
x 239.172407
y 762.941155
dtype: float64
Within a script¶
Running async/await code in Jupyter is a bit atypical. Jupyter already has an event loop running, so it’s easy to use async/await syntax directly within it. In a normal Python script this won’t be the case. Here is an example script that should run within a normal Python interpreter or as a script.
import asyncio
from dask.distributed import Client
def inc(x: int) -> int:
return x + 1
async def f():
async with Client(asynchronous=True) as client:
future = client.submit(inc, 10)
result = await future
print(result)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())