final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{
private int count = 0;
private static final int BATCH_SIZE = 500;
private static final List<ValueEvent> BATCH = new ArrayList();
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
// process a new event.
BATCH.add(event);
if (++count > BATCH_SIZE) {
processBatch();
BATCH.clear();
}
}
};
Now its fine if the use case is as simple as this and I have only a single consumer running. But what I have a few questions here:
Is there a way that I can make use of the ring buffer and have this behavior automatically? i.e. Producers keep producing events, but consumers only get them when the ring buffer is full. At that point, disruptor drains all events to the consumer.Can the sequence of the incoming events be reset back to 0?I hope my questions are clear and they make sense.
PS: Please disregard my previously posted question (which might still be waiting for mod approval)
Regards
From what I have read in this group, I understood that the endOfBatch flag in the onEvent handler is useful only when the producer produces a batch of events using publishEvents.
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
Producers keep producing events, but consumers only get them when the ring buffer is full.Can the sequence of the incoming events be reset back to 0?
What is the difference between EventHandler and BatchEventProcessor, use case wise?
Is each call of the onEvent method made in the same thread?
From what I have read in this group, I understood that the endOfBatch flag in the onEvent handler is useful only when the producer produces a batch of events using publishEvents.https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Yikes, that doesn't sound right...
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
If the consumer is using BEP, then it doesn't care how the events are going into the buffer. availableSequence marks the highest event that can be reached without reloading memory. So the end of batch flag really means "this is the last thing event I can deliver before I need to check for more writes from the publishers".
Producers keep producing events, but consumers only get them when the ring buffer is full.
Oh, I'd be very surprised if that is desirable behavior. When the ring buffer is full, the publishers can not write to it, so they either block or drop data on the floor or create backpressure, depending on your design. Likewise, the consumer would be blocked while waiting on the publishers. When the consumer is waiting on the network, your producers still can't make progress.
There was a really good talk on using disruptor for batching to smooth out network performance. I believe I'm remembering "Mechanical Sympathetic Networking", by Todd Montgomery
http://www.infoq.com/presentations/Messaging-TCP-IP
Executive summary being you get nice networking behavior out of the box by letting the producers enqueue more work while the consumer is dealing with the network.
Can the sequence of the incoming events be reset back to 0?
I can't guess what you think that means. Once the event has been overwritten in the ring buffer, the previous contents are gone, so I'm not sure what you are hoping to gain with a reset.
final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{
private static final int BATCH_SIZE = 100;
private static final List<ValueEvent> BATCH = new ArrayList();
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
// process a new event.
BATCH.add(event);
if (sequence % BATCH_SIZE == 0) {
processBatch();
BATCH.clear();
}
}
};
What is the difference between EventHandler and BatchEventProcessor, use case wise?
Primarily separation of concerns. You can easily test the EventHandler in isolation, for example in a unit test by calling onEvent. You don't actually need to have a ring buffer, dedicated threads, and so on behind it. You might swap out the BatchEventProcessor if you decided that you wanted different cancellation semantics (choosing an idea at random).
So the batch in BEP does not actually refer to consuming events, by default, in batches?
publicvoid onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
target.onEvent(event, sequence, (sequence % BATCH_SIZE == 0) || endOfBatch);
}
final long availableSequence = Math.min(sequenceBarrier.waitFor(nextSequence), BATCH_SIZE);
So for instance, I want to maintain a maximum latency in processing any event to 500ms
So the batch in BEP does not actually refer to consuming events, by default, in batches?
I wouldn't language it that way. It's closer to say that the batch event processor loads events in batches. What it is really doing is figuring out which events are available in memory (remember, you are working in multiple threads - the processor running the producer thread writes the event locally, eventually the write gets flushed to a common area, then the processor running the consumer thread loads that memory, so that the events can be used). The java memory model offers certain guarantees about which writes happen before other writes. The sequence barrier does the bookkeeping to let you know how many events you can read before the next load from memory is necessary to progress. The batch event processor is consuming the events in batches, in that sense, by passing the batch of events (one at a time) to the EventHandler.
If you wanted to enforce "at most 100 per batch", then you have a few choices. You can put the process batch detection in the EventHandler as you have done here (though I wonder why not use BATCH.size() instead of sequence?) You could write an EventHandler that flushes the BATCH every time endOfBatch is set, and call that handler from another that keeps the running count going
publicvoid onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
target.onEvent(event, sequence, (sequence % BATCH_SIZE == 0) || endOfBatch);
}
You could replace the batch event processor with one that caps the availableSequence number on the batch sizefinal long availableSequence = Math.min(sequenceBarrier.waitFor(nextSequence), BATCH_SIZE);
Similarly, you could wrap the SequenceBarrier, so that waitFor is capped.
The first two choices, where you are implementing the batching in EventHandler, are the "right" way to do it.
So for instance, I want to maintain a maximum latency in processing any event to 500ms
What's supposed to happen when things get slower than that?
But then, I come up with this doubt that how things work in case I do not used BEP and simply provide a handler. Who does the bookkeeping?
Which first two choices are you referring here?
By slower, if you mean that my consumer is taking time to process the batch, then I handle that in another thread as processing of the batch is actually supposed to take time. Otherwise, if you mean that what if my batch is not full within 500ms, then I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch even if the batch is not full.
But then, I come up with this doubt that how things work in case I do not used BEP and simply provide a handler. Who does the bookkeeping?
Right. That's a good doubt to have. If none of the existing implementations in the library fit your use case, you can provide your own implementation of com.lmax.disruptor.EventProcessor, and use that. But my feeling is that even copying the prior art correctly requires a good understanding of what the library code is doing.
Which first two choices are you referring here?
Sorry; I had sketched 4 different ways you might meet your "batch every N events" requirement.
I showed two different ways of doing it in the EventHandler itself. Either of those approaches could be suitable, and keep you well away from the bookkeeping logic. So my recommendation is to restrict your search to implementations like them.
But for completeness, I also explained that you produce a similar effect by getting closer to the protocol (ie, writing an EventProcessor, or implementing a customized SequenceBarrier). I discourage those approaches, because they require a more detailed understanding of what's actually happening. But if you have to have "smarter" batching, and you can't get it to work using the event handlers alone, then maybe you look where the monsters live.
By slower, if you mean that my consumer is taking time to process the batch, then I handle that in another thread as processing of the batch is actually supposed to take time. Otherwise, if you mean that what if my batch is not full within 500ms, then I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch even if the batch is not full.
Some thoughts: batching up a bunch of work, and then publishing the result somewhere that can be seen by other threads: that's a normal disruptor pattern. For instance, the event handler could be holding a local copy of a state machine. Each processed event advances the machine, but the "current state" of the machine is only published at the end of a batch of events. Very reasonable.
Having some other thread reach into the event handler to write to the batch; that's sounds like a violation of the single writer principle. I haven't really explored the implications of that for a long time -- I filed it away under Bad Ideas, and haven't looked back.
The more idiomatic approach would be to add clock tick events to the event stream; a publisher thread puts a "the time is now" message into the event stream, and when the consumer sees that event, it compares the time value to the latest time value it had seen when it fired the batch. If the interval is too large, it flushes the batch "immediately" and updates its tracking of the "currentTime".
Well, I do not have a single writer to begin with. I have single reader.
I have a separate scheduled task running at 500 ms interval which looks at the last drain time to drain any pending events in the batch
since the producer is multi threaded producer, it cannot really keep track of time or count of produced events