Live Notebook

You can run this notebook in a live session Binder or view it on Github.

ETL Pipelines with Prefect

Prefect is a platform for automating data workflows. Data engineers and data scientists can build, test and deploy production pipelines without worrying about all of the “negative engineering” aspects of production. For example, Prefect makes it easy to deploy a workflow that runs on a complicated schedule, requires task retries in the event of failures, and sends notifications when certain tasks are complete. Prefect was built on top of Dask, and relies on Dask to schedule and manage the execution of a Prefect workflow in a distributed environment.

This example demonstrates running a Prefect ETL Flow on Dask which ultimately creates a GIF. While this is a somewhat unconventional use case of Prefect, we’re no strangers to unconventional use cases.

In the world of workflow engines, Prefect supports many unique features; in this particular example we will see:

  • parametrization of workflows

  • dynamic runtime “mapping” of workflow tasks

  • customizable execution logic

You wouldn’t get this from any other engine.

Contents

  1. Description of goal

  2. Setting up our environment

  3. Building our Flow

    1. Extract

    2. Transform

    3. Load

    4. Putting the pieces together

  4. Running our Flow on Dask

  5. Watching our GIF

Goal

To demonstrate how Prefect and Dask work together, we are going to build and execute a standard “Extract / Transform / Load” (ETL) workflow for processing some basic image data. Most ETL workflows involve a scheduled migration of data from one database to another. In our case, we will be moving data from a file located at a known URL to our local hard disk, converting the individual file into a series of frames, and compiling those frames into a GIF. The URL references a file containing raw bytes such as:

b"""aÙw˜≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV

! HCDBB5;5?

“””

The steps of our workflow will be as follows: - Extract: pull the data file from a URL (speicified by a Parameter) to disk - Transform: split the file into multiple files, each corresponding to a single frame - Load: Store each frame individually, and compile the frames together into a GIF

Once we have built our Flow, we can execute it with different values for the Parameter or even run it on a nightly schedule.

NOTE: If we planned on executing this Flow in a truly distributed environment, writing the images to the local filesystem would not be appropriate. We would instead use an external datastore such as Google Cloud Storage, or a proper database.

Setting up our environment

Before proceeding, we need to install both the `prefect <https://pypi.org/project/prefect/>`__ and `imageio <https://pypi.org/project/imageio/>`__ packages.

[1]:
!pip install imageio prefect[viz]
Requirement already satisfied: imageio in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (2.6.1)
Collecting prefect[viz]
  Downloading https://files.pythonhosted.org/packages/01/70/b3b455f31cd229de01bcbcf6346c8b604586960aded8a84a2d3286430da5/prefect-0.6.7-py3-none-any.whl (461kB)

Requirement already satisfied: numpy in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from imageio) (1.17.2)
Requirement already satisfied: pillow in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from imageio) (6.2.0)
Collecting marshmallow-oneofschema<3.0,>=2.0.0b2
  Downloading https://files.pythonhosted.org/packages/7a/f3/da71ec4c539b6b5e16f1988fc46d8ea3374e258ef79fb5a7d88478fb922f/marshmallow_oneofschema-2.0.1-py2.py3-none-any.whl
Requirement already satisfied: requests<3.0,>=2.20 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (2.22.0)
Collecting python-slugify<4.0,>=1.2.6
  Downloading https://files.pythonhosted.org/packages/03/cd/f0b39ad9b5a17f93c0a006dbd49f67fc5dc8b53de58a61542bc034ca70a8/python-slugify-3.0.6.tar.gz
Collecting croniter<1.0,>=0.3.24
  Downloading https://files.pythonhosted.org/packages/c8/4c/04dea44f87b963d5c3f2bbc391e6c69d0a14aa896e35590be56213a04e4f/croniter-0.3.30-py2.py3-none-any.whl
Collecting typing-extensions<4.0,>=3.6.2
  Downloading https://files.pythonhosted.org/packages/27/aa/bd1442cfb0224da1b671ab334d3b0a4302e4161ea916e28904ff9618d471/typing_extensions-3.7.4-py3-none-any.whl
Requirement already satisfied: click<8.0,>=7.0 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (7.0)
Collecting docker<5.0,>=3.4.1
  Downloading https://files.pythonhosted.org/packages/cc/ca/699d4754a932787ef353a157ada74efd1ceb6d1fc0bfb7989ae1e7b33111/docker-4.1.0-py2.py3-none-any.whl (139kB)

Requirement already satisfied: python-dateutil~=2.7 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (2.8.0)
Requirement already satisfied: cloudpickle<1.3,>=0.6.0 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (1.2.2)
Collecting python-box<4.0,>=3.4.4
  Downloading https://files.pythonhosted.org/packages/0a/8f/1797a73eb18d55aafe7a4a97ad9be88eb352b97bca58cfc10226d263d177/python_box-3.4.5-py2.py3-none-any.whl
Collecting pendulum<3.0,>=2.0.4
  Downloading https://files.pythonhosted.org/packages/0b/db/eadff08d7a50a5a28d13efba6a029868f72b662f7cb30079f4af56a8de02/pendulum-2.0.5-cp37-cp37m-manylinux1_x86_64.whl (141kB)

Collecting typing<4.0,>=3.6.1
  Downloading https://files.pythonhosted.org/packages/fe/2e/b480ee1b75e6d17d2993738670e75c1feeb9ff7f64452153cf018051cc92/typing-3.7.4.1-py3-none-any.whl
Collecting marshmallow<3.1.0,>=3.0.0b19
  Downloading https://files.pythonhosted.org/packages/ea/43/6d61ca1a4de901701e20f5cf3a15332df4447391fa2258f4e7d87019b14d/marshmallow-3.0.5-py2.py3-none-any.whl (43kB)

Collecting toml<1.0,>=0.9.4
  Downloading https://files.pythonhosted.org/packages/a2/12/ced7105d2de62fa7c8fb5fce92cc4ce66b57c95fb875e9318dba7f8c5db0/toml-0.10.0-py2.py3-none-any.whl
Collecting mypy-extensions<1.0,>=0.4.0
  Downloading https://files.pythonhosted.org/packages/1e/2f/7bba47eb58f62a473387cd7658dedd0bedb4b0fa9d530bbbfa0a6d23034a/mypy_extensions-0.4.2.tar.gz
Requirement already satisfied: dask[bag]<3.0,>=0.19.3 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (2.6.0)
Requirement already satisfied: pytz>=2018.7 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (2019.3)
Requirement already satisfied: distributed<3.0,>=1.26.1 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (2.6.0)
Requirement already satisfied: pyyaml<5.2,>=3.13 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (5.1.2)
Collecting tabulate<1.0,>=0.8.0
  Downloading https://files.pythonhosted.org/packages/66/d4/977fdd5186b7cdbb7c43a7aac7c5e4e0337a84cb802e154616f3cfc84563/tabulate-0.8.5.tar.gz (45kB)

Requirement already satisfied: graphviz>=0.8.3; extra == "viz" in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from prefect[viz]) (0.13)
Requirement already satisfied: urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from requests<3.0,>=2.20->prefect[viz]) (1.25.6)
Requirement already satisfied: idna<2.9,>=2.5 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from requests<3.0,>=2.20->prefect[viz]) (2.8)
Requirement already satisfied: certifi>=2017.4.17 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from requests<3.0,>=2.20->prefect[viz]) (2019.9.11)
Requirement already satisfied: chardet<3.1.0,>=3.0.2 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from requests<3.0,>=2.20->prefect[viz]) (3.0.4)
Collecting text-unidecode>=1.3
  Downloading https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl (78kB)

Collecting websocket-client>=0.32.0
  Downloading https://files.pythonhosted.org/packages/29/19/44753eab1fdb50770ac69605527e8859468f3c0fd7dc5a76dd9c4dbd7906/websocket_client-0.56.0-py2.py3-none-any.whl (200kB)

Requirement already satisfied: six>=1.4.0 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from docker<5.0,>=3.4.1->prefect[viz]) (1.12.0)
Collecting pytzdata>=2018.3
  Downloading https://files.pythonhosted.org/packages/7f/f9/cdd39831b0465285c02ed90cfbf0db25bb951cb2a35ded0e69222375bea3/pytzdata-2019.3-py2.py3-none-any.whl (489kB)

Requirement already satisfied: fsspec>=0.5.1; extra == "bag" in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from dask[bag]<3.0,>=0.19.3->prefect[viz]) (0.5.2)
Requirement already satisfied: toolz>=0.7.3; extra == "bag" in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from dask[bag]<3.0,>=0.19.3->prefect[viz]) (0.10.0)
Requirement already satisfied: partd>=0.3.10; extra == "bag" in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from dask[bag]<3.0,>=0.19.3->prefect[viz]) (1.0.0)
Requirement already satisfied: zict>=0.1.3 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (1.0.0)
Requirement already satisfied: psutil>=5.0 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (5.6.3)
Requirement already satisfied: tblib in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (1.4.0)
Requirement already satisfied: tornado>=5 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (5.1.1)
Requirement already satisfied: msgpack in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (0.6.2)
Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from distributed<3.0,>=1.26.1->prefect[viz]) (2.1.0)
Requirement already satisfied: locket in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from partd>=0.3.10; extra == "bag"->dask[bag]<3.0,>=0.19.3->prefect[viz]) (0.2.0)
Requirement already satisfied: heapdict in /home/travis/miniconda/envs/test/lib/python3.7/site-packages (from zict>=0.1.3->distributed<3.0,>=1.26.1->prefect[viz]) (1.0.1)
Building wheels for collected packages: python-slugify, mypy-extensions, tabulate
  Building wheel for python-slugify (setup.py) ... - done
  Created wheel for python-slugify: filename=python_slugify-3.0.6-py2.py3-none-any.whl size=5470 sha256=cef9bc7f770a435024b8eadddd3bcabb710d8987b018e09fbae7cfb111525262
  Stored in directory: /home/travis/.cache/pip/wheels/00/09/3b/beb8520abeb5d6eb7cb8a8cd23c412103e5bb60cbb0145b25d
  Building wheel for mypy-extensions (setup.py) ... - done
  Created wheel for mypy-extensions: filename=mypy_extensions-0.4.2-py2.py3-none-any.whl size=4463 sha256=84555c7e4e423c615cf956f20781a961da3fc1bef1aca31e968399f5c755ffe6
  Stored in directory: /home/travis/.cache/pip/wheels/d6/7b/51/3dea4153cf7748cf89d5a9f2e89a619abded55e3a2e82c521e
  Building wheel for tabulate (setup.py) ... - done
  Created wheel for tabulate: filename=tabulate-0.8.5-cp37-none-any.whl size=23257 sha256=90235c007539ac32d73ffdd199b1b450d702b7e94099e7406717e452ec4c8c23
  Stored in directory: /home/travis/.cache/pip/wheels/e1/41/5e/e201f95d90fc84f93aa629b6638adacda680fe63aac47174ab
Successfully built python-slugify mypy-extensions tabulate
Installing collected packages: marshmallow, marshmallow-oneofschema, text-unidecode, python-slugify, croniter, typing-extensions, websocket-client, docker, python-box, pytzdata, pendulum, typing, toml, mypy-extensions, tabulate, prefect
Successfully installed croniter-0.3.30 docker-4.1.0 marshmallow-3.0.5 marshmallow-oneofschema-2.0.1 mypy-extensions-0.4.2 pendulum-2.0.5 prefect-0.6.7 python-box-3.4.5 python-slugify-3.0.6 pytzdata-2019.3 tabulate-0.8.5 text-unidecode-1.3 toml-0.10.0 typing-3.7.4.1 typing-extensions-3.7.4 websocket-client-0.56.0

Extract

First, we will define our tasks for extracting the image data file from a given URL and saving it to a given file location. To do so, we will utilize two methods for creating Prefect Tasks: - the task decorator for converting any Python function into a task - a pre-written, configurable Task from the Prefect “Task Library” which helps us abstract some standard boilerplate

Additionally, we will utilize the following Prefect concepts: - a Prefect signal for marking this task and its downstream depedencies as successfully “Skipped” if the file is already present in our local filesystem - retry semantics: if, for whatever reason, our curl command fails to connect, we want it to retry up to 2 times with a 10 second delay. This way, if we run this workflow on a schedule we won’t need to concern ourselves with temporary intermittent connection issues.

Right now we are simply defining our individual tasks - we won’t actually set up our dependency structure until we create the full Flow.

[2]:
import datetime
import os

import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask


@task
def curl_cmd(url: str, fname: str) -> str:
    """
    The curl command we wish to execute.
    """
    if os.path.exists(fname):
        raise SKIP("Image data file already exists.")
    return "curl -fL -o {fname} {url}".format(fname=fname, url=url)


# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code

download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))

Transform

Next up, we need to define our task which loads the image data file and splits it into multiple frames. In this case, each frame is delimited by 4 newlines. Note that, in the event the previous two tasks are “Skipped”, the default behavior in Prefect is to skip downstream dependencies as well. However, as with most things in Prefect, this behavior is customizable. In this case, we want this task to run regardless of whether the upstreams skipped or not, so we set the skip_on_upstream_skip flag to False.

[3]:
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
    """
    Loads image data file at `fname` and splits it into
    multiple frames.  Returns a list of bytes, one element
    for each frame.
    """
    with open(fname, "rb") as f:
        images = f.read()

    return [img for img in images.split(b"\n" * 4) if img]

Load

Finally, we want to write our frames to disk as well as combine the frames into a single GIF. In order to achieve this goal, we are going to utilize Prefect’s task “mapping” feature which conveniently spawns new tasks in response to upstream outputs. In this case, we will write a single task for writing an image to disk, and “map” this task over all the image frames returned by load_and_split above! To infer which frame we are on, we look in `prefect.context <https://docs.prefect.io/guide/core_concepts/execution.html#context>`__.

Additionally, we can “reduce” over a mapped task - in this case, we will take the collection of mapped tasks and pass them into our combine_to_gif task for creating and saving our GIF.

[4]:
@task
def write_to_disk(image: bytes) -> bytes:
    """
    Given a single image represented as bytes, writes the image
    to the present working directory with a filename determined
    by `map_index`.  Returns the image bytes.
    """
    frame_no = prefect.context.get("map_index")
    with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
        f.write(image)
    return image
[5]:
import imageio
from io import BytesIO


@task
def combine_to_gif(image_bytes: list) -> None:
    """
    Given a list of ordered images represented as bytes,
    combines them into a single GIF stored in the present working directory.
    """
    images = [imageio.imread(BytesIO(image)) for image in image_bytes]
    imageio.mimsave('./clip.gif', images)

Build the Flow

Finally, we need to put our tasks together into a Prefect “Flow”. Similar to Dask’s delayed interface, all computation is deferred and no Task code will be executed in this step. Because Prefect maintains a stricter contract between tasks and additionally needs the ability to run in non-Dask execution environments, the mechanism for deferring execution is independent of Dask.

In addition to the tasks we have already defined, we introduce two “Parameters” for specifying the URL and local file location of our data. At runtime, we can optionally override these tasks to return different values.

[6]:
from prefect import Parameter, Flow


DATA_URL = Parameter("DATA_URL",
                     default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")

DATA_FILE = Parameter("DATA_FILE", default="image-data.img")


with Flow("Image ETL") as flow:

    # Extract
    command = curl_cmd(DATA_URL, DATA_FILE)
    curl = download(command=command)

    # Transform
    # we use the `upstream_tasks` keyword to specify non-data dependencies
    images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])

    # Load
    frames = write_to_disk.map(images)
    result = combine_to_gif(frames)


flow.visualize()
[6]:
../_images/applications_prefect-etl_12_0.svg

Running the Flow on Dask

Now we have built our Flow, independently of Dask. We could execute this Flow sequentially, Task after Task, but there is inherent parallelism in our mapping of the images to files that we want to exploit. Luckily, Dask makes this easy to achieve.

First, we will start a local Dask cluster. Then, we will run our Flow against Prefect’s DaskExecutor, which will submit each Task to our Dask cluster and use Dask’s distributed scheduler for determining when and where each Task should run. Essentially, we built a Directed Acylic Graph (DAG) and are simply “submitting” that DAG to Dask for handling its execution in a distributed way.

[7]:
# start our Dask cluster
from dask.distributed import Client


client = Client(n_workers=4, threads_per_worker=1)

# point Prefect's DaskExecutor to our Dask cluster

from prefect.engine.executors import DaskExecutor

executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
[2019-10-16 21:14:51,061] INFO - prefect.FlowRunner | Beginning Flow run for 'Image ETL'
[2019-10-16 21:14:51,065] INFO - prefect.FlowRunner | Starting flow run.
[2019-10-16 21:14:56,897] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[7]:
<Success: "All reference tasks succeeded.">

Next Steps

Now that we’ve built our workflow, what next? The interested reader should try to:

  • run the Flow again to see how the SKIP signal behaves

  • use different parameters for both the URL and the file location (Parameter values can be overriden by simply passing their names as keyword arguments to flow.run())

  • introduce a new Parameter for the filename of the final GIF

  • use Prefect’s scheduler interface to run our workflow on a schedule

Play

Finally, let’s watch our creation!

[8]:
from IPython.display import HTML

HTML('<img src="./clip.gif" alt="Rick Daskley">')
[8]:
Rick Daskley