Can't restart stream after a longer pause

16 views
Skip to first unread message

Tomas Bartalos

unread,
Jun 9, 2021, 5:18:56 PM6/9/21
to Delta Lake Users and Developers
Hello Deltas,

I have a problem with restarting Delta stream after a longer period of inactivity than delta.logRetentionDuration. There is one more condition, the pause must happen right after the second latest offset has isStartingVersion = false.

Example:
Delta's Log state:
...
00000000000000949888.json
Stream checkpoint state:
_checkpoint/offsets/0 -> "reservoirVersion" : 949888, isStartingVersion = true

//update roll in

Delta's Log state:
...
00000000000000949889.json
Stream checkpoint state:
_checkpoint/offsets/0 -> "reservoirVersion" : 949888, isStartingVersion = true
_checkpoint/offsets/1 -> "reservoirVersion" : 949890, isStartingVersion = false

// now delta is inactive for longer period of time than delta.logRetentionDuration
// after that update roll in

Delta's Log state:
// all previous logs are deleted, because checkpointing triggers log cleanup of logs older than delta.logRetentionDuration
00000000000000949890.checkpoint.parquet

Stream started in this state will fail, because:
  • getBatch is called with startOffsetOption = 949888, end = 949890
    • stack gets to getFileChanges with fromVersion = 949888, isStartingVersion = true
      • this will try to getSnapshot at version 949888, but since this delta log is already deleted the call will fail.
The error occurs only when the secondLatestOffset has isStartingVersion = true. 
The result of the getBatch call is not even used, since it's called only to initialize Stream source internals from Spark's MicroBatchExecution.populateStartOffsets().

I know it is a special case but I think it should pass, since the necessary logs to continue streaming are present and even when the second latest offset has isStartingVersion = false it is passing. 

Regards, 
Tomas
Reply all
Reply to author
Forward
0 new messages