{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Resilience against hardware failures\n", "\n", "Scenario: We have a cluster that partially consists of preemptible ressources. That is, we'll have to deal with workers suddenly being shut down during computation. While demonstrated here with a `LocalCluster`, Dask's resilience against preempted ressources is most useful with, e.g., [Dask Kubernetes](https://kubernetes.dask.org/) or [Dask Jobqueue](https://jobqueue.dask.org).\n", "\n", "Relevant docs: " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Increase resilience\n", "\n", "Whenever a worker shuts down, the scheduler will increment the suspiciousness counter of _all_ tasks that were assigned (not necessarily computing) to the worker in question. Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken. We want to compute many tasks on only a few workers with workers shutting down randomly. So we expect the suspiciousness of all tasks to grow rapidly. Let's increase the threshold:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:02.450063Z", "iopub.status.busy": "2022-07-27T19:13:02.449369Z", "iopub.status.idle": "2022-07-27T19:13:02.586449Z", "shell.execute_reply": "2022-07-27T19:13:02.585692Z" } }, "outputs": [], "source": [ "import dask\n", "\n", "dask.config.set({'distributed.scheduler.allowed-failures': 100});" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## All other imports" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:02.591120Z", "iopub.status.busy": "2022-07-27T19:13:02.589825Z", "iopub.status.idle": "2022-07-27T19:13:02.882769Z", "shell.execute_reply": "2022-07-27T19:13:02.882217Z" } }, "outputs": [], "source": [ "from dask.distributed import Client, LocalCluster\n", "from dask import bag as db\n", "import os\n", "import random\n", "from time import sleep" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A cluster" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:02.886251Z", "iopub.status.busy": "2022-07-27T19:13:02.886039Z", "iopub.status.idle": "2022-07-27T19:13:05.734651Z", "shell.execute_reply": "2022-07-27T19:13:05.733981Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-24950cb3-0de0-11ed-9bcc-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

22f50347

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 4\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 1.49 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-0999d4fd-0cdb-4600-8ce3-c52d841558f8

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:35393\n", " \n", " Workers: 4\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 1.49 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:46159\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:36825/status\n", " \n", " Memory: 381.47 MiB\n", "
\n", " Nanny: tcp://127.0.0.1:39639\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-icrn9gq_\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:44753\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:46199/status\n", " \n", " Memory: 381.47 MiB\n", "
\n", " Nanny: tcp://127.0.0.1:34707\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-ke1b20m1\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:44591\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:37637/status\n", " \n", " Memory: 381.47 MiB\n", "
\n", " Nanny: tcp://127.0.0.1:42199\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-t8f4iili\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:41479\n", " \n", " Total threads: 1\n", "
\n", " Dashboard: http://127.0.0.1:38249/status\n", " \n", " Memory: 381.47 MiB\n", "
\n", " Nanny: tcp://127.0.0.1:38963\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-qmr65by3\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)\n", "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A simple workload\n", "\n", "We'll multiply a range of numbers by two, add some sleep to simulate some real work, and then reduce the whole sequence of doubled numbers by summing them." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:05.737638Z", "iopub.status.busy": "2022-07-27T19:13:05.737362Z", "iopub.status.idle": "2022-07-27T19:13:05.740996Z", "shell.execute_reply": "2022-07-27T19:13:05.740431Z" } }, "outputs": [], "source": [ "def multiply_by_two(x):\n", " sleep(0.02)\n", " return 2 * x" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:05.744199Z", "iopub.status.busy": "2022-07-27T19:13:05.743802Z", "iopub.status.idle": "2022-07-27T19:13:05.750425Z", "shell.execute_reply": "2022-07-27T19:13:05.749500Z" } }, "outputs": [], "source": [ "N = 400\n", "\n", "x = db.from_sequence(range(N), npartitions=N // 2)\n", "\n", "mults = x.map(multiply_by_two)\n", "\n", "summed = mults.sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Suddenly shutting down workers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's mark two worker process id's as non-preemptible." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:05.757782Z", "iopub.status.busy": "2022-07-27T19:13:05.757272Z", "iopub.status.idle": "2022-07-27T19:13:05.761043Z", "shell.execute_reply": "2022-07-27T19:13:05.760448Z" } }, "outputs": [], "source": [ "all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]\n", "non_preemptible_workers = all_current_workers[:2]" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:05.765244Z", "iopub.status.busy": "2022-07-27T19:13:05.763772Z", "iopub.status.idle": "2022-07-27T19:13:05.769426Z", "shell.execute_reply": "2022-07-27T19:13:05.768724Z" } }, "outputs": [], "source": [ "def kill_a_worker():\n", " preemptible_workers = [\n", " w.pid for w in cluster.scheduler.workers.values()\n", " if w.pid not in non_preemptible_workers]\n", " if preemptible_workers:\n", " os.kill(random.choice(preemptible_workers), 15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start the computation and keep shutting down workers while it's running" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:05.772546Z", "iopub.status.busy": "2022-07-27T19:13:05.772002Z", "iopub.status.idle": "2022-07-27T19:13:08.945965Z", "shell.execute_reply": "2022-07-27T19:13:08.945162Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-07-27 19:13:06,023 - distributed.nanny - WARNING - Restarting worker\n" ] } ], "source": [ "summed = client.compute(summed)\n", "\n", "while not summed.done():\n", " kill_a_worker()\n", " sleep(3.0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check if results match" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:13:08.950451Z", "iopub.status.busy": "2022-07-27T19:13:08.949868Z", "iopub.status.idle": "2022-07-27T19:13:08.960537Z", "shell.execute_reply": "2022-07-27T19:13:08.959989Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "`sum(range(400))` on cluster: 159600\t(should be 159600)\n" ] } ], "source": [ "print(f\"`sum(range({N}))` on cluster: {summed.result()}\\t(should be {N * (N-1)})\")" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }