# Resilience against hardware failures

## Contents

Live Notebook

You can run this notebook in a live session or view it on Github.

# Resilience against hardware failures¶

Scenario: We have a cluster that partially consists of preemptible ressources. That is, we’ll have to deal with workers suddenly being shut down during computation. While demonstrated here with a LocalCluster, Dask’s resilience against preempted ressources is most useful with, e.g., Dask Kubernetes or Dask Jobqueue.

## Increase resilience¶

Whenever a worker shuts down, the scheduler will increment the suspiciousness counter of all tasks that were assigned (not necessarily computing) to the worker in question. Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken. We want to compute many tasks on only a few workers with workers shutting down randomly. So we expect the suspiciousness of all tasks to grow rapidly. Let’s increase the threshold:

[1]:

import dask



## All other imports¶

[2]:

from dask.distributed import Client, LocalCluster
from dask import bag as db
import os
import random
from time import sleep


## A cluster¶

[3]:

cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)
client = Client(cluster)
client

[3]:


### Client

Client-24950cb3-0de0-11ed-9bcc-000d3a8f7959

 Connection method: Cluster object Cluster type: distributed.LocalCluster Dashboard: http://127.0.0.1:8787/status

### Cluster Info

We’ll multiply a range of numbers by two, add some sleep to simulate some real work, and then reduce the whole sequence of doubled numbers by summing them.

[4]:

def multiply_by_two(x):
sleep(0.02)
return 2 * x

[5]:

N = 400

x = db.from_sequence(range(N), npartitions=N // 2)

mults = x.map(multiply_by_two)

summed = mults.sum()


## Suddenly shutting down workers¶

Let’s mark two worker process id’s as non-preemptible.

[6]:

all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]
non_preemptible_workers = all_current_workers[:2]

[7]:

def kill_a_worker():
preemptible_workers = [
w.pid for w in cluster.scheduler.workers.values()
if w.pid not in non_preemptible_workers]
if preemptible_workers:
os.kill(random.choice(preemptible_workers), 15)


## Start the computation and keep shutting down workers while it’s running¶

[8]:

summed = client.compute(summed)

while not summed.done():
kill_a_worker()
sleep(3.0)

2022-07-27 19:13:06,023 - distributed.nanny - WARNING - Restarting worker


## Check if results match¶

[9]:

print(f"sum(range({N})) on cluster: {summed.result()}\t(should be {N * (N-1)})")

sum(range(400)) on cluster: 159600    (should be 159600)