UPSERT never ending, related to logging/telemetry issue on Datalake Gen2

65 views
Skip to first unread message

Juan Ignacio Bagnato

unread,
Feb 14, 2022, 5:43:23 PM2/14/22
to Delta Lake Users and Developers
Hello, I have a problem using deltalake format on Azure Synapse Analyics, when doing an upsert, hope you can help me.

I describe the steps:

First I write 4.000.000 records on a new folder on Datalake Gen2 using 
df.write.partitionBy("year").mode("overwrite").format("delta").save(fileName)
and it takes about 45 minutes.

Then in another session I try to write the same 4.000.000 records with UPSERT using the example code:
deltaTable.alias("dimension") \
                .merge(df_after_etl.alias("updates"), updateCondition) \
                .whenMatchedUpdate(set = updateMap ) \
                .whenNotMatchedInsertAll() \
                .execute()

And was runnning for more than 40 hours and never ends (nor throws error). 

Here I attach images with the situation and the DAG:
This is the big picture:
issue01.JPG
The Job that has 24 stages (as you see it show 100% but no ending)
issue03.JPG
The 24 stages overview:
issue04.JPG
The problem is on task 99
issue05.JPG
And its DAG is:
issue06.JPG
AS you can see, they all are related to a line  SynapseLoggingShim.scala:86
When I get the detail:
issue07.JPG
So you can see that this is related to a telemetry package of Microsoft.
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:86)
com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:72)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:107)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:92)
org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:201)
org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:253)
io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:223)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:103)
org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:89)
io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:120)
io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:204)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)


Do you know if there's something I can do to avoid this and be capable of using upsert/merge pattern?
I know that the Microsoft's code is private package and we can not debug it but may be we could avoid that telemetry step, or "inject" some kind of timeout.


Thanks in advance!

Reply all
Reply to author
Forward
0 new messages