Prefect is a platform for automating data workflows. Data engineers and data scientists can build, test and deploy production pipelines without worrying about all of the "negative engineering" aspects of production. For example, Prefect makes it easy to deploy a workflow that runs on a complicated schedule, requires task retries in the event of failures, and sends notifications when certain tasks are complete. Prefect was built on top of Dask, and relies on Dask to schedule and manage the execution of a Prefect workflow in a distributed environment.
This example demonstrates running a Prefect ETL Flow on Dask which ultimately creates a GIF. While this is a somewhat unconventional use case of Prefect, we're no strangers to unconventional use cases.
In the world of workflow engines, Prefect supports many unique features; in this particular example we will see:
You wouldn't get this from any other engine.
Contents
To demonstrate how Prefect and Dask work together, we are going to build and execute a standard "Extract / Transform / Load" (ETL) workflow for processing some basic image data. Most ETL workflows involve a scheduled migration of data from one database to another. In our case, we will be moving data from a file located at a known URL to our local hard disk, converting the individual file into a series of frames, and compiling those frames into a GIF. The URL references a file containing raw bytes such as:
b"""aÙw˜ ≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω!µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV!HCDBB5;5?"""
The steps of our workflow will be as follows:
Parameter
) to diskOnce we have built our Flow, we can execute it with different values for the Parameter
or even run it on a nightly schedule.
NOTE: If we planned on executing this Flow in a truly distributed environment, writing the images to the local filesystem would not be appropriate. We would instead use an external datastore such as Google Cloud Storage, or a proper database.
First, we will define our tasks for extracting the image data file from a given URL and saving it to a given file location. To do so, we will utilize two methods for creating Prefect Tasks:
task
decorator for converting any Python function into a taskAdditionally, we will utilize the following Prefect concepts:
curl
command fails to connect, we want it to retry up to 2 times with a 10 second delay. This way, if we run this workflow on a schedule we won't need to concern ourselves with temporary intermittent connection issues.Right now we are simply defining our individual tasks - we won't actually set up our dependency structure until we create the full Flow.
import datetime
import os
import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask
@task
def curl_cmd(url: str, fname: str) -> str:
"""
The curl command we wish to execute.
"""
if os.path.exists(fname):
raise SKIP("Image data file already exists.")
return "curl -fL -o {fname} {url}".format(fname=fname, url=url)
# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code
download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))
Next up, we need to define our task which loads the image data file and splits it into multiple frames. In this case, each frame is delimited by 4 newlines. Note that, in the event the previous two tasks are "Skipped", the default behavior in Prefect is to skip downstream dependencies as well. However, as with most things in Prefect, this behavior is customizable. In this case, we want this task to run regardless of whether the upstreams skipped or not, so we set the skip_on_upstream_skip
flag to False
.
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
"""
Loads image data file at `fname` and splits it into
multiple frames. Returns a list of bytes, one element
for each frame.
"""
with open(fname, "rb") as f:
images = f.read()
return [img for img in images.split(b"\n" * 4) if img]
Finally, we want to write our frames to disk as well as combine the frames into a single GIF. In order to achieve this goal, we are going to utilize Prefect's task "mapping" feature which conveniently spawns new tasks in response to upstream outputs. In this case, we will write a single task for writing an image to disk, and "map" this task over all the image frames returned by load_and_split
above! To infer which frame we are on, we look in prefect.context
.
Additionally, we can "reduce" over a mapped task - in this case, we will take the collection of mapped tasks and pass them into our combine_to_gif
task for creating and saving our GIF.
@task
def write_to_disk(image: bytes) -> bytes:
"""
Given a single image represented as bytes, writes the image
to the present working directory with a filename determined
by `map_index`. Returns the image bytes.
"""
frame_no = prefect.context.get("map_index")
with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
f.write(image)
return image
import imageio
from io import BytesIO
@task
def combine_to_gif(image_bytes: list) -> None:
"""
Given a list of ordered images represented as bytes,
combines them into a single GIF stored in the present working directory.
"""
images = [imageio.imread(BytesIO(image)) for image in image_bytes]
imageio.mimsave('./clip.gif', images)
Finally, we need to put our tasks together into a Prefect "Flow". Similar to Dask's delayed
interface, all computation is deferred and no Task code will be executed in this step. Because Prefect maintains a stricter contract between tasks and additionally needs the ability to run in non-Dask execution environments, the mechanism for deferring execution is independent of Dask.
In addition to the tasks we have already defined, we introduce two "Parameters" for specifying the URL and local file location of our data. At runtime, we can optionally override these tasks to return different values.
from prefect import Parameter, Flow
DATA_URL = Parameter("DATA_URL",
default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")
DATA_FILE = Parameter("DATA_FILE", default="image-data.img")
with Flow("Image ETL") as flow:
# Extract
command = curl_cmd(DATA_URL, DATA_FILE)
curl = download(command=command)
# Transform
# we use the `upstream_tasks` keyword to specify non-data dependencies
images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])
# Load
frames = write_to_disk.map(images)
result = combine_to_gif(frames)
flow.visualize()
Now we have built our Flow, independently of Dask. We could execute this Flow sequentially, Task after Task, but there is inherent parallelism in our mapping of the images to files that we want to exploit. Luckily, Dask makes this easy to achieve.
First, we will start a local Dask cluster. Then, we will run our Flow against Prefect's DaskExecutor
, which will submit each Task to our Dask cluster and use Dask's distributed scheduler for determining when and where each Task should run. Essentially, we built a Directed Acylic Graph (DAG) and are simply "submitting" that DAG to Dask for handling its execution in a distributed way.
# start our Dask cluster
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
# point Prefect's DaskExecutor to our Dask cluster
from prefect.executors import DaskExecutor
executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
Now that we've built our workflow, what next? The interested reader should try to:
SKIP
signal behavesflow.run()
)from IPython.display import Image
Image(filename="clip.gif", alt="Rick Daskley")