How to post events back to Disruptor/RingBuffer without using MultiProducer?

530 views
Skip to first unread message

Sam Goldberg

unread,
Mar 3, 2015, 4:55:54 PM3/3/15
to lmax-di...@googlegroups.com
Doing a test where all we did was change the Disruptor from ProducerType.SINGLE to ProducerType.MULTI (without even attaching a second producer to the Disruptor queue) we saw the following difference:

SINGLE (2 disruptors, both single producer)

% events <= value
50%      1.81u
90%      3.02u
98.7%    9.055u

Simply changing the producer type in the disruptor constructor from SINGLE to MULTI in one of the 2 disruptors, gives the following times:

50%     6.04u
90%     9.06u
98.7% 17.82u

We have various reasons to want to marshal messages back to one of the consumers from another thread.  One need for this would only produce very occasional additional messages, so we were wondering if an alternative might be to create a modified BusySpinWaitStrategy which could potentially read another queue while it is waiting for the next sequence number:

    @Override
   
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
       
throws AlertException, InterruptedException
   
{
       
long availableSequence;

       
while ((availableSequence = dependentSequence.get()) < sequence)
       
{
            barrier
.checkAlert();
             
// do some work here
            handleAdditionalWork
();
       
}

       
return availableSequence;
   
}



Is this a reasonable approach or is there a simpler way to accomplish the same thing without the same drop in performance as the Multi-Producer causes?











Michael Barker

unread,
Mar 3, 2015, 5:08:20 PM3/3/15
to lmax-di...@googlegroups.com
Have a look at the EventPoller[0] and the Sequencer.newPoller[1].  Create a poller for each RingBuffer, and cycle through them.  Using this approach you won't use a WaitStrategy, but instead do the polling in your own code.


Mike.

--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sam Goldberg

unread,
Mar 3, 2015, 5:31:32 PM3/3/15
to lmax-di...@googlegroups.com
Thanks for the pointer!

Sam Goldberg

unread,
Mar 5, 2015, 1:18:18 PM3/5/15
to lmax-di...@googlegroups.com
Hi Mike,

A simple test with the EventPoller looks pretty good.   Just a couple follow up questions.  Does this setup code for the test look correct?

        AtomicInteger handledEventCounter = new AtomicInteger();
       
int bufferSize = 2048;
       
RingBuffer<LongEvent> buffer = RingBuffer.createSingleProducer(factory, bufferSize, strategy);
       
final EventPoller<LongEvent> poller = buffer.newPoller(new Sequence[0]);
       
final PollerLongEventHandler handler = new PollerLongEventHandler(STATS, handledEventCounter);

       
Thread pollerThread = new Thread() {

           
@Override
           
public void run() {
               
try {
                    LOG
.info("thread started");
                   
while (!isInterrupted()) {
                       
PollState state = poller.poll(handler);
                       
// if state is something appropriate, do some periodic work
                       
// if (state == IDLE) {
                       
//   readBackChannelQueue();
                       
// }
                   
}
               
} catch (Exception e) {
                    e
.printStackTrace();
               
}
           
}

       
};
       
        pollerThread
.start();


Also, I'm a little confused about what to do with the different PollState return values.  Does it really matter what the PollState is as to whether the polling thread dispatches to do additional work?  For example,
I can see from the source code, that if you want your handler to consume all available events before exiting the poll method, it should return true.  So if it returns true, then whenever the poll method returns you know there are no more consumable events.  In that case, I am guessing you can assume you can do other work?  (On the other hand, if your handler returns false, then I am guessing PROCESSING is a signal that there may be more events to consume). 

Relating to this, what does the GATING state mean, and also is there any reason if it returns that state, you shouldn't do extra work?

Thanks again and Best Regards,

Sam






On Tuesday, March 3, 2015 at 5:08:20 PM UTC-5, mikeb01 wrote:

Michael Barker

unread,
Mar 5, 2015, 6:25:35 PM3/5/15
to lmax-di...@googlegroups.com
What you have is essentially correct, when you create the poller, the sequence passed into the newPoller call should be the sequence of another handler that you want to gate on.  So in your case the method arguments should be empty.

PollStates
IDLE: Buffer is empty/all processed
PROCESSING: More messages available in the buffer
GATING: More messages are available, but can't yet be processed as a consumer that is being gated on has not completed its processing.

Mike.
Reply all
Reply to author
Forward
0 new messages