spark functions
mergeSchema
spark.read.option("mergeSchema", "true")
handling ability to merge schemas for schemas that might change across different data partitions
join
to join on a string, the column must exist in both dfs. i’ve tried joining two dfs that should link up but have different columns names and this ends up returning nulls returned from right df.
for example, i run:
joined_orders_df = df_orders_detail.join(df_orders, df_orders_detail.__parentId == df_orders.id, "left")
if you get this error during a left join:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for LEFT OUTER join between logical plans
this is a known issue in spark where if you have empty right dataframe, it will throw the error. this may also be caused by my joining of two dfs on keys that are stored with different column names…
you can fix by adding:
spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )
doing this may have caused the right df i was joining with to show null data, need to fix at source of problem…
apparently if you use alias for your dfs this will work, i ended up using alias and added a new join_id to link on.
this is because i noticed if you are creating a df2 by using a select query on another original df, joining that df2 to another df1 (also select query on original df) will cause the df2 contains columns stored with name from original table.
for example, i have a table with all data:
df_input_raw.createOrReplaceTempView('shopify_orders')
i created a new df with just a portion of that data:
orders_query = """
select id,name,event_date,batch_id, etl_date,etl_ts_utc,createdAt,processedAt,updatedAt,email,subtotalLineItemsQuantity,customer,cancelReason,cancelledAt,displayFinancialStatus,displayFulfillmentStatus,originalTotalPriceSet,netPaymentSet,totalTaxSet,refunds,transactions,tags,note,customAttributes,billingAddress,shippingAddress from shopify_orders
where __parentId is null
"""df_orders = spark.sql(orders_query)
and joining it to another df:
joined_orders_df = df_orders_detail.join(df_orders, df_orders_detail.__parentId == df_orders.id, "left").select("df_orders.event_date")
the above will generate an error because it can’t find df_orders.event_date:
if i create an alias for each table:
orders = df_orders.alias('orders')
orders_detail = df_orders_detail.alias('orders_detail')joined_orders_df = orders_detail.join(orders, orders_detail.__parentId == orders.id, "left").select("orders.event_date")
i am still seeing nulls from the right df that i’ve joined to.
therefore i’ve used the combo of alias and adding an additional join id to make it work:
df_orders = df_orders.withColumn("join_id", col("id"))
df_orders_detail = df_orders_detail.withColumn("join_id", col("__parentId"))
here is my new join statement:
joined_orders_df = orders_detail.join(orders, "join_id", "left").select('orders_detail.*', 'orders.event_date')
i primarily wanted to use this method since i thought it was cool how you can select specific columns in your new df after the join, without have to create new tables to run a sql select statement on and load into a different df.
if you are joining on keys with same/common column names in both dfs but including all fields and then getting duplicate column error after the join you can rename duplicate columns:
df_orders = df_orders.withColumnRenamed("id","order_id") \
.withColumnRenamed("batch_id","order_batch_id") \
.withColumnRenamed("etl_date","order_etl_date") \
.withColumnRenamed("etl_ts_utc","order_etl_ts_utc") df_orders.printSchema()
i did not want to do this because i wanted to preserve the original column names from the source.
sql
a simple way you can create a new df from existing df is to run sql query on the df
refunds_query = """
select id,batch_id,etl_date,etl_ts_utc,refunds
from shopify_orders
"""
df_refunds_raw = spark.sql(refunds_query)
querying fields within struct:
spark.sql("""SELECT totalRefundedSet.shopMoney.amount FROM refund_test""").show()
size
for filtering out rows with empty arrays
if you want to remove rows where you have an empty array for a col, you can use the size function to get the size of your array and then include where size > 0
from pyspark.sql.functions import col, sizedf_refunds_raw = df_refunds_raw.withColumn("refund_count", size(col('refunds')))
df_refunds = df_refunds_raw.filter(col("refund_count") >= 1)
select
to view a single column in df
df_refunds.select("refunds").show()
show
to show full contents of column instead of truncated view, use truncate=False
df_refunds_line_detail.select('refund_line_items_array').show(truncate=False)
show one record
df_input.show(1, False)
if you want to split properties of a json string into multiple columns, you can use:
df_refunds_line_detail.select('id', 'refund_line_items.*')
selectExpr
variant of select but allows for sql expressions
df_refunds_list = df_refunds_list.selectExpr("refund.createdAt as event_date", "refund.id", "refund.note", "refund.totalRefundedSet", "refund.updatedAt")
printSchema
you can view the schema of an array in that single column
df_refunds.select("refunds").printSchema()
explode
split each element of an array into its own row
from pyspark.sql.functions import explode
if you know the schema of the array, you can do the following
df_refunds_list = df_refunds.select(explode("refunds").alias("refund"))
df_refunds_list = df_refunds_list.selectExpr("refund.createdAt", "refund.id", "refund.note", "refund.totalRefundedSet", "refund.updatedAt")
df_refunds_list.show()
if you have a large dataset and schema may differ, you can dynamically indicate the schema and exploding the array based on the overall schema:
events_schema = spark.read.json(silver_table_raw.rdd.map(lambda row: row.events)).schemasilver_table = silver_table_raw.withColumn('event_data', explode(from_json(col('events'), ArrayType(events_schema))))
if you have a json array column is loaded as a string, be sure that it is properly json encoded so that you don’t get a corrupt record error.
for example in below record, i have refund_line_items
that is an array of records:
when i generate the schema for this column
line_items_schema = spark.read.json(df_refunds_detail.rdd.map(lambda row: row.refund_line_items)).schema
i see that it is created as a _corrupt_record
:
this message typically appears because spark is expecting one json record when using .json
.
to solve this, i first made sure i loaded my string into proper json:
line_items_schema = spark.read.json(df_refunds_detail.rdd.map(lambda row: json.loads(row.refund_line_items))).schema
but i saw that we are getting json decoding issues:
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 3 (char 2)
to further fix this, i made sure that i’m passing proper json string into the spark dataframe for refund_line_items
via a json.dumps
and this seems to give me the proper schema i need, which i will then further explode my array into multiple detail columns.
make sure to use ArrayType()
around your schema like so:
df_refunds_detail = df_refunds_detail.withColumn('refund_line_items_array', from_json('refund_line_items', ArrayType(line_items_schema)))
or it will create the column as a struct and not an array type giving you an error:
due to data type mismatch: input to function explode should be array or map type, not struct
to explode array into multiple rows and columns
df_refunds_list = df_refunds.select("name", "refunds")
df_refunds_list = df_refunds_list.withColumn('refund_struct', explode("refunds"))
df_refunds_list = df_refunds_list.selectExpr("name", "refund_struct.createdAt", "refund_struct.id", "refund_struct.note", "refund_struct.totalRefundedSet", "refund_struct.updatedAt")
df_refunds_list.show()
if you want to include additional field(s) and explode
another column, you can use:
df_refunds_list = df_refunds.select("name", "refunds")
df_refunds_list = df_refunds_list.withColumn('refund_struct', explode("refunds"))
then select your columns:
df_refunds_list = df_refunds_list.selectExpr("name", "createdAt", "id", "note", "totalRefundedSet", "updatedAt")
df_refunds_list.show()
take
look at first 5 rows:
df_refunds_list.take(5)
dropDuplicates
dropDuplicates([‘etl_date’, ‘batch_id’, ‘id’])
createOrReplaceTempView
silver_refunds_detail_raw.createOrReplaceTempView('refunds_detail_raw')
isNotNull
df_refunds_detail.where(col('refund_line_items').isNotNull())
orderBy
from pyspark.sql.functions import asc, desc
silver_refunds_detail_delta.orderBy("id").orderBy(desc("updatedAt"))
filter
df_name.filter(df_name.field_name == "field_value") \
.show(truncate=False)df_orders.filter(col("name") == "name")