Delta lake + off-side compaction and Structured Streaming consuming order

186 views
Skip to first unread message

Yuri Oleynikov

unread,
Jan 31, 2021, 10:28:32 AM1/31/21
to Delta Lake Users and Developers
Hi all ,
Need some clarification regarding Spark Structured Streaming consuming order of from delta lake.
Consider following scenario:
  1. Generate some events to Delta Lake for some time
    1. Save event to HDFS by event time partition (YYYY-MM-DD-hh-mm)
  2. Stop generating events
  3. Compactevents  oldest partition as described, e.g. coalesce -> write with replaceWhere is set, dataChange is set to false, and write mode is overwrite 
  4. Start consume generated events with Structured Streaming with ignoreChanges set to true and ignoreDeletes set to true
What i'm expecting is that events compacted partition should be consumed at first micro-batch of stage #3 (due to dataChange is set to false)

However the compacted events are consumed last.
That is if generated events ar partitioned as following
  • 2021-1-31-15-6
  • 2021-1-31-15-9
  • 2021-1-31-15-7
  • 2021-1-31-15-8
and the parition 2021-1-31-15-6 is compacted, events still should be consumed in chronological order regardless of compaction:
  1. 2021-1-31-15-6  <--- compacted partition
  2. 2021-1-31-15-9
  3. 2021-1-31-15-7
  4. 2021-1-31-15-8
However, the actual consumed order by Spark Structured Streaming is
  • 2021-1-31-15-7
  • 2021-1-31-15-8
  • 2021-1-31-15-9
  • 2021-1-31-15-6 <--- compacted partition


Delta: delta-core_2.12:0.7.0
Spark : 3.0.1 (Scala 2.12)

Sample code that reproduing the issue: https://gist.github.com/yurkao/858fe4d8b36c3f31328beebdb96215d4

Thanks and best regards,
Yurii

Yuri Oleynikov

unread,
Jan 31, 2021, 10:32:19 AM1/31/21
to Delta Lake Users and Developers
Sorry, my mistake in the mail
The excpeted result
  1. 2021-1-31-15-6  <--- compacted partition
  2. 2021-1-31-15-7
  3. 2021-1-31-15-8
  4. 2021-1-31-15-9
The actual result:
  1. 2021-1-31-15-7
  2. 2021-1-31-15-8
  3. 2021-1-31-15-9
  4. 2021-1-31-15-6 <--- compacted partition
воскресенье, 31 января 2021 г. в 17:28:32 UTC+2, Yuri Oleynikov:

Shixiong(Ryan) Zhu

unread,
Feb 2, 2021, 1:02:42 PM2/2/21
to Yuri Oleynikov, Delta Lake Users and Developers
Hey Yurii,

There is not an explicit order. When you start a streaming query, we will process all of the existing files at the same time. I saw you were using the `maxFilesPerTrigger` option. In this case, Delta will load the files in a specific order and take the first `maxFilesPerTrigger` files. The internal order is to avoid processing the same set of files in the next micro batch. But that's not a user facing order. How to process files in a table is not guaranteed.

Best Regards,

Ryan


--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/d6b38dc2-b2ad-4aeb-886b-3cd44d00f7f7n%40googlegroups.com.

Jacek Laskowski

unread,
Mar 23, 2021, 8:52:06 AM3/23/21
to Shixiong(Ryan) Zhu, Yuri Oleynikov, Delta Lake Users and Developers
Hi Ryan,

I've been taking a look at this part of Delta Lake and got a question.

I'm fine with "There is not an explicit order." but why "dataChange is set to false" adds a new micro-batch? Should it be somehow disregarded by the delta data source as a no-change micro-batch?

Liqiang Guo

unread,
Mar 23, 2022, 9:19:48 AM3/23/22
to Delta Lake Users and Developers
Looks like my post failed..
Hi Ryan, I am interested in the internal order, could you elaborate more, for example, 2 files added in one transaction, streaming will read which one first? thanks

Shixiong(Ryan) Zhu

unread,
Mar 25, 2022, 1:41:21 AM3/25/22
to Liqiang Guo, Delta Lake Users and Developers
Hey Liqiang,

If your `maxFilesPerTrigger` is 1, then it will process the 2 files in the same order of the transaction log in two different micro batches. Otherwise, they are likely to be processed in the same micro batch. However, I don't recommend relying on this because:

- When two files are in the same transaction, they are likely generated in a random order.
- When Spark reads a file, if the file is large, Spark will launch multiple tasks to process the file in parallel. It means the data in the file won't be processed in the same order.

Best Regards,

Ryan


Reply all
Reply to author
Forward
0 new messages