Multiple executions of flatMapGroupsWithState when DeltaTable.merge

330 views
Skip to first unread message

Jacek Laskowski

unread,
Sep 29, 2020, 8:49:36 AM9/29/20
to Delta Lake Users and Developers
Hi,

I've been trying to pinpoint the root cause of multiple executions of flatMapGroupsWithState's stateUpdateFunc when used with DeltaTable.merge for a few weeks already and have still no clue why it works this way. Has anyone noticed or heard of a similar case before (and perhaps fixed)?

I use the latest and greatest of Spark 3.0.1 and Delta Lake 0.7.0 (but confirmed it's in 2.4.7 and 0.6.1).

I initially thought that it only happens with streaming queries, but turns out it shows up with batch queries too.

The small reproducible sample application is available at https://github.com/jaceklaskowski/yurii-double-metrics. Clone, import to IDEA and run. You should see the following logs:

>>> Using Spark 3.0.1
Create an empty Delta table at /tmp/yurii-delta-double-metrics
>>> >>> stateUpdateFunc executed -> key=1, values: WrappedArray(1, 2)
>>> >>> >>> 3. no earlier state
>>> >>> stateUpdateFunc executed -> key=1, values: WrappedArray(1, 2)
>>> >>> >>> 3. no earlier state

Why are "3. no earlier state" printed twice for the key 1? How to narrow it down? Is this a known issue in Delta? Please help.

Tathagata Das

unread,
Sep 29, 2020, 12:38:20 PM9/29/20
to Jacek Laskowski, Delta Lake Users and Developers
Merge does two passes on the source data. So the source data computation gets re-executed. One way to avoid this is to cache the source data, and then uncache it after merge.

TD

--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/CAB_G8ZufOw9NL5kxtgVftP0SgOkPK-w59j9DOrfwzy1u8Dtv8Q%40mail.gmail.com.

Jacek Laskowski

unread,
Sep 29, 2020, 2:10:40 PM9/29/20
to Tathagata Das, Delta Lake Users and Developers
Hi,

Thanks TD for such prompt response!

> Merge does two passes on the source data. 

Shouldn't merge do that instead (not an end user / dev who does not have to know what happens under the covers of merge operation)? Since merge knows it's going to use two passes over a Dataset, a Spark "idiom" is to cache it to avoid recomputation.

Why does merge not do this?

Denny Lee

unread,
Sep 29, 2020, 3:10:42 PM9/29/20
to Jacek Laskowski, Tathagata Das, Delta Lake Users and Developers
I think the issue with merge automatically doing this is because if the size of your table is larger than available memory, then you would take a hit in performance.  

BTW, as timing would have it, just published today is Diving Into Delta Lake: DML Internals (Update, Delete, Merge) which may provide some additional context of what is happening underneath the covers of merge.  

HTH!  

Jacek Laskowski

unread,
Sep 29, 2020, 3:51:42 PM9/29/20
to Denny Lee, Tathagata Das, Delta Lake Users and Developers
Hey Denny,

Glad to e-meet you here! :)


> this is because if the size of your table is larger than available memory, then you would take a hit in performance.

Caching or not merge would have hard times anyway, no? I'm considering this need for caching an implementation "leak" that devs don't have to deal with. Less burden on devs' shoulders the better. THere could also be an option to turn it on and off. In the current 0.7.0 days there's no option but to know it or deal with issues like double metrics due to these two passes (I'm yet to describe it in another post when I know more).

> as timing would have it, just published today

Can't wait to read it!

I've published a small "blurb" on the internals of merge operation too! [1]

Reply all
Reply to author
Forward
0 new messages