prefect dag scheduling — adding schedules
October 5, 2020
upon reading documentation, adding a schedule for your flow is as easy as:
import datetime
from prefect.schedules import IntervalScheduleschedule = IntervalSchedule(interval=datetime.timedelta(minutes=2))
flow = Flow('workflow-name', schedule)
similarly adding a cron schedule would be:
from prefect.schedules import CronScheduledaily_schedule = CronSchedule("0 8 * * *")
flow = Flow('workflow-name', daily_schedule)
but if you have required parameters in any of your tasks, you will get an error:
"Flows with required parameters can not be scheduled automatically."
prefect.utilities.exceptions.ClientError: Flows with required parameters can not be scheduled automatically.
this simpler scheduler corresponds with what we see in the dashboard when we try to navigate to Settings
then Schedules
to create a new schedule:
if you want to schedule fixed times for a workflow and pass in different parameter values, you use clocks
and the build your Schedule
import datetime
from prefect import Flow
from prefect.schedules import clocks, Schedulenow = datetime.datetime.utcnow()
flow = Flow('workflow-name')interval_clock = clocks.IntervalClock(
start_date=now,
interval=datetime.timedelta(minutes=5),
parameter_defaults={"param1": "value"}
)cron_clock = clocks.CronClock(
cron="30 6 * * *",
parameter_defaults={"param1": "value"}
)schedule = Schedule(clocks=[interval_clock, cron_clock])flow.schedule = schedule # set the schedule on the Flow
when running the prefect server locally, i didn’t see the schedule in prefect dashboard UI with my parameterized schedules. to see that my schedules were active, i additionally ran a flow to trigger the agent to run in my terminal:
flow.run()
i don’t see in the UI that my more complex schedules appear, not sure if this is a feature i would be able to see once i connect to prefect cloud
# needed to re-register flow to project
# when combined with full task definition got the same error