I have been looking at the current reactive messaging and the smallrye
0.10-SNAPSHOT version, and I don't see a discussion of how a bean that has multiple independent incoming channels is handled. For example, I have this test bean:
@ApplicationScoped
public class TestBean {
static final List<String> COLLECTOR = new ArrayList<>();
@Incoming("sink")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void sink(String input) {
System.out.printf("TestBean.incoming(sink), input=%s\n", input);
}
@Incoming("counter")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void counted(Integer input) {
System.out.printf("TestBean.incoming(counter), input=%s\n", input);
}
@Incoming("fibonacci")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void fibonacci(Integer input) {
System.out.printf("TestBean.incoming(fibonacci), input=%s\n", input);
}
}
where each of the sink, counter and fibonacci channels have independent sources. The "sink" channel source is a custom connector that pushes invocations on a bean to the channel, the "counter" channel source is a PublisherBuilder that counts to 100, and the "fibonacci" channel source is an infinite stream of the Fib sequence, but this blows up as soon as the F(n) is reached such that the integer sum overflows.
The behavior I see is that the counter channel is sourced first to completion, and then the "fibonacci" channel is sourced next, but blows up, and this ends the application as a io.smallrye.reactive.messaging.ProcessingException is thrown during initialization of the CDI container.
It seems that producer streams sourced from CDI bean methods are completed loaded during initialization of the CDI container. Control never returns to the application code that is initializing the CDI container. Is this expected?
Is there a way to enable concurrent processing of all of the channels that exist in an application rather than the serial processing I'm currently seeing?