Question about Usage

245 views
Skip to first unread message

Robert DiFalco

unread,
Nov 26, 2017, 12:23:21 PM11/26/17
to Conversant Disruptor
A couple of things, would I see improved performance if I simply dropped in one of these into a CallRuns Policy bounded ThreadPoolExecutor instead of the usual ArrayListBlockingQueue? 

Also, I have quite a few single producer and multiple producer to single consumer workers that look something like this. This for example would be used to force multiple writers into a single thread that writes to a file system. Would you expect the DisruptorBlockingQueue to be more efficient here? Note that this may sometimes even be a single producer and single consumer. In these cases I want to process results on a different single thread that the single thread I am producing them on. 

public EventPipeline(Consumer<T> writer, T terminator) {
this.writer = writer;
this.END_OF_STREAM = terminator;

queue = new ArrayBlockingQueue<T>(10_000);
future = CompletableFuture.runAsync(this::run, executorService);
}

public void submit(T event) throws InterruptedException {
queue.put(event);
}

public void waitForDone() throws InterruptedException {
future.join();
}

public void run() {
try {
while (true) {
T item = queue.take();
if (item == END_OF_STREAM) {
break;
}

writer.accept(item);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}


Then I have a single producer and multiple consumer that looks like this. I'm wonder if there would be a benefit of using one of your queues for this versus an ArrayBlockingQueue as the thread pool executor's queue. I create a caller runs policy with a bounded queue of say 4-8 (depending on the number of processors on the machine). 

public void processAsync() throws InterruptedException {
for (Map<String, String> line : csvFile) {
workCounter.incrementAndGet();
executor.execute(() -> {
lineConsumer.accept(line);
if (workCounter.decrementAndGet() == 0) {
latch.countDown();
}
});
}

latch.await();
}

I'm omitting some code so it's simpler to follow but the CSV file may have millions of lines, the line Consumer is any consumer the client code sends me to process the CSV lines in an async way. 

Thanks!

jo...@2ad.com

unread,
Nov 27, 2017, 10:19:55 AM11/27/17
to Conversant Disruptor
In general, Conversant Disruptor will perform better than ArrayBlockingQueue. However you will not see improved performance unless your application performance is dominated by those calls. This is the sort of thing you could answer by benchmarking your application.

In the case where you have a single producer single consumer you can use the PushPullBlockingQueue. That will often perform better than the multi threaded version.

In any case where you have a multi producer or a multi consumer you must use DisruptorBlockingQueue.

Also if you’re setting your queue limit to 4 that might be too low to allow buffering the incoming requests. You might see some improvement in performance by increasing that bound even if your machine has fewer processors because I/O operations will frequently occur in chunks.

John
Reply all
Reply to author
Forward
Message has been deleted
0 new messages