A couple of things, would I see improved performance if I simply dropped in one of these into a CallRuns Policy bounded ThreadPoolExecutor instead of the usual ArrayListBlockingQueue?
Also, I have quite a few single producer and multiple producer to single consumer workers that look something like this. This for example would be used to force multiple writers into a single thread that writes to a file system. Would you expect the DisruptorBlockingQueue to be more efficient here? Note that this may sometimes even be a single producer and single consumer. In these cases I want to process results on a different single thread that the single thread I am producing them on.
public EventPipeline(Consumer<T> writer, T terminator) {
this.writer = writer;
this.END_OF_STREAM = terminator;
queue = new ArrayBlockingQueue<T>(10_000);
future = CompletableFuture.runAsync(this::run, executorService);
}
public void submit(T event) throws InterruptedException {
queue.put(event);
}
public void waitForDone() throws InterruptedException {
future.join();
}
public void run() {
try {
while (true) {
T item = queue.take();
if (item == END_OF_STREAM) {
break;
}
writer.accept(item);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Then I have a single producer and multiple consumer that looks like this. I'm wonder if there would be a benefit of using one of your queues for this versus an ArrayBlockingQueue as the thread pool executor's queue. I create a caller runs policy with a bounded queue of say 4-8 (depending on the number of processors on the machine).
public void processAsync() throws InterruptedException {
for (Map<String, String> line : csvFile) {
workCounter.incrementAndGet();
executor.execute(() -> {
lineConsumer.accept(line);
if (workCounter.decrementAndGet() == 0) {
latch.countDown();
}
});
}
latch.await();
}
I'm omitting some code so it's simpler to follow but the CSV file may have millions of lines, the line Consumer is any consumer the client code sends me to process the CSV lines in an async way.
Thanks!