Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore

36 views
Skip to first unread message

Kiran Reddy

unread,
May 31, 2023, 9:11:19 PM5/31/23
to Delta Lake Users and Developers

My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the “put-if-absent” consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config).

    spark = SparkSession \

        .builder \

        .appName("Delta Operations") \

        .config("spark.driver.memory", args["spark_driver_memory"]) \

        .config("spark.executor.memory", args["spark_executor_memory"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \

        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

Please find the actual logic below:

delta_table.alias("old").merge(

            input_df.alias("new"),

            f"old.{primary_key} = new.{primary_key}") \

            .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \

            .whenMatchedUpdateAll() \

            .whenNotMatchedInsertAll() \

            .execute()

 

delta_table is the destination table in delta lake.

input_df is a combined data frame of all the inserts, and updates.

deletes_df is the dataframe that has just the deletes.

I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?


rameshkumar subramanian

unread,
Apr 26, 2024, 3:58:37 PMApr 26
to Kiran Reddy, Delta Lake Users and Developers
Hi Kiran Reddy,

  Has the issue been resolved? If yes, could you please share how you managed to handle it?

Thanks,
Rameshkumar S

--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/709d2539-931d-4a8e-9f6b-e2e09f86216an%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages