[Akka Streams] Sink vs Flow (or: committing offsets after writing to Kafka)

355 views
Skip to first unread message

hbf

unread,
Oct 11, 2015, 6:55:21 PM10/11/15
to Akka User List
Hi,

I using Akka streams to read (= consume) messages from a Kafka tropic, transform them, and write them to another Kafka topic. I am looking for a way to commit the consumer offset of a message after it was written.

Example: if I've read message m, I'd like to first process it and write it out to the destination topic. Only then do I want to tell Kafka "ok, I've read m; if i crash and restart, position me after m, please!"

Here are a few ways to realize this:
  • Make the writer a Sink and give it knowledge about the consumer so it can commit the latter's offset after writing. Not nice: the sink shouldn't have to know about a consumer.
  • Make the writer a Flow that as a side effect writes to Kafka. Then connect this flow to a CommitSink that commits the offsets. That doesn't sound nice either, as conceptually, both are sinks.
Any suggestions how to do this in The Akka Streams Way®?

Thanks,
K

Julian Howarth

unread,
Oct 12, 2015, 1:03:06 PM10/12/15
to Akka User List
Have you looked at reactive-kafka: https://github.com/softwaremill/reactive-kafka ? We use the method documented in the Manual Commit section of that page which works well for us. Basically, it automatically commits some configurable period of time after your processing of the message is complete. So as long as your handling of the incoming messages is idempotent, you can never lose messages.

HTH,

Julian

hbf

unread,
Oct 12, 2015, 1:55:52 PM10/12/15
to Akka User List

On Monday, 12 October 2015 10:03:06 UTC-7, Julian Howarth wrote:
Have you looked at reactive-kafka: https://github.com/softwaremill/reactive-kafka ?

I have looked at reactive-kaka and am following some discussions on Gitter on the subject.
 
We use the method documented in the Manual Commit section of that page which works well for us. Basically, it automatically commits some configurable period of time after your processing of the message is complete. So as long as your handling of the incoming messages is idempotent, you can never lose messages.

Can you elaborate how that works exactly? My main question is this: an Akka Stream flow does message processing in parallel. So what can happen is that the Kafka consumer has read two messages and both of them are still in some flow and have not reached the sink (= the Kafka writer who writes to an output topic) yet. Suppose the auto-commit happens now and we crash. In this case, I will have lost two messages. Correct?

If so, then auto-commit doesn't work. So we need to commit messages after they been written out.

Looking at reactive-kafka, I think they use approach two from my original mail:

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source(consumerWithOffsetSink.publisher)
  .map(processMessage(_)) // your message processing
  .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
  .run()

You'd write your message out in processMessage.

– Kaspar
Reply all
Reply to author
Forward
0 new messages