We are using Disruptor 3.4 We have the following setup:
______________________________
disruptor = new Disruptor<>(
GenericEvent::new,
8192,
createThreadFactory("processor-thread"),
ProducerType.SINGLE,
new BlockingWaitStrategy());
ringBuffer = disruptor.getRingBuffer();
// setup handlers (e.g. consumers)
consumers = createPool(128);
disruptor.setDefaultExceptionHandler(new ExceptionEventHandler<>(lastExceptionReference));
disruptor.handleEventsWithWorkerPool(consumers).then(new ClearingEventHandler<>());
// start disruptor
disruptor.start();
_________________________
Our EventHandlers (consumers) look like this:
___
public class EventHandler<S, M extends Serializable>
implements WorkHandler<Event<M>>, EventHandler<Event<M>> {
protected ProcessingEventHandlerBase(final T t) {
}
@Override
public void onEvent(Event<M> event) throws Exception {
processEvent(event);
}
@Override
public void onEvent(Event<M> event, long sequence, boolean endOfBatch) throws Exception {
processEvent(event);
}
___
We realized that this configuration was using only one of the consumers. We want to 'round-robin' the consumers so that each consumer works on 1/128 of the jobs.
So we changed to disruptor.handleEventsWith from disruptor.handleEventsWithWorkerPool
Our consumer changed to
public class EventHandler<S, M extends Serializable>
implements EventHandler<Event<M>> {
protected ProcessingEventHandlerBase(final T t) {
}
@Override
public void onEvent(Event<M> event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % consumerNumber != 0) {
return;
}
processEvent(event);
}
___
Does the second setup look better?
What happens if the consumer processEvent method throws an exception? Does that thread die?
Thanks