delta lake and athena external tables
staging data lake files to aws s3 using delta lake tables to track changes for daily upserts of data, then making queryable in athena by creating external table schema. i am running my notebook in databricks.
process for my current data job is to land json data from source into an s3 folder then it will be read into spark df, df converted to delta table in append mode, delta file will be written stored in stage/silver s3 path, then loaded from stage/silver s3 path for any needed processing then merge/upsert into the final data lake/gold s3 location.
sourced_landing_file = f"s3a://{bucket}/{s3_file_path}"
silver_file = f"s3a://{bucket}/stg/{s3_file_path}"
gold_file = f"s3a://{bucket}/lake/{s3_file_path}"
general steps:
read source into spark df
write delta table to silver file location (append)
read silver file into spark df
write delta table to gold file location (merge)
functions used in spark:
reading my json data into spark df, adding a column to each row with my ETL timestamp in UTC:
import datetime
from delta.tables import *
from pyspark.sql.functions import litetl_ts_utc = f'{datetime.datetime.utcnow().isoformat(timespec="microseconds")}Z'df_input_raw = spark.read.json(sourced_landing_file).withColumn("etl_ts_utc", lit(etl_ts_utc))
check to see the row count of data that was read into df:
df_input_raw.count()
for any processing you may need, ie: i need to split raw file content into two separate files. i create a table on my df that i can run sql queries
df_input_raw.createOrReplaceTempView('my_table')
show contents your table:
spark.sql("""SELECT * FROM my_table""").show()
write query for how you need to process file:
my_query = """
select id,name,createdAt, <etc> from my_table
where someId is null
"""
create a new df for subset of data:
part_of_data_df = spark.sql(my_query)
you can verify that data:
part_of_data_df.show()
part_of_data_df.count()
check out the schema of your df:
part_of_data_df.printSchema()
this is how you would create delta table files to start tracking changes:
part_of_data_df.write.format("delta").save(<s3-path-to-save-delta-files>)
if you are getting this error write delta table format
Py4JJavaError: An error occurred while calling o1006.save. : org.apache.spark.sql.AnalysisException: Incompatible format detected. You are trying to write to `s3a://<s3-bucket>/<s3-path>/` using Databricks Delta, but there is no transaction log present. Check the upstream job to make sure that it is writing using format("delta") and that you are trying to write to the table base path.
it’s because you have files saved into that path already that is not in delta format, so you should choose a new path or delete files in that path
once you have your delta tables created in your desired data lake location, you will need to set up the integration between delta lake and athena to make the delta tables queryable in athena. this is made possible through manifest files, you can create by running in databricks:
%sqlGENERATE symlink_format_manifest FOR TABLE delta.`s3a://<data-lake-file-path>`
then create the external table required to read the generated manifest file.
i found more info about the creation of manifest files for delta lake/presto/athena integration here
it’s important to note that if you want enable spark access to your aws services including aws glue, you need to configure this access in your databricks cluster. in your cluster, configure this in Advanced Options
, then the Spark
tab:

spark.databricks.hive.metastore.glueCatalog.enabled true
first, select the database you are creating the external table for:
%sqluse `<database-name>`
then define schema for your external table (below schema is for data coming from shopify graphql order query)
%sqlCREATE EXTERNAL TABLE `<table-name>` (
id string,
name string,
createdAt string,
processedAt string,
updatedAt string,
email string,
subtotalLineItemsQuantity BIGINT,
customer struct<`firstName`:string,id:string,lastName:string,note:string,ordersCount:string>,
cancelReason string,
cancelledAt string,
displayFinancialStatus string,
displayFulfillmentStatus string,
originalTotalPriceSet struct <`shopMoney`:struct<`amount`:string,currencyCode:string>>,
netPaymentSet struct <`shopMoney`:struct<`amount`:string,currencyCode:string>>,
totalTaxSet struct <`shopMoney`:struct<`amount`:string,currencyCode:string>>,
refunds array<struct<`createdAt`:string,id:string,note:string,totalRefundedSet:struct <`shopMoney`:struct<`amount`:string, currencyCode:string>>,updatedAt:string>>,
transactions array<struct<`accountNumber`:string,gateway:string,kind:string,processedAt:string,status:string>>,
tags array<string>,
note string,
customAttributes array<struct<`key`:string,value:string>>,
billingAddress struct<`city`:string,country:string,provinceCode:string,zip:string>,
shippingAddress struct<`city`:string,country:string,provinceCode:string,zip:string>
)
PARTITIONED BY (<partition-field-name> string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://s3-path-to-save-delta-files/_symlink_format_manifest/'
supported athena datatypes are here
i founded sample syntax for created nested json data schema here
note that any fields you are including as a partition key should not be listed in field definition above it or you will encounter an error:
Error in SQL statement: AnalysisException: Found duplicate column(s) in the table definition of `database_name`.`table_name`: `field_name`;
also if you are using partitions in spark, make sure to include in your table schema, or athena will complain about missing key when you query (it is the partition key)
after you create the external table, run the following to add your data/partitions:
spark.sql(f'MSCK REPAIR TABLE `{database-name}`.`{table-name}`')
you should be able run queries on your delta tables after this!
for reference, in your data lake folder on s3, you will see files that look like this:

when you have new delta table files, ie: daily partitions to upsert/merge, you will need to rerun:
GENERATE symlink_format_manifest
command to reference the new files
as well as the MSCK REPAIR TABLE
command to add these partitions