I found the method tryNext() and tryPublishEvent() on Ringbuffer. Though this is not exactly what I wanted (would prefer wrapping with new data than discarding new and retaining old), it at least seemed to be serving the purpose of not blocking the producers.
However I see a very odd behavior. Most of the times, the producer discards the event unless the whole RingBuffer is consumed. That is, if the RingBuffer is of size 8, then producer pushed 8 events and unless the consumer consumes all 8 events, the producer does not start populating the buffer again.
Pasting below some logs from my output. Buffer size is 8. As observed below, the producer pushed 8 events and then keeps sayins buffer is full until all those 8 events are processed by the consumer.
producer::seqId: 0 event num: 0 time: 1582703940089
producer::seqId: 1 event num: 1 time: 1582703940090
producer::seqId: 2 event num: 2 time: 1582703940090
producer::seqId: 3 event num: 3 time: 1582703940090
producer::seqId: 4 event num: 4 time: 1582703940090
producer::seqId: 5 event num: 5 time: 1582703940090
producer::seqId: 6 event num: 6 time: 1582703940090
producer::seqId: 7 event num: 7 time: 1582703940090
consumer::seqId: 0 time: 1582703940091
buffer full event num: 8, time: 1582703940091
buffer full event num: 9, time: 1582703940091
buffer full event num: 10, time: 1582703940091
buffer full event num: 11, time: 1582703940091
buffer full event num: 12, time: 1582703940092
buffer full event num: 13, time: 1582703940092
buffer full event num: 14, time: 1582703940092
buffer full event num: 15, time: 1582703940092
buffer full event num: 16, time: 1582703940092
buffer full event num: 17, time: 1582703940093
buffer full event num: 18, time: 1582703940093
buffer full event num: 19, time: 1582703940093
buffer full event num: 20, time: 1582703940093
buffer full event num: 21, time: 1582703940093
buffer full event num: 22, time: 1582703940093
buffer full event num: 23, time: 1582703940094
buffer full event num: 24, time: 1582703940094
buffer full event num: 25, time: 1582703940094
buffer full event num: 26, time: 1582703940094
buffer full event num: 27, time: 1582703940094
buffer full event num: 28, time: 1582703940095
buffer full event num: 29, time: 1582703940095
buffer full event num: 30, time: 1582703940095
buffer full event num: 31, time: 1582703940095
buffer full event num: 32, time: 1582703940095
buffer full event num: 33, time: 1582703940096
buffer full event num: 34, time: 1582703940096
buffer full event num: 35, time: 1582703940096
buffer full event num: 36, time: 1582703940096
buffer full event num: 37, time: 1582703940096
buffer full event num: 38, time: 1582703940097
buffer full event num: 39, time: 1582703940097
buffer full event num: 40, time: 1582703940097
buffer full event num: 41, time: 1582703940097
buffer full event num: 42, time: 1582703940097
buffer full event num: 43, time: 1582703940098
buffer full event num: 44, time: 1582703940098
consumer::seqId: 1 time: 1582703940098
buffer full event num: 45, time: 1582703940098
buffer full event num: 46, time: 1582703940098
consumer::seqId: 2 time: 1582703940098
buffer full event num: 47, time: 1582703940098
buffer full event num: 48, time: 1582703940098
consumer::seqId: 3 time: 1582703940099
buffer full event num: 49, time: 1582703940099
buffer full event num: 50, time: 1582703940099
buffer full event num: 51, time: 1582703940099
consumer::seqId: 4 time: 1582703940099
buffer full event num: 52, time: 1582703940099
buffer full event num: 53, time: 1582703940099
consumer::seqId: 5 time: 1582703940099
buffer full event num: 54, time: 1582703940099
buffer full event num: 55, time: 1582703940100
consumer::seqId: 6 time: 1582703940100
buffer full event num: 56, time: 1582703940100
buffer full event num: 57, time: 1582703940100
consumer::seqId: 7 time: 1582703940100
buffer full event num: 58, time: 1582703940100
buffer full event num: 59, time: 1582703940100
buffer full event num: 60, time: 1582703940100
buffer full event num: 61, time: 1582703940101
buffer full event num: 62, time: 1582703940101
buffer full event num: 63, time: 1582703940101
buffer full event num: 64, time: 1582703940101
buffer full event num: 65, time: 1582703940101
buffer full event num: 66, time: 1582703940101
buffer full event num: 67, time: 1582703940102
producer::seqId: 8 event num: 68 time: 1582703940102
producer::seqId: 9 event num: 69 time: 1582703940102
producer::seqId: 10 event num: 70 time: 1582703940102
producer::seqId: 11 event num: 71 time: 1582703940102
producer::seqId: 12 event num: 72 time: 1582703940102
producer::seqId: 13 event num: 73 time: 1582703940102
producer::seqId: 14 event num: 74 time: 1582703940102
producer::seqId: 15 event num: 75 time: 1582703940102
buffer full event num: 76, time: 1582703940103
buffer full event num: 77, time: 1582703940103
buffer full event num: 78, time: 1582703940103
buffer full event num: 79, time: 1582703940103
buffer full event num: 80, time: 1582703940103
buffer full event num: 81, time: 1582703940103
buffer full event num: 82, time: 1582703940103
consumer::seqId: 8 time: 1582703940104
buffer full event num: 83, time: 1582703940104
buffer full event num: 84, time: 1582703940104
consumer::seqId: 9 time: 1582703940104
buffer full event num: 85, time: 1582703940104
buffer full event num: 86, time: 1582703940104
consumer::seqId: 10 time: 1582703940104
buffer full event num: 87, time: 1582703940104
buffer full event num: 88, time: 1582703940105
consumer::seqId: 11 time: 1582703940105
buffer full event num: 89, time: 1582703940105
buffer full event num: 90, time: 1582703940105
consumer::seqId: 12 time: 1582703940105
buffer full event num: 91, time: 1582703940105
buffer full event num: 92, time: 1582703940105
consumer::seqId: 13 time: 1582703940105
buffer full event num: 93, time: 1582703940105
buffer full event num: 94, time: 1582703940106
buffer full event num: 95, time: 1582703940106
buffer full event num: 96, time: 1582703940106
consumer::seqId: 14 time: 1582703940106
buffer full event num: 97, time: 1582703940106
buffer full event num: 98, time: 1582703940106
buffer full event num: 99, time: 1582703940106
consumer::seqId: 15 time: 1582703940106
Code snippets:
RingBuffer:
@Bean
public RingBuffer<JRequestTracker> ringBuffer()
{
final WaitStrategy waitStrategy = new SleepingWaitStrategy();
final Disruptor<JRequestTracker> disruptor = new Disruptor<>(JRequestTracker::new,
getPropOrDefaulValue(IConfig.ConfigurationConstants.BUFFER_SIZE, 8),
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI,
waitStrategy);
final EventHandler<JRequestTracker> handler = new ReqEventHandler();
disruptor.handleEventsWith(handler).then((event, sequence, endOfBatch) -> event.clearData());
return disruptor.start();
}
producer:
for (int i = 0; i < 100; i++)
{
final JRequestTracker reqTracker = new JRequestTracker();
reqTracker.setDynamoDBReadsCount(i);
addRequestObject(reqTracker);
}
public void addRequestObject(JRequestTracker reqObj)
{
long seqId = -1;
try
{
seqId = ringBuffer.tryNext();
} catch (final InsufficientCapacityException e)
{
System.out.println("buffer full for " + reqObj.getDynamoDBReadsCount() + ", time: " + System.currentTimeMillis());
return;
}
if (seqId != -1)
{
try
{
final JRequestTracker newReq = ringBuffer.get(seqId);
newReq.makeCopyFrom(reqObj);
} finally
{
ringBuffer.publish(seqId);
System.out.println("producer::seqId: " + seqId + " event num: " + reqObj.getDynamoDBReadsCount() + " time: " + System.currentTimeMillis());
}
}
}
consumer: