Hi,
I'm having issues in my application whereby the event queue is becoming overloaded very quickly.
I'm not that familiar with the LMAX disruptor, it is code that I inherited, so was wondering if someone could maybe take a look at my code and see if this is the correct usage? I have been reading up on it but am still a little fuzzy on it.
I appreciative its hard to tell without more context if the code below is correct or not, but I would just like to know if it at least looks ok, maybe there's something obvious in it that doesn't look right etc.
I'm unsure at this stage if the producers are producing events too quickly for the 5 consumers to handle, or if the consumers are taking too long to process events (or both).
Any help is much appreciated!
So, I'm creating the disruptor like so:
public void createMultipleProducerMultipleConsumerQueue() {
this.size = 65536;
this.numThreads = 5;
executor = Executors.newFixedThreadPool(numThreads , new EventQueueThreadFactory());
ringBuffer = new RingBuffer<Event>(Event.FACTORY,
new MultiThreadedClaimStrategy(size),
new BlockingWaitStrategy());
batchEventProcessors = new BatchEventProcessor[numThreads];
final Sequence[] sequences = new Sequence[numThreads];
for (int i = 0; i < numberOfConsumerThreads; ++i) {
final EventHandler eventHandler = new EventHandler(i, numThreads);
batchEventProcessors[i] = new BatchEventProcessor<Event>(ringBuffer, ringBuffer.newBarrier(), eventHandler);
sequences[i] = batchEventProcessors[i].getSequence();
executor.submit(batchEventProcessors[i]);
}
ringBuffer.setGatingSequences(sequences);
}
And events are submitted like so:
public void submit(final Executable executable) {
if (0 == ringBuffer.remainingCapacity()) {
Log.EventQueueFull(size);
return;
}
final long sequence = ringBuffer.next();
final Event event = ringBuffer.get(sequence);
event.build(executable);
ringBuffer.publish(sequence);
}
And this is the EventHandler:
public EventHandler(final long ordinal, final long numberOfConsumers) {
this.ordinal = ordinal;
this.numberOfConsumers = numberOfConsumers;
}
@Override
public void onEvent(final Event event, final long seq, final boolean bln) {
if ((seq % numberOfConsumers) == ordinal) {
event.doWork();
}
Many thanks,
Rory