Kafka consumerGroupNamingStrategy and overriddenConsumerGroup features

16 views
Skip to first unread message

mathieu...@diginext.fr

unread,
Jul 22, 2021, 8:41:52 AM7/22/21
to Nussknacker
Hi everyone,

I'm currently working with NK0.2.2, and I write my API in Java.

I would like to manage the Kafka consumer groupfor my processes, and I've got some questions on this subject :
-> I tried the consumerGroupNamingStrategy configutration. So far, I've tested the processId and the processId-nodeId strategies. is there any other strategy natively available in nussknacker ?
-> Is it possible to implements custom consumerGroupNamingStrategies as part of the API ?
-> How can I use the overridenConsumerGroup property ? how does it work with the Java API (My sources class extends the KafkaSourceFactory class) ? I try to have an eye on the source code, but it's not easy for a Scala neophyte such as me

Thanks a lot, and best regards
--

Arek Burdach

unread,
Jul 22, 2021, 9:16:10 AM7/22/21
to mathieu...@diginext.fr, Nussknacker
Hi Mathieu,

Nice to hear you again.

I've left answers in the text:


On 22.07.2021 14:41, 'mathieu...@diginext.fr' via Nussknacker wrote:
Hi everyone,

I'm currently working with NK0.2.2, and I write my API in Java.

I would like to manage the Kafka consumer groupfor my processes, and I've got some questions on this subject :
-> I tried the consumerGroupNamingStrategy configutration. So far, I've tested the processId and the processId-nodeId strategies. is there any other strategy natively available in nussknacker ?
Unfortunately those two are the only options available.

 
-> Is it possible to implements custom consumerGroupNamingStrategies as part of the API ?
It's depend what you mean by the api? You can't just add some strategy next to those by writing some provider, but you can write own / extend existing sources by determining consumer group on your own.

-> How can I use the overridenConsumerGroup property ? how does it work with the Java API (My sources class extends the KafkaSourceFactory class) ? I try to have an eye on the source code, but it's not easy for a Scala neophyte such as me
You can write your own KafkaSourceFactory similar to existing one and add it to your ConfigCreator. During creation of KafkaSource in this factory, you should pass Some(consumerGroup) as an overriddenConsumerGroup - after that it should work for you. In KafkaSourceFactory.create method you have access to configuration and to parameters that are presented to the end user.

Cheers,
Arek

Thanks a lot, and best regards
--

--
You received this message because you are subscribed to the Google Groups "Nussknacker" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nussknacker...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/nussknacker/82996c48-031c-4253-9ef4-53a52f04d776n%40googlegroups.com.

Reply all
Reply to author
Forward
0 new messages