weekly misc items: October 5, 2020

diary of a codelovingyogi
1 min readJun 19, 2021
  1. noting here for reference — moto for mocking aws secrets manager
from moto import mock_secretsmanager@pytest.fixture
@mock_secretsmanager
def mock_response():
mock_secretsmanager().start()
sm_client = boto3.client('secretsmanager')
response = sm_client.put_secret_value(
SecretId='<aws-secrets-name>',
SecretString='<secrets-string>'
)

2. import pendulum

3. mocking aws athena

we usually use moto to mock aws services, but athena is currently not supported in the library

code:

import boto3athena = boto3.client("athena")query_exec_param = {
'QueryString': query,
'ResultConfiguration': {
'OutputLocation': output_location,
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3', 'KmsKey': ''}
}
}
response = athena.start_query_execution(**query_exec_param)

test code:

ATHENA_RESPONSE = {
'QueryExecutionId': '123',
'ResponseMetadata': {
'RequestId': ANY,
'HTTPStatusCode': 200,
'HTTPHeaders': {
'content-type': 'application/x-amz-json-1.1',
'date': ANY,
'x-amzn-requestid': ANY,
'content-length': ANY,
'connection': 'keep-alive'
},
'RetryAttempts': 0
}
}
def setup_mock_athena():
def mock_client(client_name, **kwargs):
class MockAthenaClient:
def __init__(self, region_name):
self.region_name = region_name
def start_query_execution(self, **kwargs):
expected_params = {
'QueryString': 'SELECT * FROM TABLE',
'ResultConfiguration': {
'OutputLocation': 's3://<bucket>/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3',
'KmsKey': ''
}
}
}
athena = botocore.session.get_session().create_client('athena')
stubber = Stubber(athena)
stubber.add_response('start_query_execution', ATHENA_RESPONSE, expected_params)
stubber.activate()
service_response = athena.start_query_execution(
QueryString='SELECT * FROM TABLE',
ResultConfiguration={
'OutputLocation': 's3://<bucket>/',
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3', 'KmsKey': ''
}
}
)
return service_response
if client_name == 'athena':
return MockAthenaClient(region_name="us-east-1")
else:
raise Exception('Unsupported client_name')
return mock_clientdef test_exec_query(monkeypatch):
os.environ['region'] = 'us-east-1'
monkeypatch.setattr(boto3, 'client', setup_mock_athena())
query_id, output_location = exec_query('select now()')
assert isinstance(query_id, str)

4. even if you aren’t using a fixture, and it includes a teardown method you need — you should include in your params or teardown will happen before you run the test method

--

--