Is this the correct usage pattern for a multiple producer multiple consumer disruptor?

1,269 views
Skip to first unread message

Rory

unread,
Mar 15, 2013, 7:03:04 AM3/15/13
to lmax-di...@googlegroups.com
Hi,

I'm having issues in my application whereby the event queue is becoming overloaded very quickly. 

I'm not that familiar with the LMAX disruptor, it is code that I inherited, so was wondering if someone could maybe take a look at my code and see if this is the correct usage? I have been reading up on it but am still a little fuzzy on it. 

I appreciative its hard to tell without more context if the code below is correct or not, but I would just like to know if it at least looks ok, maybe there's something obvious in it that doesn't look right etc. 

I'm unsure at this stage if the producers are producing events too quickly for the 5 consumers to handle, or if the consumers are taking too long to process events (or both).

Any help is much appreciated!

So, I'm creating the disruptor like so:

public void createMultipleProducerMultipleConsumerQueue() {
        this.size = 65536;
        this.numThreads = 5;

        executor = Executors.newFixedThreadPool(numThreads , new EventQueueThreadFactory());

        ringBuffer = new RingBuffer<Event>(Event.FACTORY,
                                           new MultiThreadedClaimStrategy(size),
                                           new BlockingWaitStrategy());

        batchEventProcessors = new BatchEventProcessor[numThreads];

        final Sequence[] sequences = new Sequence[numThreads];

        for (int i = 0; i < numberOfConsumerThreads; ++i) {
            final EventHandler eventHandler = new EventHandler(i, numThreads);
            batchEventProcessors[i] = new BatchEventProcessor<Event>(ringBuffer, ringBuffer.newBarrier(), eventHandler);
            sequences[i] = batchEventProcessors[i].getSequence();
            executor.submit(batchEventProcessors[i]);
        }

        ringBuffer.setGatingSequences(sequences);
}

And events are submitted like so:

public void submit(final Executable executable) {
        if (0 == ringBuffer.remainingCapacity()) {
            Log.EventQueueFull(size);
            return;
        }

        final long sequence = ringBuffer.next();
        final Event event = ringBuffer.get(sequence);
        event.build(executable);
        ringBuffer.publish(sequence);
}

And this is the EventHandler:

    public EventHandler(final long ordinal, final long numberOfConsumers) {
        this.ordinal = ordinal;
        this.numberOfConsumers = numberOfConsumers;
    }

    @Override
    public void onEvent(final Event event, final long seq, final boolean bln) {
            if ((seq % numberOfConsumers) == ordinal) {
                event.doWork();
    }

Many thanks,

Rory

Marvin Hansen

unread,
Mar 17, 2013, 6:13:45 PM3/17/13
to lmax-di...@googlegroups.com
Hi Rory,

I've tried out your code and according to my observation it looks like this line

      executor.submit(batchEventProcessors[i]);

causes a lot of overhead.


First of all, if not done yet, I would suggest you to use the latest disruptor version,
 3.0.0B3 which you can obtain from Maven Central:

http://search.maven.org/#browse|1726177859


Second, I would suggest to re-write the builder like this:



for (int i = 0; i < numberOfConsumerThreads; ++i) {
            final EventHandler eventHandler = new EventHandler(i, numThreads);
            batchEventProcessors[i] = new BatchEventProcessor<Event>(ringBuffer, ringBuffer.newBarrier(), eventHandler);
          
           // use disruptor to handle event processing instead of using executer-submit
           disruptor.handleEventsWith(
batchEventProcessors[i])
          }
        ringBuffer.setGatingSequences(sequences);
}

I'm relatively new to the disruptor as well, but while comparing

      executor.submit(batchEventProcessors[i]);

versus

      disruptor.handleEventsWith(batchEventProcessors[i])

I've noticed that the later runs signicant faster. My synthetic benchmark
is running ~55 seconds for the executor approach and under ten seconds for using
disruptor.handleEventsWith. I've measured average time over ~100 runs and variance
is much higher for the executer service as well. I honestly don't know where bottleneck
is in the executor service but from my results, I would strongly suggest you
to do your own comparison to figure out if this applies to your code as well.



Third, make sure the RingBuffer size and the used date fit all together into the CPU Cache.
On linux, run a "lscpu" to get details about the actual cache size. In your code, the size is
arbitrary set to "65536" which is maybe okay on a server with a big cache CPU.
Try to reduce the number and look how throughput and execution time is changing. If no change
is happening, then the size okay. If execution time is getting faster, then the Size was chosen too big
and has lead to cache misses.  As a guestimation, just try 1024 and start increasing and measuring from there.

Also, I've got very good results with the totall number of threads set to 2*Cores to fully utilize HyperThreading
on Intel CPU's but you need to run your own benchmark for verification.

Finally, using the EventHandler with modulo arithmetic is actually the fastest way to process data
and it is clearly advised in the official FAQ:

http://code.google.com/p/disruptor/wiki/FrequentlyAskedQuestions#How_do_you_arrange_a_Disruptor_with_multiple_consumers_so_that_e

I've just finished an evaluation of workerPool versus EventHandler with modulo and
the later is just insane fast. I mean, it is so incredible fast that feeding my RingBuffer becomes
actually the limiting factor in my application. Just make sure you use the latest disruptor version and the build
in mechanism for eventHandling, as described above.


cheers
Marvin

Rory

unread,
Mar 19, 2013, 7:40:53 AM3/19/13
to lmax-di...@googlegroups.com
Hi Marvin,

Thanks a million. You've given me a lot of good stuff to try here. I appreciate you taking the time to look at this. I'll let you know how I get on.

Many thanks,

Rory
Reply all
Reply to author
Forward
0 new messages