{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Async/Await and Non-Blocking Execution\n", "=======================================\n", "\n", "Dask integrates natively with concurrent applications using the [Tornado](https://www.tornadoweb.org/en/stable/) or [Asyncio](https://docs.python.org/3/library/asyncio.html) frameworks, and can make use of Python's `async` and `await` keywords.\n", "\n", "This example shows a small example how to start up a Dask Client in asynchronous mode." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `asynchronous=True` parameter\n", "---------------------------------\n", "\n", "Dask LocalCluster and Client objects can operate in async-await mode if you pass the `asynchronous=True` parameter." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:27.576512Z", "iopub.status.busy": "2022-07-27T19:14:27.576165Z", "iopub.status.idle": "2022-07-27T19:14:29.751351Z", "shell.execute_reply": "2022-07-27T19:14:29.750248Z" } }, "outputs": [], "source": [ "from dask.distributed import Client\n", "client = await Client(asynchronous=True)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:29.755547Z", "iopub.status.busy": "2022-07-27T19:14:29.755050Z", "iopub.status.idle": "2022-07-27T19:14:29.921776Z", "shell.execute_reply": "2022-07-27T19:14:29.920652Z" } }, "outputs": [ { "data": { "text/html": [ "Future: inc\n", " status: \n", "\n", "\n", "pending,\n", "\n", "\n", "\n", " type: NoneType,\n", "\n", "\n", " key: inc-cf183ed345225f36fe30323c57a93811" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def inc(x: int) -> int:\n", " return x + 1\n", "\n", "future = client.submit(inc, 10)\n", "future" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:29.926427Z", "iopub.status.busy": "2022-07-27T19:14:29.925594Z", "iopub.status.idle": "2022-07-27T19:14:29.939065Z", "shell.execute_reply": "2022-07-27T19:14:29.938178Z" } }, "outputs": [ { "data": { "text/plain": [ "11" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "await future" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Collections\n", "-----------\n", "\n", "Note that blocking operations like the `.compute()` method aren't ok to use in asynchronous mode. Instead you'll have to use the `Client.compute` method." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:29.942194Z", "iopub.status.busy": "2022-07-27T19:14:29.941780Z", "iopub.status.idle": "2022-07-27T19:14:30.298646Z", "shell.execute_reply": "2022-07-27T19:14:30.297952Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "df = dask.datasets.timeseries()\n", "df" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:30.302926Z", "iopub.status.busy": "2022-07-27T19:14:30.302514Z", "iopub.status.idle": "2022-07-27T19:14:30.329883Z", "shell.execute_reply": "2022-07-27T19:14:30.329331Z" } }, "outputs": [], "source": [ "df = df.persist() # persist is non-blocking, so it's ok" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:30.333306Z", "iopub.status.busy": "2022-07-27T19:14:30.332799Z", "iopub.status.idle": "2022-07-27T19:14:30.341749Z", "shell.execute_reply": "2022-07-27T19:14:30.341195Z" } }, "outputs": [], "source": [ "total = df[['x', 'y']].sum() # lazy computations are also ok" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:30.347194Z", "iopub.status.busy": "2022-07-27T19:14:30.346708Z", "iopub.status.idle": "2022-07-27T19:14:30.350518Z", "shell.execute_reply": "2022-07-27T19:14:30.349930Z" } }, "outputs": [], "source": [ "# total.compute() # but compute is bad, because compute blocks until done" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:30.363398Z", "iopub.status.busy": "2022-07-27T19:14:30.363198Z", "iopub.status.idle": "2022-07-27T19:14:30.401604Z", "shell.execute_reply": "2022-07-27T19:14:30.400873Z" } }, "outputs": [ { "data": { "text/html": [ "Future: finalize\n", " status: \n", "\n", "\n", "pending,\n", "\n", "\n", "\n", " type: NoneType,\n", "\n", "\n", " key: finalize-c005c96bb73e558555c311bfabfc6ba2" ], "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "future = client.compute(total)\n", "future" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:14:30.405415Z", "iopub.status.busy": "2022-07-27T19:14:30.405220Z", "iopub.status.idle": "2022-07-27T19:14:31.247222Z", "shell.execute_reply": "2022-07-27T19:14:31.246606Z" } }, "outputs": [ { "data": { "text/plain": [ "x 239.172407\n", "y 762.941155\n", "dtype: float64" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "await future" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Within a script\n", "---------------\n", "\n", "Running async/await code in Jupyter is a bit atypical. Jupyter already has an event loop running, so it's easy to use async/await syntax directly within it. In a normal Python script this won't be the case. Here is an example script that should run within a normal Python interpreter or as a script.\n", "\n", "```python\n", "import asyncio\n", "from dask.distributed import Client\n", "\n", "\n", "def inc(x: int) -> int:\n", " return x + 1\n", "\n", "\n", "async def f():\n", " async with Client(asynchronous=True) as client:\n", " future = client.submit(inc, 10)\n", " result = await future\n", " print(result)\n", "\n", "\n", "if __name__ == '__main__':\n", " asyncio.get_event_loop().run_until_complete(f())\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }