Reactive messaging with Kafka

152 views
Skip to first unread message

Maciej Swiderski

unread,
Jul 11, 2019, 11:12:32 AM7/11/19
to Quarkus Development mailing list
Hi,

I have been working with reactive messaging + Kafka. Overall works very good and provides really easy way to work with Kafka :)
Though I have few questions:

1. To use custom types for payload for both Kafka consumer and producer we can use Kafka client api to serialise to JSON. Which is completely fine and works as expected but to make native image for such app you need to make 
- not only deserialiser that extends JsonbDeserializer the but also serialiser JsonbSerializer as otherwise it fails at runtime with class not found for JsonbSerializer
  - serialiser and deserialiser must be marked as @RegisteredForReflection otherwise they will be excluded, even though they are defined in application.properties. Is this expected? Just wondering if they are really needed for reflection….

2. Is there already support for Kafka triggered hot reload? I recall seeing some discussion about it but can’t find the current state.

3. I ran into an issue on shutdown of the native image process that manifests with this exception

2019-07-11 11:55:05,711 WARN  [io.qua.arc] (main) An error occured during delivery of the @BeforeDestroyed(ApplicationScoped.class) event: java.util.concurrent.RejectedExecutionException: Task io.vertx.core.impl.TaskQueue$$Lambda$76ad1f053e61859eedb59b3e5ac366287a291713@114bab6b8 rejected from java.util.concurrent.ThreadPoolExecutor@110bfd260[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at io.vertx.core.impl.TaskQueue.execute(TaskQueue.java:93)
at io.vertx.core.impl.ContextImpl.executeBlocking(ContextImpl.java:288)
at io.vertx.core.impl.ContextImpl.executeBlocking(ContextImpl.java:242)
at io.vertx.core.impl.EventLoopContext.executeBlocking(EventLoopContext.java:23)
at io.vertx.core.impl.ContextImpl.executeBlocking(ContextImpl.java:247)
at io.vertx.core.impl.EventLoopContext.executeBlocking(EventLoopContext.java:23)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.close(KafkaWriteStreamImpl.java:225)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.close(KafkaWriteStreamImpl.java:220)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.close(KafkaWriteStreamImpl.java:215)
at io.smallrye.reactive.messaging.kafka.KafkaSink.close(KafkaSink.java:146)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.terminate(KafkaConnector.java:45)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Observer_terminate_01ca61423b802db2b2ec45ffe53148a5efa33e27.notify(KafkaConnector_Observer_terminate_01ca61423b802db2b2ec45ffe53148a5efa33e27.zig:125)
at io.quarkus.arc.EventImpl$Notifier.notify(EventImpl.java:228)
at io.quarkus.arc.EventImpl$Notifier.notify(EventImpl.java:218)
at io.quarkus.arc.ArcContainerImpl.shutdown(ArcContainerImpl.java:263)
at io.quarkus.arc.Arc.shutdown(Arc.java:37)
at io.quarkus.arc.runtime.ArcDeploymentTemplate$1.run(ArcDeploymentTemplate.java:37)
at io.quarkus.runtime.StartupContext.close(StartupContext.java:43)
at io.quarkus.runner.ApplicationImpl1.doStop(ApplicationImpl1.zig:140)
at io.quarkus.runtime.Application.stop(Application.java:151)
at io.quarkus.runtime.Application.run(Application.java:203)
at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:34)

Thanks
Maciej

clement escoffier

unread,
Jul 12, 2019, 10:18:08 AM7/12/19
to swidersk...@gmail.com, Quarkus Development mailing list
Hello,


On 11 Jul 2019, at 17:12, Maciej Swiderski <swidersk...@gmail.com> wrote:

Hi,

I have been working with reactive messaging + Kafka. Overall works very good and provides really easy way to work with Kafka :)
Though I have few questions:

1. To use custom types for payload for both Kafka consumer and producer we can use Kafka client api to serialise to JSON. Which is completely fine and works as expected but to make native image for such app you need to make 
- not only deserialiser that extends JsonbDeserializer the but also serialiser JsonbSerializer as otherwise it fails at runtime with class not found for JsonbSerializer
  - serialiser and deserialiser must be marked as @RegisteredForReflection otherwise they will be excluded, even though they are defined in application.properties. Is this expected? Just wondering if they are really needed for reflection….

I’m working on a PR that would avoid it and you would be able to use the business object directly. 
Unfortunately, my priority list got reshuffled lately, hopefully it will make the next release.


2. Is there already support for Kafka triggered hot reload? I recall seeing some discussion about it but can’t find the current state.

You mean when receiving a Kafka record? No not yet. Can you open an issue?
That looks like a bug…. Can you open an issue with this stack trace on https://github.com/smallrye/smallrye-reactive-messaging.

Clement


Thanks
Maciej

--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
Visit this group at https://groups.google.com/group/quarkus-dev.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/5F5FFC95-3C70-4756-8191-13847D63D540%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Maciej Swiderski

unread,
Jul 13, 2019, 10:35:27 PM7/13/19
to clement....@gmail.com, Quarkus Development mailing list
Clement,

On 13 Jul 2019, at 00:17, clement escoffier <clement....@gmail.com> wrote:

Hello,


On 11 Jul 2019, at 17:12, Maciej Swiderski <swidersk...@gmail.com> wrote:

Hi,

I have been working with reactive messaging + Kafka. Overall works very good and provides really easy way to work with Kafka :)
Though I have few questions:

1. To use custom types for payload for both Kafka consumer and producer we can use Kafka client api to serialise to JSON. Which is completely fine and works as expected but to make native image for such app you need to make 
- not only deserialiser that extends JsonbDeserializer the but also serialiser JsonbSerializer as otherwise it fails at runtime with class not found for JsonbSerializer
  - serialiser and deserialiser must be marked as @RegisteredForReflection otherwise they will be excluded, even though they are defined in application.properties. Is this expected? Just wondering if they are really needed for reflection….

I’m working on a PR that would avoid it and you would be able to use the business object directly. 
Unfortunately, my priority list got reshuffled lately, hopefully it will make the next release.
Great to hear it’s coming



2. Is there already support for Kafka triggered hot reload? I recall seeing some discussion about it but can’t find the current state.

You mean when receiving a Kafka record? No not yet. Can you open an issue?
Yes, that’s exactly what I meant, issue created https://github.com/quarkusio/quarkus/issues/3217

Clement


Thanks
Maciej

--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
Visit this group at https://groups.google.com/group/quarkus-dev.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/5F5FFC95-3C70-4756-8191-13847D63D540%40gmail.com.
For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
Visit this group at https://groups.google.com/group/quarkus-dev.
Reply all
Reply to author
Forward
0 new messages