weekly misc items: September 21, 2020

diary of a codelovingyogi
1 min readJun 13, 2021
  1. reading multiple s3 files into pyspark
file_paths = '<bucket-name>/<filepath>/<object-name>1.json,<bucket-name>/<filepath>/<object-name>2.json'file_paths_list = [f"s3a://{file}" for file in file_paths.split(',')]df = spark.read.format("json").load(queued_files_list)# or df_merged_schema = spark.read.option("mergeSchema", "true").format('json').load(queued_files_list)

2. display spark sql

display(spark.sql("""<sql statement>"""))

3. empty spark dataframe

schema = StructType([])
df_concat = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df_concat.schema

4. spark — join on multiple conditions

join_conditions = [product_variants.productVariantParentId == products.id, product_variants.productVariantFileBatchId == products.file_batch_id]

joined_products_df = product_variants.join(products, join_conditions, "left").select('products.*', 'product_variants.*')

or

joined_products_df = product_variants.join(products, (product_variants.productVariantParentId == products.id) & (product_variants.productVariantFileBatchId == products.file_batch_id), "left").select('products.*', 'product_variants.*')

note, unlike joining in below fashion, above join method will keep the ids you’re joining on from both tables:

joined_products_df = product_variants.join(products, "id", "left").select('products.*', 'product_variants.*')

i had to rename the ids in product_variantstable to avoid duplicate column errors when writing delta files.

5. window function on spark dataframe:

silver_products_delta = silver_products_delta.withColumn("row_num", row_number() \
.over(Window.partitionBy('productVariantId') \
.orderBy(col('productVariantUpdatedAt').desc())))

had to partition at most granular level product variant and not by product since our table includes product variant detail

to get the final result set, you can select where row_num ==1:

silver_products_delta = silver_products_delta.filter(col("row_num") == 1).drop("row_num")

--

--