Option to configure producer to overwrite ringbuffer in case of slow consumer

310 views
Skip to first unread message

Maanasa Srinivasulu

unread,
Feb 25, 2020, 5:07:46 AM2/25/20
to Disruptor
Hi,
Contrary to normal usage, I want my producers to overwrite the data in the ring buffer when the consumer is lagging behind. 
Is there anyway to achieve this? 

My use case is for log/metric aggregator and so there are many producers and a single consumer which is just pushing the data to a Kinesis stream for further analysis. In case my consumer lags behind for whatever reasons, I want to be sure that the producers are not waiting on it. In such eventuality, I do not mind losing the data but the producer threads should not wait. 

From experimentation I see if there is no consumer defined at all (no eventhandler added), then the buffer is overwritten. However, when there is a event handler, the producers wait on the consumer. I am using Disruptor version 3.4.2 in java. 
 

mns

unread,
Feb 26, 2020, 3:24:20 AM2/26/20
to Disruptor
I found the method tryNext() and tryPublishEvent() on Ringbuffer. Though this is not exactly what I wanted (would prefer wrapping with new data than discarding new and retaining old), it at least seemed to be serving the purpose of not blocking the producers.
However I see a very odd behavior. Most of the times, the producer discards the event unless the whole RingBuffer is consumed. That is, if the RingBuffer is of size 8, then producer pushed 8 events and unless the consumer consumes all 8 events, the producer does not start populating the buffer again.

Pasting below some logs from my output. Buffer size is 8. As observed below, the producer pushed 8 events and then keeps sayins buffer is full until all those 8 events are processed by the consumer.  

producer::seqId: 0 event num: 0 time: 1582703940089
producer::seqId: 1 event num: 1 time: 1582703940090
producer::seqId: 2 event num: 2 time: 1582703940090
producer::seqId: 3 event num: 3 time: 1582703940090
producer::seqId: 4 event num: 4 time: 1582703940090
producer::seqId: 5 event num: 5 time: 1582703940090
producer::seqId: 6 event num: 6 time: 1582703940090
producer::seqId: 7 event num: 7 time: 1582703940090
consumer::seqId: 0 time: 1582703940091
 buffer full event num: 8, time: 1582703940091
 buffer full event num: 9, time: 1582703940091
 buffer full event num: 10, time: 1582703940091
 buffer full event num: 11, time: 1582703940091
 buffer full event num: 12, time: 1582703940092
 buffer full event num: 13, time: 1582703940092
 buffer full event num: 14, time: 1582703940092
 buffer full event num: 15, time: 1582703940092
 buffer full event num: 16, time: 1582703940092
 buffer full event num: 17, time: 1582703940093
 buffer full event num: 18, time: 1582703940093
 buffer full event num: 19, time: 1582703940093
 buffer full event num: 20, time: 1582703940093
 buffer full event num: 21, time: 1582703940093
 buffer full event num: 22, time: 1582703940093
 buffer full event num: 23, time: 1582703940094
 buffer full event num: 24, time: 1582703940094
 buffer full event num: 25, time: 1582703940094
 buffer full event num: 26, time: 1582703940094
 buffer full event num: 27, time: 1582703940094
 buffer full event num: 28, time: 1582703940095
 buffer full event num: 29, time: 1582703940095
 buffer full event num: 30, time: 1582703940095
 buffer full event num: 31, time: 1582703940095
 buffer full event num: 32, time: 1582703940095
 buffer full event num: 33, time: 1582703940096
 buffer full event num: 34, time: 1582703940096
 buffer full event num: 35, time: 1582703940096
 buffer full event num: 36, time: 1582703940096
 buffer full event num: 37, time: 1582703940096
 buffer full event num: 38, time: 1582703940097
 buffer full event num: 39, time: 1582703940097
 buffer full event num: 40, time: 1582703940097
 buffer full event num: 41, time: 1582703940097
 buffer full event num: 42, time: 1582703940097
 buffer full event num: 43, time: 1582703940098
 buffer full event num: 44, time: 1582703940098
consumer::seqId: 1 time: 1582703940098
 buffer full event num: 45, time: 1582703940098
 buffer full event num: 46, time: 1582703940098
consumer::seqId: 2 time: 1582703940098
 buffer full event num: 47, time: 1582703940098
 buffer full event num: 48, time: 1582703940098
consumer::seqId: 3 time: 1582703940099
 buffer full event num: 49, time: 1582703940099
 buffer full event num: 50, time: 1582703940099
 buffer full event num: 51, time: 1582703940099
consumer::seqId: 4 time: 1582703940099
 buffer full event num: 52, time: 1582703940099
 buffer full event num: 53, time: 1582703940099
consumer::seqId: 5 time: 1582703940099
 buffer full event num: 54, time: 1582703940099
 buffer full event num: 55, time: 1582703940100
consumer::seqId: 6 time: 1582703940100
 buffer full event num: 56, time: 1582703940100
 buffer full event num: 57, time: 1582703940100
consumer::seqId: 7 time: 1582703940100
 buffer full event num: 58, time: 1582703940100
 buffer full event num: 59, time: 1582703940100
 buffer full event num: 60, time: 1582703940100
 buffer full event num: 61, time: 1582703940101
 buffer full event num: 62, time: 1582703940101
 buffer full event num: 63, time: 1582703940101
 buffer full event num: 64, time: 1582703940101
 buffer full event num: 65, time: 1582703940101
 buffer full event num: 66, time: 1582703940101
 buffer full event num: 67, time: 1582703940102
producer::seqId: 8 event num: 68 time: 1582703940102
producer::seqId: 9 event num: 69 time: 1582703940102
producer::seqId: 10 event num: 70 time: 1582703940102
producer::seqId: 11 event num: 71 time: 1582703940102
producer::seqId: 12 event num: 72 time: 1582703940102
producer::seqId: 13 event num: 73 time: 1582703940102
producer::seqId: 14 event num: 74 time: 1582703940102
producer::seqId: 15 event num: 75 time: 1582703940102
 buffer full event num: 76, time: 1582703940103
 buffer full event num: 77, time: 1582703940103
 buffer full event num: 78, time: 1582703940103
 buffer full event num: 79, time: 1582703940103
 buffer full event num: 80, time: 1582703940103
 buffer full event num: 81, time: 1582703940103
 buffer full event num: 82, time: 1582703940103
consumer::seqId: 8 time: 1582703940104
 buffer full event num: 83, time: 1582703940104
 buffer full event num: 84, time: 1582703940104
consumer::seqId: 9 time: 1582703940104
 buffer full event num: 85, time: 1582703940104
 buffer full event num: 86, time: 1582703940104
consumer::seqId: 10 time: 1582703940104
 buffer full event num: 87, time: 1582703940104
 buffer full event num: 88, time: 1582703940105
consumer::seqId: 11 time: 1582703940105
 buffer full event num: 89, time: 1582703940105
 buffer full event num: 90, time: 1582703940105
consumer::seqId: 12 time: 1582703940105
 buffer full event num: 91, time: 1582703940105
 buffer full event num: 92, time: 1582703940105
consumer::seqId: 13 time: 1582703940105
 buffer full event num: 93, time: 1582703940105
 buffer full event num: 94, time: 1582703940106
 buffer full event num: 95, time: 1582703940106
 buffer full event num: 96, time: 1582703940106
consumer::seqId: 14 time: 1582703940106
 buffer full event num: 97, time: 1582703940106
 buffer full event num: 98, time: 1582703940106
 buffer full event num: 99, time: 1582703940106
consumer::seqId: 15 time: 1582703940106


Code snippets:
RingBuffer: 
 
@Bean
 public RingBuffer<JRequestTracker> ringBuffer()
 {
   final WaitStrategy waitStrategy = new SleepingWaitStrategy();
   final Disruptor<JRequestTracker> disruptor = new Disruptor<>(JRequestTracker::new,
                                                                getPropOrDefaulValue(IConfig.ConfigurationConstants.BUFFER_SIZE, 8),
       DaemonThreadFactory.INSTANCE,
       ProducerType.MULTI,
       waitStrategy);

    final EventHandler<JRequestTracker> handler = new ReqEventHandler();
   disruptor.handleEventsWith(handler).then((event, sequence, endOfBatch) -> event.clearData());

    return disruptor.start();
 }



producer: 

                for (int i = 0; i < 100; i++)
       {
         final JRequestTracker reqTracker = new JRequestTracker();
         reqTracker.setDynamoDBReadsCount(i);
         addRequestObject(reqTracker);
       }

  public void addRequestObject(JRequestTracker reqObj)
 {
    long seqId = -1;
     try
     {
       seqId = ringBuffer.tryNext();
     } catch (final InsufficientCapacityException e)
     {
       System.out.println("buffer full for " + reqObj.getDynamoDBReadsCount() + ", time: " + System.currentTimeMillis());
       return;
     }

      if (seqId != -1)
     {
       try
       {
         final JRequestTracker newReq = ringBuffer.get(seqId);
         newReq.makeCopyFrom(reqObj);
       } finally
       {
         ringBuffer.publish(seqId);
         System.out.println("producer::seqId: " + seqId + " event num: " + reqObj.getDynamoDBReadsCount() + " time: " + System.currentTimeMillis());
       }
     }
 }



consumer:
 
 @Override
 public void onEvent(JRequestTracker event, long sequence, boolean endOfBatch) throws Exception
 {
  System.out.println("consumer::seqId: " + sequence + " time: " + System.currentTimeMillis());
   
       //code to push to kinesis
 }


What is the reason for this behavior? 

Tom Lee

unread,
Feb 26, 2020, 10:34:24 AM2/26/20
to lmax-di...@googlegroups.com
Think your publish operations are very frequent relative to the rate at which the messages get processed by your EventHandlers, so a pile of messages arrive all at once & the disruptor processes them as a single batch.The sequence won't necessarily get incremented by one each time a message is consumed, instead a batch will be processed all at once and the sequence will bump by the batch size on completion.

You'll probably find that your successful writes to the ring buffer roughly  coincide with endOfBatch == true in your EventHandler.onEvent method -- which just happens to be the last event in the buffer because writes are frequent and the buffer is small.To put it another way, it seems likely to me the behavior you're seeing is probably tied to batching in the Disruptor rather than the remaining capacity of the ring buffer.

My memory's awful and I'm not at my keyboard, but I think some of the details of this are in the Sequencer implementations in the Disruptor source code.



--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/lmax-disruptor/d6795003-f9ca-4107-b8d6-1757a7a63910%40googlegroups.com.

mns

unread,
Feb 27, 2020, 12:29:59 AM2/27/20
to Disruptor
Hi Tom,
Thanks for your response. I verified and as you rightly mentioned this behavior is because of batching. However I do not see this batching with the same code and using next() instead of tryNext(). So even the minimum delay in producing because of blocking will ensure it is not batched seems like. 

This however was an experiment in a for loop. I am wondering how this would translate in real world with many threads writing. My guess is the consumer is always processing it in batches. Probably only trying it out is the solution but I think batching records when not explicitly asked to is an undesired behavior. This would result either in lot of data loss in case of tryNext or in long delays because of blocking next() call. 
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-di...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages