Asynchronous Computation: Web Servers + Dask
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Asynchronous Computation: Web Servers + Dask¶
Lets imagine a simple web server that serves both fast-loading pages and also performs some computation on slower loading pages. In our case this will be a simple Fibonnaci serving application, but you can imagine replacing the fib
function for running a machine learning model on some input data, fetching results from a database, etc..
[1]:
import tornado.ioloop
import tornado.web
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
def get(self, n):
result = fib(int(n))
self.write(str(result))
class FastHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello!")
def make_app():
return tornado.web.Application([
(r"/fast", FastHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(8000)
[1]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3040>
Speed¶
We know that users associate fast response time to authoritative content and trust, so we want to measure how fast our pages load. We’re particularly interested in doing this during many simultaneous loads, simulating how our web server will respond when serving many users
[2]:
import asyncio
import tornado.httpclient
client = tornado.httpclient.AsyncHTTPClient()
from time import time
async def measure(url, n=100):
""" Get url n times concurrently. Print duration. """
start = time()
futures = [client.fetch(url) for i in range(n)]
results = await asyncio.gather(*futures)
end = time()
print(url, ', %d simultaneous requests, ' % n, 'total time: ', (end - start))
Timings¶
We see that
Tornado has about a 3-5ms roundtrip time
It can run 100 such queries in around 100ms, so there is some nice concurrency happening
Calling fib takes a while
Calling fib 100 times takes around 100 times as long, there is not as much parallelism
[3]:
await measure('http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests, total time: 0.005836009979248047
[4]:
await measure('http://localhost:8000/fast', n=100)
http://localhost:8000/fast , 100 simultaneous requests, total time: 0.13756465911865234
[5]:
await measure('http://localhost:8000/fib/28', n=1)
http://localhost:8000/fib/28 , 1 simultaneous requests, total time: 0.11277055740356445
[6]:
await measure('http://localhost:8000/fib/28', n=100)
http://localhost:8000/fib/28 , 100 simultaneous requests, total time: 11.173585653305054
Blocking async¶
In the example below we see that one call to the slow fib/
route will unfortunately block other much faster requests:
[7]:
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
http://localhost:8000/fib/35 , 1 simultaneous requests, total time: 3.2470662593841553
http://localhost:8000/fast , 1 simultaneous requests, total time: 3.246363639831543
Discussion¶
There are two problems/opportunities here:
All of our
fib
calls are independent, we would like to run these computations in parallel with multiple cores or a nearby cluster.Our slow computationally intense
fib
requests can get in the way of our fast requests. One slow user can affect everyone else.
Asynchronous off-process computation with Dask¶
To resolve both of these problems we will offload computation to other processes or computers using Dask. Because Dask is an async framework it can integrate nicely with Tornado or Asyncio.
[8]:
from dask.distributed import Client
dask_client = await Client(asynchronous=True) # use local processes for now
dask_client
[8]:
Client
Client-62e54cd7-0de0-11ed-9dd9-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
aada52c9
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 2 | Total memory: 6.78 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-f94995f6-d115-4423-8a61-bc03a4ba55f2
Comm: tcp://127.0.0.1:42751 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:39891 | Total threads: 1 |
Dashboard: http://127.0.0.1:43005/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:46657 | |
Local directory: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-589y0ffl |
Worker: 1
Comm: tcp://127.0.0.1:35639 | Total threads: 1 |
Dashboard: http://127.0.0.1:33063/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:43917 | |
Local directory: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-27symtet |
[9]:
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
async def get(self, n):
future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere
result = await future
self.write(str(result))
class MainHandler(tornado.web.RequestHandler):
async def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/fast", MainHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(9000)
[9]:
<tornado.httpserver.HTTPServer at 0x7f82ccab3070>
Performance changes¶
By offloading the fib computation to Dask we acheive two things:
Parallel computing¶
We can now support more requests in less time. The following experiment asks for fib(28)
simultaneously from 20 requests. In the old version we computed these sequentially over a few seconds (the last person to request waited for a few seconds while their browser finished). In the new one many of these may be computed in parallel and so everyone gets an answer in a few hundred milliseconds.
[10]:
# Before parallelism
await measure('http://localhost:8000/fib/28', n=20)
http://localhost:8000/fib/28 , 20 simultaneous requests, total time: 2.282106399536133
[11]:
# After parallelism
await measure('http://localhost:9000/fib/28', n=20)
http://localhost:9000/fib/28 , 20 simultaneous requests, total time: 0.44431519508361816
Asynchronous computing¶
Previously while one request was busy evaluating fib(...)
Tornado was blocked. It was unable to handle any other request. This is particularly problematic when our server provides both expensive computations and cheap ones. The cheap requests get hung up needlessly.
Because Dask is able to integrate with asynchronous systems like Tornado or Asyncio, our web server can freely jump between many requests, even while computation is going on in the background. In the example below we see that even though the slow computation started first, the fast computation returned in just a few milliseconds.
[12]:
# Before async
a = asyncio.ensure_future(measure('http://localhost:8000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:8000/fast', n=1))
await b
await a
http://localhost:8000/fib/35 , 1 simultaneous requests, total time: 3.244072437286377
http://localhost:8000/fast , 1 simultaneous requests, total time: 3.244060516357422
[13]:
# After async
a = asyncio.ensure_future(measure('http://localhost:9000/fib/35', n=1))
b = asyncio.ensure_future(measure('http://localhost:9000/fast', n=1))
await b
await a
http://localhost:9000/fast , 1 simultaneous requests, total time: 0.009461402893066406
http://localhost:9000/fib/35 , 1 simultaneous requests, total time: 3.4255080223083496
Other options¶
In these situations people today tend to use concurrent.futures or Celery.
concurrent.futures allows easy parallelism on a single machine and integrates well into async frameworks. The API is exactly what we showed above (Dask implements the concurrent.futures API). However concurrent.futures doesn’t easily scale out to a cluster.
Celery scales out more easily to multiple machines, but has higher latencies, doesn’t scale down as nicely, and needs a bit of effort to integrate into async frameworks (or at least this is my understanding, my experience here is shallow)
In this context Dask provides some of the benefits of both. It is easy to set up and use in the common single-machine case, but can also scale out to a cluster. It integrates nicely with async frameworks and adds only very small latencies.
[14]:
async def f():
start = time()
result = await dask_client.submit(lambda x: x + 1, 10)
end = time()
print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))
await f()
Roundtrip latency: 13.69 ms