Streaming doesn't fail when missing log files

308 views
Skip to first unread message

Tomas Bartalos

unread,
Jun 9, 2021, 6:20:28 PM6/9/21
to Delta Lake Users and Developers
Hello,

as a follow-up to my previous email. When delta log is cleaned up, the streaming doesn't realize it is missing increments and it only continues with available log file increments.

Example:
Delta's Log state:
...
00000000000000949885.json
Stream checkpoint state:
_checkpoint/offsets/150 -> "reservoirVersion" : 949886, isStartingVersion = false

//updates roll in
Delta's Log state:
...
00000000000000949885.json
00000000000000949886.json
00000000000000949887.json
00000000000000949888.json
00000000000000949889.json
00000000000000949890.checkpoint.parquet
00000000000000949890.json
00000000000000949891.json

Delta's Log state after log cleanup:
00000000000000949890.checkpoint.parquet
00000000000000949890.json
00000000000000949891.json

When the streaming is started in this state, the data appended in versions 949885 - 949889 are ignored and only the appends from  versions 949890- 949891 are down-streamed.
This behaviour can cause data loss without any notice. 

I think it should be possible to check in getBatch if continuous delta logs exist in range "startOffsetOption" - "end" . The only problem would be on stream start when getBatch is called with a possibly deleted log version. 

Regards,
Tomas

Shixiong(Ryan) Zhu

unread,
Jun 9, 2021, 6:28:14 PM6/9/21
to Tomas Bartalos, Delta Lake Users and Developers
Hey Tomas,

What's your Delta Lake version? This should be fixed in 0.8.0 by https://github.com/delta-io/delta/commit/3a726e62757748f20cd194e8113b5c972165495c

Best Regards,

Ryan


--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/CAFM852S%3DPN4gFap-ok-tW1D%2B4xD34D-zEMySgGjveBbbEhwt3Q%40mail.gmail.com.

Burak Yavuz

unread,
Jun 9, 2021, 6:28:30 PM6/9/21
to Tomas Bartalos, Delta Lake Users and Developers
Hi Tomas,

Which version of Delta Lake are you using? This behavior has been fixed in recent versions:

Best,

Burak Yavuz

Software Engineer

Databricks Inc.

bu...@databricks.com

databricks.com



On Wed, Jun 9, 2021 at 3:20 PM Tomas Bartalos <tomas.b...@gmail.com> wrote:

Tomas Bartalos

unread,
Jun 9, 2021, 7:19:56 PM6/9/21
to Burak Yavuz, Delta Lake Users and Developers
Hello Ryan, Burak, 

Thank you for such an accurate and quick response. I'm using delta 0.7.0. 
You're right the issue is fixed in 0.8.0 I have just tried the fix.

However, now I have another problem. In 0.8.0 I'm not able to start a streaming query, which should work without data-loss.

My delta state (after log cleanup):
00000000000000000180.json
00000000000000000180.checkpoint.parquet
00000000000000000181.json
00000000000000000182.json
00000000000000000183.json
Stream checkpoint state:
_checkpoint/offsets/4 -> "reservoirVersion" : 172, isStartingVersion: false
_checkpoint/offsets/5 -> "reservoirVersion" : 182, isStartingVersion: false (committed)

I'm getting exception:
org.apache.spark.sql.streaming.StreamingQueryException: The stream from your Delta table was expecting process data from version 172,
but the earliest available version in the _delta_log directory is 180

Since all data is downstreamed until version 181 and I have delta logs to continue, the stream should be able to start.
It is the same initialization symptom as the one from my previous mail, however now it is not needed to have isStartingVersion: false.

The problem is that on streaming start, Spark calls the getBatch method 2 times. Once only for initialization with checkpoints offsets/4 (start  = 172), offsets/5 (end = 182).
This happens in MicroBatchExecution.populateStartOffsets() and the returned DF is not used.
And second time with correct offsets - start = 182, end = 184 and this DF is used for query.

Do you think it is possible to fix this initialization glitch ?

Thank you,
Tomas


št 10. 6. 2021 o 0:28 Burak Yavuz <bu...@databricks.com> napísal(a):
Reply all
Reply to author
Forward
0 new messages