How to use Quarkus Kafka client via quarkus-smallrye-reactive-messaging-kafka?

2,027 views
Skip to first unread message

David Hoffer

unread,
Sep 23, 2021, 3:39:04 PM9/23/21
to Quarkus Development mailing list
How to use the Quarkus Kafka client where my list of Kafka topics is dynamic?  I'm following the guides here: https://quarkus.io/guides/kafka

I'm using the KafkaClientService approach because my set of topics is dynamic so it does not fit the model where all topics are defined in application.yml and read via annotations.

The problem is the KafkaConsumer is always null.  Here is my setup...

@ApplicationScoped
public class AppLifecycle {

@Inject
KafkaClientService clientService;

void onStartup(@Observes StartupEvent startupEvent) {

final List<String> topics = getTopics();

for (String topic : getTopics()) {
KafkaConsumer<String, String> consumer = clientService.getConsumer(topic);
consumer.runOnPollingThread(stringStringConsumer -> {
// do something with message
});
}
}
}

Which calls...the following in KafkaConnector...sources is empty...which results in the null KafkaConsumer.

public <K, V> KafkaConsumer<K, V> getConsumer(String channel) {
        return (KafkaConsumer<K, V>) sources.stream()
                .filter(ks -> ks.getChannel().equals(channel))
                .map(KafkaSource::getConsumer)
                .findFirst().orElse(null);
    }

However all code paths that populate sources are never called.  How do I configure things so that sources is configured?

-Dave

David Hoffer

unread,
Sep 23, 2021, 6:30:55 PM9/23/21
to Quarkus Development mailing list
I think one of the problems with sources not getting populated is that I don't have a mp.messaging.incoming.XXX.connector=smallrye-kafka configured.  Where XXX is a statically defined topic.

And that is because I am using the dynamic topics approach where the application.yml doesn't know what topics I have.  Seems Quarkus has a chicken-and-egg problem here?  They say you can use KafkaClientService to create the consumer for each topic dynamically but you still have to statically define them all?  None of this makes sense to me.  Is it possible to use the dynamic topic name approach or not?

Also I find the KafkaClient docs extremely hard to follow as it relates to what properties need to be set for this to work we have mp, kafka prefixes. And in section '20.1. Incoming channel configuration (polling from Kafka)' in the https://quarkus.io/guides/kafka#configuring-smallrye-kafka-connector docs they don't even say what the fully qualified Quarkus property is, instead they call it an alias, what is that? Can someone clarify what the full property name is?

-Dave

Ladislav Thon

unread,
Sep 24, 2021, 4:17:43 AM9/24/21
to David Hoffer, Quarkus Development mailing list
The `KafkaClientService.getConsumer` method doesn't accept a topic name. It accepts a _channel_ name, and the set of channels is statically configured in your application.{properties,yaml}.

There are some ways how to make SmallRye Reactive Messaging a bit more dynamic (from top of my head, I remember that on the producer side, you can configure one channel and set the target topic through message metadata; not sure if there's something similar on consumer side), but you can always just use the Apache Kafka client directly (`io.quarkus:quarkus-kafka-client`) and just ignore SmallRye Reactive Messaging. Not sure if that's the best thing to do, but it's possible.

LT

Dne pá 24. 9. 2021 0:30 uživatel David Hoffer <dhof...@gmail.com> napsal:
--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/7795e42c-9cdb-4374-a83b-db4e64113b45n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages