For whatever reason, my thumb drive got partitioned into several pieces making it hard to use. In windows you can do the following:

Virtual Disk Service error:
Cannot delete a protected partition without the force protected parameter set.

October 20, 2020

trying out prefect cloud features to determine deployment strategy for productionalizing workflows


from moto import mock_iam, mock_lambdadef get_role_name():
with mock_iam():
iam = boto3.client("iam", region_name=_lambda_region)
try:
return iam.get_role(RoleName="my-role")["Role"]["Arn"]
except ClientError:
return iam.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Path="/my-path/",
)["Role"]["Arn"]
@mock_lambda
def setup_mock_client():
mock_lambda().start()
lambda_client = boto3.client("lambda")
conn.create_function(
FunctionName="testFunction",
Runtime="python3.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file1()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
)

2. mocking s3fs

in some of our code we use the s3fs python library to navigate aws s3 like a filesystem. came across code that uses this and needed to mock the functionality…


from moto import mock_secretsmanager@pytest.fixture
@mock_secretsmanager
def mock_response():
mock_secretsmanager().start()
sm_client = boto3.client('secretsmanager')
response = sm_client.put_secret_value(
SecretId='<aws-secrets-name>',
SecretString='<secrets-string>'
)

2. import pendulum

3. mocking aws athena

we usually use moto to mock aws services, but athena is currently not supported in the library

code:

import boto3athena = boto3.client("athena")query_exec_param = {
'QueryString': query,
'ResultConfiguration': {
'OutputLocation': output_location,
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3', 'KmsKey': ''}
}
}
response = athena.start_query_execution(**query_exec_param)

test code:

ATHENA_RESPONSE = {
'QueryExecutionId': '123',
'ResponseMetadata': {…

October 6, 2020

thinking about creating a separate task for each repo we have. each repo usually stores the code for its lambda execution code. trying to see if we can iteratively update a flow once it’s been created so that we can configure each flow’s task in its respective repository.

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=1))def batch_fetch_coredb(obj_type: str) -> str:
print('invoke lambda for data_batch_fetch_coredb')
input = {"data_type": obj_type}
logger = prefect.context.get("logger")
logger.info("Invoked lambda")
logger.warning("A warning message.") …

October 5, 2020

upon reading documentation, adding a schedule for your flow is as easy as:

import datetime
from prefect.schedules import IntervalSchedule
schedule = IntervalSchedule(interval=datetime.timedelta(minutes=2))
flow = Flow('workflow-name', schedule)

similarly adding a cron schedule would be:

from prefect.schedules import CronScheduledaily_schedule = CronSchedule("0 8 * * *")
flow = Flow('workflow-name', daily_schedule)

but if you have required parameters in any of your tasks, you will get an error:

"Flows with required parameters can not be scheduled automatically."
prefect.utilities.exceptions.ClientError: Flows with required parameters can not be scheduled automatically.

this simpler scheduler corresponds with what we see in the dashboard when we try…


file_paths = '<bucket-name>/<filepath>/<object-name>1.json,<bucket-name>/<filepath>/<object-name>2.json'file_paths_list = [f"s3a://{file}" for file in file_paths.split(',')]df = spark.read.format("json").load(queued_files_list)# or df_merged_schema = spark.read.option("mergeSchema", "true").format('json').load(queued_files_list)

2. display spark sql

display(spark.sql("""<sql statement>"""))

3. empty spark dataframe

schema = StructType([])
df_concat = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df_concat.schema

4. spark — join on multiple conditions

join_conditions = [product_variants.productVariantParentId == products.id, product_variants.productVariantFileBatchId == products.file_batch_id]

joined_products_df = product_variants.join(products, join_conditions, "left").select('products.*', 'product_variants.*')

or

joined_products_df = product_variants.join(products, (product_variants.productVariantParentId == products.id) & (product_variants.productVariantFileBatchId == products.file_batch_id), "left").select('products.*', 'product_variants.*')…

https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html

export AWS_PROFILE=default-prod

2. you can unset env var:

unset AWS_PROFILE

3. invoke lambda function via cli

aws lambda invoke --function-name <lambda-function-name> --payload '{"key": "value"}'  out

get logs:

sed -i'' -e 's/"//g' out
sleep 15
aws logs get-log-events --log-group-name /aws/lambda/my-function --log-stream-name $(cat out) --limit 5

4. assume role for local invoke of lambda function:

https://docs.aws.amazon.com/code-samples/latest/catalog/python-sts-sts_temporary_credentials-assume_role_mfa.py.html


September 28, 2020

continuing with work i’m doing to complete a proof of concept (poc) using prefect as our dag scheduler

install prefect extras:

pip install "prefect[aws]"

adding secrets:

export PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4

read secrets from code:

import prefect
max_retries = prefect.config.tasks.defaults.max_retries

adding aws credentials:

export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<access_key>", "SECRET_ACCESS_KEY": "<secret_access_key>"}'

if you use assumed roles, you need to run code like this to get temp credentials:

import boto3sts_client = boto3.client(
service_name='sts',
aws_access_key_id='<access-key>',
aws_secret_access_key='<secret-key>',
)
assumed_role_object=sts_client.assume_role(
RoleArn='arn:aws:iam::<account-id>:role/<role-name>',
RoleSessionName='<session-name>'
)
credentials=assumed_role_object['Credentials']temp_access_key=credentials['AccessKeyId']
temp_secret_key=credentials['SecretAccessKey']…

September 10, 2020

working on a dag scheduler proof-of-concept (POC) using prefect

docker run -it prefecthq/prefect:latest

i did not have image locally, so docker pulls the latest for me

(note this image will bring you directly into python interpreter)

docker images to see the prefect image:

to run backend server:

prefect backend server

getting error:

subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.

noted in docs:

Please note the server requires Docker and Docker Compose to be running.

so let’s run:

pip install docker-compose

then run:

prefect server start

you should see this before navigating to

http://localhost:8080

diary of a codelovingyogi

quick notes, snippets, bugs, and fixes

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store