How to avoid producer from wraparound and overwriting the ring buffer before consumer reviews it

550 views
Skip to first unread message

R

unread,
May 27, 2018, 2:44:35 AM5/27/18
to Disruptor

I am recently introduced to the LMAX Disruptor and decided to give it a try. Thanks to the developers, the setup was quick and hassle free. But I think I am running into an issue if someone can help me with it.


The issue: I was told that when the producer publish the event, it should block until the consumer had a chance to retrieve it before wrapping around. I have a sequence barrier on the consumer side and I can confirm that if there is no data published by the producer, the consumer's waitFor call will block. But, producer doesn't seem to be regulated in any way and will just wraparound and overwrite unprocessed data in the ring buffer. The speed of the consumer varies based on the speed of the getData function call.


I have a producer as a runnable object running on separate thread.

public class Producer implements Runnable {
    private final RingBuffer<Event> ringbuffer;
    public Producer(RingBuffer<Event> rb) {
        ringbuffer = rb;
    }
    public void run() {
           long next = 0L;
           while(true) {
               try {
                   next = ringbuffer.next();
                   Event e = ringbuffer.get(next);
                   ... do stuff...
                   e.set(... stuff...);
               }
               finally {
                   ringbuffer.publish(next);
               }
           }
    }
}

I have a consumer running on the main thread.

public class Consumer {
     private final ExecutorService exec;
     private final Disruptor<Event> disruptor;
     private final RingBuffer<Event> ringbuffer;
     private final SequenceBarrier seqbar;
     private long seq = 0L;

     public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
    }

    public Data getData() {
         seqbar.waitFor(seq);
         Event e = ringbuffer.get(seq);
         seq++;
         return e.get();
    }
}

Finally, I run the code like so:

public class DisruptorTest {
     public static void main(String[] args){
         Consumer c = new Consumer();
         while (true) {
             c.getData();
             ... Do stuff ...
         }
}

Danil Suits

unread,
May 27, 2018, 8:52:38 AM5/27/18
to Disruptor
Are you missing a gating sequence?



R

unread,
May 27, 2018, 1:07:44 PM5/27/18
to Disruptor
Hi Danil,

Thanks for the pointer!
Is there a way to achieve this without using event handler or batch event processor? (I'm assuming batch event processor is a version of event handler that is capable of getting event in batches mentioned during the lmax talks).

Michael Barker

unread,
May 27, 2018, 9:09:46 PM5/27/18
to lmax-di...@googlegroups.com
The BatchEventProcessor takes an EventHandler as an argument and does all of the heavy lifting around managing batches and passes the individual messages to the EventHandler.  This is the general approach used by most people.  You can simplify the setup by calling Disruptor.handleEventsWith(EventHandler), which will ensure that the sequences are tracked appropriately.

To use the RingBuffer without using an EventHandler, you should look at the EventPoller.  However, this interface is experimental (subject to change) and receives far less testing than the EventHandler.  You will also need to make sure that you add the gating sequences to the Sequencer in this case as well.

Mike.

--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disruptor+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

R

unread,
May 29, 2018, 3:42:54 PM5/29/18
to Disruptor
Hi Mike,

Ah I see!
Is is possible to make the event handler wait until the "consumer" is ready to deal with the next event?

Since my event needs to be in a sequential order and no event can be dropped. I'm guessing buffering the excess load it into my own arrayblockingqueue would defeat the purpose.

Server --- sends data ---> producer                                      enqueue to my own consumer queue ---> consumer retrieves result at its own pace
                                               |                                                   |                                                                                         | 
                                            publish ---> Ring Buffer <--- event processor                                                                 human/user dictated pace



To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.

Z

unread,
Sep 11, 2018, 9:57:02 PM9/11/18
to Disruptor
Where you able to figure this one out R. I have this same criteria and haven't managed to get a perfectly working solution.

Jason Koch

unread,
Sep 13, 2018, 1:03:36 AM9/13/18
to lmax-di...@googlegroups.com
Z,

"Is is possible to make the event handler wait until the "consumer" is ready to deal with the next event?"

It is not clear what your problem is. Do you need a queue? What do you expect to happen when the consumer is busy or does not pick up? How many messages / day do you expect to process?

If you just need a producer to stop producing while a human operator is processing without any queuing, you can achieve this with classic locks, a short ABQ or SynchronousQueue, or many other solutions.
Reply all
Reply to author
Forward
0 new messages