{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Stencil Computations with Numba\n", "===============================\n", "\n", "\"Numba\n", "\n", "This notebook combines [Numba](https://numba.pydata.org/), a high performance Python compiler, with Dask Arrays.\n", "\n", "In particular we show off two Numba features, and how they compose with Dask:\n", "\n", "1. Numba's [stencil decorator](https://numba.pydata.org/numba-doc/dev/user/stencil.html)\n", "2. NumPy's [Generalized Universal Functions](https://numpy.org/doc/stable/reference/c-api/generalized-ufuncs.html)\n", "\n", "*This was originally published as a blogpost [here](https://blog.dask.org/2019/04/09/numba-stencil)*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction to Numba Stencils\n", "\n", "Many array computing functions operate only on a local region of the array. This is common in image processing, signals processing, simulation, the solution of differential equations, anomaly detection, time series analysis, and more. Typically we write code that looks like the following:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:13.217150Z", "iopub.status.busy": "2022-07-27T19:17:13.216643Z", "iopub.status.idle": "2022-07-27T19:17:13.226587Z", "shell.execute_reply": "2022-07-27T19:17:13.226051Z" } }, "outputs": [], "source": [ "def _smooth(x):\n", " out = np.empty_like(x)\n", " for i in range(1, x.shape[0] - 1):\n", " for j in range(1, x.shape[1] - 1):\n", " out[i, j] = (x[i + -1, j + -1] + x[i + -1, j + 0] + x[i + -1, j + 1] +\n", " x[i + 0, j + -1] + x[i + 0, j + 0] + x[i + 0, j + 1] +\n", " x[i + 1, j + -1] + x[i + 1, j + 0] + x[i + 1, j + 1]) // 9\n", "\n", " return out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or something similar. The `numba.stencil` decorator makes this a bit easier to write down. You just write down what happens on every element, and Numba handles the rest.\n", "\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:13.229734Z", "iopub.status.busy": "2022-07-27T19:17:13.229350Z", "iopub.status.idle": "2022-07-27T19:17:13.726508Z", "shell.execute_reply": "2022-07-27T19:17:13.725749Z" } }, "outputs": [], "source": [ "import numba\n", "\n", "@numba.stencil\n", "def _smooth(x):\n", " return (x[-1, -1] + x[-1, 0] + x[-1, 1] +\n", " x[ 0, -1] + x[ 0, 0] + x[ 0, 1] +\n", " x[ 1, -1] + x[ 1, 0] + x[ 1, 1]) // 9" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When we run this function on a NumPy array, we find that it is slow, operating at Python speeds." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:13.730835Z", "iopub.status.busy": "2022-07-27T19:17:13.730264Z", "iopub.status.idle": "2022-07-27T19:17:16.786054Z", "shell.execute_reply": "2022-07-27T19:17:16.785503Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "368 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "import numpy as np\n", "x = np.ones((100, 100))\n", "\n", "%timeit _smooth(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But if we JIT compile this function with Numba, then it runs more quickly." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:16.789631Z", "iopub.status.busy": "2022-07-27T19:17:16.789221Z", "iopub.status.idle": "2022-07-27T19:17:17.286562Z", "shell.execute_reply": "2022-07-27T19:17:17.285613Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "175 µs ± 25.4 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "@numba.njit\n", "def smooth(x):\n", " return _smooth(x)\n", "\n", "%timeit smooth(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For those counting, that’s over 1000x faster!\n", "\n", "*Note: this function already exists as `scipy.ndimage.uniform_filter`, which operates at the same speed.*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Array\n", "\n", "In these applications people often have many such arrays and they want to apply this function over all of them. In principle they could do this with a for loop.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "from glob import glob\n", "import skimage.io\n", "\n", "for fn in glob('/path/to/*.png'):\n", " img = skimage.io.imread(fn)\n", " out = smooth(img)\n", " skimage.io.imsave(fn.replace('.png', '.out.png'), out)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If they wanted to then do this in parallel they would maybe use the multiprocessing or concurrent.futures modules. If they wanted to do this across a cluster then they could rewrite their code with PySpark or some other system.\n", "\n", "Or, they could use Dask array, which will handle both the pipelining and the parallelism (single machine or on a cluster) all while still looking mostly like a NumPy array.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "import dask_image\n", "x = dask_image.imread('/path/to/*.png') # a large lazy array of all of our images\n", "y = x.map_blocks(smooth, dtype='int8')\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And then because each of the chunks of a Dask array are just NumPy arrays, we can use the [map_blocks](https://docs.dask.org/en/latest/generated/dask.array.map_blocks.html) function to apply this function across all of our images, and then save them out.\n", "\n", "This is fine, but lets go a bit further, and discuss generalized universal functions from NumPy." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because we don't have a stack of images nearby, we're going to make a random array with similar structure." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:17.290327Z", "iopub.status.busy": "2022-07-27T19:17:17.290118Z", "iopub.status.idle": "2022-07-27T19:17:17.578607Z", "shell.execute_reply": "2022-07-27T19:17:17.577955Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 9.31 GiB 47.68 MiB
Shape (10000, 1000, 1000) (50, 1000, 1000)
Count 200 Tasks 200 Chunks
Type int8 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 1000\n", " 1000\n", " 10000\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.array as da\n", "x = da.random.randint(0, 127, size=(10000, 1000, 1000), chunks=('64 MB', None, None), dtype='int8')\n", "x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Generalized Universal Functions\n", "-------------------------------\n", "\n", "**Numba Docs:** https://numba.pydata.org/numba-doc/dev/user/vectorize.html\n", "\n", "**NumPy Docs:** https://numpy.org/doc/stable/reference/c-api/generalized-ufuncs.html\n", "\n", "A generalized universal function (gufunc) is a normal function that has been\n", "annotated with typing and dimension information. For example we can redefine\n", "our `smooth` function as a gufunc as follows:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:17.581865Z", "iopub.status.busy": "2022-07-27T19:17:17.581368Z", "iopub.status.idle": "2022-07-27T19:17:18.350031Z", "shell.execute_reply": "2022-07-27T19:17:18.349228Z" } }, "outputs": [], "source": [ "@numba.guvectorize(\n", " [(numba.int8[:, :], numba.int8[:, :])],\n", " '(n, m) -> (n, m)'\n", ")\n", "def smooth(x, out):\n", " out[:] = _smooth(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This function knows that it consumes a 2d array of int8's and produces a 2d\n", "array of int8's of the same dimensions.\n", "\n", "This sort of annotation is a small change, but it gives other systems like Dask\n", "enough information to use it intelligently. Rather than call functions like\n", "`map_blocks`, we can just use the function directly, as if our Dask Array was\n", "just a very large NumPy array." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:18.353924Z", "iopub.status.busy": "2022-07-27T19:17:18.353293Z", "iopub.status.idle": "2022-07-27T19:17:18.374788Z", "shell.execute_reply": "2022-07-27T19:17:18.374158Z" } }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 9.31 GiB 47.68 MiB
Shape (10000, 1000, 1000) (50, 1000, 1000)
Count 800 Tasks 200 Chunks
Type int8 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 1000\n", " 1000\n", " 10000\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Before gufuncs\n", "y = x.map_blocks(smooth, dtype='int8')\n", "\n", "# After gufuncs\n", "y = smooth(x)\n", "y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is nice. If you write library code with gufunc semantics then that code\n", "just works with systems like Dask, without you having to build in explicit\n", "support for parallel computing. This makes the lives of users much easier." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. It will start the dashboard which \n", "is useful to gain insight on the computation. " ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:18.377921Z", "iopub.status.busy": "2022-07-27T19:17:18.377494Z", "iopub.status.idle": "2022-07-27T19:17:19.321587Z", "shell.execute_reply": "2022-07-27T19:17:19.320997Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-bb5807ea-0de0-11ed-a0f1-000d3a8f7959

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://10.1.1.64:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

9d67a1da

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://10.1.1.64:8787/status\n", " \n", " Workers: 1\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 3.73 GiB\n", "
Status: runningUsing processes: False
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-0774d2ab-c891-4a84-bc8b-4727c390aa3c

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: inproc://10.1.1.64/8433/1\n", " \n", " Workers: 1\n", "
\n", " Dashboard: http://10.1.1.64:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 3.73 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: inproc://10.1.1.64/8433/4\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://10.1.1.64:46077/status\n", " \n", " Memory: 3.73 GiB\n", "
\n", " Nanny: None\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/applications/dask-worker-space/worker-ebsbpsit\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "client = Client(threads_per_worker=4,\n", " n_workers=1,\n", " processes=False,\n", " memory_limit='4GB')\n", "client" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-07-27T19:17:19.324515Z", "iopub.status.busy": "2022-07-27T19:17:19.324109Z", "iopub.status.idle": "2022-07-27T19:18:20.268163Z", "shell.execute_reply": "2022-07-27T19:18:20.267584Z" } }, "outputs": [ { "data": { "text/plain": [ "122" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "y.max().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## GPU Version\n", "\n", "Numba also supports a JIT compilation with CUDA on compatible GPU devices. \n", "\n", "This gives about a 200x speedup over a CPU on a single V100 GPU using [numba.cuda.jit](https://numba.pydata.org/numba-doc/dev/cuda/index.html).\n", "\n", "```python\n", "import numba.cuda\n", "\n", "@numba.cuda.jit\n", "def smooth_gpu(x, out):\n", " i, j = cuda.grid(2)\n", " n, m = x.shape\n", " if 1 <= i < n - 1 and 1 <= j < m - 1:\n", " out[i, j] = (x[i - 1, j - 1] + x[i - 1, j] + x[i - 1, j + 1] +\n", " x[i , j - 1] + x[i , j] + x[i , j + 1] +\n", " x[i + 1, j - 1] + x[i + 1, j] + x[i + 1, j + 1]) // 9\n", " \n", "import cupy, math\n", "\n", "x_gpu = cupy.ones((10000, 10000), dtype='int8')\n", "out_gpu = cupy.zeros((10000, 10000), dtype='int8')\n", "\n", "# I copied the four lines below from the Numba docs\n", "threadsperblock = (16, 16)\n", "blockspergrid_x = math.ceil(x_gpu.shape[0] / threadsperblock[0])\n", "blockspergrid_y = math.ceil(x_gpu.shape[1] / threadsperblock[1])\n", "blockspergrid = (blockspergrid_x, blockspergrid_y)\n", "\n", "smooth_gpu[blockspergrid, threadsperblock](x_gpu, out_gpu)\n", "```\n", "\n", "*Full notebook [here](https://gist.github.com/mrocklin/9272bf84a8faffdbbe2cd44b4bc4ce3c)*" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }