two process producer-consumer low latency example?

100 views
Skip to first unread message

Richard Easterling

unread,
May 19, 2016, 12:42:29 PM5/19/16
to Chronicle
I'm running a two process latency test through an IndexedChronicle and haven't been able to see anything better than 1ms on the ExcerptTailer end. Is there a low latency 
ExcerptAppender-ExcerptTailer Chronicle example available that I could use as a performance baseline?

I'm running on a 4 CPU, 2.6Ghz E5-2697 Windows box. My setup is fairly strait forward writing quotes with a small delay (i.e.Thread.sleep(0, 1)) from the first process, while the other process polls. The system time (System.nanoTime()) is passed in the quote.

Peter Lawrey

unread,
May 19, 2016, 2:58:37 PM5/19/16
to java-ch...@googlegroups.com

Thread.sleep() always waits at least 1 ms.

I suggest writing the nanoTime in the message and reading it for comparison on read.

--
You received this message because you are subscribed to the Google Groups "Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Richard Easterling

unread,
May 20, 2016, 5:51:35 PM5/20/16
to Chronicle
I went ahead and wrote my own test, which I've included below. If you wish to incorporate it in your code base, have at it. The numbers I'm seeing are closer to what we were hoping for (~10µs) at least on my MacBook Pro. If you see anything obvious I could do to lower the latency, I'm all ears.  Thanks for all you're contributions!

latency test for 10,000,000 messages on Windows: 
< 10ns: 15
< 100ns: 0
< 1,000ns: 35,268
< 10,000ns: 305,732
< 100,000ns: 1,593,504
< 1,000,000ns: 2,754,494
< 10,000,000ns: 2,516,860
< 100,000,000ns: 2,794,127
< 1,000,000,000ns: 0
> 1,000,000,000ns: 0
runtime: 5,987,724,487ns


latency test for 10,000,000 messages on MacBook Pro
< 10ns: 0
< 100ns: 9
< 1,000ns: 78,247
< 10,000ns: 5,502,481
< 100,000ns: 3,480,166
< 1,000,000ns: 939,096
< 10,000,000ns: 0
< 100,000,000ns: 1
< 1,000,000,000ns: 0
> 1,000,000,000ns: 0
runtime: 1,521,867,541ns


package com.pva.chronicle;

 


 


 

import java.io.File;

import java.io.IOException;

import net.openhft.chronicle.Chronicle;

import net.openhft.chronicle.ChronicleQueueBuilder;

import net.openhft.chronicle.ExcerptAppender;

import net.openhft.chronicle.ExcerptTailer;

import net.openhft.lang.io.Bytes;

import net.openhft.lang.io.serialization.BytesMarshallable;


 

/**

 * Test Chronicle Queue latency on two threaded producer and consumer.

 *  

 * @author richard easterling

 */


public class ChronicleLatencyTest  

{

 
static File queuePath = new File(System.getProperty("java.io.tmpdir")+"/temp-latency-test-"+System.nanoTime());

   

 
static int MESSAGE_COUNT = 100_000_000; //Integer.MAX_VALUE;  

   

 
public static void main(String[] args) throws InterruptedException, IOException  

 
{

 
System.out.println(String.format("latency test for %,d messages: %s\n",MESSAGE_COUNT, queuePath));

 
long start = System.nanoTime();

 
try {

 
Thread t1 = new Thread(new Consumer()); t1.start();

 
Thread t2 = new Thread(new Producer()); t2.start();

 t1
.join(); t2.join();

 
System.out.println(String.format("runtime: %,dns", System.nanoTime() - start));

 
} finally {

 
new File(queuePath.toString()+".index").delete();

 
new File(queuePath.toString()+".data").delete();

 
}

 
}

 

 
static class Producer implements Runnable

 
{

 
@Override public void run()  

 
{

 
Message msg = new Message();

 
int countDown = MESSAGE_COUNT;

 
try (Chronicle chronicle = ChronicleQueueBuilder.indexed(queuePath).build(); ) {

 
ExcerptAppender appender = chronicle.createAppender();

 
while(countDown-- > 0) {

 msg
.setCount(countDown);

 
final long t = System.nanoTime();

 msg
.setQueueInsertTimestamp(t);  

 msg
.writeQueue(appender);

 
}

 
} catch (IOException e) { e.printStackTrace(); }

 
}

 
}

 

 
static class Consumer implements Runnable

 
{

 
static final long ns = 1, µs = 1_000, ms = 1_000_000, s = 1_000_000_000;

 
@Override public void run()

 
{

 
final Bucket bucket = new Bucket(10*ns, 100*ns, 1s, 10s, 100s, 1*ms, 10*ms, 100*ms, 1*s);

 
Message msg = new Message();

 
int countDown = Integer.MAX_VALUE;

 
try ( Chronicle chronicle = ChronicleQueueBuilder.indexed(queuePath).build(); ) {

 
ExcerptTailer trailer = chronicle.createTailer();

 
while(countDown > 0) {

 
while(!trailer.nextIndex());

 msg
.readQueue(trailer);

 countDown
= msg.getCount();

 
final long latency = System.nanoTime() - msg.getQueueInsertTimestamp();

 bucket
.inc(latency);

 
}

 
} catch (IOException e) { e.printStackTrace(); }

 
System.out.println(bucket);

 
}

 
}

 

 
static class Message implements BytesMarshallable

 
{

 
private int count;

 
private long queueInsertTimestamp;

 

 
public int getCount() { return count; }

 
public void setCount(int count) { this.count = count; }

 
public void setQueueInsertTimestamp(long queueInsertTimestamp) { this.queueInsertTimestamp = queueInsertTimestamp; }

 
public long getQueueInsertTimestamp() { return queueInsertTimestamp; }


 

 
public void writeQueue(final ExcerptAppender appender)

 
{

         appender
.startExcerpt(8+4);

         writeMarshallable
(appender);

 appender
.finish();

 
}


 

 
public void readQueue(final ExcerptTailer reader)

 
{

 readMarshallable
(reader);

 reader
.finish();

 
}

 

 
@Override public void readMarshallable(final Bytes in) throws IllegalStateException

 
{

 count
= in.readInt();

 queueInsertTimestamp
= in.readLong();

 
}


 

 
@Override public void writeMarshallable(final Bytes out)

 
{

 
out.writeInt(count);

 
out.writeLong(queueInsertTimestamp);

 
}

 
}

 

 
static class Bucket  

 
{

 
final long[] latencies;

 
final int[] counts;

 
public Bucket(long...latencies)

 
{

 
this.latencies = latencies;

 
this.counts = new int[latencies.length+1];

 
}

 
public final void inc(long latency)

 
{

 
for(int i=0;i<latencies.length;i++) {

 
final long cutoff = latencies[i];

 
if (latency < cutoff) {

 counts
[i]++;

 
return;

 
}

 
}

 counts
[latencies.length]++;

 
}

 
public String toString()

 
{

 
StringBuilder sb = new StringBuilder();

 
for(int i=0;i<latencies.length;i++) {

 sb
.append(String.format("< %,dns: %,d\n", latencies[i], counts[i]));

 
}

 sb
.append(String.format("> %,dns: %,d\n", latencies[latencies.length-1], counts[latencies.length]));

 
return sb.toString();

 
}

 
}

}

Richard Easterling

unread,
May 20, 2016, 5:54:36 PM5/20/16
to Chronicle
Attached the compete test source code.


 
while(!trailer.nextIndex());<span style="color: #0
ChronicleLatencyTest.java

Peter Lawrey

unread,
May 21, 2016, 3:36:16 AM5/21/16
to java-ch...@googlegroups.com
When you test latencies you need to consider a specific throughput you are testing for.  I suggest allowing the tests to control the through put and giving you a distribution for that throughput.  To correct for co-ordinated omission, you should take the time not from when the test starts, but when it should have started based on your target throughput.  The simplest option is to busy wait before the test if the calculated time to start the test hasn't been reached.

Gil Tene does an excelent talk about benchamrking and why correcting for co-ordniated omission can make such a diference https://www.youtube.com/watch?v=gQ6LEbjhVr4

If you can add code to test for a throughput without co-ordinated omission you can added it via a pull request.

Regards,
   Peter.

--

Peter Lawrey

unread,
May 21, 2016, 3:37:45 AM5/21/16
to java-ch...@googlegroups.com
I am sorry, that is the wrong link Gil's talk is here https://www.infoq.com/presentations/latency-response-time
Reply all
Reply to author
Forward
0 new messages