I've written a simple test case to benchmark Disruptor against a
ConcurrentLinkedQueue.
What I have observed is that disruptor sometimes hangs or that is does
not capture all events on the ring, but this might be due to bad
WorkPool synchronization on my end. I am still trying to understand
how disruptor works.
It would be great, if someone with more decent hardware coud reproduce
these results.
Regards,
Pres
Test-System:
Debian Linux 2.6.34.5
AMD Phenom(tm) II X6 1055T Processor
Benchmark:
3 runs each (CLQ, Disruptor) with 100000000 iterations respectively.
Results:
producers: 1, consumers: 1
Queue: 16457ms, RingBuffer: 11201ms, delta:5256ms
Queue: 15601ms, RingBuffer: 11060ms, delta:4541ms
Queue: 15813ms, RingBuffer: 10640ms, delta:5173ms
producers: 1, consumers: 5
Queue: 49113ms, RingBuffer: 32271ms, delta:16842ms
Queue: 64905ms, RingBuffer: 32240ms, delta:32665ms
Queue: 67754ms, RingBuffer: 30583ms, delta:37171ms
producers: 3, consumers: 3
Queue: 38401ms, RingBuffer: 33207ms, delta:5194ms
Queue: 39485ms, RingBuffer: 33297ms, delta:6188ms
Queue: 40691ms, RingBuffer: 33162ms, delta:7529ms
producers: 2, consumers: 4
Queue: 45344ms, RingBuffer: 27339ms, delta:18005ms
Queue: 45120ms, RingBuffer: 27433ms, delta:17687ms
Queue: 44780ms, RingBuffer: 27499ms, delta:17281ms
Queue: 35756ms, RingBuffer: 32175ms, delta:3581ms
Queue: 35860ms, RingBuffer: 31913ms, delta:3947ms
Queue: 35513ms, RingBuffer: 32042ms, delta:3471ms
producers: 5, consumers: 1
Queue: 35756ms, RingBuffer: 32175ms, delta:3581ms
Queue: 35860ms, RingBuffer: 31913ms, delta:3947ms
Queue: 35513ms, RingBuffer: 32042ms, delta:3471ms
Code:
package perf;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.ClaimStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.MultiThreadedLowContentionClaimStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
public class PerfTest {
private final int runs = 3;
private final long iterations = 1000L * 1000L * 100L;
private final int BUFFER_SIZE = 1024 * 8;
private final int numProducers;
private final int numConsumers;
private final Executor exec;
private final Queue<Long> queue;
private final RingBuffer<Holder<Long>> buffer;
private final WorkerPool<Holder<Long>> workPool;
private final Consumer[] consumers;
private final Producer[] producers;
private final long expectedResult = accumulatedAddition(iterations);
public PerfTest(int prod, int cons) {
this.numProducers = prod;
this.numConsumers = cons;
this.exec = Executors.newFixedThreadPool(prod+cons);
consumers = new Consumer[cons];
for (int i=0; i<cons; i++)
consumers[i] = new Consumer();
producers = new Producer[prod];
for (int i=0; i<prod; i++)
producers[i] = new Producer();
queue = new PaddedConcurrentLinkedQueue<Long>();
// queue = new ConcurrentLinkedQueue<Long>();
ClaimStrategy claim = prod==1 ? new
SingleThreadedClaimStrategy(BUFFER_SIZE) : new
MultiThreadedLowContentionClaimStrategy(BUFFER_SIZE);
buffer = new RingBuffer<Holder<Long>>(Holder.EVENT_FACTORY, claim,
new BusySpinWaitStrategy());
workPool = new WorkerPool<Holder<Long>>(buffer, buffer.newBarrier(),
exceptionHandler, consumers);
buffer.setGatingSequences(workPool.getWorkerSequences());
}
public void benchmark() throws Exception {
System.out.println("Starting test: producers: "+numProducers+ ",
consumers: "+numConsumers);
ArrayList<Result> res = new ArrayList<PerfTest.Result>();
for (int i=0; i<runs; i++) {
System.gc();
long q = runQueue();
System.out.println("Queue "+i+ " finished");
res.add(new Result());
res.get(i).setTsQ(q);
}
for (int i=0; i<runs; i++) {
System.gc();
long q = runDis();
System.out.println("Disruptor "+i+ " finished");
res.get(i).setTsB(q);
}
for (Result r:res)
System.out.println(r);
System.out.println("Finished test: producers: "+numProducers+ ",
consumers: "+numConsumers);
}
private long runQueue() throws Exception {
for (Consumer c:consumers)
c.reset(queue);
CountDownLatch latch = new CountDownLatch(numProducers);
long delta = iterations / numProducers;
for (int i=0; i<numProducers; i++) {
long start = i*delta;
long end = (i+1) * delta;
if (i+1==numProducers)
end = iterations;
producers[i].reset(latch, queue, start, end);
}
long tsStart = System.currentTimeMillis();
for (Consumer c:consumers)
exec.execute(c);
for (Producer p:producers)
exec.execute(p);
latch.await();
while (!queue.isEmpty()) Thread.yield();
long tsEnd = System.currentTimeMillis();
// verify that the values do match up
long count = 0;
long value = 0;
for (Consumer c:consumers) {
c.halt();
count += c.getCount();
value += c.getValue();
}
System.out.println(count);
if (count!=iterations || value!=expectedResult)
throw new IllegalStateException("This is bad !");
return tsEnd - tsStart;
}
private long runDis() throws Exception {
for (Consumer c:consumers)
c.reset(queue);
CountDownLatch latch = new CountDownLatch(numProducers);
long delta = iterations / numProducers;
for (int i=0; i<numProducers; i++) {
long start = i*delta;
long end = (i+1) * delta;
if (i+1==numProducers)
end = iterations;
producers[i].reset(latch, buffer, start, end);
}
long tsStart = System.currentTimeMillis();
for (Producer p:producers)
exec.execute(p);
Thread.sleep(10);
workPool.start(exec);
workPool.drainAndHalt();
latch.await();
long tsEnd = System.currentTimeMillis();
// verify that the values do match up
long count = 0;
long value = 0;
for (Consumer c:consumers) {
c.halt();
count += c.getCount();
value += c.getValue();
}
System.out.println(count);
if (count!=iterations || value!=expectedResult)
throw new IllegalStateException("This is bad !");
return tsEnd - tsStart;
}
public static void main(String[] args) throws Exception {
// System.in.read();
new PerfTest(new Integer(args[0]), new
Integer(args[1])).benchmark();
System.exit(0);
}
private static class Holder<T> {
private T value;
public void setValue(T value) {
this.value = value;
}
public T getValue() {
return value;
}
public final static EventFactory<Holder<Long>> EVENT_FACTORY =
new EventFactory<Holder<Long>>() {
public Holder<Long> newInstance() {
return new Holder<Long>();
}
};
}
private static class Consumer implements WorkHandler<Holder<Long>>,
Runnable {
private Queue<Long> queue;
private boolean run;
private long count;
private long value;
@Override
public void run() {
long missed = 0;
run = true;
for (;;) {
Long l = queue.poll();
if (null!=l) {
count++;
value += l;
} else {
if (!run)
break;
missed++;
Thread.yield();
}
}
System.out.println("missed: "+missed);
}
@Override
public void onEvent(Holder<Long> event) throws Exception {
count++;
value += event.getValue();
}
public void reset(Queue<Long> q) {
this.queue = q;
this.count = 0;
this.value = 0;
}
public long getValue() {
return value;
}
public long getCount() {
return count;
}
public void halt() {
this.run = false;
}
}
public static long accumulatedAddition(final long iterations)
{
long temp = 0L;
for (long i = 0L; i < iterations; i++)
{
temp += i;
}
return temp;
}
private static class Producer implements Runnable {
private CountDownLatch latch;
private Queue<Long> queue;
private RingBuffer<Holder<Long>> buffer;
private long start;
private long end;
public void reset(CountDownLatch latch, Queue<Long> q, long start,
long end) {
this.queue = q;
this.buffer = null;
this.start = start;
this.end = end;
this.latch = latch;
}
public void reset(CountDownLatch latch, RingBuffer<Holder<Long>> wp,
long start, long end) {
this.queue = null;
this.buffer = wp;
this.start = start;
this.end = end;
this.latch = latch;
}
@Override
public void run() {
if (null!=queue) {
for (long i=start; i<end; i++) {
queue.offer(new Long(i));
}
} else if (null!=buffer) {
for (long i=start; i<end; i++) {
long sequence = buffer.next();
buffer.get(sequence).setValue(new Long(i));
buffer.publish(sequence);
}
}
latch.countDown();
}
}
private static class Result {
private long tsQ;
private long tsB;
public void setTsB(long tsB) {
this.tsB = tsB;
}
public void setTsQ(long tsQ) {
this.tsQ = tsQ;
}
@Override
public String toString() {
return "Queue: "+tsQ+"ms, RingBuffer: "+tsB+"ms, delta:"+(tsQ-tsB)
+"ms";
}
}
private ExceptionHandler exceptionHandler = new ExceptionHandler() {
@Override
public void handleOnStartException(Throwable e) {
e.printStackTrace();
}
@Override
public void handleOnShutdownException(Throwable e) {
e.printStackTrace();
}
@Override
public void handleEventException(Throwable e, long sequence, Object
event) {
e.printStackTrace();
}
};
public static class PaddedConcurrentLinkedQueue<E> extends
AbstractQueue<E> implements Queue<E> {
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
Node(E item) {
// Piggyback on imminent casNext()
lazySetItem(item);
}
E getItem() {
return item;
}
boolean casItem(E cmp, E val) {
return itemUpdater.compareAndSet(this, cmp, val);
}
void setItem(E val) {
item = val;
}
void lazySetItem(E val) {
itemUpdater.lazySet(this, val);
}
void lazySetNext(Node<E> val) {
nextUpdater.lazySet(this, val);
}
Node<E> getNext() {
return next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
// Unsafe mechanics
static AtomicReferenceFieldUpdater<Node, Node> nextUpdater =
AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class,
"next");
static AtomicReferenceFieldUpdater<Node, Object> itemUpdater =
AtomicReferenceFieldUpdater.newUpdater(Node.class, Object.class,
"item");
}
private transient volatile Node<E> head;
private long p01,p02,p03,p04,p05,p06,p07,p08;
private long p11,p12,p13,p14,p15,p16,p17,p18;
private long p21,p22,p23,p24,p25,p26,p27,p28;
private long p31,p32,p33,p34,p35,p36,p37,p38;
private transient volatile Node<E> tail;
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially
empty.
*/
public PaddedConcurrentLinkedQueue() {
head = new Node<E>(null);
tail = head;
}
// Have to override just to update the javadoc
/**
* Inserts the specified element at the tail of this queue.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
/**
* We don't bother to update head or tail pointers if fewer than
* HOPS links from "true" location. We assume that volatile
* writes are significantly more expensive than volatile reads.
*/
private static final int HOPS = 1;
/**
* Try to CAS head to p. If successful, repoint old head to
itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
/**
* Returns the successor of p, or the head node if p.next has
been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) ? head : next;
}
/**
* Inserts the specified element at the tail of this queue.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
retry:
for (;;) {
Node<E> t = tail;
Node<E> p = t;
for (int hops = 0; ; hops++) {
Node<E> next = succ(p);
if (next != null) {
if (hops > HOPS && t != tail)
continue retry;
p = next;
} else if (p.casNext(null, n)) {
if (hops >= HOPS)
casTail(t, n); // Failure is OK.
return true;
} else {
p = succ(p);
}
}
}
}
public E poll() {
Node<E> h = head;
Node<E> p = h;
for (int hops = 0; ; hops++) {
E item = p.getItem();
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
Node<E> next = succ(p);
if (next == null) {
updateHead(h, p);
break;
}
p = next;
}
return null;
}
/**
* Returns the first live (non-deleted) node on list, or null if
none.
* This is yet another variant of poll/peek; here returning the
* first node, not element. We could make peek() a wrapper around
* first(), but that would cost an extra volatile read of item,
* and the need to add a retry loop to deal with the possibility
* of losing a race to a concurrent poll().
*/
Node<E> first() {
Node<E> h = head;
Node<E> p = h;
Node<E> result;
for (;;) {
E item = p.getItem();
if (item != null) {
result = p;
break;
}
Node<E> next = succ(p);
if (next == null) {
result = null;
break;
}
p = next;
}
updateHead(h, p);
return result;
}
/**
* Returns {@code true} if this queue contains no elements.
*
* @return {@code true} if this queue contains no elements
*/
public boolean isEmpty() {
return first() == null;
}
/**
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
*
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
}
}
return count;
}
// Unsafe mechanics
static AtomicReferenceFieldUpdater<PaddedConcurrentLinkedQueue,
Node> tailUpdater =
AtomicReferenceFieldUpdater.newUpdater(PaddedConcurrentLinkedQueue.class,
Node.class, "tail");
static AtomicReferenceFieldUpdater<PaddedConcurrentLinkedQueue,
Node> headUpdater =
AtomicReferenceFieldUpdater.newUpdater(PaddedConcurrentLinkedQueue.class,
Node.class, "head");
private boolean casTail(Node<E> cmp, Node<E> val) {
return tailUpdater.compareAndSet(this, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return headUpdater.compareAndSet(this, cmp, val);
}
private void lazySetHead(Node<E> val) {
headUpdater.lazySet(this, val);
}
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.getItem();
if (item != null &&
o.equals(item))
return true;
}
return false;
}
public E peek() {
Node<E> h = head;
Node<E> p = h;
E item;
for (;;) {
item = p.getItem();
if (item != null)
break;
Node<E> next = succ(p);
if (next == null) {
break;
}
p = next;
}
updateHead(h, p);
return item;
}
public Object[] toArray() {
// Use ArrayList to deal with resizing.
ArrayList<E> al = new ArrayList<E>();
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.getItem();
if (item != null)
al.add(item);
}
return al.toArray();
}
public <T> T[] toArray(T[] a) {
// try to use sent-in array
int k = 0;
Node<E> p;
for (p = first(); p != null && k < a.length; p = succ(p)) {
E item = p.getItem();
if (item != null)
a[k++] = (T)item;
}
if (p == null) {
if (k < a.length)
a[k] = null;
return a;
}
// If won't fit, use ArrayList version
ArrayList<E> al = new ArrayList<E>();
for (Node<E> q = first(); q != null; q = succ(q)) {
E item = q.getItem();
if (item != null)
al.add(item);
}
return al.toArray(a);
}
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/**
* Next node to return item for.
*/
private Node<E> nextNode;
/**
* nextItem holds on to item fields because once we claim
* that an element exists in hasNext(), we must return it in
* the following next() call even if it was in the process of
* being removed when hasNext() was called.
*/
private E nextItem;
/**
* Node of the last returned item, to support remove.
*/
private Node<E> lastRet;
Itr() {
advance();
}
/**
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
private E advance() {
lastRet = nextNode;
E x = nextItem;
Node<E> pred, p;
if (nextNode == null) {
p = first();
pred = null;
} else {
pred = nextNode;
p = succ(nextNode);
}
for (;;) {
if (p == null) {
nextNode = null;
nextItem = null;
return x;
}
E item = p.getItem();
if (item != null) {
nextNode = p;
nextItem = item;
return x;
} else {
// skip over nulls
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
p = next;
}
}
}
public boolean hasNext() {
return nextNode != null;
}
public E next() {
if (nextNode == null) throw new NoSuchElementException();
return advance();
}
public void remove() {
Node<E> l = lastRet;
if (l == null) throw new IllegalStateException();
// rely on a future traversal to relink.
l.setItem(null);
lastRet = null;
}
}
}
}
Thanks for the post and benchmark and sorry about the delayed reply.
I've been looking into your code and you may have uncovered an issue.
I am still investigating, I think it is specific to the WorkerPool
implementation.
Mike.
I've spotted a number of changes that can be made to your benchmark to
make it work correctly. Some of the WorkerPool construction idioms
can be confusing though.
First remove the RingBuffer field from the top of the class and
construction the worker pool using the event factory based
constructor, this will create the RingBuffer internally.
workPool = new WorkerPool<Holder<Long>>(Holder.EVENT_FACTORY, claim,
wait, exceptionHandler, consumers);
In the runDis() method you can get to the ring buffer when you start
the workProcessor
RingBuffer<Holder<Long>> buffer = workPool.start();
You need to do this before starting the producers. You also need to
change the order of a couple of the calls in the runDis() method:
workPool.start(exec);
latch.await(); // Wait for the latch before draining and halting the pool
workPool.drainAndHalt();
Mike.