How to get the last consumed offset per <consumer group, topic, partition> triplet

3,163 views
Skip to first unread message

Nguyen Cao

unread,
Jan 6, 2016, 7:37:19 AM1/6/16
to Confluent Platform
Hi,


There is one part in this blog mentioned about how Kafka keeps track of what messages have consumed: "Internally the implementation of the offset storage is just a compacted Kafka topic (__consumer_offsets) keyed on the consumer’s group, topic, and partition. The offset commit request writes the offset to the compacted Kafka topic using the highest level of durability guarantee that Kafka provides (acks=-1) so that offsets are never lost in the presence of uncorrelated failures. Kafka maintains an in-memory view of the latest offset per <consumer group, topic, partition> triplet, so offset fetch requests can be served quickly without requiring a full scan of the compacted offsets topic. With this feature, consumers can checkpoint offsets very often, possibly per message."

I am looking for a way to get the latest offset consumed per <consumer group, topic, partition> but have no clues from the current documentation, especially with the REST API: http://docs.confluent.io/2.0.0/kafka-rest/docs/index.html

Anyone have worked on that can give me some hints? 

Thanks a lot,
Nguyen 

Ewen Cheslack-Postava

unread,
Jan 7, 2016, 11:38:34 PM1/7/16
to Confluent Platform
The REST API doesn't expose this directly -- it'd be great to add support for it, but this currently falls under "admin" APIs that haven't been added yet.

However, you can use the ConsumerGroupCommand (or ConsumerOffsetChecker in older versions) to get information about consumer offsets. There is a wrapper for the ConsumerGroupCommand class in bin/kafka-consumer-groups.sh. Run it with no parameters to see information about how to use it.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a2e005a8-f744-4bf8-b3ec-e43fe2ea40bb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Nguyen Cao

unread,
Jan 8, 2016, 4:44:58 AM1/8/16
to Confluent Platform
Hi Ewen,

Actually, my question relates to the problem of starting a consumer (not simple consumer) to consume messages from a specific offset (in my case is the last consumed offset with respect to a <consumer group, topic, partitionId>) including incoming messages in real time. If that's possible, either with REST API or Java API, then I could be able to consume un-processed messages after restarting our processing jobs. I am sure it should be possible, since that's just instead of consuming data from the beginning of retained messages in Kafka, we want to consume messages from a specific offset. Do you have that feature ready (either Java/REST API) or plan to have?

Many thanks for your instruction. 
Nguyen

Ewen Cheslack-Postava

unread,
Jan 8, 2016, 12:27:55 PM1/8/16
to Confluent Platform
Usually you would use the offset commit support in Kafka to track this for you. The consumer (both old and new) will automatically resume from the last committed offset, so as long as you're committing offsets (or using auto commit), this should just work. Both the Java client and REST API support this (e.g. in the REST API: http://docs.confluent.io/2.0.0/kafka-rest/docs/api.html#post--consumers-%28string-group_name%29-instances-%28string-instance%29-offsets)

In the new Java consumer, you can also use the seek() method to set the position in a TopicPartition (when you start up, or at any other time). See http://docs.confluent.io/2.0.0/clients/javadocs/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek%28org.apache.kafka.common.TopicPartition,%20long%29 The REST API does not expose this functionality yet.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

rajiv gandhi

unread,
Feb 18, 2016, 7:25:43 AM2/18/16
to Confluent Platform
Ewen 

any idea when it is going to be available over REST API, actually in our module we need to manage offset.

as you said via seek api It is working perfectly by having java consumer, but not with rest consumer even after i provide the same consumer group to both of them, the change in offset is not reflected on the REST consumer.

my understanding was like i will have two consumers are belonging to same group, then one can manage offset (Java one) and other will server the data over REST api.

let me know if i am missing something.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages