Is Disruptor appropriate for asynchronously inserting records to database.

2,181 views
Skip to first unread message

Yang Derek

unread,
May 25, 2014, 10:14:41 PM5/25/14
to lmax-di...@googlegroups.com
Let say a great number of messages are kept coming in and they needs to be inserted into database.

Because consumer is far more slow than producer, in order not to block the message receiving, I use Disruptor with several InsertEventHandler (Let say I have 5 InserEventHandler in the case).

Because one message should be inserted into database once, I'm following the below pattern:


Here's my question:

#1. When InsertEventHandler begin to consumer the data in ringbuffer, does it need to wait for the completion of the insertion of InsertEventHandler  before moving to the next index in ringbuffer?

#2. Could 5 InsertEventHandler consumer 5 message in the ringbuffer simultaneously?

Michael Barker

unread,
Jun 3, 2014, 12:17:30 AM6/3/14
to lmax-di...@googlegroups.com
Answers inline.
 
#1. When InsertEventHandler begin to consumer the data in ringbuffer, does it need to wait for the completion of the insertion of InsertEventHandler  before moving to the next index in ringbuffer?

I'm not sure what the question is here.  Are you talking about 2 independent instances of InsertEventHandler or the same instance?

#2. Could 5 InsertEventHandler consumer 5 message in the ringbuffer simultaneously?

Yes.

However, in this circumstance the best approach for handling high volumes of inserts it not necessary by going parallel, but instead use the batching capabilities of the Disruptor and use SQL batch inserts.  If you are inserting into the same table from multiple threads you are likely to just run into lock contention in the database and won't get the parallel speed up you are looking for.  The EventHandler interface has a batching flag that you can use to determine if any more events are about to come through.  E.g.

class InsertEventHandler implements EventHandler<RowEvent> {
    private int count = 0;
    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        addToBatch(event);
        if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
        }
    }
}

I would break up the application such that you have 1 disruptor per table, as you can't batch write across multiple tables assuming that you don't need insert across multiple tables in the same transaction.

Mike.

AndrewL

unread,
Jun 3, 2014, 4:05:52 AM6/3/14
to lmax-di...@googlegroups.com
I agree with Mike, it is better to keep all the operations on one target in a single handler.  In my case I am writing to a persistence technology that has poor write contention, so it better not parallelize writes - much better to batch them.

To add to Mike's code, if you have a dependency graphs like this:

disruptor.handleEventsWith(doInitialThings)
.then(insertIntoTableHandler).
.then(concludingThings)

and you want the "concludingThings" to operate as soon as the "flushBatch()" has executed you need this optimization:

1. add to your Handler the interface SequenceReportingEventHandler

2. add 

public void setSequenceCallback(Sequence sequence) {
this.sequenceCallback = sequence;
}

3. change the end of batch processing:

        if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
            sequenceCallback.set(sequence);
        }


Explanation: 

Normally when the the class managing your EventHandler (BatchEventProcessor) detects a batch, it will call your onEvent successively and only mark all the events from the batch handled once the last onEvent has returned.  Now suppose your Handler had been busy inserting a batch and a massive backlog of 1000 events had built up.  Let's assume that you have set batchSize=100 then flushBatch() will execute 10 times and none of the 1000 events will be "released" do the following concludingThings handler.  By adding the sequence.set(lastSequence)concludingThings gets access to the events when your inner batch has finished.

Yang Derek

unread,
Jun 4, 2014, 3:15:25 AM6/4/14
to lmax-di...@googlegroups.com
Hi Mike,

Let me explain more. The application is dealing with one table only. So the multiple InsertEventHandlers are inserting data to the same table, which may cause write content on database side, according to your words.

#1 What I mean is: Let say we have InsertEventHandler1 and InsertEventHandlers2, at first both InsertEventHandlers1 and InsertEventHandlers2 get the data from ringbuffer but it's InsertEventHandlers1's turn to handle the data. So InsertEventHandlers1 start to process and insert to database and InsertEventHandlers2 just pass due to the modulo operation (the same thing like: https://github.com/LMAX-Exchange/disruptor/wiki/Frequently-Asked-Questions#how-do-you-arrange-a-disruptor-with-multiple-consumers-so-that-each-event-is-only-consumed-once)

Let's assume InsertEventHandler1 will take a long long time to process the data. At this particular point, how is the behavior of InsertEventHandler2? Is InsertEventHandler2 waiting until InsertEventHandler1 is finished, then both move to the next Event of ringbuffer? Or InsertEventHandler2 will move forward to get the next event of ringbuffer and go processing regardless how long InsertEventHandler1 will take?



#2 The application is only responsible for calling in stored procedure only. So I'm not sure if there's batch inserts.

So you're not suggesting the multiple InsertEventHandler to insert the same table, right? But from my testing, multiple InsertEventHandler to insert is much faster than only one InsertEventHandler to insert.

在 2014年6月3日星期二UTC+8下午12时17分30秒,mikeb01写道:

Yang Derek

unread,
Jun 4, 2014, 3:35:01 AM6/4/14
to lmax-di...@googlegroups.com
Hi AndrewL

Thx for you reply.

The strange thing is that even write contention exists, the performance of multiple write to the same table is still much faster than those of single write to the table.

I'm using Java+Spring+Sql Server, is there any batch insert for it?

在 2014年6月3日星期二UTC+8下午4时05分52秒,AndrewL写道:

Michael Barker

unread,
Jun 4, 2014, 3:46:09 AM6/4/14
to lmax-di...@googlegroups.com
Hi Yang,

How much faster is it going with 2 threads, you'll probably find that it will have some contention, but not so much that some parallelism will improve performance.  However I suspect it won't linearly.  Batch inserts should be available for Java+SQL Server.  I think Spring has a API for it too.  However, if your application is calling a stored procedure then you are probably out of luck.  If this is the case, then you are probably not getting all that much value out of the Disruptor.

As for question #1 independent event handlers will not have to wait on each other.  Although if one of them blocks for long enough eventually you will run out of space in the ring buffer ant the producer will need to block or drop incoming events.

Mike.


--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

AndrewL

unread,
Jun 4, 2014, 3:55:44 AM6/4/14
to lmax-di...@googlegroups.com
My persistence store is MongoDB which has a single database-wide write lock.

This is how you can approach multi-thread JDBC style updates:

disruptor.handleEventsWith(handler1).then(handlerMore).then(persister0, persister1, persister2)

The persisters will operate in parallel.  But each receives the same event, so you need this kind of trick:

class Persister<T> {
  int instance;
  Persister(int instance) {..}
  onEvent(T event, long sequence, boolean endOfBatch)
     if(sequence % PERSISTER_INSTANCES == instance) {
     }

...now the parallel running Handlers will only process some of the events.

Jason Koch

unread,
Jun 4, 2014, 5:31:33 AM6/4/14
to lmax-di...@googlegroups.com
Hi Yang,

> On 4 Jun 2014, at 5:15 pm, Yang Derek <info...@gmail.com> wrote:
>
> #2 The application is only responsible for calling in stored procedure only. So I'm not sure if there's batch inserts.
>
> So you're not suggesting the multiple InsertEventHandler to insert the same table, right? But from my testing, multiple InsertEventHandler to insert is much faster than only one InsertEventHandler to insert.
>

If I read correctly, you are not inserting direct to table but actually calling a stored proc to insert into the table.

If so I think there are two reasonable options and it really depends which db server you are using and how much control you have

1) modify stored proc to accept arrays or logical tables of data and have the stored proc iterate the array and insert it all as a batch. This will work for oracle DB's jdbc extensions but not sure about other data stores. In this case you would need to build the array until endOfBatch=true and then make a single bulk call to run the SP.

Better yet,

2) abandon the stored proc and call direct INSERT statements on the DB and use the jdbc API to insert the rows as a batch.

The aim is to combine multiple data round trips and insert commit requests into a single chunk (batch). I'm my experience this has a 5-10x improvement in avg latency when the db is under load while also reducing utilisation at the DB.

However if you are using mongo - I have very little direct experience with Mongo. You might try the above anyway and see the results.

Yang Derek

unread,
Jun 4, 2014, 10:52:09 AM6/4/14
to lmax-di...@googlegroups.com
Hi Mike,

You are right. The performance of insertion is not linear with the number of threads. But 4 threads are at lease twice time faster than only 1 thread in my case.

Now I am considering batch the insertion instead of calling the stored procedure to see if it's faster. And you are suggesting to use only 1 thread for the batch insertion, right?

As for #1, I misunderstood at the first place. So the event handlers could keep going independently. And the first handler who arrive at the beginning of the ring buffer will wait for slow handlers. Once all of the handlers arrives at the beginning of the ring buffer, and then they start to process again. Is that right?


在 2014年6月4日星期三UTC+8下午3时46分09秒,mikeb01写道:

Yang Derek

unread,
Jun 4, 2014, 10:57:16 AM6/4/14
to lmax-di...@googlegroups.com
Thanks for your reply. 

I'm doing the same thing with multiple handleEventsWith() methods and modulo operation to make parallel running.


在 2014年6月4日星期三UTC+8下午3时55分44秒,AndrewL写道:

Yang Derek

unread,
Jun 4, 2014, 11:01:29 AM6/4/14
to lmax-di...@googlegroups.com
Hi Jason,

I'm using Spring JDBC+Sql Server 2008 r2.

So in your experience, calling stored procedure per messages is worse than batching tens of or hundreds of messages at one time. So I''m considering try it out.

BTW, if batch is used, could multi-threading of batching increase the performance?

在 2014年6月4日星期三UTC+8下午5时31分33秒,Jason Koch写道:

Michael Barker

unread,
Jun 5, 2014, 5:05:56 PM6/5/14
to lmax-di...@googlegroups.com
BTW, if batch is used, could multi-threading of batching increase the performance?

It might do, but it is possible that with batching proportionally more time will be spent actually writing data to the database, which may be the part of the process that cannot run concurrently.  If so the gains will be limited.

Mike. 

Yang Derek

unread,
Jun 5, 2014, 10:58:49 PM6/5/14
to lmax-di...@googlegroups.com
Thx Mike,

After abandoning stored procedures and changing to batch with 2 threads, the performance is incredibly increased, just like Jason Koch mentioned above.

But here are the following questions for this design:

#1. You mention that handler should be like this:

class InsertEventHandler implements EventHandler<RowEvent> {
    private int count = 0;
    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        addToBatch(event);
        if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
        }
    }
}

But when I test it, I find that the endOfBatch is always true. Am I missing something?

So I remove "endOfBatch" from "if" and change the "if" to this:

if (++count >= batchSize) {
            flushBatch();
            count = 0;
        }

It perform well. So I'd like to ask you about the details of endOfBatch and why it is always true in my program.


#2. Assume that the batchSize is 200 and there're only 150 now. Because the main market is closed and the frequency of message will drop down to 1 message per hour let say, the program will never call flushBatch until it reach to 200 messages. In this case, the remaining 150 message will wait for hours to get inserted into database.

I'd like to trigger flushBatch() every interval to make sure that it won't wait for too long before getting inserted?

So how to achieve this? Add another thread to monitor that addBatch() size?


在 2014年6月6日星期五UTC+8上午5时05分56秒,mikeb01写道:

Michael Barker

unread,
Jun 5, 2014, 11:06:58 PM6/5/14
to lmax-di...@googlegroups.com
The endOfBatch flag will be false if at the time of making the call to onEvent there are other events waiting in the ring buffer for processing.  If you producing rate is lower than you consuming rate then it is possible that the endOfBatch flag will be true most of the time.

It is also possible that I have introduced a bug somewhere.  Are you using the single producer or multi producer implementation?

Mike.


--

Michael Barker

unread,
Jun 5, 2014, 11:23:50 PM6/5/14
to lmax-di...@googlegroups.com
I'm confident that batching is working.  Below is a unit test that demonstrates.  Batching in in the Disruptor is adaptive [0].

[0] http://mechanical-sympathy.blogspot.co.nz/2011/10/smart-batching.html

package com.lmax.disruptor;

import static org.hamcrest.CoreMatchers.not;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.support.LongEvent;

@RunWith(Parameterized.class)
public class BatchingTest
{
    private final ProducerType producerType;

    public BatchingTest(ProducerType producerType)
    {
        this.producerType = producerType;
    }

    @Parameters
    public static Collection<Object[]> generateData()
    {
        Object[][] producerTypes = { { ProducerType.MULTI }, { ProducerType.SINGLE } };
        return Arrays.asList(producerTypes);
    }

    @SuppressWarnings("unchecked")
    @Test
    public void shouldBatch() throws Exception
    {
        Disruptor<LongEvent> d = new Disruptor<LongEvent>(LongEvent.FACTORY, 2048, Executors.newCachedThreadPool(),
                producerType, new SleepingWaitStrategy());

        final int[] batchingCount = { 0 };
        final long[] publishedValue = { 0 };

        d.handleEventsWith(new EventHandler<LongEvent>()
        {
            @Override
            public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception
            {
                if (!endOfBatch)
                {
                    batchingCount[0]++;
                }
                else
                {
                    LockSupport.parkNanos(1);
                }

                publishedValue[0] = event.get();
            }
        });

        RingBuffer<LongEvent> buffer = d.start();

        EventTranslator<LongEvent> translator = new EventTranslator<LongEvent>()
        {
            @Override
            public void translateTo(LongEvent event, long sequence)
            {
                event.set(sequence);
            }
        };

        for (int i = 0; i < 10000; i++)
        {
            buffer.publishEvent(translator);
        }

        while (publishedValue[0] != buffer.getCursor())
        {
            Thread.sleep(1);
        }

        Assert.assertThat(batchingCount[0], is(not(0)));
    }
}

Jason Koch

unread,
Jun 6, 2014, 2:29:37 AM6/6/14
to lmax-di...@googlegroups.com
If I'm not mistaken endOfBatch will be true if you are using WorkerPool as well rather than a normal handler. There may be other situations, I can't check just now.

Sent from my iPad

Yang Derek

unread,
Jun 6, 2014, 5:53:48 AM6/6/14
to lmax-di...@googlegroups.com
Hi Mike,

It's single producer in my case but the consumer could be 1 thread or 2 threads. 

Assume that I have 901 messages to process and batchSize=200. When there is only 1 consumer thread to consume the data, if I make the judgement statement like this:

if (endOfBatch || count >= properties.dbBatchSize())

the result is fine the consumer ends up processing all 901 messages. However, when I use 2 consumer threads to process the message, the result is a little bit weird and I believe there's some messages missing. Here's the log:

Total flush 392 messages to database --By pool-1-thread-2.
Total flush 451 messages to database --By pool-1-thread-1

and 392+451=843 < 901, which is wrong result. Here is my codes:

class InsertEventHandler implements EventHandler<RowEvent> {
    private int count = 0;
    private int maxCount = 0;

    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        addToBatch(event);
        maxCount++;

        if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
            log.info("Total flush " + maxCount + " messages to database");
        }
    }
}

So what's going wrong with 2 consumer threads? Why not the result end up with 901? The funny thing is thread-1 always consume 451 while the number of thread-2 consuming is varied, which means sometime the total flush number of thread-2 is 405, sometime is 392, and it changes.


在 2014年6月6日星期五UTC+8上午11时06分58秒,mikeb01写道:

Jason Koch

unread,
Jun 6, 2014, 9:39:14 PM6/6/14
to lmax-di...@googlegroups.com
Is endOfBatch true shortcutting the count++ increment branch?

Sent from my iPhone

Yang Derek

unread,
Jun 9, 2014, 2:34:36 AM6/9/14
to lmax-di...@googlegroups.com
I think the problem is thread-1 hits the end of the Batch and turn endOfBatch=true, meanwhile thread-2 doesn't finish the job.

How to resolve this?

在 2014年6月7日星期六UTC+8上午9时39分14秒,Jason Koch写道:

Michael Barker

unread,
Jun 9, 2014, 3:23:40 AM6/9/14
to lmax-di...@googlegroups.com
Hi,

I'm not sure what problem you're seeing, see the unit test attached which works as expected.

Mike.
BatchingTest.java

Yang Derek

unread,
Jun 10, 2014, 3:17:51 AM6/10/14
to lmax-di...@googlegroups.com
Hi Mike.

I know what's wrong eventually. Thanks to you codes.

In your BatchingTest.java, it's like:

class InsertEventHandler implements EventHandler<RowEvent> {
    private int count = 0;
    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        
        if((sequence & mask)==ordinal){
           addToBatch(event);
        }
 
        if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
        }
    }
}

But what I wrote is a little bit different:

class InsertEventHandler implements EventHandler<RowEvent> {
    private int count = 0;
    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        
        if((sequence & mask)==ordinal){
           addToBatch(event);

           if (endOfBatch || ++count >= batchSize) {
            flushBatch();
            count = 0;
           }
        }
    }
}

In my way, the endOfBatch is not working appropriately, which is setting to true for one thread but setting to false to another thread.

Why is that happening?


在 2014年6月9日星期一UTC+8下午3时23分40秒,mikeb01写道:

Danil Suits

unread,
Jun 10, 2014, 11:13:05 AM6/10/14
to lmax-di...@googlegroups.com
Short answer: endOfBatch is being set to true for both threads - one of your event processors ignores it.

Consider this case: the ring buffer has three events available to the consumers right now.
event[0]
event[1]
event[2]

To the batch event processors, that's going to look like one batch of three events.  This means that it is going to make the following three calls

onEvent(event[0], 0, false);
onEvent(event[1], 1, false);
onEvent(event[2], 2, true);

These three calls are being made to each of the two ParallelProcessors

In Mike's code, he has been careful to separate "addToBatch" from "processBatch".  When his ParallelProcessor(0) receives event[2], it adds that event to the batch and flushes the batch.  When his ParallelProcessor(1) receives event[2], it ignores the event, but it still checks the end of batch flag to perform the flush.

In your code, you've conflated the batch handling and the event processing.  Your ParallelProcessor(0)  does exactly what Mike's does when it receives event[2], but your ParallelProcessor(1) skips the event entirely, which means that it fails to check the endOfBatch flag.  In effect, this means that your ParallelProcessor(1) fails each time a batch happens to end on an even number.  In any test, one of the two ParallelProcessors - which is determined by the total number of messages - will always flush all of its events.  The other might flush all of the events it collects, but probably won't.


It looks to me as though there's also a little bit of confusion about count, which should probably track be tracking the number of events added to the batch...

// Warning - UNTESTED CODE CHANGE

class InsertEventHandler implements EventHandler<RowEvent> 
{
    private int count = 0;
    public void onEvent(RowEvent event, long sequence, boolean endOfBatch) {
        // If this is our sequence number, then act on the event.
        if((sequence & mask)==ordinal){
           addToBatch(event);
           ++count;
        }
 
        // Check to see if we have reached the limit of the batch, whether
        // or not this is our sequence number.
        if (endOfBatch || count >= batchSize) {

Yang Derek

unread,
Jun 10, 2014, 10:05:43 PM6/10/14
to lmax-di...@googlegroups.com
Hi Danil,

Thanks very much for the detailed and clear explanation. I understand now.

Cheers

在 2014年6月10日星期二UTC+8下午11时13分05秒,Danil Suits写道:

Jay

unread,
Sep 10, 2015, 4:53:42 PM9/10/15
to Disruptor
Mike ,

Can you please elaborate more on this logic?

if((sequence & mask)==ordinal)

Is it possible to add more than 2 handlers with the current logic that you have?

        ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
        ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);


    if((sequence & mask)==ordinal)

Michael Barker

unread,
Sep 10, 2015, 5:04:37 PM9/10/15
to lmax-di...@googlegroups.com
Hi Jay,

Can you please elaborate more on this logic?

if((sequence & mask)==ordinal)

This is a fast modulo operation where, but it requires that mask == 2^n - 1 (i.e. the number of handlers needs to be a power of 2.
 
Is it possible to add more than 2 handlers with the current logic that you have?

        ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
        ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);


Yes.  E.g. for 4 handlers.

int numHandlers = 4;
ParallelEventHandler h1 = new ParallelEventHandler(numHandlers - 1, 0);
ParallelEventHandler h1 = new ParallelEventHandler(numHandlers - 1, 1);
ParallelEventHandler h1 = new ParallelEventHandler(numHandlers - 1, 2);
ParallelEventHandler h1 = new ParallelEventHandler(numHandlers - 1, 3);

Mike.

tx...@sina.com

unread,
Oct 28, 2015, 4:12:46 AM10/28/15
to Disruptor


在 2014年5月26日星期一 UTC+8上午10:14:41,Yang Derek写道:
if you do the insertion asynchronously without batching, it is meaningless , why dont you just use asynchronous thread 

 

tx...@sina.com

unread,
Oct 28, 2015, 4:12:46 AM10/28/15
to Disruptor


在 2014年6月3日星期二 UTC+8下午12:17:30,mikeb01写道:
hi mike,
    i think if  we do the insertion parallelly by multi threads in the same table of DB , we wont get locked by database , the records will be inserted to the idle block, wont get blocked


 

Jason

unread,
Oct 28, 2015, 9:47:23 AM10/28/15
to lmax-di...@googlegroups.com
From my experience for database inserts the gains from batching are much more significant than the gains from multi threading. Try batching first with good batch sizes and ensure db storage is tuned. Only then if it is not enough, try parallel insert.

Sent from my iPhone
--
Reply all
Reply to author
Forward
0 new messages