"
],
"text/plain": [
""
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client\n",
"\n",
"dask_client = await Client(asynchronous=True) # use local processes for now\n",
"dask_client"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:51.769262Z",
"iopub.status.busy": "2021-01-14T10:43:51.768103Z",
"iopub.status.idle": "2021-01-14T10:43:51.773515Z",
"shell.execute_reply": "2021-01-14T10:43:51.773161Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"def fib(n):\n",
" if n < 2:\n",
" return n\n",
" else:\n",
" return fib(n - 1) + fib(n - 2)\n",
"\n",
" \n",
"class FibHandler(tornado.web.RequestHandler):\n",
" async def get(self, n):\n",
" future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere\n",
" result = await future\n",
" self.write(str(result))\n",
"\n",
" \n",
"class MainHandler(tornado.web.RequestHandler):\n",
" async def get(self):\n",
" self.write(\"Hello, world\")\n",
"\n",
" \n",
"def make_app():\n",
" return tornado.web.Application([\n",
" (r\"/fast\", MainHandler),\n",
" (r\"/fib/(\\d+)\", FibHandler),\n",
" \n",
" ])\n",
"\n",
"app = make_app()\n",
"app.listen(9000)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Performance changes\n",
"\n",
"By offloading the fib computation to Dask we acheive two things:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parallel computing\n",
"\n",
"We can now support more requests in less time. The following experiment asks for `fib(28)` simultaneously from 20 requests. In the old version we computed these sequentially over a few seconds (the last person to request waited for a few seconds while their browser finished). In the new one many of these may be computed in parallel and so everyone gets an answer in a few hundred milliseconds."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:51.776083Z",
"iopub.status.busy": "2021-01-14T10:43:51.775681Z",
"iopub.status.idle": "2021-01-14T10:43:53.607524Z",
"shell.execute_reply": "2021-01-14T10:43:53.605713Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"http://localhost:8000/fib/28 , 20 simultaneous requests, total time: 1.8247489929199219\n"
]
}
],
"source": [
"# Before parallelism\n",
"await measure('http://localhost:8000/fib/28', n=20)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:53.612645Z",
"iopub.status.busy": "2021-01-14T10:43:53.612226Z",
"iopub.status.idle": "2021-01-14T10:43:53.844069Z",
"shell.execute_reply": "2021-01-14T10:43:53.843346Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"http://localhost:9000/fib/28 , 20 simultaneous requests, total time: 0.23087596893310547\n"
]
}
],
"source": [
"# After parallelism\n",
"await measure('http://localhost:9000/fib/28', n=20)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Asynchronous computing\n",
"\n",
"Previously while one request was busy evaluating `fib(...)` Tornado was blocked. It was unable to handle any other request. This is particularly problematic when our server provides both expensive computations and cheap ones. The cheap requests get hung up needlessly.\n",
"\n",
"Because Dask is able to integrate with asynchronous systems like Tornado or Asyncio, our web server can freely jump between many requests, even while computation is going on in the background. In the example below we see that even though the slow computation started first, the fast computation returned in just a few milliseconds."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:53.850586Z",
"iopub.status.busy": "2021-01-14T10:43:53.850178Z",
"iopub.status.idle": "2021-01-14T10:43:56.750451Z",
"shell.execute_reply": "2021-01-14T10:43:56.749888Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"http://localhost:8000/fib/35 , 1 simultaneous requests, total time: 2.889683246612549\n",
"http://localhost:8000/fast , 1 simultaneous requests, total time: 2.8900492191314697\n"
]
}
],
"source": [
"# Before async\n",
"a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))\n",
"b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))\n",
"await b\n",
"await a"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:56.753046Z",
"iopub.status.busy": "2021-01-14T10:43:56.752155Z",
"iopub.status.idle": "2021-01-14T10:43:59.837825Z",
"shell.execute_reply": "2021-01-14T10:43:59.836227Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"http://localhost:9000/fast , 1 simultaneous requests, total time: 0.004990577697753906\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"http://localhost:9000/fib/35 , 1 simultaneous requests, total time: 3.0786397457122803\n"
]
}
],
"source": [
"# After async\n",
"a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))\n",
"b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))\n",
"await b\n",
"await a"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Other options\n",
"\n",
"In these situations people today tend to use [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) or [Celery](http://www.celeryproject.org/).\n",
"\n",
"- concurrent.futures allows easy parallelism on a single machine and integrates well into async frameworks. The API is exactly what we showed above (Dask implements the concurrent.futures API). However concurrent.futures doesn't easily scale out to a cluster.\n",
"- Celery scales out more easily to multiple machines, but has higher latencies, doesn't scale down as nicely, and needs a bit of effort to integrate into async frameworks (or at least this is my understanding, my experience here is shallow)\n",
"\n",
"In this context Dask provides some of the benefits of both. It is easy to set up and use in the common single-machine case, but can also [scale out to a cluster](http://distributed.readthedocs.io/en/latest/setup.html). It integrates nicely with async frameworks and adds only very small latencies."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false,
"execution": {
"iopub.execute_input": "2021-01-14T10:43:59.844258Z",
"iopub.status.busy": "2021-01-14T10:43:59.839676Z",
"iopub.status.idle": "2021-01-14T10:43:59.851104Z",
"shell.execute_reply": "2021-01-14T10:43:59.849847Z"
},
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Roundtrip latency: 5.70 ms\n"
]
}
],
"source": [
"async def f():\n",
" start = time()\n",
" result = await dask_client.submit(lambda x: x + 1, 10)\n",
" end = time()\n",
" print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))\n",
" \n",
"await f()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}