prefect dag scheduling — backend server, tasks, flow setup
September 10, 2020
working on a dag scheduler proof-of-concept (POC) using prefect
docker run -it prefecthq/prefect:latest
i did not have image locally, so docker pulls the latest for me
(note this image will bring you directly into python interpreter)
docker images
to see the prefect image:
to run backend server:
prefect backend server
getting error:
subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
noted in docs:
Please note the server requires Docker and Docker Compose to be running.
so let’s run:
pip install docker-compose
then run:
prefect server start
you should see this before navigating to
http://localhost:8080
it will bring you to the dashboard:
(note: https brought me to a page indicating This site can't provide a secure connection
, i tried http
and it worked)
to see anything in dashboard, we need to register our flow. to register our flow, we also need to register to a project.
based onConcepts
section of documentation , you can create a project like this:
from prefect import Clientclient = Client()
client.create_project(project_name=”Hello, World!”)
working through Deployment Tutorial
section, i found below that worked for creating tasks and a flow:
import prefect
from prefect import task, Flow@task
def hello_task(person: str) -> None:
print(f"Hello, {person}!")
logger = prefect.context.get("logger")
logger.info("Hello, Cloud!")flow = Flow("hello-flow", tasks=[hello_task])# to take advantage of features from prefect API, need to register flow to a project
flow.register(project_name="Hello, World!")# use agent to watch for flow runs scheduled by prefect api
flow.run_agent()
if you save this to file and run it (make sure you’re server is still running), you will see the flow:
to run the flow you can also use the CLI:
prefect run flow --name hello-flow --project 'Hello, World!'
the terminal where i was running the agent shows the flow i ran:
i can also see a run history in the UI:
if you create a second flow, you must register is before you can run it:
import prefect
from prefect import task, Flow@task
def another_task():
logger = prefect.context.get("logger")
logger.info("Our second Flow!")flow = Flow("second-flow", tasks=[another_task])flow.register(project_name="Hello, World!")
you can then run both flows when you want/at the same time:
standalone agent vs. cli agent
above we are monitoring flows via:
(the agent must be running for flows to get processed, otherwise it will just be scheduled but not run)
if you have a flow set up, you can include:
flow.run_agent()
or you can run
prefect agent start
or you can also install a supervised agent, a cli agent that is always running in background by entering the following commands:
pip install supervisor
prefect agent install local > supervisord.conf
supervisord
using docker agent to execute flow runs within docker container:
added a new flow and wanted to restart agent, got an error trying to run cli agent:
Another program is already listening on a port that one of our HTTP servers is configured to use. Shut this program down first before starting supervisord.
to shut it down, i ran:
ps -ef | grep supervisord
i fetched the PID
:
then
kill -s SIGTERM 69800
and restarted agent:
supervisord
to get flows running, i could not seem to get it working via python client. for example, in one of the tutorials, it appears that flow.run()
would trigger a run. when i run my python script, i see this:
but the agent doesn’t pick it up and nor does it appear in my dashboard.
to get my job running, i was able to:
- via cli command (as listed above)
prefect run flow --name hello-flow --project 'Hello, World!'
- via cli command with parameters
prefect run flow --name hello-flow --project 'Hello, World!' --parameters-string '{"name": "yogi"}'
- via the UI / dashboard, there is a button to
Quick Run:
- if you have a parameter, you can input during a manual run via the
Parameters
section:
- to set task dependencies within a flow, you can use either
set_upstream
orset_dependencies
methods:
with Flow("parent-flow", schedule=daily_schedule) as flow:
flow4.set_upstream(flow1)
flow4.set_upstream(flow2)
flow4.set_upstream(flow3)
or
flow.set_dependencies(
task=hello_task
)