I am writing a sample Sink Connector.
I have a requirement where I need to commit offsets after a set of sink records in the put() method.
public void put(Collection<SinkRecord> sinkRecords){
TopicPartition topicPartition;
SinkRecord lastProcessedRecord = null;
Map<TopicPartition, Long> offsets = new HashMap<>();
for (SinkRecord record : sinkRecords) {
boolean isPublished;
do{
isPublished = //processing status
lastProcessedRecord = record;
}while (!isPublished);
}
if(sinkRecords.size() > 0){
topicPartition = new TopicPartition(lastProcessedRecord.topic(), lastProcessedRecord.kafkaPartition());
offsets.put(topicPartition, lastProcessedRecord.kafkaOffset());
resetConsumerOffset(offsets);
}
}
private void resetConsumerOffset(Map<TopicPartition, Long> offset){
this.context.offset(offset);
}
Based on my sample runs, I see the above code for committing offsets is not really working.
Is there any other way that I can manage / commit the offsets correctly?
Thank you,
Hemchand