Hello, I have a problem using deltalake format on Azure Synapse Analyics, when doing an upsert, hope you can help me.
I describe the steps:
First I write 4.000.000 records on a new folder on Datalake Gen2 using
df.write.partitionBy("year").mode("overwrite").format("delta").save(fileName)
and it takes about 45 minutes.
Then in another session I try to write the same 4.000.000 records with UPSERT using the example code:
deltaTable.alias("dimension") \
.merge(df_after_etl.alias("updates"), updateCondition) \
.whenMatchedUpdate(set = updateMap ) \
.whenNotMatchedInsertAll() \
.execute()
And was runnning for more than 40 hours and never ends (nor throws error).
Here I attach images with the situation and the DAG:
This is the big picture:
The Job that has 24 stages (as you see it show 100% but no ending)
The 24 stages overview:
The problem is on task 99
And its DAG is:
AS you can see, they all are related to a line
SynapseLoggingShim.scala:86
When I get the detail:
So you can see that this is related to a telemetry package of Microsoft.
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:86)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:72)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:107)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:92)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:253)
io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:223)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:103)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:89)
io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
Do you know if there's something I can do to avoid this and be capable of using upsert/merge pattern?
I know that the Microsoft's code is private package and we can not debug it but may be we could avoid that telemetry step, or "inject" some kind of timeout.
Thanks in advance!