Stream<String> stream = Streams.just("a", "b", "c", "d", "e", "f", "g", "h");
//prepare a shared pipeline
Stream<String> sharedStream = stream.broadcast().log("SharedStream");
//prepare two unique pipelines
Stream<Long> actionChain2 = sharedStream.observe(LOG::info).count().log("actionChain2");
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C") || w.equals("B")).log("actionChain1");
actionChain2.consume(e -> LOG.info(String.valueOf(e))); //start chain2
actionChain1.consume(LOG::info); //start chain1
18:57:34.867 [main] INFO actionChain2 - subscribe: ConsumerAction
18:57:34.871 [main] INFO SharedStream - subscribe: CallbackAction
18:57:34.871 [main] INFO SharedStream - onSubscribe: {push}
18:57:34.871 [main] INFO actionChain2 - onSubscribe: {push}
18:57:34.872 [main] INFO actionChain2 - request: 9223372036854775807
18:57:34.872 [main] INFO SharedStream - request: 9223372036854775807
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: a
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - a
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: b
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - b
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: c
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - c
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: d
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - d
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: e
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - e
18:57:34.873 [main] INFO actionChain1 - subscribe: ConsumerAction
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: f
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - f
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: g
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - g
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - onNext: h
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - h
18:57:34.873 [dispatcherGroup-1] INFO SharedStream - complete: Broadcaster
18:57:34.873 [dispatcherGroup-1] INFO actionChain2 - onNext: 8
18:57:34.873 [main] INFO SharedStream - subscribe: MapAction
18:57:34.873 [dispatcherGroup-1] INFO ReactorTest - 8
18:57:34.874 [dispatcherGroup-1] INFO actionChain2 - complete: CountAction
18:57:34.874 [dispatcherGroup-1] INFO actionChain2 - cancel: CountAction
18:57:34.874 [dispatcherGroup-1] INFO SharedStream - cancel: Broadcaster
18:57:34.875 [main] INFO SharedStream - onSubscribe: {push}
18:57:34.875 [main] INFO actionChain1 - onSubscribe: {push}
18:57:34.875 [main] INFO actionChain1 - request: 9223372036854775807
18:57:34.875 [main] INFO SharedStream - request: 9223372036854775807
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Stream<String> stream = Streams.just("a", "b", "c", "d", "e", "f", "g", "h");
//prepare a shared pipeline
// Stream<String> sharedStream = stream.broadcast().log("Broadcast");
final Broadcaster<String> stringBroadcaster = Broadcaster.create();
Stream<String> sharedStream = stringBroadcaster;
//prepare two unique pipelines
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C")).log("ActionChain1");
Stream<Long> actionChain2 = sharedStream.take(3).observe(LOG::info).count().log("ActionChain2");
actionChain2.consume(e -> {
LOG.info("Printing ActionChain2 -> " + e);
}); //start chain2
actionChain1.consume(e -> LOG.info("Printing ActionChain1 -> " + e)); //start chain1
stream.subscribe(stringBroadcaster);
07:52:51.678 [main] INFO ActionChain2 - subscribe: ConsumerAction
07:52:51.684 [main] INFO ActionChain2 - onSubscribe: {push}
07:52:51.687 [main] INFO ActionChain2 - request: 9223372036854775807
07:52:51.687 [main] INFO ActionChain1 - subscribe: ConsumerAction
07:52:51.688 [main] INFO ActionChain1 - onSubscribe: {push}
07:52:51.689 [main] INFO ActionChain1 - request: 9223372036854775807
07:52:51.690 [main] INFO reactor.SimpleReactorExample - a
07:52:51.690 [main] INFO reactor.SimpleReactorExample - b
07:52:51.690 [main] INFO reactor.SimpleReactorExample - c
07:52:51.690 [main] INFO ActionChain2 - onNext: 3
07:52:51.690 [main] INFO reactor.SimpleReactorExample - Printing ActionChain2 -> 3
07:52:51.690 [main] INFO ActionChain2 - complete: CountAction
07:52:51.691 [main] INFO ActionChain2 - cancel: CountAction
07:52:51.691 [main] INFO ActionChain1 - onNext: C
07:52:51.691 [main] INFO reactor.SimpleReactorExample - Printing ActionChain1 -> C
07:52:51.707 [main] ERROR ActionChain1 - onError:
|____IterableStream[iterable=[a, b, c, d, e, f, g, h]]
|____Broadcaster[{{push}}]
| |
| |____Map[{{push}}]
| |____Filter[{{push}}]
| |____Logger[{{push}}{logger=ActionChain1}]
| |____Consumer[{{push}}{pending=0}]
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[na:1.8.0_45]
at java.util.ArrayList$Itr.next(ArrayList.java:851) ~[na:1.8.0_45]
at reactor.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:135) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.requestUpstream(Broadcaster.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestUpstream(Action.java:572) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.filter.FilterAction.doNext(FilterAction.java:45) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:67) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:137) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onSubscribe(Broadcaster.java:159) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream.subscribe(IterableStream.java:64) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.SimpleReactorExample.testReactorPubSubDocExample(SimpleReactorExample.java:113) [main/:na]
at reactor.SimpleReactorExample.main(SimpleReactorExample.java:27) [main/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) [idea_rt.jar:na]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: b
at reactor.core.support.Exceptions.addValueAsLastCause(Exceptions.java:127) ~[reactor-core-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:271) [reactor-stream-2.0.3.RELEASE.jar:na]
... 40 common frames omitted
07:52:51.712 [main] INFO ActionChain1 - cancel: FilterAction
07:52:51.713 [main] ERROR reactor.environment -
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[na:1.8.0_45]
at java.util.ArrayList$Itr.next(ArrayList.java:851) ~[na:1.8.0_45]
at reactor.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:135) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.requestUpstream(Broadcaster.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestUpstream(Action.java:572) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action$7.onRequest(Action.java:564) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.request(PushSubscription.java:84) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.requestMore(Action.java:619) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.filter.FilterAction.doNext(FilterAction.java:45) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.transformation.MapAction.doNext(MapAction.java:39) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:67) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription$2.accept(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.forEach(FanOutSubscription.java:137) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.FanOutSubscription.onNext(FanOutSubscription.java:63) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:267) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.doNext(Broadcaster.java:136) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.onNext(Action.java:202) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onNext(Broadcaster.java:147) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.subscription.PushSubscription.onNext(PushSubscription.java:111) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream$1.request(IterableStream.java:72) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.broadcast.Broadcaster.onSubscribe(Broadcaster.java:159) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.rx.stream.IterableStream.subscribe(IterableStream.java:64) [reactor-stream-2.0.3.RELEASE.jar:na]
at reactor.SimpleReactorExample.testReactorPubSubDocExample(SimpleReactorExample.java:113) [main/:na]
at reactor.SimpleReactorExample.main(SimpleReactorExample.java:27) [main/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) [idea_rt.jar:na]
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: b
at reactor.core.support.Exceptions.addValueAsLastCause(Exceptions.java:127) ~[reactor-core-2.0.3.RELEASE.jar:na]
at reactor.rx.action.Action.broadcastNext(Action.java:271) [reactor-stream-2.0.3.RELEASE.jar:na]
... 40 common frames omitted
...
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.