Using EventBus with Processor

118 views
Skip to first unread message

Aviad Hadida

unread,
Jun 10, 2015, 6:40:34 AM6/10/15
to reactor-...@googlegroups.com
Hi,

I have looked over the docs and investigate this issue a lot.. There is also nice presentation that i saw: http://www.slideshare.net/StphaneMaldini/spring-one2gx-2014reactivestreams
I have several question which i have trouble to understand.. I am working in leading forex company that works with the big actor in banking world.. I already posted before a question i after reading and triying i have achieved a several things and i have some question:

The basic thing i am trying to do: Use the event bus with processor(To achieve several thread handling on the same message for several subscribers). Stephane and Jon directed me to this direction and use the RingBufferWorkProcessor
 which alone worked ok according to the example in docs.

What i don't know:

1. Practical question is how to i attach Processor to and eventBus event? for example: eventBus.on(Selectors.$("quotes"), processor);
1. What is the diffrence between eventBus and Streams? When i should use one or another? I am really trying to understand deeply the differences in order to make better choice when using the framework and my implementation  
2. Do i need to use diffrent/ several event bus/streams under one app? Or i can make the event handler to act diffrently in several cases and how

Thanks for your help guys.. Great job, Great framework!
Aviad

Jon Brisbin

unread,
Jun 10, 2015, 12:12:48 PM6/10/15
to Aviad Hadida, Reactor Framework
There’s not really a good answer for when to use EventBus over Stream other than the general guidance that EventBus is really meant to be a single, special kind of micro message bus that doesn’t easily support pipelines (by that I mean it’s not easy to send an event back to the EventBus under a different topic key). The EventBus was the primary component of Reactor 1.x because the Stream was just being introduced. By Reactor 2.0 and its implementation of Reactive Streams, the Stream has pretty well infected/taken over everything with just a few exceptions.

If you don’t have access to the “head” of a Stream onto which you want to publish values, you have the benefit of declaring only a single Consumer to handle that event (with no subsequent, dependent steps), and you need the flexibility of routing events to given Consumers based on possibly dynamic criteria, then an EventBus is probably still your best choice. If your processing can have multiple starting points, then the Selector is your friend. For the majority of other use cases, however, we’re finding that the Stream/Processor model actually works better and is more flexible. Rather than sending data to an EventBus, you would create an ah-hoc Stream using the input value as the starting point:

Streams.just(inputValue).flatMap(o -> explodeIntoAnotherStream(o)).consume(out -> handleOutputObject(out));

This uses lambdas efficiently to scope tasks to the input data rather than creating a long-lived Consumer singleton that is assigned to the EventBus under a given Selector registration. That may be good or bad depending on your throughput and memory pressure requirements. But it also introduces pipeline composition which the EventBus lacks. Being able to do retries, reduce, transform, and any number of combinatory or transformative operations is more flexible than the EventBus by its nature. These benefits may outweigh the efficiency of the EventBus in some circumstances. Note that this doesn’t require the lookup and caching of a Selector mapping. If you’re not careful, that can really burn you, especially if you’re publishing lots and lots of unique keys which will be cached.

I hope that gives you some idea of the differences between the two. As a general rule, I’d say: go with Stream for everything unless it’s clear you need the EventBus for a specific situation (the flexibility of Selectors and the efficiency of singleton Consumers).


Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Aviad Hadida

unread,
Jun 11, 2015, 8:24:46 AM6/11/15
to reactor-...@googlegroups.com, aviad...@gmail.com
Well Thanks a lot for the detailed answer.. It surely make sense.

Based on your answer i have another question:
1. This Processor is basically handle only one event type for different subscribers.. So he will be used for one event. For example: i will have Processor that will be for only for EUR/USD? I will need single processor for all prices? Or do you mean i will have one processor for moudle, for example one processor for all prices. and on this stream i will filter what i need and send to the place that will handle that?

2. Is there a chance this will be good practice? - I will use event bus and wrap eventbus.on with something like that:

private <T> void subscribeToEvent(Selector<String> selector, Consumer<Event<T>> handler, int numberOfProcessors) {
Processor<Event<T>, Event<T>> p = RingBufferWorkProcessor.create("test", 32);
Stream<Event<T>> stream = Streams.wrap(p);
for (int i = 0; i < numberOfProcessors; i++) {
stream.consume(handler);
}

eventBus.on(selector, new Consumer<Event<T>>() {

@Override
public void accept(Event<T> t) {
p.onNext(t);
}
});
}

In this case i will subscribe to event and ask that X processor will be the handlers. This is a general idea that i would like to use it.. But if i can't so maybe it won't be good for me.
In my case i need the event bus because every user subscribe with his user id and a few other things so i am surely need the dynamic for this kind of actions. 

In general.. If i use blocking actions maybe we can't use the event bus


Thanks a lot
Aviad
Reply all
Reply to author
Forward
0 new messages