{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Handle Evolving Workflows\n", "=========================\n", "\n", "For some workflows we don't know the extent of the computation at the outset. We need to do some computation in order to figure out the rest of the computation that we need to do. The computation grows and evolves as we do more work.\n", "\n", "As an example, consider a situation where you need to read many files and then based on the contents of those files, fire off additional work. You would like to read the files in parallel, and then within each file expose more parallelism.\n", "\n", "This example goes through three ways to handle this situation using [Dask Futures](https://docs.dask.org/en/latest/futures.html)\n", "\n", "1. Using `as_completed`\n", "2. Using `async/await`\n", "3. Launching tasks from tasks\n", "\n", "But first, lets run our code sequentially." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "0: Sequential code\n", "------------------" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:44:41.140985Z", "iopub.status.busy": "2021-01-14T10:44:41.140433Z", "iopub.status.idle": "2021-01-14T10:44:41.146656Z", "shell.execute_reply": "2021-01-14T10:44:41.146981Z" } }, "outputs": [ { "data": { "text/plain": [ "['file.0.txt', 'file.1.txt', 'file.2.txt']" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "filenames = [\"file.{}.txt\".format(i) for i in range(10)]\n", "\n", "filenames[:3]" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:44:41.149274Z", "iopub.status.busy": "2021-01-14T10:44:41.148755Z", "iopub.status.idle": "2021-01-14T10:44:41.160651Z", "shell.execute_reply": "2021-01-14T10:44:41.161147Z" } }, "outputs": [], "source": [ "import random, time\n", "\n", "\n", "def parse_file(fn: str) -> list:\n", " \"\"\" Returns a list work items of unknown length \"\"\"\n", " time.sleep(random.random())\n", " return [random.random() for _ in range(random.randint(1, 10))]\n", "\n", "def process_item(x: float):\n", " \"\"\" Process each work item \"\"\"\n", " time.sleep(random.random() / 4)\n", " return x + 1" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:44:41.163230Z", "iopub.status.busy": "2021-01-14T10:44:41.162712Z", "iopub.status.idle": "2021-01-14T10:44:55.804427Z", "shell.execute_reply": "2021-01-14T10:44:55.804057Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 3.63 ms, sys: 754 µs, total: 4.38 ms\n", "Wall time: 14.6 s\n" ] } ], "source": [ "%%time\n", "\n", "# This takes around 10-20s\n", "\n", "results = []\n", "\n", "for fn in filenames:\n", " L = parse_file(fn)\n", " for x in L:\n", " out = process_item(x)\n", " results.append(out)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start Dask Client\n", "-----------------\n", "\n", "We'll need a Dask client in order to manage dynamic workloads" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2021-01-14T10:44:55.808824Z", "iopub.status.busy": "2021-01-14T10:44:55.808412Z", "iopub.status.idle": "2021-01-14T10:44:58.065690Z", "shell.execute_reply": "2021-01-14T10:44:58.065203Z" } }, "outputs": [ { "data": { "text/html": [ "
\n",
"Client\n", "
| \n",
"\n",
"Cluster\n", "
| \n",
"