Kafka Streams - at least once commit

885 views
Skip to first unread message

Maria Abramiuc

unread,
May 12, 2016, 2:08:29 AM5/12/16
to Confluent Platform
 Hello,

I've been investigating Kafka Streams to use for a new application development. The documentation states :

Kafka Streams currently supports at-least-once processing guarantees in the presence of failure. This means that if your stream processing application fails, no data records are lost and fail to be processed, but some data records may be re-read and therefore reprocessed.

  
I've been using a TopologyBuilder , with a source, some processors and a sink.

And I understand that all items will be processed, my questoin is what will happen if a write to the sink will fail?

I don't have a explicit commit called on each item, as far as I see writes are done asynchronous to the sink and the offset for the consumer is commit at periods of time.

The callback for the writes logs the errors, if any are received.

The commit offset will also call a flush on the producer assuring that for all item the future object is marked as done,
but if the future is done with an error the offset on the client will still be committed and the message will be lost.

Is there anyway to assure at least once commit in the sink and still have the performance gained from not using explicit commit on every message?

Thank you,
Maria

Guozhang Wang

unread,
May 12, 2016, 6:28:08 PM5/12/16
to Confluent Platform
Hello Maria,

If a commit() call on the StreamTask failed, which could be either on flushing state store, or flushing producer, or committing offsets, an exception will be thrown, which could be caught by user specified Thread.UncaughtExceptionHandler; the only cases it will only be logged are:

1) the exception thrown is a CommitFailedException , which indicating the commit fails because of rebalancing; in this case we only log a WARNING since it may cause duplicates.
2) commit() is called during shutdown(), in which case we log it and let the shutdown process finish.

Guozhang

Maria Abramiuc

unread,
Jul 21, 2016, 10:23:00 AM7/21/16
to Confluent Platform
Hello Guozhang,

 And thank you for your answer, I've been investigation more and more what you explained here, I understand this part but what  doesn't seem to be clear is:

-  a call StreamTask commit is done
-  stateMgr.flush(); is ok
- recordCollector.flush(); that does producer.flush() that is ok.
at this point all the records are sent and we wait until all future are marked as done (completed successfully or with an error)
- offsets are committed

If the future from recordColletor is done with an error we have this:

 
private final Callback callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
log.error("Error sending record: " + metadata, exception);
}
}

So I don't understand were and how is this handled, if the future is done with error the error is logged, but the offset doesn't get committed anyway?

In the KafkaProducer javadocs I found that setting a large "retries" number should solve the issue of losing data, is this still needed as a configuration when using Kafka Streams?

Thank you,
Maria

Guozhang Wang

unread,
Jul 26, 2016, 12:18:15 AM7/26/16
to Confluent Platform
Hello Maria,

Thanks for the find, and after looking at it a second time I agree we should expose such exceptions to users in order to have at-least-once semantics; I have an JIRA open in Apache Kafka


And I will update it with your observations as well, we should be able to fix it asap.


Also we are currently working exactly-once semantics for Kafka Streams, and in the current design such callback exceptions will be handled so that the producing can be "rolledback" to guarantee atomicity of the above three operations, i.e. flushing state store, flushing producers, and committing consumer offsets. We are creating a new KIP for that design to be public so stay tuned for the updates.

Guozhang
Reply all
Reply to author
Forward
0 new messages