weekly misc items: September 21, 2020
1 min readJun 13, 2021
- reading multiple s3 files into pyspark
file_paths = '<bucket-name>/<filepath>/<object-name>1.json,<bucket-name>/<filepath>/<object-name>2.json'file_paths_list = [f"s3a://{file}" for file in file_paths.split(',')]df = spark.read.format("json").load(queued_files_list)# or df_merged_schema = spark.read.option("mergeSchema", "true").format('json').load(queued_files_list)
2. display spark sql
display(spark.sql("""<sql statement>"""))
3. empty spark dataframe
schema = StructType([])
df_concat = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df_concat.schema
4. spark — join on multiple conditions
join_conditions = [product_variants.productVariantParentId == products.id, product_variants.productVariantFileBatchId == products.file_batch_id]
joined_products_df = product_variants.join(products, join_conditions, "left").select('products.*', 'product_variants.*')
or
joined_products_df = product_variants.join(products, (product_variants.productVariantParentId == products.id) & (product_variants.productVariantFileBatchId == products.file_batch_id), "left").select('products.*', 'product_variants.*')
note, unlike joining in below fashion, above join method will keep the ids you’re joining on from both tables:
joined_products_df = product_variants.join(products, "id", "left").select('products.*', 'product_variants.*')
i had to rename the ids in product_variants
table to avoid duplicate column errors when writing delta files.
5. window function on spark dataframe:
silver_products_delta = silver_products_delta.withColumn("row_num", row_number() \
.over(Window.partitionBy('productVariantId') \
.orderBy(col('productVariantUpdatedAt').desc())))
had to partition at most granular level product variant
and not by product
since our table includes product variant detail
to get the final result set, you can select where row_num ==1:
silver_products_delta = silver_products_delta.filter(col("row_num") == 1).drop("row_num")