prefect dag scheduling — invoke lambda, logging

September 28, 2020

continuing with work i’m doing to complete a proof of concept (poc) using prefect as our dag scheduler

install prefect extras:

pip install "prefect[aws]"

adding secrets:

export PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4

read secrets from code:

import prefect
max_retries = prefect.config.tasks.defaults.max_retries

adding aws credentials:

export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>"}'

if you use assumed roles, you need to run code like this to get temp credentials:

import boto3sts_client = boto3.client(
service_name='sts',
aws_access_key_id='<access-key>',
aws_secret_access_key='<secret-key>',
)
assumed_role_object=sts_client.assume_role(
RoleArn='arn:aws:iam::<account-id>:role/<role-name>',
RoleSessionName='<session-name>'
)
credentials=assumed_role_object['Credentials']temp_access_key=credentials['AccessKeyId']
temp_secret_key=credentials['SecretAccessKey']
temp_session_token=credentials['SessionToken']

update your env var to include all temp credentials:

export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>", "SESSION_TOKEN": "<session-token>"}'

to invoke lambda, you can run:

import json
from prefect.tasks.aws import LambdaInvoke
payload = {"data_type": "<data-type>"}
payloadBytes = bytes(json.dumps(payload), encoding='utf8')
lambda_conn = LambdaInvoke(
function_name="<lambda-name>",
payload=payloadBytes,
qualifier="dev",
invocation_type="RequestResponse"
)
response = lambda_conn.run()
lambda_payload = response['Payload']
lambda_text = lambda_payload.read()
print(lambda_text)

now that i am able to invoke lambda, i need to figure out how i can trigger tasks to run within my flow that are dependent on the status of lambda. if you noticed in above code, i use param invocation_type="RequestResponse" . This arg will require that the lambda function finishes execution before returning a response. it will also include any function response you included in your lambda handler.

before i learned about invocation_type i had considered using:

  • state handlers to update the state of my task at the end of my lambda code
  • looking through lambda execution logs to verify the successful run, i can do this within the same task that i invoke my lambda function (since for poc — i don’t want to modify our existing lambda code)

logging in prefect can be added in your task function:

logger = prefect.context.get("logger")
logger.info("Invoked lambda")
logger.warning("A warning message.")

it shows up in the prefect dashboard like this:

speaking of logs, for poc purposes, i am trying to query our cloudwatch logs to fetch an id that we use in our s3 path for verification that another task finished in the next step of our flow.

in our etl jobs, we have a service (batchy )that helps us track the status of our batch jobs. during implementation, ideally this batch_id and the lambda requestId would be returned from the first lambda function into the next task via prefect’s data bindings/dependencies feature.

but for now — in cloudwatch logs, i can use a query like this in Logs/Insightsto parse through the log and get the batch_id i need:

fields @timestamp, @message
| filter @requestId = '<lambda request id>'
| filter @message like /(?i)(batchy_job:)/
| sort @timestamp desc
| limit 20

via boto3, you can run this query:

import time
from datetime import datetime, timedelta
import boto3
logs_conn = boto3.client('logs')
query = """fields @timestamp, @message
| filter @requestId = '<lambda-request-id>'
| filter @message like /(?i)(batchy_job:)/
| sort @timestamp desc
| limit 20
"""
log_group = '/aws/lambda/<log-group-name>'
print('starting query')
start_query_response = logs_conn.start_query(
logGroupName=log_group,
startTime=int((datetime.today() - timedelta(hours=36)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString=query,
)
query_id = start_query_response['queryId']
response = None
while response == None or response['status'] == 'Running':
print('Waiting for query to complete ...')
time.sleep(1)
response = logs_conn.get_query_results(
queryId=query_id
)
print(response)

quick notes, snippets, bugs, and fixes