Overwriting of Delta Lake failing with Spark 3.2.1, Delta Lake 1.1.0 and Scala 2.13

4,917 views
Skip to first unread message

amars...@gmail.com

unread,
Mar 1, 2022, 9:27:45 PM3/1/22
to Delta Lake Users and Developers
Hi Team,

Can you please take a look at following issue:
  • Getting following error with Spark 3.2.1 and Delta Lake 1.1.0 with Scala 2.13.
  • It was working with Spark 3.0.1 and Delta Lake 0.7.0 with Scala 2.12
  • Code:
Dataset<Row> df = spark.read().format("delta").load(hdfsLocation).where(partition);
df.repartition(numberOfPartitions).write().option("dataChange", "false").format("delta").mode("overwrite")
.option("replaceWhere", partition).save(hdfsLocationInner);
  • Exception:

Exception in thread "main" java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.ClassCastException: class org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable cannot be cast to class org.apache.spark.sql.delta.commands.DeleteCommand (org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable and org.apache.spark.sql.delta.commands.DeleteCommand are in unnamed module of loader 'app') at org.apache.spark.sql.delta.commands.WriteIntoDelta.removeFiles(WriteIntoDelta.scala:232) at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:178) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:80) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:78) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:198) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78) at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:154) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)

Thanks,
Amar

amars...@gmail.com

unread,
Mar 1, 2022, 9:32:59 PM3/1/22
to Delta Lake Users and Developers
To clarify, I am seeing above error only with a partition table. Tables without partition are working fine with Delta Lake 1.1.0.

John O'Dwyer

unread,
Mar 3, 2022, 6:26:27 PM3/3/22
to Delta Lake Users and Developers
Hi Amar,

NOTE: I'm on 3.2.0 and not 3.2.1 at the moment but I can switch to 3.2.1 if needed. My Delta Lake and Scala versions are the same.
The code I'm using the recreate this issue is at the bottom for reference.

I'm trying to recreate your issue but I'm actually running into a different error when trying to recreate it. Here is the error I'm getting:

"org.apache.spark.sql.AnalysisException: Attempting to change metadata when 'dataChange' option is set to false during Create a Delta table"

This indicates that it is trying to create a delta table instead of updating the one that's exists and updating the current table is what you should do. I noticed in your code you are using hdfsLocationInner instead of hdfsLocation when updating the table. You should change the Delta Lake table as a whole and use theLocation. Here are two articles on the subject of using replaceWhere to change values on a Delta Lake table. Both access the table as a whole and the second is specific to partitions which I think is what you are doing.


Could you try the following statement and see if you get the expected result? It solved the issue for me in the simple case I built. I believe you can also remove the dataChange option if you like.

df.repartition(numberOfPartitions).write.option("dataChange", "false").format("delta").mode("overwrite").option("replaceWhere", partition).save(hdfsLocation)

If this doesn't solve your issue. Please let me know. Thanks!

John

The code I'm using to recreate the issue:

val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()

val theLocation = "/my/delta/path"
val theLocationInner = "/my/delta/path/language=Scala"

dfFromRDD1.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("language")save(theLocation)

val partition = "language = 'Scala'"
val numberOfPartitions = 1

val df = spark.read.format("delta").load(theLocation).where(partition)
df.repartition(numberOfPartitions).write.option("dataChange", "false").format("delta").mode("overwrite").option("replaceWhere", partition).save(theLocationInner)

Shixiong(Ryan) Zhu

unread,
Mar 3, 2022, 8:13:08 PM3/3/22
to amars...@gmail.com, Delta Lake Users and Developers
ClassCastException is usually because of incompatible libraries. Could you run the following two lines and provide the output?

println(classOf[org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable].getResource("DeleteFromTable.class"))
println(classOf[org.apache.spark.sql.delta.commands.DeleteCommand].getResource("DeleteCommand.class"))

They will show which Spark version and Delta version are used in your environment.

Best Regards,

Ryan


--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/996b4ce0-06b1-483f-8a7c-5c7c7bf2d30an%40googlegroups.com.

Michael Nacey

unread,
May 19, 2022, 2:14:30 PM5/19/22
to Delta Lake Users and Developers
I get the same with spark 3.2.0 (EMR 6.6.0):

jar:file:/Users/mike-nacey/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-catalyst_2.12/3.2.0/spark-catalyst_2.12-3.2.0.jar!/org/apache/spark/sql/catalyst/plans/logical/DeleteFromTable.class
jar:file:/Users/mike-nacey/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/delta/delta-core_2.12/1.2.0/delta-core_2.12-1.2.0.jar!/org/apache/spark/sql/delta/commands/DeleteCommand.class


org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable cannot be cast to org.apache.spark.sql.delta.commands.DeleteCommand
java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable cannot be cast to org.apache.spark.sql.delta.commands.DeleteCommand

The code that triggers it is:

// Overwrite the table with specified number of files per partition
newData.repartition(numFilesPerPartition)
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", value = true)
.option("dataChange", "false")
.option("replaceWhere", partitions)
.save(tableFullPath)

Shixiong(Ryan) Zhu

unread,
May 19, 2022, 2:31:01 PM5/19/22
to Michael Nacey, Delta Lake Users and Developers
Did you set the following configs when starting Spark (See https://docs.delta.io/latest/quick-start.html#set-up-apache-spark-with-delta-lake ):

--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Best Regards,

Ryan


Michael Nacey

unread,
May 19, 2022, 2:55:20 PM5/19/22
to Delta Lake Users and Developers
Just tested now and adding:

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

solved this problem. Thanks!

Reply all
Reply to author
Forward
0 new messages