Initialization of channel streams and concurrency

17 views
Skip to first unread message

Scott Stark

unread,
Jun 25, 2019, 1:18:44 AM6/25/19
to Eclipse MicroProfile
I have been looking at the current reactive messaging and the smallrye 0.10-SNAPSHOT version, and I don't see a discussion of how a bean that has multiple independent incoming channels is handled. For example, I have this test bean:

@ApplicationScoped
public class TestBean {
static final List<String> COLLECTOR = new ArrayList<>();

@Incoming("sink")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void sink(String input) {
System.out.printf("TestBean.incoming(sink), input=%s\n", input);
}

@Incoming("counter")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void counted(Integer input) {
System.out.printf("TestBean.incoming(counter), input=%s\n", input);
}

@Incoming("fibonacci")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void fibonacci(Integer input) {
System.out.printf("TestBean.incoming(fibonacci), input=%s\n", input);
}

}

where each of the sink, counter and fibonacci channels have independent sources. The "sink" channel source is a custom connector that pushes invocations on a bean to the channel, the "counter" channel source is a PublisherBuilder that counts to 100, and the "fibonacci"  channel source is an infinite stream of the Fib sequence, but this blows up as soon as the F(n) is reached such that the integer sum overflows.

The behavior I see is that the counter channel is sourced first to completion, and then the "fibonacci" channel is sourced next, but blows up, and this ends the application as a io.smallrye.reactive.messaging.ProcessingException is thrown during initialization of the CDI container.

It seems that producer streams sourced from CDI bean methods are completed loaded during initialization of the CDI container. Control never returns to the application code that is initializing the CDI container. Is this expected?

Is there a way to enable concurrent processing of all of the channels that exist in an application rather than the serial processing I'm currently seeing?

clement escoffier

unread,
Jun 25, 2019, 3:32:48 AM6/25/19
to Eclipse MicroProfile
Hello,

I need to check, but I believe it’s because your sources are synchronous. 

So the first source (let’s say sink) starts, got subscriptions and immediately emits an item. The item is immediately processed by the “sink” method which is also synchronous. You should be able to “break” this synchronous cycle by emitting the items on different threads. 

Clement

--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/microprofile/0d3ed6c1-4ffe-4a89-8978-7073c7518002%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Scott Stark

unread,
Jun 25, 2019, 3:01:59 PM6/25/19
to Eclipse MicroProfile
So likely it is my lack of familiarity with the Reactive Streams apis on which RM depends, but there is a disconnect for me as writing a RM producer, depending on what APIs I'm using, I don't see the subscriptions/subscriber calls. Take for example this "counter" channel producer:

@ApplicationScoped
public class CounterSource {
@Outgoing("counter")
public PublisherBuilder<Integer> nextCount() {
final AtomicInteger count = new AtomicInteger(0);
Supplier<Integer> counter = () -> {
try {
Thread.sleep(100);
} catch (Exception e) {
}
return count.incrementAndGet();
};
PublisherBuilder<Integer> publisherBuilder = ReactiveStreams.generate(counter).limit(100);
return publisherBuilder;
}
}

So, I don't know if this is asynchronous or not. Is that something that is obvious to someone who knows RS?

clement escoffier

unread,
Jun 26, 2019, 4:17:21 AM6/26/19
to Eclipse MicroProfile

On 25 Jun 2019, at 21:01, Scott Stark <sst...@redhat.com> wrote:

So likely it is my lack of familiarity with the Reactive Streams apis on which RM depends, but there is a disconnect for me as writing a RM producer, depending on what APIs I'm using, I don't see the subscriptions/subscriber calls. Take for example this "counter" channel producer:

@ApplicationScoped
public class CounterSource {
@Outgoing("counter")
public PublisherBuilder<Integer> nextCount() {
final AtomicInteger count = new AtomicInteger(0);
Supplier<Integer> counter = () -> {
try {
Thread.sleep(100);
} catch (Exception e) {
}
return count.incrementAndGet();
};
PublisherBuilder<Integer> publisherBuilder = ReactiveStreams.generate(counter).limit(100);
return publisherBuilder;
}
}

So, I don't know if this is asynchronous or not. Is that something that is obvious to someone who knows RS?

It’s not async and it’s actually blocking (Thread.sleep). 
The supplier is called in the caller thread, and block before every “get”. 
An async version not blocking the caller thread would use a thread poll with a single thread and delay the emission every 100. 

Alternatively, you can use something like:

ReativeStreams.fromPublisher(Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureDrop()
.map(x -> count.incrementAndGet())).limit(100);

Clement





On Tuesday, June 25, 2019 at 12:32:48 AM UTC-7, clement escoffier wrote:
Hello,

I need to check, but I believe it’s because your sources are synchronous. 

So the first source (let’s say sink) starts, got subscriptions and immediately emits an item. The item is immediately processed by the “sink” method which is also synchronous. You should be able to “break” this synchronous cycle by emitting the items on different threads. 

Clement


--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages