Live Notebook

You can run this notebook in a live session or view it on Github.

# ETL Pipelines with Prefect¶

Prefect is a platform for automating data workflows. Data engineers and data scientists can build, test and deploy production pipelines without worrying about all of the “negative engineering” aspects of production. For example, Prefect makes it easy to deploy a workflow that runs on a complicated schedule, requires task retries in the event of failures, and sends notifications when certain tasks are complete. Prefect was built on top of Dask, and relies on Dask to schedule and manage the execution of a Prefect workflow in a distributed environment.

This example demonstrates running a Prefect ETL Flow on Dask which ultimately creates a GIF. While this is a somewhat unconventional use case of Prefect, we’re no strangers to unconventional use cases.

In the world of workflow engines, Prefect supports many unique features; in this particular example we will see:

• parametrization of workflows

• dynamic runtime “mapping” of workflow tasks

• customizable execution logic

You wouldn’t get this from any other engine.

Contents

## Goal¶

To demonstrate how Prefect and Dask work together, we are going to build and execute a standard “Extract / Transform / Load” (ETL) workflow for processing some basic image data. Most ETL workflows involve a scheduled migration of data from one database to another. In our case, we will be moving data from a file located at a known URL to our local hard disk, converting the individual file into a series of frames, and compiling those frames into a GIF. The URL references a file containing raw bytes such as:

b"""aÙw˜≠•∆≠≠ﬁ#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµﬁ••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV

! HCDBB5;5?

“””

The steps of our workflow will be as follows: - Extract: pull the data file from a URL (speicified by a Parameter) to disk - Transform: split the file into multiple files, each corresponding to a single frame - Load: Store each frame individually, and compile the frames together into a GIF

Once we have built our Flow, we can execute it with different values for the Parameter or even run it on a nightly schedule.

NOTE: If we planned on executing this Flow in a truly distributed environment, writing the images to the local filesystem would not be appropriate. We would instead use an external datastore such as Google Cloud Storage, or a proper database.

## Setting up our environment¶

Before proceeding, we need to install both the prefect <https://pypi.org/project/prefect/>__ and imageio <https://pypi.org/project/imageio/>__ packages.

[1]:

!pip install imageio prefect[viz]

Requirement already satisfied: imageio in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (2.9.0)
Collecting prefect[viz]

Requirement already satisfied: numpy in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from imageio) (1.18.5)
Requirement already satisfied: pillow in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from imageio) (7.2.0)
Collecting toml<1.0,>=0.9.4
Collecting marshmallow<3.6.2,>=3.0.0b19

Requirement already satisfied: pyyaml<5.4,>=3.13 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (5.3.1)
Requirement already satisfied: urllib3>=1.24.3 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (1.25.9)
Collecting mypy-extensions<1.0,>=0.4.0
Requirement already satisfied: python-dateutil~=2.7 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (2.8.1)
Collecting croniter<1.0,>=0.3.24
Collecting docker<5.0,>=3.4.1

Requirement already satisfied: pytz>=2018.7 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (2020.1)
Requirement already satisfied: distributed<3.0,>=2.8.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (2.20.0)
Collecting pendulum<3.0,>=2.0.4

Requirement already satisfied: requests<3.0,>=2.20 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (2.24.0)
Collecting python-box<5.0,>=3.4.4
Collecting tabulate<1.0,>=0.8.0
Requirement already satisfied: click<8.0,>=7.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (7.1.2)
Collecting marshmallow-oneofschema<3.0,>=2.0.0b2
Collecting python-slugify<5.0,>=1.2.6
Requirement already satisfied: cloudpickle<2.0,>=0.6.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (1.5.0)
Requirement already satisfied: graphviz>=0.8.3; extra == "viz" in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from prefect[viz]) (0.14.1)
Requirement already satisfied: six>=1.5 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from python-dateutil~=2.7->prefect[viz]) (1.15.0)
Collecting natsort
Collecting websocket-client>=0.32.0

Requirement already satisfied: msgpack>=0.6.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (1.0.0)
Requirement already satisfied: setuptools in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (49.2.0.post20200712)
Requirement already satisfied: tblib>=1.6.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (1.6.0)
Requirement already satisfied: zict>=0.1.3 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (2.0.0)
Requirement already satisfied: toolz>=0.8.2 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (0.10.0)
Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (2.2.2)
Requirement already satisfied: tornado>=6.0.3; python_version >= "3.8" in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (6.0.4)
Requirement already satisfied: psutil>=5.0 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from distributed<3.0,>=2.8.0->prefect[viz]) (5.7.0)
Collecting pytzdata>=2020.1

Requirement already satisfied: chardet<4,>=3.0.2 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from requests<3.0,>=2.20->prefect[viz]) (3.0.4)
Requirement already satisfied: idna<3,>=2.5 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from requests<3.0,>=2.20->prefect[viz]) (2.10)
Requirement already satisfied: certifi>=2017.4.17 in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from requests<3.0,>=2.20->prefect[viz]) (2020.6.20)
Collecting ruamel.yaml

Collecting text-unidecode>=1.3

Requirement already satisfied: heapdict in /home/travis/miniconda/envs/test/lib/python3.8/site-packages (from zict>=0.1.3->distributed<3.0,>=2.8.0->prefect[viz]) (1.0.1)
Collecting ruamel.yaml.clib>=0.1.2; platform_python_implementation == "CPython" and python_version < "3.9"

Building wheels for collected packages: python-slugify
Building wheel for python-slugify (setup.py) ... - done
Created wheel for python-slugify: filename=python_slugify-4.0.1-py2.py3-none-any.whl size=6767 sha256=2339414d7964ffc97b340b99fd936c236b7919ea4956e20b7935136516c0f2f5
Stored in directory: /home/travis/.cache/pip/wheels/91/4d/4f/e740a68c215791688c46c4d6251770a570e8dfea91af1acb5c
Successfully built python-slugify
Installing collected packages: toml, marshmallow, mypy-extensions, natsort, croniter, websocket-client, docker, pytzdata, pendulum, ruamel.yaml.clib, ruamel.yaml, python-box, tabulate, marshmallow-oneofschema, text-unidecode, python-slugify, prefect
Successfully installed croniter-0.3.34 docker-4.2.2 marshmallow-3.6.1 marshmallow-oneofschema-2.0.1 mypy-extensions-0.4.3 natsort-7.0.1 pendulum-2.1.1 prefect-0.12.3 python-box-4.2.3 python-slugify-4.0.1 pytzdata-2020.1 ruamel.yaml-0.16.10 ruamel.yaml.clib-0.2.0 tabulate-0.8.7 text-unidecode-1.3 toml-0.10.1 websocket-client-0.57.0


### Extract¶

First, we will define our tasks for extracting the image data file from a given URL and saving it to a given file location. To do so, we will utilize two methods for creating Prefect Tasks: - the task decorator for converting any Python function into a task - a pre-written, configurable Task from the Prefect “Task Library” which helps us abstract some standard boilerplate

Additionally, we will utilize the following Prefect concepts: - a Prefect signal for marking this task and its downstream depedencies as successfully “Skipped” if the file is already present in our local filesystem - retry semantics: if, for whatever reason, our curl command fails to connect, we want it to retry up to 2 times with a 10 second delay. This way, if we run this workflow on a schedule we won’t need to concern ourselves with temporary intermittent connection issues.

Right now we are simply defining our individual tasks - we won’t actually set up our dependency structure until we create the full Flow.

[2]:

import datetime
import os

import prefect
from prefect.engine.signals import SKIP

def curl_cmd(url: str, fname: str) -> str:
"""
The curl command we wish to execute.
"""
if os.path.exists(fname):
raise SKIP("Image data file already exists.")
return "curl -fL -o {fname} {url}".format(fname=fname, url=url)

# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code



### Transform¶

Next up, we need to define our task which loads the image data file and splits it into multiple frames. In this case, each frame is delimited by 4 newlines. Note that, in the event the previous two tasks are “Skipped”, the default behavior in Prefect is to skip downstream dependencies as well. However, as with most things in Prefect, this behavior is customizable. In this case, we want this task to run regardless of whether the upstreams skipped or not, so we set the skip_on_upstream_skip flag to False.

[3]:

@task(skip_on_upstream_skip=False)
"""
Loads image data file at fname and splits it into
multiple frames.  Returns a list of bytes, one element
for each frame.
"""
with open(fname, "rb") as f:

return [img for img in images.split(b"\n" * 4) if img]


Finally, we want to write our frames to disk as well as combine the frames into a single GIF. In order to achieve this goal, we are going to utilize Prefect’s task “mapping” feature which conveniently spawns new tasks in response to upstream outputs. In this case, we will write a single task for writing an image to disk, and “map” this task over all the image frames returned by load_and_split above! To infer which frame we are on, we look in prefect.context <https://docs.prefect.io/guide/core_concepts/execution.html#context>__.

Additionally, we can “reduce” over a mapped task - in this case, we will take the collection of mapped tasks and pass them into our combine_to_gif task for creating and saving our GIF.

[4]:

@task
def write_to_disk(image: bytes) -> bytes:
"""
Given a single image represented as bytes, writes the image
to the present working directory with a filename determined
by map_index.  Returns the image bytes.
"""
frame_no = prefect.context.get("map_index")
with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
f.write(image)
return image

[5]:

import imageio
from io import BytesIO

def combine_to_gif(image_bytes: list) -> None:
"""
Given a list of ordered images represented as bytes,
combines them into a single GIF stored in the present working directory.
"""
images = [imageio.imread(BytesIO(image)) for image in image_bytes]
imageio.mimsave('./clip.gif', images)


## Build the Flow¶

Finally, we need to put our tasks together into a Prefect “Flow”. Similar to Dask’s delayed interface, all computation is deferred and no Task code will be executed in this step. Because Prefect maintains a stricter contract between tasks and additionally needs the ability to run in non-Dask execution environments, the mechanism for deferring execution is independent of Dask.

In addition to the tasks we have already defined, we introduce two “Parameters” for specifying the URL and local file location of our data. At runtime, we can optionally override these tasks to return different values.

[6]:

from prefect import Parameter, Flow

DATA_URL = Parameter("DATA_URL",
default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")

DATA_FILE = Parameter("DATA_FILE", default="image-data.img")

with Flow("Image ETL") as flow:

# Extract
command = curl_cmd(DATA_URL, DATA_FILE)

# Transform
# we use the upstream_tasks keyword to specify non-data dependencies

frames = write_to_disk.map(images)
result = combine_to_gif(frames)

flow.visualize()

[6]:


### Running the Flow on Dask¶

Now we have built our Flow, independently of Dask. We could execute this Flow sequentially, Task after Task, but there is inherent parallelism in our mapping of the images to files that we want to exploit. Luckily, Dask makes this easy to achieve.

First, we will start a local Dask cluster. Then, we will run our Flow against Prefect’s DaskExecutor, which will submit each Task to our Dask cluster and use Dask’s distributed scheduler for determining when and where each Task should run. Essentially, we built a Directed Acylic Graph (DAG) and are simply “submitting” that DAG to Dask for handling its execution in a distributed way.

[7]:

# start our Dask cluster

flow.run(executor=executor)

[2020-07-14 21:51:01] INFO - prefect.FlowRunner | Beginning Flow run for 'Image ETL'
[2020-07-14 21:51:01] INFO - prefect.FlowRunner | Starting flow run.

/home/travis/miniconda/envs/test/lib/python3.8/site-packages/distributed/worker.py:3376: UserWarning: Large object of size 2.03 MB detected in task graph:
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

future = client.submit(func, big_data)    # bad

big_future = client.scatter(big_data)     # good
future = client.submit(func, big_future)  # good
warnings.warn(

[2020-07-14 21:51:05] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

[7]:

<Success: "All reference tasks succeeded.">


## Next Steps¶

Now that we’ve built our workflow, what next? The interested reader should try to:

• run the Flow again to see how the SKIP signal behaves

• use different parameters for both the URL and the file location (Parameter values can be overriden by simply passing their names as keyword arguments to flow.run())

• introduce a new Parameter for the filename of the final GIF

• use Prefect’s scheduler interface to run our workflow on a schedule

## Play¶

Finally, let’s watch our creation!

[8]:

from IPython.display import HTML


[8]: