My Experience with Prefect so Far (and a quick getting started guide)
Airflow is widely recognized as the leading orchestration tool for scheduling workflows, and it’s open source framework has been starred on github 25k times with 15k commits. Recently I’ve seen buzz rising about Prefect and wanted to check it out. So the past few weeks I’ve been learning, testing, and setting up Prefect and I wanted to share my thoughts so far.
Overall initial impressions of Pros of Prefect
- Not only is Prefect’s UI pleasing to the eye, but it provides good information at a glance that may not be easily available otherwise. Airflow’s UI provides an overview of status of DAGs, additional detail of functions within a DAG, but in production workflows to monitor logging stats generally other tools or BI are implemented. These basic items within the Prefect UI like run time, upcoming runs, and current status are nice to see at a glance and can help from an admin perspective. There is also a decent amount of configuration within the UI that I think can be useful over time.
- In general data engineers seem to be all in on this movement for “everything as code”. And sure, I love terraform, LookML, and other code-based frameworks that make things repeatable and git-able. But with these code management processes, there are certain elements where adjustable variables in a UI without code changes is a huge plus. For example, having the ability to set up, change, or adjust schedules in the UI rather than adjust the DAG code. Or change the number days back to ingest data. You may by default only look for changes in the past few days, but what if there was an issue and you need to reload back 30 days?
Ease of Use and Development Workflow
- If you have used Airflow before and understand the concepts of a DAG, then Prefect will be a piece of cake. However if you’re new to orchestration, I think Prefect’s workflow is easier to understand. The concepts of tasks and flows are pretty logical and easy to grasp with a bit of testing. And the development process of developing and testing locally before registering a task into production is great, not to mention the versioning within Prefect that doesn’t exist in Airflow jobs.
Getting Started with Prefect
So you have a python script getting data from an API, doing some sort of transformation, and landing it in your data warehouse of choice. Or you have a python script that takes data from your data warehouse and sends it somewhere else. It works locally, but how do you set this up on a schedule that’s not dependent on your system being online?
Prefect has a free Cloud offering for 1 developer, and is a good way to get started. Also of note that Prefect Cloud does not mean that integration jobs are running on their servers, it just means that Agents send updates to Prefect Cloud and the front-end is hosted by Prefect.
First step is to get a Prefect job running locally before scheduling and hosting it. If you have a working python script, making it a Prefect flow is pretty striaght-forward.
- Install Prefect with your installer of choice
sudo pip install prefect
2. Nest your job inside of the Prefect statements
import pandas or whatever
from prefect import task
from prefect import Flow@task # this is the prefect identifier
def function2(Y):with flow(name='Test Flow') as flow:
function1results = function1(conn)
3. Try to run your Prefect flow locally
You can do this by either just executing the python statement or running it through Prefect
prefect run -p prefect_test.py
Now that we have Prefect flow running locally, we need to host it somewhere.
- Start be spinning up a GCP VM with a Ubuntu image, this can be done by searching Compute Engine in the GCP portal and clicking “Create Instance”. Select CPU and boot disk based on your requirements. I generally prefer GCP, but you can set this up on any cloud service of your choice.
- SSH into the new machine. This can be done by clicking SSH within the GCP console or by authing in via terminal using the gcloud sdk.
Once logged into the machine, we need to set it up to be able to execute the prefect jobs.
- Install docker and docker compose: set up docs
- Install pip
sudo apt update
sudo apt install python3-pip
- Install necessary Python packages leveraged in my script via pip
- Install Prefect via pip
- Loaded the script to the VM
- Test running the script on the VM. This may take some trial and error depending on how thorough you are with installing the Python packages via pip above.
Once you have the script running on the VM, let’s get things set up for the Cloud and scheduling.
First, set up the prefect config to use cloud:
prefect backend cloud
Next, go into the Prefect Cloud account and create an API Key (Prefect now calls these Service Accounts)
Set up your VM user to auth into the Cloud:
prefect auth login --key <YOUR-KEY>
In order for Prefect cloud to get updates from the VM, you need to install an Agent to check the status and run Prefect jobs. I chose to use supervisord to accomplish this.
You’ll need to install it:
pip install supervisor
Then you can use the Prefect CLI to generate a .conf file. I saved the following as supervisord.conf and loaded it to the local VM directory.
file=/tmp/supervisor.sock ; the path to the socket file
loglevel=debug ; log level; default info; others: debug,warn,trace
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
command=prefect agent local start
You can check the status using the supervisorctl command
I prefer not to save my API secrets or database credentials in the Python file, so leveraging the Cloud Secrets vault within Prefect to handle these as variables is a good option if you’re not using another keyvault.
In order to set this up, save the secrets in a JSON format in the Cloud UI:
And in your python script, reference these secrets as variables:
from prefect.tasks.secrets import PrefectSecret@task()
someapi.api_key = test_key['key']
with flow(name="test_flow") as flow:
test_key = PrefectSecret("Test")
In particular I thought it was helpful to save Snowflake connection configuration credentials this way:
conn = snowflake.connector.connect(
user = snowflake_creds['user'],
password = snowflake_creds['password']
account = snowflake_creds['account'],
warehouse = snowflake_creds['warehouse'],
database = snowflake_creds['database'],
schema = snowflake_creds['schema'],
role = snowflake_creds['role'])cur = conn.cursor()cur.execute()
** In order for this to work, you need to change the Prefect configuration file to use_local_secrets = false. I did this by installing locate to find the config file and using nano to edit it.
Once you have everything configured within your Flow, it’s time to register the flow so it’s recognized within the Prefect environment.
To register a flow, it’s as simple as:
prefect register -p test_script.py --project TestProject
You should now see the flow in the UI!
Schedules can be configured within the Flow code just like with an Airflow DAG, or you can set up and adjust schedules within the UI after a flow is registered.
Navigate to your flow and click on Settings, the Schedule option allows you to set up your cadence:
Also make sure you enable your schedule by hitting the toggle on the top right.
Finally, let’s set up some alerting.
To set this up as a Slack notification to a channel, create a slack app (instructutions here) and insert the slack webhook into the Cloud hook configuration section.
In more advanced setups and configurations for production workflows, these things should also be set up or considered:
- Running jobs in docker containers to protect the package dependencies
- Setting up git for the file-management for prefect flows when moving to production
Hopefully this basic walk-through was helpful to understand a bit more about Prefect. If this was interesting, I encourage you to go set up a flow yourself! If you enjoyed this article, go ahead and add me on LinkedIn, I hope to find time to continue to publish more content in 2022.
If you are looking for analytics help, consider us at Summit Advisory Team.
James leads the Analytics consulting practice at Summit Advisory Team and is CTO for Elevate, a supply chain visibility data platform. James prefers working with Looker, BigQuery & GCP, Snowflake, DBT, and whatever integration method works and is the easiest to maintain. James lives in California and is a new dad.