EventPoller for external consumer to RingBuffer

386 views
Skip to first unread message

Cory E Adams

unread,
Nov 19, 2015, 1:28:58 PM11/19/15
to Disruptor
We have a use case whereby we are pulling a seed/counter from a datastore and then using the RingBuffer to post process Events using the counter.  Further an external thread would consume messages after all of the consumers/Handlers had finished.  This external thread would consume messages on demand, perhaps via user interaction.

Optimizations might consist of using RingBuffer.hasAvailableCapacity to cause the Producer to add more seed values to the RingBuffer.

I am expecting the producer to wait once the EventHandlers have finished while no consumption of the Events has transpired from the external caller.  The behavior exhibited thus far is that I am unable to get a copy of the Event only after the collator has fired.

Is the EventPoller the correct tool?  Is there a way to chain the EventPoller to the Events via the DSL?  

We have this thus far:

        final EventPoller<DataEvent> poller = disruptor.getRingBuffer().newPoller();
        disruptor
.getRingBuffer().addGatingSequences(poller.getSequence());
       
         
// would like to chain the poller here
        disruptor
.handleEventsWith(aHandler).then(bHandler, cHandler).then(collator);

       
Thread pollerThread = new Thread() {
           
@Override

           
public void run() {
               
try {
                   
System.out.println("--------------------------> thread started");
                   
while (!isInterrupted()) {


                       
DataEvent eventCopy = (DataEvent)getNextValue(poller);
                       
System.out.println("------ eventCopy; " + eventCopy.counter);
                   
}

               
} catch (Exception e) {
                    e
.printStackTrace();
               
}
           
}
       
};
        pollerThread
.start();

   private static Object getNextValue(EventPoller<DataEvent> poller) throws Exception
    {
        final Object[] out = new Object[1];

        poller.poll(
                new EventPoller.Handler<DataEvent>()
                {
                    @Override
                    public boolean onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception
                    {
                        out[0] = event.copyOfData();
                                                    // Return false so that only one event is processed at a time.
                        return false;
                    }
                });
        return out[0];
    }
...
    public static class CollatingPollingHandler implements EventPoller.Handler<DataEvent>
    {
        @Override
        public boolean onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception
        {





Michael Barker

unread,
Nov 19, 2015, 7:41:59 PM11/19/15
to lmax-di...@googlegroups.com
Hi,

The poller is probably the right tool to use in your use case.  Unfortunately it is still experimental and not well integrated into the DSL.  The best bet is to set up the RingBuffer manually:

        RingBuffer<DataEvent<Object>> ringBuffer = RingBuffer.createMultiProducer(DataEvent.factory(), 1024);

        // Create processor a.
        EventProcessor aProcessor = new BatchEventProcessor<>(
            ringBuffer, ringBuffer.newBarrier(), new DataEventHandler());

        // Create b&c with dependency on a.
        EventProcessor bProcessor = new BatchEventProcessor<>(
            ringBuffer, ringBuffer.newBarrier(aProcessor.getSequence()), new DataEventHandler());
        EventProcessor cProcessor = new BatchEventProcessor<>(
            ringBuffer, ringBuffer.newBarrier(aProcessor.getSequence()), new DataEventHandler());
        
        // Create poller with dependency on b&c.
        EventPoller<DataEvent<Object>> poller = ringBuffer.newPoller(
            bProcessor.getSequence(), cProcessor.getSequence());
        
        DaemonThreadFactory.INSTANCE.newThread(aProcessor).start();
        DaemonThreadFactory.INSTANCE.newThread(bProcessor).start();
        DaemonThreadFactory.INSTANCE.newThread(cProcessor).start();

Mike.


--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Patrick Julien

unread,
Sep 13, 2016, 3:32:04 PM9/13/16
to Disruptor
Is it possible to poll without starting any new threads?  I have several single writer disruptors I want to loop over from a single thread and send to another single writer disruptor.

Michael Barker

unread,
Nov 24, 2016, 2:34:31 PM11/24/16
to lmax-di...@googlegroups.com
Yes it is possible, but you need to make sure that you don't deadlock.  When running in a single thread for both poll and publish, you need to schedule the poll calls with the publish calls appropriately.  E.g. if my Disruptor has a buffer size of 64 and I try an publish 100 events to the Disruptor, before polling it, then the ring buffer will get filled up and never get a chance to drain.  One possible strategy would be to use tryPublish and fall back to polling if you fail to publish a message.

Mike.

To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disruptor+unsubscribe@googlegroups.com.

Patrick Julien

unread,
Nov 24, 2016, 3:13:59 PM11/24/16
to Disruptor
well hello :)

It wouldn't be a single thread though.  What I want is many 1:1 disruptors (something like 12 total).  Then a single consumer thread polls them all in a round-robin fashion.

I didn't understand how to use the poll API to do something like this.

massimiliano.masi

unread,
Dec 6, 2016, 10:57:02 AM12/6/16
to lmax-di...@googlegroups.com
Hi All, 

I’ve just updated issue 130. I’ve a very dumb question.

If a create a ringbuffer of 8, and I send 10 messages, the 9th wait that all the other 8s conclude before getting a sequence. I would have expected that as soon as a process ends, a resource would have been released. It hangs on ringBuffer.get(sequence). I think I do not release the event after processing as required. The code of the consumer can be found here: 




I also have an application where the disruptor event handlers are getting a lot of CPU time. Can it be related to the SleepingWaitStrategy?

Thanks in advance, 

Massimiliano

--
Anger is a gift, http://www.mascanc.net/

Michael Barker

unread,
Dec 8, 2016, 5:39:44 PM12/8/16
to lmax-di...@googlegroups.com
Hi,

If a create a ringbuffer of 8, and I send 10 messages, the 9th wait that all the other 8s conclude before getting a sequence. I would have expected that as soon as a process ends, a resource would have been released. It hangs on ringBuffer.get(sequence). I think I do not release the event after processing as required. The code of the consumer can be found here: 

This is because the Disruptor will release the resources at the end of the batch.  This avoid a write (and likely cache miss) on the sequence until the batch completes.  If you need to release resources early, you can use the SequenceReportingEventHandler interface instead.  This will pass the Sequence into your handler so that you can update it as frequently as you like.

public class MyHandler implements SequenceReportingEventHandler<Event>
{
    private Sequence sequence;

    public void setSequenceCallback(Sequence sequence)
    {
        this.sequence = sequence;
    }

    public void onMessage(Event e, long sequence, boolean endOfBatch)
    {
        // do stuff.
        this.sequence.set(sequence);
    }
}
 
I also have an application where the disruptor event handlers are getting a lot of CPU time. Can it be related to the SleepingWaitStrategy?

Possibly, the amount of CPU used by the SleepingWaitStrategy is dependent on OS and hardware.  Using a profiler should be able to provide some insight.

Mike.

massimiliano.masi

unread,
Dec 16, 2016, 4:23:15 AM12/16/16
to lmax-di...@googlegroups.com
Hi, 

Il giorno 08 dic 2016, alle ore 23:39, Michael Barker <mik...@gmail.com> ha scritto:

This is because the Disruptor will release the resources at the end of the batch.  This avoid a write (and likely cache miss) on the sequence until the batch completes.  If you need to release resources early, you can use the SequenceReportingEventHandler interface instead.  This will pass the Sequence into your handler so that you can update it as frequently as you like.


Thanks for the hint! It seems that I’ve design issues, the creator is too fast for the eventHandler. Either I add a additional event handler, or I try 
to make it faster. 

 
I also have an application where the disruptor event handlers are getting a lot of CPU time. Can it be related to the SleepingWaitStrategy?

Possibly, the amount of CPU used by the SleepingWaitStrategy is dependent on OS and hardware.  Using a profiler should be able to provide some insight.

JProfiler gave good results and yes, it was consuming a lot of CPU (don’t have hardware details, but I assume a XEON somehow), thus I’ll change the application leaving the waiting strategy configurable for the user. 


Thanks again! 

Michael Barker

unread,
Dec 30, 2016, 4:46:23 AM12/30/16
to lmax-di...@googlegroups.com
Sorry about the delayed reply.  You need to create an EventPoller for each Disruptor and put them in an array, the consumer thread would need to loop through the array polling the events.  The implementation of EventPoller.Handler can determine how many of each ring buffer you want to consume, by returning false when the handler has consumed enough events (you can use your own definition of enough here, e.g. max 20 per Disruptor).

Mike.

To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disruptor+unsubscribe@googlegroups.com.

Patrick Julien

unread,
Dec 30, 2016, 9:13:27 AM12/30/16
to Disruptor
thank you
Reply all
Reply to author
Forward
0 new messages