Kafka-Connect committing offsets after processing set of SinkRecords in CustomSinkTask.put() method

642 views
Skip to first unread message

Hemchand R

unread,
Jul 18, 2016, 3:35:13 PM7/18/16
to Confluent Platform
Hello,

I am writing a sample Sink Connector.

I have a requirement where I need to commit offsets after a set of sink records in the put() method.

Currently I have the following code:

public void initialize(SinkTaskContext context) {
super.initialize(context);
this.context = context;
}

public void put(Collection<SinkRecord> sinkRecords){
TopicPartition topicPartition;
SinkRecord lastProcessedRecord = null;
Map<TopicPartition, Long> offsets = new HashMap<>();

for (SinkRecord record : sinkRecords) {
boolean isPublished;
do{
isPublished = //processing status
lastProcessedRecord = record;
}while (!isPublished);
}

if(sinkRecords.size() > 0){
topicPartition = new TopicPartition(lastProcessedRecord.topic(), lastProcessedRecord.kafkaPartition());
offsets.put(topicPartition, lastProcessedRecord.kafkaOffset());
resetConsumerOffset(offsets);
}
}

private void resetConsumerOffset(Map<TopicPartition, Long> offset){
this.context.offset(offset);
}

Based on my sample runs, I see the above code for committing offsets is not really working. 

Is there any other way that I can manage / commit the offsets correctly?


Thank you,
Hemchand

Saravanan Tirugnanum

unread,
Jul 19, 2016, 10:56:51 AM7/19/16
to Confluent Platform
I believe we do not have to explicitly track offsets for Sink Connectors especially as the Kafka consumer offsets per partition are used for connect as well ( unless for specific usecase)
This is not the case for Source Connectors as you may have maintain\track the source offsets manually. Confluent folks can clarify this as well.

Regards
Saravanan

Ewen Cheslack-Postava

unread,
Jul 20, 2016, 12:57:03 AM7/20/16
to Confluent Platform
That's correct, you don't manage the offsets yourself. The framework will commit offsets periodically, relying on flush() to ensure all the data has been delivered.

Generally you shouldn't need to commit offsets at the end of put() because you can't control what chunk of data will be put() anyway (so there's no guarantee it would be semantically meaningful). If you have a sink system that has something like transactional semantics, you might want to make a transaction last over multiple put() calls (i.e. however many calls there are between subsequent flush() calls) to minimize the likelihood of duplicated data in the case of failure.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ba5f83b1-4a37-4c26-b4c0-2b1a899112ad%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages