Hello,
I am trying to process messages in a blocking manner using a combination of @Blocking and @Channel Emitter. Processing works fine for approximately couple of hours and then suddenly the Emitter.send starts throwing "Insufficient downstream requests to emit item".
It looks normal, because of the backpressure. But the strange thing is that the Emitter no longer accepts messages, seems like it stopped publishing to the downstream channel. No logs produced whatsoever.
I'd like to know what I did wrong in the setup and how to properly implement blocking code with SmallRye Reactive. Here's a overview of how this is implemented:
@Blocking
@Incoming("orders")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public void consume(Object payload) {
DomainModel order = parse(payload);
// blocking call to the DB
repository.save(order);
// we add metadata attrobutes
Message<DomainModel> downstreamMessage = generateDownstreamMessage(order);
emitter.send(downstreamMessage);
}
Kind regards