Kafka Connect - SinkTask at least once delivery

754 views
Skip to first unread message

Ziliang Chen

unread,
Oct 4, 2017, 1:29:02 PM10/4/17
to Confluent Platform
Hi,

May i ask how to achieve at least once delivery without introducing external offset management for Kafka Connect Sink task ? 
It appears to me, the topic/partition offset is auto-managed by Kafka Connect framework, clients has no chance to manually commits the offsets. 
For example, current working flow is something like below in general:
1) Kafka Connect calls SinkTask.put
2) SinkTask will write the records to sink system (asynchronously)
3) Kafka Connect call "flush(Map<TopicPartition, OffsetAndMetadata> meta)"  with the offsets passed in which gives a chance for sink task to commit the offsets to external offset mgmt storage. If there is no external offset mgmt storage like a KVStore, SinkTask usually doesn't override this function
4) Kafka Connect will periodically commits the offsets to offset topic

The problem here is when SinkTask writes records to sink system, in quite some cases, it is an "async" process which means after step 2) is successful, it doesn't mean the records are in sink system which in turns mean step 4) is not safe to do commits. The key thing is we don't want to wait there for the async write to sink system completely done via a callback notification and then move forward since it will impact the throughput a lot.

If Kafka Connect can expose the offset commits "commitSync" API to SinkTask and provide an API to disable the periodical commits in Kafka Connect framework, sink task can have a way to achieve at least once delivery i guess.
The new flow will like below:

1) Kafka Connect calls SinkTask.put
2) SinkTask will write the records to sink system (asynchronously)
3) Kafka Connect call "flush" (we don't need care this)
4) Sometimes in future, a callback was invoked in SinkTask which indicates the records are in sink system completely, it is good to commit this offsets now. 
5) "commitSync" was made in the callback which commits the offsets to offset topic

Thank you !

Randall Hauch

unread,
Oct 4, 2017, 1:45:52 PM10/4/17
to confluent...@googlegroups.com
The Connect framework by default provides at least once delivery to sink connectors that work in a synchronous manner. Sink connectors can optionally track/manage offsets on their own to send records to the sink system exactly once, but any sink connector that *asynchronously* sends records to the external sink system should also track/manage offsets based upon the acknowledgements from the sink system. 

The trick with this, though, is to use the newer `preCommit(offsets)` method that is similar to and replaces the `flush(offsets)` method, except that unlike `flush` the `preCommit` method returns the latest offsets per topic partition that the task knows have been successfully written/flushed to the sink. The Connect worker will then commit to Kafka *those* task-specified offsets (rather than the offsets that Connect passed to `preCommit`) so that, if the task were to crash, upon restart it would begin at those offsets.

Hope this helps,

Randall

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6a8315b5-dd57-45e8-88df-b474fa1f63a8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ziliang Chen

unread,
Oct 4, 2017, 3:33:34 PM10/4/17
to Confluent Platform
Thanks Randall, this is very good know. To confirm the behavior of "preCommit", basically there are 2 flows in my code
1. Kafka Connect framework continues calling `put` and my sink task does records commits to sink system asynchronously
2. When there are acks from sink system, a callback will be called and I plan to do "preCommit" in the callback.

The key question here is after we do step 2) offset commit, will Kafka Connect rewind to that offset commit in the next `put` which will cause dups if it does ?
To post to this group, send email to confluent...@googlegroups.com.

Randall Hauch

unread,
Oct 5, 2017, 2:26:35 PM10/5/17
to confluent...@googlegroups.com
Regarding your #3 item, just to be clear that the Connect framework will call `preCommit` when the Connect framework wants to record offsets. I don't know much about your system, but you might consider having the callback record the offsets per topic partition that have been committed to the sink system, and then have `preCommit` just return those when Connect wants.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Ziliang Chen

unread,
Oct 5, 2017, 11:39:46 PM10/5/17
to Confluent Platform
Thanks !
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Ziliang Chen

unread,
Oct 8, 2017, 1:06:59 AM10/8/17
to Confluent Platform
Hi Randall,

I followed your suggestion to override `preCommit`, but i have encountered wired class cast problem. More details: https://groups.google.com/forum/#!topic/confluent-platform/cmg-4qJH9YE

Could you please help shed some light here ?
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages