Stream.broadcast() behavior

63 views
Skip to first unread message

Gesly George

unread,
Jul 10, 2015, 1:50:25 AM7/10/15
to reactor-...@googlegroups.com
Hi,

The following code is a slightly modified version (added calls to log()) of the broadcast example from the docs.


       
Stream<String> stream = Streams.just("a", "b", "c", "d", "e", "f", "g", "h");

       
//prepare a shared pipeline
       
Stream<String> sharedStream = stream.broadcast().log("SharedStream");

       
//prepare two unique pipelines
       
Stream<Long> actionChain2 = sharedStream.observe(LOG::info).count().log("actionChain2");

       
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C") || w.equals("B")).log("actionChain1");

        actionChain2
.consume(e -> LOG.info(String.valueOf(e))); //start chain2
        actionChain1
.consume(LOG::info); //start chain1


The log output of the two consumers was not what I had expected based on the figure in the doc. As you can see in the output below, actionChain1 did out log anything, as actionChain2 sent a cancel to the underlying Broadcaster. Was this what the example was trying to demonstrate - once you call broadcast, you get a shared stream so any consumer's suscribe-request from any consumer affects all consumers?

18:57:34.867 [main] INFO  actionChain2 - subscribe: ConsumerAction
18:57:34.871 [main] INFO  SharedStream - subscribe: CallbackAction
18:57:34.871 [main] INFO  SharedStream - onSubscribe: {push}
18:57:34.871 [main] INFO  actionChain2 - onSubscribe: {push}
18:57:34.872 [main] INFO  actionChain2 - request: 9223372036854775807
18:57:34.872 [main] INFO  SharedStream - request: 9223372036854775807
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: a
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - a
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: b
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - b
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: c
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - c
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: d
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - d
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: e
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - e
18:57:34.873 [main] INFO  actionChain1 - subscribe: ConsumerAction
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: f
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - f
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: g
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - g
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - onNext: h
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - h
18:57:34.873 [dispatcherGroup-1] INFO  SharedStream - complete: Broadcaster
18:57:34.873 [dispatcherGroup-1] INFO  actionChain2 - onNext: 8
18:57:34.873 [main] INFO  SharedStream - subscribe: MapAction
18:57:34.873 [dispatcherGroup-1] INFO  ReactorTest - 8
18:57:34.874 [dispatcherGroup-1] INFO  actionChain2 - complete: CountAction
18:57:34.874 [dispatcherGroup-1] INFO  actionChain2 - cancel: CountAction
18:57:34.874 [dispatcherGroup-1] INFO  SharedStream - cancel: Broadcaster
18:57:34.875 [main] INFO  SharedStream - onSubscribe: {push}
18:57:34.875 [main] INFO  actionChain1 - onSubscribe: {push}
18:57:34.875 [main] INFO  actionChain1 - request: 9223372036854775807
18:57:34.875 [main] INFO  SharedStream - request: 9223372036854775807


Stephane Maldini

unread,
Jul 10, 2015, 5:03:32 AM7/10/15
to Gesly George, reactor-framework
This is a limitation of the current broadcast. It starts with the first subscribe so chances are for a small source like this it will finish before broadcasting to the second one.  A workaround is to create a simple broadcaster and attach it after to the upstream source:

 Stream<String> stream = Streams.just("a", "b", "c", "d", "e", "f", "g", "h");
        
//prepare a shared pipeline
        Broadcaster<String> broadcaster = Broadcaster.create();
        
Stream<String> sharedStream = broadcaster.log("SharedStream");

        
//prepare two unique pipelines
        
Stream<Long> actionChain2 = sharedStream.observe(LOG::info).count().log("actionChain2");
        
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(-> w.equals("C") || w.equals("B")).log("actionChain1");
        actionChain2
.consume(-> LOG.info(String.valueOf(e))); //start chain2
        actionChain1
.consume(LOG::info); //start chain1

stream.subscribe(broadcaster);


--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Solutions Architect, CSO EMEA | London | Pivotal

Gesly George

unread,
Jul 10, 2015, 11:02:00 AM7/10/15
to reactor-...@googlegroups.com, gesly....@gmail.com
I see, once you do this, actionChain2 will receive some data before the cancel on the stream kicks in - actionchain2 counts 3 even though its takes 5.

Now, see the following code. Note that I am attaching a broadcaster after to the upstream. Also actionChain2 kicks off before actionChain1 and it only signal 3 elements [take{3)]. I got a ConcurrentModificationException



       
Stream<String> stream = Streams.just("a", "b", "c", "d", "e", "f", "g", "h");

       
//prepare a shared pipeline
//        Stream<String> sharedStream = stream.broadcast().log("Broadcast");
       
final Broadcaster<String> stringBroadcaster = Broadcaster.create();
       
Stream<String> sharedStream = stringBroadcaster;


       
//prepare two unique pipelines
       
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C")).log("ActionChain1");
       
Stream<Long> actionChain2 = sharedStream.take(3).observe(LOG::info).count().log("ActionChain2");

       
actionChain2.consume(e -> {
            LOG
.info("Printing ActionChain2 -> " + e);
       
});
//start chain2

        actionChain1
.consume(e -> LOG.info("Printing ActionChain1 -> " + e)); //start chain1


        stream
.subscribe(stringBroadcaster);


Output

07:52:51.678 [main] INFO  ActionChain2 - subscribe: ConsumerAction
07:52:51.684 [main] INFO  ActionChain2 - onSubscribe: {push}
07:52:51.687 [main] INFO  ActionChain2 - request: 9223372036854775807
07:52:51.687 [main] INFO  ActionChain1 - subscribe: ConsumerAction
07:52:51.688 [main] INFO  ActionChain1 - onSubscribe: {push}
07:52:51.689 [main] INFO  ActionChain1 - request: 9223372036854775807
07:52:51.690 [main] INFO  reactor.SimpleReactorExample - a
07:52:51.690 [main] INFO  reactor.SimpleReactorExample - b
07:52:51.690 [main] INFO  reactor.SimpleReactorExample - c
07:52:51.690 [main] INFO  ActionChain2 - onNext: 3
07:52:51.690 [main] INFO  reactor.SimpleReactorExample - Printing ActionChain2 -> 3
07:52:51.690 [main] INFO  ActionChain2 - complete: CountAction
07:52:51.691 [main] INFO  ActionChain2 - cancel: CountAction
07:52:51.691 [main] INFO  ActionChain1 - onNext: C
07:52:51.691 [main] INFO  reactor.SimpleReactorExample - Printing ActionChain1 -> C
07:52:51.707 [main] ERROR ActionChain1 - onError:
|____IterableStream[iterable=[a, b, c, d, e, f, g, h]]
|____Broadcaster[{{push}}]
|   |  
|   |____Map[{{push}}]
|   |____Filter[{{push}}]
|   |____Logger[{{push}}{logger=ActionChain1}]
|   |____Consumer[{{push}}{pending=0}]

java
.util.ConcurrentModificationException: null
    at java
.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[na:1.8.0_45]
    at java
.util.ArrayList$Itr.next(ArrayList.java:851) ~[na:1.8.0_45]
    at reactor
.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:135) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.requestUpstream(Broadcaster.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestUpstream(Action.java:572) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.filter.FilterAction.doNext(FilterAction.java:45) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:67) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:137) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onSubscribe(Broadcaster.java:159) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream.subscribe(IterableStream.java:64) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.SimpleReactorExample.testReactorPubSubDocExample(SimpleReactorExample.java:113) [main/:na]
    at reactor
.SimpleReactorExample.main(SimpleReactorExample.java:27) [main/:na]
    at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
    at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
    at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
    at java
.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
    at com
.intellij.rt.execution.application.AppMain.main(AppMain.java:140) [idea_rt.jar:na]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: b
    at reactor
.core.support.Exceptions.addValueAsLastCause(Exceptions.java:127) ~[reactor-core-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:271) [reactor-stream-2.0.3.RELEASE.jar:na]
   
... 40 common frames omitted
07:52:51.712 [main] INFO  ActionChain1 - cancel: FilterAction
07:52:51.713 [main] ERROR reactor.environment -
java
.util.ConcurrentModificationException: null
    at java
.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[na:1.8.0_45]
    at java
.util.ArrayList$Itr.next(ArrayList.java:851) ~[na:1.8.0_45]
    at reactor
.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:135) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.requestUpstream(Broadcaster.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestUpstream(Action.java:572) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.filter.FilterAction.doNext(FilterAction.java:45) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:67) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:137) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.broadcast.Broadcaster.onSubscribe(Broadcaster.java:159) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.rx.stream.IterableStream.subscribe(IterableStream.java:64) [reactor-stream-2.0.3.RELEASE.jar:na]
    at reactor
.SimpleReactorExample.testReactorPubSubDocExample(SimpleReactorExample.java:113) [main/:na]
    at reactor
.SimpleReactorExample.main(SimpleReactorExample.java:27) [main/:na]
    at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
    at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
    at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
    at java
.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
    at com
.intellij.rt.execution.application.AppMain.main(AppMain.java:140) [idea_rt.jar:na]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: b
    at reactor
.core.support.Exceptions.addValueAsLastCause(Exceptions.java:127) ~[reactor-core-2.0.3.RELEASE.jar:na]
    at reactor
.rx.action.Action.broadcastNext(Action.java:271) [reactor-stream-2.0.3.RELEASE.jar:na]
   
... 40 common frames omitted

...

Stephane Maldini

unread,
Jul 10, 2015, 11:06:33 AM7/10/15
to Gesly George, reactor-framework
That's a critical blocker for a next version, just in case check if that still arises in 2.0.4.RELEASE. 

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages