Multiple producers single consumer batching scenario

2,047 views
Skip to first unread message

Girish Sharma

unread,
Jan 8, 2016, 4:41:48 PM1/8/16
to Disruptor
Hi all,

I have a system where multiple simultaneous threads (acting as producers) will be publishing individual events and there will be a single consumer for these events. Now the job of this consumer is to publish these events to downstream components (like Kafka). This needs to be done in batches to improve network performance.

So for instance, if I have 1000 concurrent threads generating events at 10000 events / sec and I want my consumer to consume these events and batch them in a group of 500 events.

From what I have read in this group, I understood that the endOfBatch flag in the onEvent handler is useful only when the producer produces a batch of events using publishEvents. So if my producer threads produced 10 events in a batch, 9 of them will get the flag as false and last one will get true. But in my case, all the producer threads are producing individual events, so that flag is always true.

From similar posts, I figured that the suggested way is something like:


final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{

private int count = 0;

private static final int BATCH_SIZE = 500;
private static final List<ValueEvent> BATCH = new ArrayList();

 public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { // process a new event.
BATCH.add(event);
if (++count > BATCH_SIZE) {
processBatch();
BATCH.clear();
}
} };


Now its fine if the use case is as simple as this and I have only a single consumer running. But what I have a few questions here:
  1. Is there a way that I can make use of the ring buffer and have this behavior automatically? i.e. Producers keep producing events, but consumers only get them when the ring buffer is full. At that point, disruptor drains all events to the consumer.
  2. Can the sequence of the incoming events be reset back to 0?
  3. What is the difference between EventHandler and BatchEventProcessor, use case wise?
  4. Is each call of the onEvent method made in the same thread?

I hope my questions are clear and they make sense.


PS: Please disregard my previously posted question (which might still be waiting for mod approval)


Regards

Danil Suits

unread,
Jan 8, 2016, 6:11:55 PM1/8/16
to Disruptor
From what I have read in this group, I understood that the endOfBatch flag in the onEvent handler is useful only when the producer produces a batch of events using publishEvents.

Yikes, that doesn't sound right...
 
https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java

                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                   
while (nextSequence <= availableSequence)
                   
{
                       
event = dataProvider.get(nextSequence);
                        eventHandler
.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence
++;
                   
}

                    sequence
.set(availableSequence);


If the consumer is using BEP, then it doesn't care how the events are going into the buffer.  availableSequence marks the highest event that can be reached without reloading memory.  So the end of batch flag really means "this is the last thing event I can deliver before I need to check for more writes from the publishers".



Producers keep producing events, but consumers only get them when the ring buffer is full.

Oh, I'd be very surprised if that is desirable behavior.  When the ring buffer is full, the publishers can not write to it, so they either block or drop data on the floor or create backpressure, depending on your design.   Likewise, the consumer would be blocked while waiting on the publishers.  When the consumer is waiting on the network, your producers still can't make progress.

There was a really good talk on using disruptor for batching to smooth out network performance.  I believe I'm remembering "Mechanical Sympathetic Networking", by Todd Montgomery
http://www.infoq.com/presentations/Messaging-TCP-IP

Executive summary being you get nice networking behavior out of the box by letting the producers enqueue more work while the consumer is dealing with the network.


Can the sequence of the incoming events be reset back to 0?

I can't guess what you think that means.  Once the event has been overwritten in the ring buffer, the previous contents are gone, so I'm not sure what you are hoping to gain with a reset.  

What is the difference between EventHandler and BatchEventProcessor, use case wise?

Primarily separation of concerns.  You can easily test the EventHandler in isolation, for example in a unit test by calling onEvent.  You don't actually need to have a ring buffer, dedicated threads, and so on behind it.  You might swap out the BatchEventProcessor if you decided that you wanted different cancellation semantics (choosing an idea at random).

Is each call of the onEvent method made in the same thread?

Yes, unless you deliberately shoot yourself in the foot.  For example, you could assign one EventHandler to multiple copies of the EventProcessor, run them both at the same time, and make a mess that way (how bad that would get you depend on whether the event handler was mutating its state while running).

Girish Sharma

unread,
Jan 9, 2016, 3:11:32 PM1/9/16
to Disruptor


On Saturday, January 9, 2016 at 4:41:55 AM UTC+5:30, Danil Suits wrote:
From what I have read in this group, I understood that the endOfBatch flag in the onEvent handler is useful only when the producer produces a batch of events using publishEvents.

Yikes, that doesn't sound right...
 
https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java

                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                   
while (nextSequence <= availableSequence)
                   
{
                       
event = dataProvider.get(nextSequence);
                        eventHandler
.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence
++;
                   
}

                    sequence
.set(availableSequence);


If the consumer is using BEP, then it doesn't care how the events are going into the buffer.  availableSequence marks the highest event that can be reached without reloading memory.  So the end of batch flag really means "this is the last thing event I can deliver before I need to check for more writes from the publishers".


Ah, so (correct me if I am wrong) this basically means that while one consumer was busy doing some processing on the consumed events, there were many other events added by the producers and when the consumer got free processing the previously consumed events, the barrier is sending all the events received in the meantime one after another. In this scenario, the endOfBatch variable will be true only for the latest sequence.


Producers keep producing events, but consumers only get them when the ring buffer is full.

Oh, I'd be very surprised if that is desirable behavior.  When the ring buffer is full, the publishers can not write to it, so they either block or drop data on the floor or create backpressure, depending on your design.   Likewise, the consumer would be blocked while waiting on the publishers.  When the consumer is waiting on the network, your producers still can't make progress.


Well, my requirement is that my single consumer always has to batch the events and then process them. This batch size can very well be smaller than the ring buffer size (so as to not choke the consumers). I was simply wondering if disruptor can provide me such functionality out of the box. So if I provide the batch size, it tells me that the batch size has been reached so that I can work upon the batch then. I don't mind saving the events (like in the original example) until that "batch size reached" indication has been given.

 
There was a really good talk on using disruptor for batching to smooth out network performance.  I believe I'm remembering "Mechanical Sympathetic Networking", by Todd Montgomery
http://www.infoq.com/presentations/Messaging-TCP-IP

Executive summary being you get nice networking behavior out of the box by letting the producers enqueue more work while the consumer is dealing with the network.

As said above, I have a compulsion of batching the events to some number (roughly). Only after then I can do something to them. Because of this, I am left with many things that cannot be used in my case:

  • I cannot use multiple consumers to individually and parallely handle the events.
  • I cannot make a network call in the same consumer thread as the network call actually finishes up much more later than the next batch would be ready.



Can the sequence of the incoming events be reset back to 0?

I can't guess what you think that means.  Once the event has been overwritten in the ring buffer, the previous contents are gone, so I'm not sure what you are hoping to gain with a reset.  


Each event sent to the consumer has an auto-incremented number attached to it. Now if I have to manually decide when a batch of events is ready, one approach I can see is doing a modulo on that sequence number. For instance, if my batch size limit is 100 then I would have code something like:

final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{


   
private static final int BATCH_SIZE = 100;

   
private static final List<ValueEvent> BATCH = new ArrayList();

   
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
   
{
       
// process a new event.
        BATCH
.add(event);

       
if (sequence % BATCH_SIZE == 0) {
            processBatch
();
            BATCH
.clear();
       
}
   
}
};

This is really handy, but this cannot be used if I have a slightly complex batch draining logic (which I have). So for instance, I want to maintain a maximum latency in processing any event to 500ms. This means that even if the batch limit size has not been reached, if any event is sitting in the BATCH queue, it has to be drained out. Now this messes up the whole, I can check sequence modulo batch size, logic once the queue has been drained without having BATCH_SIZE events.
If I had a way to reset the sequence while I drain the events before they reached batch limit, then I could always rely on the modulo logic. I know that by design sequence reset might not be possible. I'll see if something similar worked using a static count variable.

 
What is the difference between EventHandler and BatchEventProcessor, use case wise?

Primarily separation of concerns.  You can easily test the EventHandler in isolation, for example in a unit test by calling onEvent.  You don't actually need to have a ring buffer, dedicated threads, and so on behind it.  You might swap out the BatchEventProcessor if you decided that you wanted different cancellation semantics (choosing an idea at random).


So the batch in BEP does not actually refer to consuming events, by default, in batches?
 

Danil Suits

unread,
Jan 13, 2016, 4:47:40 PM1/13/16
to Disruptor
So the batch in BEP does not actually refer to consuming events, by default, in batches?

I wouldn't language it that way.  It's closer to say that the batch event processor loads events in batches.  What it is really doing is figuring out which events are available in memory (remember, you are working in multiple threads - the processor running the producer thread writes the event locally, eventually the write gets flushed to a common area, then the processor running the consumer thread loads that memory, so that the events can be used).  The java memory model offers certain guarantees about which writes happen before other writes.  The sequence barrier does the bookkeeping to let you know how many events you can read before the next load from memory is necessary to progress.  The batch event processor is consuming the events in batches, in that sense, by passing the batch of events (one at a time) to the EventHandler.

If you wanted to enforce "at most 100 per batch", then you have a few choices.  You can put the process batch detection in the EventHandler as you have done here (though I wonder why not use BATCH.size() instead of sequence?)  You could write an EventHandler that flushes the BATCH every time endOfBatch is set, and call that handler from another that keeps the running count going

publicvoid onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
    target
.onEvent(event, sequence, (sequence % BATCH_SIZE == 0) || endOfBatch);
}

You could replace the batch event processor with one that caps the availableSequence number on the batch size

final long availableSequence = Math.min(sequenceBarrier.waitFor(nextSequence), BATCH_SIZE);

Similarly, you could wrap the SequenceBarrier, so that waitFor is capped.

The first two choices, where you are implementing the batching in EventHandler, are the "right" way to do it.


 So for instance, I want to maintain a maximum latency in processing any event to 500ms

What's supposed to happen when things get slower than that?

 

Girish Sharma

unread,
Jan 14, 2016, 4:09:02 AM1/14/16
to Disruptor


On Thursday, January 14, 2016 at 3:17:40 AM UTC+5:30, Danil Suits wrote:
So the batch in BEP does not actually refer to consuming events, by default, in batches?

I wouldn't language it that way.  It's closer to say that the batch event processor loads events in batches.  What it is really doing is figuring out which events are available in memory (remember, you are working in multiple threads - the processor running the producer thread writes the event locally, eventually the write gets flushed to a common area, then the processor running the consumer thread loads that memory, so that the events can be used).  The java memory model offers certain guarantees about which writes happen before other writes.  The sequence barrier does the bookkeeping to let you know how many events you can read before the next load from memory is necessary to progress.  The batch event processor is consuming the events in batches, in that sense, by passing the batch of events (one at a time) to the EventHandler.
 
Thanks a lot. This makes sense now. But then, I come up with this doubt that how things work in case I do not used BEP and simply provide a handler. Who does the bookkeeping? Who puts the java memory writing and reading into consideration before reading events and etc..
 
If you wanted to enforce "at most 100 per batch", then you have a few choices.  You can put the process batch detection in the EventHandler as you have done here (though I wonder why not use BATCH.size() instead of sequence?)  You could write an EventHandler that flushes the BATCH every time endOfBatch is set, and call that handler from another that keeps the running count going
 
publicvoid onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
    target
.onEvent(event, sequence, (sequence % BATCH_SIZE == 0) || endOfBatch);
}

You could replace the batch event processor with one that caps the availableSequence number on the batch size

final long availableSequence = Math.min(sequenceBarrier.waitFor(nextSequence), BATCH_SIZE);

Similarly, you could wrap the SequenceBarrier, so that waitFor is capped.

I see. thanks for explaining that with an example. Now I get the gist of it.
 
The first two choices, where you are implementing the batching in EventHandler, are the "right" way to do it.

Which first two choices are you referring here?
 
 So for instance, I want to maintain a maximum latency in processing any event to 500ms

What's supposed to happen when things get slower than that?

By slower, if you mean that my consumer is taking time to process the batch, then I handle that in another thread as processing of the batch is actually supposed to take time. Otherwise, if you mean that what if my batch is not full within 500ms, then I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch even if the batch is not full.

Danil Suits

unread,
Jan 14, 2016, 12:19:29 PM1/14/16
to Disruptor
But then, I come up with this doubt that how things work in case I do not used BEP and simply provide a handler. Who does the bookkeeping?

Right.  That's a good doubt to have.  If none of the existing implementations in the library fit your use case, you can provide your own implementation of com.lmax.disruptor.EventProcessor, and use that.  But my feeling is that even copying the prior art correctly requires a good understanding of what the library code is doing.


Which first two choices are you referring here?

Sorry; I had sketched 4 different ways you might meet your "batch every N events" requirement. 

I showed two different ways of doing it in the EventHandler itself.   Either of those approaches could be suitable, and keep you well away from the bookkeeping logic.  So my recommendation is to restrict your search to implementations like them.

But for completeness, I also explained that you produce a similar effect by getting closer to the protocol (ie, writing an EventProcessor, or implementing a customized SequenceBarrier).  I discourage those approaches, because they require a more detailed understanding of what's actually happening.  But if you have to have "smarter" batching, and you can't get it to work using the event handlers alone, then maybe you look where the monsters live.


By slower, if you mean that my consumer is taking time to process the batch, then I handle that in another thread as processing of the batch is actually supposed to take time. Otherwise, if you mean that what if my batch is not full within 500ms, then I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch even if the batch is not full.

Some thoughts: batching up a bunch of work, and then publishing the result somewhere that can be seen by other threads: that's a normal disruptor pattern.  For instance, the event handler could be holding a local copy of a state machine.  Each processed event advances the machine, but the "current state" of the machine is only published at the end of a batch of events.  Very reasonable.

Having some other thread reach into the event handler to write to the batch; that's sounds like a violation of the single writer principle.  I haven't really explored the implications of that for a long time -- I filed it away under Bad Ideas, and haven't looked back.

The more idiomatic approach would be to add clock tick events to the event stream; a publisher thread puts a "the time is now" message into the event stream, and when the consumer sees that event, it compares the time value to the latest time value it had seen when it fired the batch.  If the interval is too large, it flushes the batch "immediately" and updates its tracking of the "currentTime".

You might look at that, and see if it meeting your SLA is easier to implement.


Girish Sharma

unread,
Jan 15, 2016, 4:40:45 AM1/15/16
to Disruptor


On Thursday, January 14, 2016 at 10:49:29 PM UTC+5:30, Danil Suits wrote:
But then, I come up with this doubt that how things work in case I do not used BEP and simply provide a handler. Who does the bookkeeping?

Right.  That's a good doubt to have.  If none of the existing implementations in the library fit your use case, you can provide your own implementation of com.lmax.disruptor.EventProcessor, and use that.  But my feeling is that even copying the prior art correctly requires a good understanding of what the library code is doing.

:) Aren't you going to clear the doubt? 
 
Which first two choices are you referring here?

Sorry; I had sketched 4 different ways you might meet your "batch every N events" requirement. 

I showed two different ways of doing it in the EventHandler itself.   Either of those approaches could be suitable, and keep you well away from the bookkeeping logic.  So my recommendation is to restrict your search to implementations like them.

But for completeness, I also explained that you produce a similar effect by getting closer to the protocol (ie, writing an EventProcessor, or implementing a customized SequenceBarrier).  I discourage those approaches, because they require a more detailed understanding of what's actually happening.  But if you have to have "smarter" batching, and you can't get it to work using the event handlers alone, then maybe you look where the monsters live.
 
Ah, I see .

By slower, if you mean that my consumer is taking time to process the batch, then I handle that in another thread as processing of the batch is actually supposed to take time. Otherwise, if you mean that what if my batch is not full within 500ms, then I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch even if the batch is not full.

Some thoughts: batching up a bunch of work, and then publishing the result somewhere that can be seen by other threads: that's a normal disruptor pattern.  For instance, the event handler could be holding a local copy of a state machine.  Each processed event advances the machine, but the "current state" of the machine is only published at the end of a batch of events.  Very reasonable.

Having some other thread reach into the event handler to write to the batch; that's sounds like a violation of the single writer principle.  I haven't really explored the implications of that for a long time -- I filed it away under Bad Ideas, and haven't looked back.

Well, I do not have a single writer to begin with. I have single reader. That read accumulates the events until a size is reached and then does the final very slow processing over the batch. This processing requires a batch of a roughly certain size.
 
The more idiomatic approach would be to add clock tick events to the event stream; a publisher thread puts a "the time is now" message into the event stream, and when the consumer sees that event, it compares the time value to the latest time value it had seen when it fired the batch.  If the interval is too large, it flushes the batch "immediately" and updates its tracking of the "currentTime".

Well, in my case, i cannot let the producer do heavy computations. So the producer just produces the event. Also, since the producer is multi threaded producer, it cannot really keep track of time or count of produced events

Danil Suits

unread,
Jan 15, 2016, 10:53:14 AM1/15/16
to Disruptor
Well, I do not have a single writer to begin with. I have single reader.

We're talking about two different contexts.  Your design has a single reader of the disruptor.  Does it have a single writer of BATCH?

This is what you wrote earlier, that got my attention:


I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch

The scheduled task is running in a separate thread, no?  Is it writing to BATCH?  In the EventHandler you offered, I would expect the handler to have exclusive access to the batch.  But you also wrote that the variable is static, which is curious, and you are writing as though a task running in a separate thread has access to the batch.  So I'm not sure it's clear what is going on.

  since the producer is multi threaded producer, it cannot really keep track of time or count of produced events

You are right, so far as this goes.  But the producer can notice that the buffer is full, and then (for example) decide to invoke congestion control rather than adding to the backlog.  If that kind of behavior is acceptable, you may be able to size the ring buffer so that the producers switch into congestion control before the SLA is at risk.

 

Reply all
Reply to author
Forward
0 new messages