Want to use a several thread consumer when event is recevied

113 views
Skip to first unread message

Aviad Hadida

unread,
Jun 2, 2015, 9:47:41 AM6/2/15
to reactor-...@googlegroups.com
Hi.

I am in the middle of Refactoring project to a forex company that works with several serious banks all across the globe.. I really need help here.
I also asked this question on the github but i couldn't get it done and there are several things i don't understand.
I wanted to use RING_BUFFER but i saw that there is one thread that handle event.. In case of blocking actions or in case i wanted that consumers will be handled by their own thread.

There are several question..

1. In github one of the guys told me to use: RingBufferWorkProcessor - He even sent an example but i don't sure why the syntax is that way. I have started first with version 1.X and i changed the pacakged (EventBus instead of Reactor and a few more). Now i would like to understand how can i do that with the example from site of Spring Reactor.

2. What is the diffrent between Stream or what called Reactive Stream and the regular notify & on. Maybe i am trying to use something wrong

In general i need this event bus for 2 reasons.. One is to update memory in case something has been changed for login user(Update session after change from another module. The second thing is to stream prices into netty channel which is connected to users.. I need really high thruoput and consider saving on the order.. Not sure if this will be critical

Please advise

Thanks
Aviad

Jon Brisbin

unread,
Jun 2, 2015, 10:17:06 AM6/2/15
to Aviad Hadida, Reactor Framework
Just a couple clarifications and then some comments…


I am in the middle of Refactoring project to a forex company that works with several serious banks all across the globe.. I really need help here.
I also asked this question on the github but i couldn't get it done and there are several things i don't understand.
I wanted to use RING_BUFFER but i saw that there is one thread that handle event.. In case of blocking actions or in case i wanted that consumers will be handled by their own thread.


From the standard configuration, Reactor provides a single-threaded **shared** RingBufferDispatcher you can assign to an EventBus. It’s the highest throughput because there’s never any context-switching for tasks executed on it. That’s a standard pattern in the Disruptor world to use a single RingBuffer as an input. But it’s not a universal use-case and obviously not every application can deal with only a single thread for things.

There are also RingBufferDispatchers that are “pooled” (accessed by calling Environment.cachedDispatcher()). These are the same as the single-threaded RingBufferDispatcher except there are (by default configuration) 1 per CPU of these. Each time you call Environment.cachedDispatcher() you get a round-robin’d one from the pool. This allows you to distribute load across your CPU but still get the advantages of using a RingBuffer. It’s a good compromise between an “old school” thread pool and the high throughput RingBuffer.



There are several question..

1. In github one of the guys told me to use: RingBufferWorkProcessor - He even sent an example but i don't sure why the syntax is that way. I have started first with version 1.X and i changed the pacakged (EventBus instead of Reactor and a few more). Now i would like to understand how can i do that with the example from site of Spring Reactor.


The Getting Started guides are being updated to Reactor 2.0. Until those have been fully updated, you should rely on the docs in the reference guide:




2. What is the diffrent between Stream or what called Reactive Stream and the regular notify & on. Maybe i am trying to use something wrong


The EventBus [1] is a specific component that is a bit like a message broker (think RabbitMQ et al). You “subscribe” to topics by placing a Selector + Consumer combo using the .on() method and then “publish” data to topics via the String key + Event data using .notify(). It is **strictly** pub/sub behavior and therefore its usefulness is limited to that use case. If your use case is a little more complicated than plain pub/sub you will run into problems trying to use it effectively. That’s why we always encourage using a Stream. 90% of the time people’s needs are quite beyond simply pub/sub (or they’re a combination of workflow processing and pub/sub).

The Reactive Streams **Specification** [2] is just a document describing how implementations should work. There are only 4 interfaces in the Reactive Streams artifact dependency. You must have an implementation of that standard to do anything meaningful. Reactor’s Stream subclasses are implementations of this standard.

It is possible to mix the “work queue” type use case with a stream processing type use case by using the RingBufferWorkProcessor [3], which Stephane mentioned on the GH issue. It is a little bit more advanced component but is the most flexible way to do a work queue. That said, it does come with the trade-off that each Subscriber gets its own processing thread. This is where the huge throughput efficiencies come from. You don’t want to create any more of these than you have to. You would typically create one as a “sink” for events, then subscribe to it the number of times you want to have threads consuming events. Using the Processor allows you to create a Stream from it and compose further actions in a way that’s easier to do than by using the fairly coarse-grained functionality of the EventBus. To publish data into a Processor, you can simply call the onNext(T) method. You can also set up a Broadcaster, which is just a Stream that allows you to publish data into it directly.



In general i need this event bus for 2 reasons.. One is to update memory in case something has been changed for login user(Update session after change from another module. The second thing is to stream prices into netty channel which is connected to users.. I need really high thruoput and consider saving on the order.. Not sure if this will be critical


It’s a little confusing having more than one way to do things but Reactor tries to be as flexible as possible and there’s always more than one good way to accomplish a task using it. The most flexible way to deal with streaming data is to use the component designed to do that: the Stream and its subclasses. If you’re using Reactor’s Netty support [4], then it’s actually quite simple to set up an output Stream that sends data to clients.




Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor


Aviad Hadida

unread,
Jun 2, 2015, 2:52:44 PM6/2/15
to reactor-...@googlegroups.com
Thanks for the details update.. I need to look into it a little bit and i will keep you posted.
Thanks a lot!

Aviad Hadida

unread,
Jun 7, 2015, 4:27:42 AM6/7/15
to reactor-...@googlegroups.com
Hi..

I tried to read and investigate a little bit and i still have some issues with implementing what i need or something close..

After our last discussion i updated the framework depencdecies and tried to checkout EventBus(Which was Reactor before)..
It worked great for non blocking actions and according to our discussion this won't be the right decision for me.

I tried to use the RingBufferWorkProcessor as Stephane mentioned and tried to take his example. I couldn't get it worked because it seems that method signtures expect something else. I did the following

// Processor
for (int i = 0; i < 10; i++) {
   processor.subscribe(SubscriberFactory.unbounded(new reactor.fn.BiConsumer<String, SubscriptionWithContext<Void>>() {
@Override
public void accept(String left,
SubscriptionWithContext<Void> right) {
   try {
Thread.sleep(1000);
   } catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
   }
   System.out.println(left);
}
   }));
}
for (int i = 0; i < 1000; i++) {
   processor.onNext(i + " next");
}

Well.. This worked as i wanted it to do.. Single thread for every subscriber.. But from here i don't know how to take it to implementation buy using subscription mechanism like in EventBus(Which is so intuitive and easy to use) and still get this processor thread for each subscriber.

In addition there is still something i am not sure i understand.. Using the onNext is the same as using notify? I guess not and i really would like to understand the difference.. I looked on docs and couldn't understand why the processor is so different from the EventBus.

Two things i worry about:
1. The processor send the message not in order(Which is fine by me.. as long as the message will not be too far. For example: if i send 1-100 this will be ok: 2,3,4,1,5,6 But this: 23,24,1,27)
Is there any way to say every message that older than X seconds will be neglect or maybe send in order to avoid this?

2. I worry that i don't fully understand the diffrence between those 2 ways and maybe there is another way that will be better for me.

Small reminder: What i trying to achieve is eventBus that will have diffrent thread for every subscriber, or maybe only for part of the events in my system i will need thread for every event and for other group of event i can use ring buffer event bus.

Thanks a lot for your help
Aviad

On Tuesday, June 2, 2015 at 4:47:41 PM UTC+3, Aviad Hadida wrote:

Aviad Hadida

unread,
Jun 7, 2015, 6:46:54 AM6/7/15
to reactor-...@googlegroups.com
Sorry but i i think i was wrong.. I probably wanted this to work so hard :)
I can see that only one subscriber getting the message instead of all the subscribers...

The feature we needed is diffrent i think..

The same as the event bus, subscribe on event uri even if i have 2 subscribers(Which every one can be another part of the system we would like to get notify)

For example:
I get stream of prices for USD/ILS. Every user will subscribe to this, we will add our fee(Specific to every user) and send on channel to client..
So every subscriber on USD/ILS and specific Liquidity provider (the user can choose) he will get the price and we will load our fee according to the subscribed user

Another example is:
We would like after trade to notify a few service that won't be related(Like micro services system). So when Trade is finish we would like to send to:Credit manager,Position manager, and Post trade.
I would like to send notification to every manager with diffrent threads because this action might be blocking. In addition in case i have even hundreds of users on the same machine it will be heavier

Thanks and sorry for confusing
Aviad

On Tuesday, June 2, 2015 at 4:47:41 PM UTC+3, Aviad Hadida wrote:
Reply all
Reply to author
Forward
0 new messages