Hi All,
I hope this is the right forum where talk about smallrye reactive messaging.
I wrote a sample where try to understand how to produce messages continuosly.
So I have a method that produces continuosly messages:
@Outgoing("from-producer-to-processor")
public Multi<ClassA> periodicallySendMessage() {
return Multi.createFrom()
.ticks()
.every(Duration.ofMillis(50))
.onItem().transform(t -> new ClassA("Hello " + t))
.onFailure(mm -> {
log.info("Producer NOT EMITTING");
return true;
})
.retry()
.withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.indefinitely()
.onItem()
.invoke(msg ->
log.info("Producer emitting " + msg));
}
What I realized is that if a failure happens, for example if the method that consumes message through the @Incoming annotation is slow, the failure is rised but the ticks starts again from 1, am I right?
I have also noticed that the failure is raised only when a considerable amount of messages are sent (at least 256). I suppose this is the default behaviour, given that the default buffer size is 256. But even trying to change the buffer size, I was unable to control the number of messages sent, before a failure (caused by a slow consumer) is raised.
Best regards,
Vincenzo