How to call AdminClient.describeConsumerGroup from Java

313 views
Skip to first unread message

Craig Hooper

unread,
Dec 4, 2016, 5:27:59 PM12/4/16
to Confluent Platform
This commit - https://apache.googlesource.com/kafka/+/0aff450961a8dd14cc7820ee8d1c9eea855439b0%5E%21/#F0 - changed the signature of AdminClient.describeConsumerGroup so that it now returns a scala Option, so code like this

      List<AdminClient.ConsumerSummary> groupSummaries = scala.collection.JavaConversions.(
          adminClient.describeConsumerGroup(groupId));

no longer works. Can someone tell me how to call it from Java now?

Ewen Cheslack-Postava

unread,
Dec 4, 2016, 11:43:12 PM12/4/16
to Confluent Platform
Craig,

First, it's worth pointing out that this is *not* public API, which is why the signature is changing without any warning. In general you should not rely on internal APIs like this. If you're willing to accept the burden of having to update your code with every new release you want to compile against it can be ok, but we don't even guarantee the same functionality will be exposed in each release. In fact, the API you're referring to has changed yet again on trunk since the version you're referring to.

That said, if you're going to rely on this, Optional types just need to be checked for if they are empty or not and then use their value, e.g. something like

Optional<List<AdminClient.ConsumerSummary>> groupSummariesOptional = adminClient.describeConsumerGroup(groupId);
if (adminClient.isEmpty()) {
 // there's no value, handle this error
} else {
 List<AdminClient.ConsumerSummary> groupSummaries = groupSummariesOptional.get();
 // now you know the value exists, use groupSummaries as previously
}

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c34d0931-7b79-456e-bd21-10fb9dc310fc%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Matthias J. Sax

unread,
Dec 5, 2016, 12:31:21 PM12/5/16
to confluent...@googlegroups.com
I think like this:

> List<AdminClient.ConsumerSummary> groupSummaries =
> scala.collection.JavaConversions.(
> adminClient.describeConsumerGroup(groupId).consumers().get()
> );
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/c34d0931-7b79-456e-bd21-10fb9dc310fc%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Craig Hooper

unread,
Dec 5, 2016, 5:26:08 PM12/5/16
to Confluent Platform
Thanks guys. The problem I'm trying to solve is working out consumer offsets if we need to failover to a backup cluster. I tried a few different ways, starting with just a Streams app reading __consumer_offsets (we're a brand new implementation, so all consumers will be new), which had mixed results. Deserialising using 

OffsetAndMetadata metadata = GroupMetadataManager.readOffsetMessageValue(valBuffer);

throws errors for some messages, and the metadata seems to be always empty, so even though you can get offsets and timestamps you can't tell what topic or partition they are for.

I also looked at this - https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka - but haven't tried it yet, I wasn't sure whether it was going to work for 0.10.x.

Is there a recommended way to do what I need that won't be too brittle? I haven't even gotten to the fun part where we try to figure out the corresponding offsets in the failover cluster from timestamps.

Craig Hooper

unread,
Dec 7, 2016, 9:21:21 PM12/7/16
to Confluent Platform
So I managed to get this to work by using Streams and basically copying the logic from GroupMetadataManager$OffsetMessageFormatter, which basically calls GroupMetadataManager.readMessageKey() and GroupMetadataManager.readOffsetMessageValue().

Those two functions look like they are written to be forward compatible, is it reasonably safe to use them?


On Monday, 5 December 2016 09:27:59 UTC+11, Craig Hooper wrote:
Reply all
Reply to author
Forward
0 new messages