If delta table raw is loaded from Kinesis or Kafka by a structured streaming job, and likely every several seconds it has a new version.
I use a downstream job to consume from raw. The downstream job is a structured streaming job with trigger=availableNow to read from raw and it is triggered daily. So each trigger might see many versions (say, 20,000 versions) from raw. After I readStream from raw table,I use foreachBatch to do MERGE on a target delta table.
The MERGE seems fast (less than 1 min) but it shows a long time on duraionMs like
[batchId = 320] Streaming query made progress: {
"id" : "...",
"runId" : "...",
"name" : null,
"timestamp" : "2024-10-21T02:52:57.029Z",
"batchId" : 320,
"batchDuration" : 182297,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 51186,
"commitOffsets" : 66,
"getBatch" : 37802,
"latestOffset" : 91964,
"queryPlanning" : 783,
"triggerExecution" : 182287,
"walCommit" : 317
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[s3://...]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3292332,
"index" : -1,
"isStartingVersion" : false
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3294059,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "705134",
"numFilesOutstanding" : "28"
}
} ],
"sink" : {
"description" : "ForeachBatchSink",
"numOutputRows" : -1
}
}
As you can see it needs a long time for latestOffset and addBatch, and seems the time is linear to how many versions away from last offset. So delta table is not good at being consumed by a big version interval? Is there a benchmark on this?
Thanks