Resilience against hardware failures
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Resilience against hardware failures¶
Scenario: We have a cluster that partially consists of preemptible ressources. That is, we’ll have to deal with workers suddenly being shut down during computation. While demonstrated here with a LocalCluster
, Dask’s resilience against preempted ressources is most useful with, e.g., Dask Kubernetes or Dask Jobqueue.
Relevant docs: http://distributed.dask.org/en/latest/resilience.html#hardware-failures
Increase resilience¶
Whenever a worker shuts down, the scheduler will increment the suspiciousness counter of all tasks that were assigned (not necessarily computing) to the worker in question. Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken. We want to compute many tasks on only a few workers with workers shutting down randomly. So we expect the suspiciousness of all tasks to grow rapidly. Let’s increase the threshold:
[1]:
import dask
dask.config.set({'distributed.scheduler.allowed-failures': 100});
All other imports¶
[2]:
from dask.distributed import Client, LocalCluster
from dask import bag as db
import os
import random
from time import sleep
A cluster¶
[3]:
cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)
client = Client(cluster)
client
[3]:
Client
Client-24950cb3-0de0-11ed-9bcc-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
22f50347
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 1.49 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-0999d4fd-0cdb-4600-8ce3-c52d841558f8
Comm: tcp://127.0.0.1:35393 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 1.49 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:46159 | Total threads: 1 |
Dashboard: http://127.0.0.1:36825/status | Memory: 381.47 MiB |
Nanny: tcp://127.0.0.1:39639 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-icrn9gq_ |
Worker: 1
Comm: tcp://127.0.0.1:44753 | Total threads: 1 |
Dashboard: http://127.0.0.1:46199/status | Memory: 381.47 MiB |
Nanny: tcp://127.0.0.1:34707 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-ke1b20m1 |
Worker: 2
Comm: tcp://127.0.0.1:44591 | Total threads: 1 |
Dashboard: http://127.0.0.1:37637/status | Memory: 381.47 MiB |
Nanny: tcp://127.0.0.1:42199 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-t8f4iili |
Worker: 3
Comm: tcp://127.0.0.1:41479 | Total threads: 1 |
Dashboard: http://127.0.0.1:38249/status | Memory: 381.47 MiB |
Nanny: tcp://127.0.0.1:38963 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-qmr65by3 |
A simple workload¶
We’ll multiply a range of numbers by two, add some sleep to simulate some real work, and then reduce the whole sequence of doubled numbers by summing them.
[4]:
def multiply_by_two(x):
sleep(0.02)
return 2 * x
[5]:
N = 400
x = db.from_sequence(range(N), npartitions=N // 2)
mults = x.map(multiply_by_two)
summed = mults.sum()
Suddenly shutting down workers¶
Let’s mark two worker process id’s as non-preemptible.
[6]:
all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = all_current_workers[:2]
[7]:
def kill_a_worker():
preemptible_workers = [
w.pid for w in cluster.scheduler.workers.values()
if w.pid not in non_preemptible_workers]
if preemptible_workers:
os.kill(random.choice(preemptible_workers), 15)
Start the computation and keep shutting down workers while it’s running¶
[8]:
summed = client.compute(summed)
while not summed.done():
kill_a_worker()
sleep(3.0)
2022-07-27 19:13:06,023 - distributed.nanny - WARNING - Restarting worker
Check if results match¶
[9]:
print(f"`sum(range({N}))` on cluster: {summed.result()}\t(should be {N * (N-1)})")
`sum(range(400))` on cluster: 159600 (should be 159600)