Hi Amar,
NOTE: I'm on 3.2.0 and not 3.2.1 at the moment but I can switch to 3.2.1 if needed. My Delta Lake and Scala versions are the same.
The code I'm using the recreate this issue is at the bottom for reference.
I'm trying to recreate your issue but I'm actually running into a different error when trying to recreate it. Here is the error I'm getting:
"org.apache.spark.sql.AnalysisException: Attempting to change metadata when 'dataChange' option is set to false during Create a Delta table"
This indicates that it is trying to create a delta table instead of updating the one that's exists and updating the current table is what you should do. I noticed in your code you are using hdfsLocationInner instead of hdfsLocation when updating the table. You should change the Delta Lake table as a whole and use theLocation. Here are two articles on the subject of using replaceWhere to change values on a Delta Lake table. Both access the table as a whole and the second is specific to partitions which I think is what you are doing.
Could you try the following statement and see if you get the expected result? It solved the issue for me in the simple case I built. I believe you can also remove the dataChange option if you like.
df.repartition(numberOfPartitions).write.option("dataChange", "false").format("delta").mode("overwrite").option("replaceWhere", partition).save(hdfsLocation)
If this doesn't solve your issue. Please let me know. Thanks!
John
The code I'm using to recreate the issue:
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
val theLocation = "/my/delta/path"
val theLocationInner = "/my/delta/path/language=Scala"
dfFromRDD1.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("language")save(theLocation)
val partition = "language = 'Scala'"
val numberOfPartitions = 1
val df = spark.read.format("delta").load(theLocation).where(partition)
df.repartition(numberOfPartitions).write.option("dataChange", "false").format("delta").mode("overwrite").option("replaceWhere", partition).save(theLocationInner)