prefect dag scheduling — invoke lambda, logging

pip install "prefect[aws]"
export PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
import prefect
max_retries = prefect.config.tasks.defaults.max_retries
export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>"}'
import boto3sts_client = boto3.client(
service_name='sts',
aws_access_key_id='<access-key>',
aws_secret_access_key='<secret-key>',
)
assumed_role_object=sts_client.assume_role(
RoleArn='arn:aws:iam::<account-id>:role/<role-name>',
RoleSessionName='<session-name>'
)
credentials=assumed_role_object['Credentials']temp_access_key=credentials['AccessKeyId']
temp_secret_key=credentials['SecretAccessKey']
temp_session_token=credentials['SessionToken']
export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>", "SESSION_TOKEN": "<session-token>"}'
import json
from prefect.tasks.aws import LambdaInvoke
payload = {"data_type": "<data-type>"}
payloadBytes = bytes(json.dumps(payload), encoding='utf8')
lambda_conn = LambdaInvoke(
function_name="<lambda-name>",
payload=payloadBytes,
qualifier="dev",
invocation_type="RequestResponse"
)
response = lambda_conn.run()
lambda_payload = response['Payload']
lambda_text = lambda_payload.read()
print(lambda_text)
  • state handlers to update the state of my task at the end of my lambda code
  • looking through lambda execution logs to verify the successful run, i can do this within the same task that i invoke my lambda function (since for poc — i don’t want to modify our existing lambda code)
logger = prefect.context.get("logger")
logger.info("Invoked lambda")
logger.warning("A warning message.")
fields @timestamp, @message
| filter @requestId = '<lambda request id>'
| filter @message like /(?i)(batchy_job:)/
| sort @timestamp desc
| limit 20
import time
from datetime import datetime, timedelta
import boto3
logs_conn = boto3.client('logs')
query = """fields @timestamp, @message
| filter @requestId = '<lambda-request-id>'
| filter @message like /(?i)(batchy_job:)/
| sort @timestamp desc
| limit 20
"""
log_group = '/aws/lambda/<log-group-name>'
print('starting query')
start_query_response = logs_conn.start_query(
logGroupName=log_group,
startTime=int((datetime.today() - timedelta(hours=36)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString=query,
)
query_id = start_query_response['queryId']
response = None
while response == None or response['status'] == 'Running':
print('Waiting for query to complete ...')
time.sleep(1)
response = logs_conn.get_query_results(
queryId=query_id
)
print(response)

--

--

--

quick notes, snippets, bugs, and fixes

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Micro-components Architecture

3 Ways of Passing Arguments in C++

Cover Image for the article

Why should you do testing

Using flexbox in CSS for positioning

git greenbase means never pulling a broken build

What is Mobile Application Development?

Why I love Jenkins so much

How to Add Collision to Quixel Assets in UE5

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
diary of a codelovingyogi

diary of a codelovingyogi

quick notes, snippets, bugs, and fixes

More from Medium

Automating AWS deployments with CloudFormation

How to Generate and Store Reports to Amazon S3 Triggering Lambda Function at Scheduled Intervals

Analyzing HMS Kits in OVO Application

Running 💨💨💨 Apache Kafka on Mac 💻