On Oct 5, 11:12 am, damageboy <
dan.shech...@gmail.com> wrote:
Hi damageboy,
I've implemented a queue with the improvements I was talking about.
The speedup is up to ~12x depending on hardware.
I had to hardcode queue capacity (1024) and element type (int),
because on my queue integer division (position % Capacity) and calls
to IComparable<T> take most of the run time, so it was senseless to
measure synchronization overheads when most of the run time is taken
by other operations.
I use following code for testing:
class Program
{
static void ThreadFunc(Queue q1, Queue q2)
{
for (int i = 0; i != 30000; i += 1)
{
for (int j = 0; j != 1000; j += 1)
{
q1.Enqueue(j + 1);
}
for (int j = 0; j != 1000; j += 1)
{
int data = q2.Dequeue();
}
}
}
static void Main(string[] args)
{
for (int i = 0; i != 10; i += 1)
{
Queue q1 = new Queue();
Queue q2 = new Queue();
Thread thread1 = new Thread((ThreadStart)delegate
{ Program.ThreadFunc(q1, q2); });
Thread thread2 = new Thread((ThreadStart)delegate
{ Program.ThreadFunc(q2, q1); });
DateTime start = DateTime.Now;
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
TimeSpan time = DateTime.Now.Subtract(start);
Console.WriteLine("TIME=" + time.TotalMilliseconds);
}
}
}
Results on Intel Core2Duo P9500:
Joe's queue:
TIME=2648
TIME=4647
TIME=2853
TIME=3392
TIME=2726
TIME=2624
TIME=2824
TIME=3277
TIME=2545
TIME=3378
Dmitry's queue:
TIME=359
TIME=359
TIME=341
TIME=356
TIME=347
TIME=341
TIME=385
TIME=351
TIME=352
TIME=350
Results on Intel Core2Quad Q6600 (cores 0 & 1 used):
Joe's queue:
TIME=2812
TIME=3203
TIME=2859
TIME=3421
TIME=2796
TIME=3187
TIME=3218
TIME=3187
TIME=3109
TIME=3046
Dmitry's queue:
TIME=453
TIME=453
TIME=468
TIME=640
TIME=453
TIME=453
TIME=453
TIME=453
TIME=453
TIME=453
Results on Intel Core2Quad Q6600 (cores 0 & 2 used):
Joe's queue:
TIME=8921
TIME=13078
TIME=8375
TIME=9187
TIME=8031
TIME=10515
TIME=8390
TIME=7328
TIME=9000
TIME=11546
Dmitry's queue:
TIME=828
TIME=937
TIME=812
TIME=812
TIME=796
TIME=937
TIME=875
TIME=859
TIME=875
TIME=812
So enqueue/dequeue operation takes 110-520 cycles on average for Joe's
queue, and 14-38 cycles for my queue.
I guess that for my queue cycle and function call overhead:
for (int j = 0; j != 1000; j += 1)
{
q1.Enqueue(j + 1);
}
takes significant part of the run time. So I think overhead of actual
operations is about 10 cycles.
Also I've run 2 tests in parallel on quad-core Q6600, first test run
on cores 1 and 3, and second run on cores 2 and 4. This setup yields
the most difference between Joe's and my queue, this means that Joe's
queue badly saturates cache coherency interconnects leaving less
bandwidth for other parallel activities in real program. The results
are:
Joe's queue:
TIME=11578
TIME=11546
TIME=17468
TIME=11984
TIME=13640
TIME=13343
[I interrupt the test at this point]
Dmitry's queue:
TIME=968
TIME=1203
TIME=1078
TIME=1125
TIME=1015
TIME=1125
TIME=1015
TIME=1109
TIME=1125
TIME=1078
Ok, here is Joe's queue implementation I used for measurements:
class Queue
{
private int [] m_buffer;
private volatile int m_consumerIndex;
private volatile int m_consumerWaiting;
private AutoResetEvent m_consumerEvent;
private volatile int m_producerIndex;
private volatile int m_producerWaiting;
private AutoResetEvent m_producerEvent;
public Queue()
{
m_buffer = new int [1024];
m_consumerEvent = new AutoResetEvent(false);
m_producerEvent = new AutoResetEvent(false);
}
public void Enqueue(int value)
{
if ((((m_producerIndex + 1) % 1024) == m_consumerIndex))
WaitUntilNonFull();
m_buffer[m_producerIndex] = value;
Interlocked.Exchange(ref m_producerIndex, (m_producerIndex + 1) %
1024);
if (m_consumerWaiting == 1)
m_consumerEvent.Set();
}
private void WaitUntilNonFull()
{
for (int i = 0; i != 1000; i += 1)
{
if ((((m_producerIndex + 1) % 1024) != m_consumerIndex))
return;
}
Interlocked.Exchange(ref m_producerWaiting, 1);
try
{
while ((((m_producerIndex + 1) % 1024) == m_consumerIndex))
m_producerEvent.WaitOne();
}
finally
{
m_producerWaiting = 0;
}
}
public int Dequeue()
{
if (m_consumerIndex == m_producerIndex)
WaitUntilNonEmpty();
int value = m_buffer[m_consumerIndex];
m_buffer[m_consumerIndex] = 0;
Interlocked.Exchange(ref m_consumerIndex, (m_consumerIndex + 1) %
1024);
if (m_producerWaiting == 1)
m_producerEvent.Set();
return value;
}
private void WaitUntilNonEmpty()
{
for (int i = 0; i != 1000; i += 1)
{
if (m_consumerIndex != m_producerIndex)
return;
}
Interlocked.Exchange(ref m_consumerWaiting, 1);
try
{
while (m_consumerIndex == m_producerIndex)
m_consumerEvent.WaitOne();
}
finally
{
m_consumerWaiting = 0;
}
}
}
And here is my optimized queue.
Main optimization I've already described in Joe's blog:
1. Since I use spin-waiting, I replace Interlocked.Exchange with
volatile store.
2. Consumer and producer do not read each other indexes, instead they
rely on cell value to determine as to whether it's already consumed/
produced.
You may notice these ugly w1, w2, p1, ... members - it's cache line
padding, I don't know what is the best way to express them in C# (you
may experiment with the implementation by removing some/all of the
paddings).
class Queue
{
private int w1, w2, w3, w4, w5, w6, w7, w8, w9, w10, w11, w12, w13,
w14, w15, w16;
private volatile int [] m_buffer;
private int p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
p14, p15, p16;
private int m_consumerIndex;
private int m1, m2, m3, m4, m5, m6, m7, m8, m9, m10, m11, m12, m13,
m14, m15, m16;
private int m_producerIndex;
private int z1, z2, z3, z4, z5, z6, z7, z8, z9, z10, z11, z12, z13,
z14, z15, z16;
public Queue()
{
m_buffer = new int [1024];
}
public void Enqueue(int value)
{
if (m_buffer[m_producerIndex] != 0)
WaitFull();
m_buffer[m_producerIndex] = value;
m_producerIndex = (m_producerIndex + 1) % 1024;
}
public int Dequeue()
{
int value = m_buffer[m_consumerIndex];
if (value == 0)
value = WaitEmpty();
m_buffer[m_consumerIndex] = 0;
m_consumerIndex = (m_consumerIndex + 1) % 1024;
return value;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void WaitFull()
{
int count = 0;
for (; ; )
{
if (m_buffer[m_producerIndex] == 0)
break;
if (count < 1000)
count += 1;
else
Thread.Sleep(0);
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private int WaitEmpty()
{
int count = 0;
for (; ; )
{
if (m_buffer[m_consumerIndex] != 0)
break;
if (count < 1000)
count += 1;
else
Thread.Sleep(0);
}
return m_buffer[m_consumerIndex];
}
}
Will be glad to hear your feedback on this.
--
Dmitriy V'jukov