For whatever reason, my thumb drive got partitioned into several pieces making it hard to use. In windows you can do the following:


October 20, 2020

trying out prefect cloud features to determine deployment strategy for productionalizing workflows


from moto import mock_iam, mock_lambdadef get_role_name():
with mock_iam():
iam = boto3.client("iam", region_name=_lambda_region)
try:
return iam.get_role(RoleName="my-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Path="/my-path/",
)["Role"]["Arn"]
@mock_lambda
def setup_mock_client():
mock_lambda().start()
lambda_client = boto3.client("lambda")
conn.create_function(…

from moto import mock_secretsmanager@pytest.fixture
@mock_secretsmanager
def mock_response():
mock_secretsmanager().start()
sm_client = boto3.client('secretsmanager')
response = sm_client.put_secret_value(
SecretId='<aws-secrets-name>',
SecretString='<secrets-string>'
)

2. import pendulum

3. mocking aws athena

we usually use moto to mock aws services, but athena is currently not supported…


October 6, 2020

thinking about creating a separate task for each repo we have. each repo usually stores the code for its lambda execution code. …


October 5, 2020

upon reading documentation, adding a schedule for your flow is as easy as:

import datetime
from prefect.schedules import IntervalSchedule
schedule = IntervalSchedule(interval=datetime.timedelta(minutes=2))
flow = Flow('workflow-name', schedule)

similarly adding a cron schedule would be:

from prefect.schedules import CronScheduledaily_schedule = CronSchedule("0 8 * * *")
flow = Flow('workflow-name'…

file_paths = '<bucket-name>/<filepath>/<object-name>1.json,<bucket-name>/<filepath>/<object-name>2.json'file_paths_list = [f"s3a://{file}" for file in file_paths.split(',')]df = spark.read.format("json").load(queued_files_list)# or df_merged_schema = spark.read.option("mergeSchema", "true").format('json').load(queued_files_list)

2. display spark sql

display(spark.sql("""<sql statement>"""))

3. empty spark dataframe

schema = StructType([])
df_concat = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df_concat.schema…

https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html

export AWS_PROFILE=default-prod

2. you can unset env var:

unset AWS_PROFILE

3. invoke lambda function via cli

aws lambda invoke --function-name <lambda-function-name> --payload '{"key": "value"}'  out

get logs:

sed -i'' -e 's/"//g' out
sleep 15
aws logs get-log-events --log-group-name /aws/lambda/my-function --log-stream-name $(cat out) --limit 5

4. assume role for local invoke of lambda function:

https://docs.aws.amazon.com/code-samples/latest/catalog/python-sts-sts_temporary_credentials-assume_role_mfa.py.html


September 28, 2020

continuing with work i’m doing to complete a proof of concept (poc) using prefect as our dag scheduler

install prefect extras:

pip install "prefect[aws]"

adding secrets:

export PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4

read secrets from code:

import prefect
max_retries = prefect.config.tasks.defaults.max_retries

adding aws credentials:

export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>"}'

if you…


September 10, 2020

working on a dag scheduler proof-of-concept (POC) using prefect

docker run -it prefecthq/prefect:latest

i did not have image locally, so docker pulls the latest for me

(note this image will bring you directly into python interpreter)

docker images to see the prefect image:

to run backend server:

diary of a codelovingyogi

quick notes, snippets, bugs, and fixes

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store