I'm using the KafkaClientService approach because my set of topics is dynamic so it does not fit the model where all topics are defined in application.yml and read via annotations.
The problem is the KafkaConsumer is always null. Here is my setup...
@ApplicationScoped
public class AppLifecycle {
@Inject
KafkaClientService clientService;
void onStartup(@Observes StartupEvent startupEvent) {
final List<String> topics = getTopics();
for (String topic : getTopics()) {
KafkaConsumer<String, String> consumer = clientService.getConsumer(topic);
consumer.runOnPollingThread(stringStringConsumer -> {
// do something with message
});
}
}
}
Which calls...the following in KafkaConnector...sources is empty...which results in the null KafkaConsumer.
public <K, V> KafkaConsumer<K, V> getConsumer(String channel) {
return (KafkaConsumer<K, V>) sources.stream()
.filter(ks -> ks.getChannel().equals(channel))
.map(KafkaSource::getConsumer)
.findFirst().orElse(null);
}
However all code paths that populate sources are never called. How do I configure things so that sources is configured?
-Dave