prefect dag scheduling — backend server, tasks, flow setup

diary of a codelovingyogi
4 min readApr 5, 2021

--

September 10, 2020

working on a dag scheduler proof-of-concept (POC) using prefect

docker run -it prefecthq/prefect:latest

i did not have image locally, so docker pulls the latest for me

(note this image will bring you directly into python interpreter)

docker images to see the prefect image:

to run backend server:

prefect backend server

getting error:

subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.

noted in docs:

Please note the server requires Docker and Docker Compose to be running.

so let’s run:

pip install docker-compose

then run:

prefect server start

you should see this before navigating to

http://localhost:8080

it will bring you to the dashboard:

(note: https brought me to a page indicating This site can't provide a secure connection , i tried http and it worked)

to see anything in dashboard, we need to register our flow. to register our flow, we also need to register to a project.

based onConcepts section of documentation , you can create a project like this:

from prefect import Clientclient = Client()
client.create_project(project_name=”Hello, World!”)

working through Deployment Tutorial section, i found below that worked for creating tasks and a flow:

import prefect
from prefect import task, Flow
@task
def hello_task(person: str) -> None:
print(f"Hello, {person}!")
logger = prefect.context.get("logger")
logger.info("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])# to take advantage of features from prefect API, need to register flow to a project
flow.register(project_name="Hello, World!")
# use agent to watch for flow runs scheduled by prefect api
flow.run_agent()

if you save this to file and run it (make sure you’re server is still running), you will see the flow:

to run the flow you can also use the CLI:

prefect run flow --name hello-flow --project 'Hello, World!'

the terminal where i was running the agent shows the flow i ran:

i can also see a run history in the UI:

if you create a second flow, you must register is before you can run it:

import prefect
from prefect import task, Flow
@task
def another_task():
logger = prefect.context.get("logger")
logger.info("Our second Flow!")
flow = Flow("second-flow", tasks=[another_task])flow.register(project_name="Hello, World!")

you can then run both flows when you want/at the same time:

standalone agent vs. cli agent

above we are monitoring flows via:

(the agent must be running for flows to get processed, otherwise it will just be scheduled but not run)

if you have a flow set up, you can include:

flow.run_agent()

or you can run

prefect agent start

or you can also install a supervised agent, a cli agent that is always running in background by entering the following commands:

pip install supervisor
prefect agent install local > supervisord.conf
supervisord

using docker agent to execute flow runs within docker container:

added a new flow and wanted to restart agent, got an error trying to run cli agent:

Another program is already listening on a port that one of our HTTP servers is configured to use. Shut this program down first before starting supervisord.

to shut it down, i ran:

ps -ef | grep supervisord

i fetched the PID :

then

kill -s SIGTERM 69800

and restarted agent:

supervisord

to get flows running, i could not seem to get it working via python client. for example, in one of the tutorials, it appears that flow.run()would trigger a run. when i run my python script, i see this:

but the agent doesn’t pick it up and nor does it appear in my dashboard.

to get my job running, i was able to:

  • via cli command (as listed above)
prefect run flow --name hello-flow --project 'Hello, World!'
  • via cli command with parameters
prefect run flow --name hello-flow --project 'Hello, World!' --parameters-string '{"name": "yogi"}'
  • via the UI / dashboard, there is a button to Quick Run:
  • if you have a parameter, you can input during a manual run via the Parameters section:
  • to set task dependencies within a flow, you can use either set_upstream or set_dependencies methods:
with Flow("parent-flow", schedule=daily_schedule) as flow:
flow4.set_upstream(flow1)
flow4.set_upstream(flow2)
flow4.set_upstream(flow3)

or

flow.set_dependencies(
task=hello_task
)

--

--

diary of a codelovingyogi
diary of a codelovingyogi

Written by diary of a codelovingyogi

quick notes, snippets, bugs, and fixes

No responses yet