My Experience with Prefect so Far (and a quick getting started guide)

Overall initial impressions of Pros of Prefect

The UI

  • Not only is Prefect’s UI pleasing to the eye, but it provides good information at a glance that may not be easily available otherwise. Airflow’s UI provides an overview of status of DAGs, additional detail of functions within a DAG, but in production workflows to monitor logging stats generally other tools or BI are implemented. These basic items within the Prefect UI like run time, upcoming runs, and current status are nice to see at a glance and can help from an admin perspective. There is also a decent amount of configuration within the UI that I think can be useful over time.
  • In general data engineers seem to be all in on this movement for “everything as code”. And sure, I love terraform, LookML, and other code-based frameworks that make things repeatable and git-able. But with these code management processes, there are certain elements where adjustable variables in a UI without code changes is a huge plus. For example, having the ability to set up, change, or adjust schedules in the UI rather than adjust the DAG code. Or change the number days back to ingest data. You may by default only look for changes in the past few days, but what if there was an issue and you need to reload back 30 days?
  • If you have used Airflow before and understand the concepts of a DAG, then Prefect will be a piece of cake. However if you’re new to orchestration, I think Prefect’s workflow is easier to understand. The concepts of tasks and flows are pretty logical and easy to grasp with a bit of testing. And the development process of developing and testing locally before registering a task into production is great, not to mention the versioning within Prefect that doesn’t exist in Airflow jobs.

Getting Started with Prefect

Prefect’s docs

So you have a python script getting data from an API, doing some sort of transformation, and landing it in your data warehouse of choice. Or you have a python script that takes data from your data warehouse and sends it somewhere else. It works locally, but how do you set this up on a schedule that’s not dependent on your system being online?

First step is to get a Prefect job running locally before scheduling and hosting it. If you have a working python script, making it a Prefect flow is pretty striaght-forward.

  1. Install Prefect with your installer of choice
sudo pip install prefect
import pandas or whatever
from prefect import task
from prefect import Flow
@task # this is the prefect identifier
def function1(X):
@task
def function2(Y):
with flow(name='Test Flow') as flow:
function1results = function1(conn)
function2(function1results)
python prefect_test.py
prefect run -p prefect_test.py

Now that we have Prefect flow running locally, we need to host it somewhere.

  • Start be spinning up a GCP VM with a Ubuntu image, this can be done by searching Compute Engine in the GCP portal and clicking “Create Instance”. Select CPU and boot disk based on your requirements. I generally prefer GCP, but you can set this up on any cloud service of your choice.
  • SSH into the new machine. This can be done by clicking SSH within the GCP console or by authing in via terminal using the gcloud sdk.
sudo apt update
sudo apt install python3-pip
  • Install necessary Python packages leveraged in my script via pip
  • Install Prefect via pip
  • Loaded the script to the VM
  • Test running the script on the VM. This may take some trial and error depending on how thorough you are with installing the Python packages via pip above.

Once you have the script running on the VM, let’s get things set up for the Cloud and scheduling.

First, set up the prefect config to use cloud:

prefect backend cloud
prefect auth login --key <YOUR-KEY>
pip install supervisor
[unix_http_server]
file=/tmp/supervisor.sock ; the path to the socket file
[supervisord]
loglevel=debug ; log level; default info; others: debug,warn,trace
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
[program:prefect-agent]
command=prefect agent local start
from prefect.tasks.secrets import PrefectSecret@task()
def function1(test_key):
someapi.api_key = test_key['key']

with flow(name="test_flow") as flow:
test_key = PrefectSecret("Test")
conn = snowflake.connector.connect(
user = snowflake_creds['user'],
password = snowflake_creds['password']
account = snowflake_creds['account'],
warehouse = snowflake_creds['warehouse'],
database = snowflake_creds['database'],
schema = snowflake_creds['schema'],
role = snowflake_creds['role'])
cur = conn.cursor()cur.execute()
prefect register -p test_script.py --project TestProject
  • Running jobs in docker containers to protect the package dependencies
  • Setting up git for the file-management for prefect flows when moving to production

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store