prefect dag scheduling — repo strategy

diary of a codelovingyogi
2 min readJun 17, 2021

October 6, 2020

thinking about creating a separate task for each repo we have. each repo usually stores the code for its lambda execution code. trying to see if we can iteratively update a flow once it’s been created so that we can configure each flow’s task in its respective repository.

  1. i create an initial flow with one task, a parameter for that task, and bind the parameter to my task in the flow
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=1))def batch_fetch_coredb(obj_type: str) -> str:
print('invoke lambda for data_batch_fetch_coredb')
input = {"data_type": obj_type}
logger = prefect.context.get("logger")
logger.info("Invoked lambda")
logger.warning("A warning message.")
return 'status of lambda function that would trigger success'
flow = Flow('<flow-name>')
ingest_dtype = Parameter('ingest_dtype')
flow.add_task(batch_fetch_coredb)
batch_fetch_coredb.bind(obj_type=ingest_dtype, flow=flow)flow.register(project_name="<project name>")

make sure for any changes to a flow that you .register to the project name or you won’t see the update in the UI.

after you run the code, you should see something like this to indicate an update to your flow:

in my UI, i see my new flow with single task and parameter:

2. add another task, add task to flow, add edge between tasks to set dependency

@task
def batch_move_landing() -> bool:
today_date_str = datetime.datetime.today().strftime('%Y-%m-%d')
s3_conn = S3List('<bucket-name>')
files = s3_conn.run(f'<prefix>/dt={today_date_str}')
logger = prefect.context.get("logger")
logger.info(files)
if not files:
raise Exception('error: no files moved today')
else:
logger.info('confirmed that file moved')
return 'return status that would trigger success'
flow = Flow('<flow-name>')
ingest_dtype = Parameter('ingest_dtype')
flow.add_task(batch_move_landing)
flow.add_edge(
upstream_task=batch_fetch_coredb,
downstream_task=batch_move_landing
)
# batch_move_landing.set_upstream(batch_fetch_coredb, flow=flow)flow.register(project_name="e2e-poc")print(flow.tasks)

unfortunately when we do this, it overrides what we registered previously. setting the upstream task requires that you have the original task. you also need the parameter defined and binded to a task otherwise it doesn’t show up in your workflow. at a quick glance, there does not seem to be an easy to isolate your task creation and build your flow iteratively.

update: June 2021

  • we ended up included all tasks for a given flow within the same repo, any updates to a task registers the entire flow
  • we have a separate orchestrator repo that connects all dependent flows to each other

--

--