Hello Deltas,
I have a problem with restarting Delta stream after a longer period of inactivity than delta.logRetentionDuration. There is one more condition, the pause must happen right after the second latest offset has isStartingVersion = false.
Example:
Delta's Log state:
...
00000000000000949888.json
Stream checkpoint state:
_checkpoint/offsets/0 -> "reservoirVersion" : 949888, isStartingVersion = true
//update roll in
Delta's Log state:
...
00000000000000949889.json
Stream checkpoint state:
_checkpoint/offsets/0 -> "reservoirVersion" : 949888, isStartingVersion = true
_checkpoint/offsets/1 -> "reservoirVersion" : 949890, isStartingVersion = false
// now delta is inactive for longer period of time than delta.logRetentionDuration
// after that update roll in
Delta's Log state:
// all previous logs are deleted, because checkpointing triggers log cleanup of logs older than delta.logRetentionDuration
00000000000000949890.checkpoint.parquet
Stream started in this state will fail, because:
- getBatch is called with startOffsetOption = 949888, end = 949890
- stack gets to getFileChanges with fromVersion = 949888, isStartingVersion = true
- this will try to getSnapshot at version 949888, but since this delta log is already deleted the call will fail.
The error occurs only when the secondLatestOffset has isStartingVersion = true.
I know it is a special case but I think it should pass, since the necessary logs to continue streaming are present and even when the second latest offset has isStartingVersion = false it is passing.
Regards,
Tomas