Hi Pawel,
I understand we do this to protect against rebalancing, but I fail to understand a scenario when the trimming of files is required. Can you please throw some light here?
The way I see it,
If there are 2 consumer threads (C1 & C2) , 1 topic with 2 partitions (P1 & P2),
T0) In a stable system, lets say C1 -> P1 and C2 -> P2
T1) C1 dies/GC pauses, and there is a rebalance, results in C2 -> P1, P2
In this case, C2 does not have any state of P2 hence creates a new file all is good.
C1 could hold a local offset "x" in memory in the mean time if the rebalance was due to GC pause and not the thread dying. If the thread was dead, it loses all local offset state.
T2) Case 1 - C1 Died at T1
At T2 when C1 comes back into the system, again there is a rebalance and C1 -> P1, C2 -> P2
now C1 starts consuming P1 from the ZK offset and nothing new needs to be done.
T2) Case 2- C1 GC Paused, C2 did not update ZK offset
At T2 when the GC pause ended, and the consumer is recognized again and a rebalance kicks in, C1 -> P1, C2 -> P2
Here again, C1 starts consuming P1 from the ZK offset, hence the condition of (ZK offset != localoffset)[MessageWriter.adjustOffset) will kick in delete the earlier file, and start writing a new file from the last ZK offset.
All is well.
T2) Case 3 - C1 GC Paused, C2 did update ZK offset
Again in this case, the same (ZK offset != localoffset)[MessageWriter.adjustOffset) logic will kick in and delete the earlier file, and new consumption happens from the new ZK offset. (which is again the right thing to do).
So it seems to me that all consumer rebalance cases are handled already by the MessageWriter.adjustOffset() Condition.
In the end to end test, move_offset_back_test, I see the trim code kick-in, but that can be explained because there is no real-rebalance in the test, all this test tries to do is just randomly move the ZK offset back. Since there was no-real rebalance triggered, the consumer's local offset isn't updated(refetched from ZK by kafka consumer), and the consumer doesn't know that the ZK state was updated, it happily consumes messages until the upload logic is kicked in, where we detect the ZK value is different and do a trim. But does this apply to a situation.
In the test, when re-setting the ZK offset, if we try to restart secor, the trim doesn't happen.
Am I missing something obvious.
Would really appreciate if you could explain the specific scenarios when trim happens.
Thanks a lot,
Praveen