SPSC Optimized Queue

196 views
Skip to first unread message

damageboy

unread,
Oct 5, 2009, 2:12:09 PM10/5/09
to Scalable Synchronization Algorithms
I'm sure that most of you picked up on the article by Joe Duffy:
http://www.bluebytesoftware.com/blog/CommentView,guid,3740daff-a459-4298-bc9b-65d3647f5c0d.aspx

I know that Dimitry posted his rather technical comment and the bottom
of the article.
I was wondering of anyone could help with deciphering the comment /
improving the code so it would run even faster (Although still being
implemented in .NET/C#)

I personally do have a situation where I have a SPSC Queue like Joe
presented (Similar in implementation)
And I would love to be able to shave a few dozens of cycles from my
Queue.

Thanks in advance!

Dmitriy V'jukov

unread,
Oct 6, 2009, 12:33:01 PM10/6/09
to Scalable Synchronization Algorithms
On Oct 5, 11:12 am, damageboy <dan.shech...@gmail.com> wrote:

Hi damageboy,

I've implemented a queue with the improvements I was talking about.
The speedup is up to ~12x depending on hardware.

I had to hardcode queue capacity (1024) and element type (int),
because on my queue integer division (position % Capacity) and calls
to IComparable<T> take most of the run time, so it was senseless to
measure synchronization overheads when most of the run time is taken
by other operations.

I use following code for testing:

class Program
{
static void ThreadFunc(Queue q1, Queue q2)
{
for (int i = 0; i != 30000; i += 1)
{
for (int j = 0; j != 1000; j += 1)
{
q1.Enqueue(j + 1);
}
for (int j = 0; j != 1000; j += 1)
{
int data = q2.Dequeue();
}
}
}

static void Main(string[] args)
{
for (int i = 0; i != 10; i += 1)
{
Queue q1 = new Queue();
Queue q2 = new Queue();
Thread thread1 = new Thread((ThreadStart)delegate
{ Program.ThreadFunc(q1, q2); });
Thread thread2 = new Thread((ThreadStart)delegate
{ Program.ThreadFunc(q2, q1); });
DateTime start = DateTime.Now;
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
TimeSpan time = DateTime.Now.Subtract(start);
Console.WriteLine("TIME=" + time.TotalMilliseconds);
}
}
}

Results on Intel Core2Duo P9500:
Joe's queue:
TIME=2648
TIME=4647
TIME=2853
TIME=3392
TIME=2726
TIME=2624
TIME=2824
TIME=3277
TIME=2545
TIME=3378

Dmitry's queue:
TIME=359
TIME=359
TIME=341
TIME=356
TIME=347
TIME=341
TIME=385
TIME=351
TIME=352
TIME=350

Results on Intel Core2Quad Q6600 (cores 0 & 1 used):
Joe's queue:
TIME=2812
TIME=3203
TIME=2859
TIME=3421
TIME=2796
TIME=3187
TIME=3218
TIME=3187
TIME=3109
TIME=3046

Dmitry's queue:
TIME=453
TIME=453
TIME=468
TIME=640
TIME=453
TIME=453
TIME=453
TIME=453
TIME=453
TIME=453

Results on Intel Core2Quad Q6600 (cores 0 & 2 used):
Joe's queue:
TIME=8921
TIME=13078
TIME=8375
TIME=9187
TIME=8031
TIME=10515
TIME=8390
TIME=7328
TIME=9000
TIME=11546

Dmitry's queue:
TIME=828
TIME=937
TIME=812
TIME=812
TIME=796
TIME=937
TIME=875
TIME=859
TIME=875
TIME=812

So enqueue/dequeue operation takes 110-520 cycles on average for Joe's
queue, and 14-38 cycles for my queue.
I guess that for my queue cycle and function call overhead:
for (int j = 0; j != 1000; j += 1)
{
q1.Enqueue(j + 1);
}
takes significant part of the run time. So I think overhead of actual
operations is about 10 cycles.


Also I've run 2 tests in parallel on quad-core Q6600, first test run
on cores 1 and 3, and second run on cores 2 and 4. This setup yields
the most difference between Joe's and my queue, this means that Joe's
queue badly saturates cache coherency interconnects leaving less
bandwidth for other parallel activities in real program. The results
are:
Joe's queue:
TIME=11578
TIME=11546
TIME=17468
TIME=11984
TIME=13640
TIME=13343
[I interrupt the test at this point]

Dmitry's queue:
TIME=968
TIME=1203
TIME=1078
TIME=1125
TIME=1015
TIME=1125
TIME=1015
TIME=1109
TIME=1125
TIME=1078

Ok, here is Joe's queue implementation I used for measurements:

class Queue
{
private int [] m_buffer;
private volatile int m_consumerIndex;
private volatile int m_consumerWaiting;
private AutoResetEvent m_consumerEvent;
private volatile int m_producerIndex;
private volatile int m_producerWaiting;
private AutoResetEvent m_producerEvent;

public Queue()
{
m_buffer = new int [1024];
m_consumerEvent = new AutoResetEvent(false);
m_producerEvent = new AutoResetEvent(false);
}

public void Enqueue(int value)
{
if ((((m_producerIndex + 1) % 1024) == m_consumerIndex))
WaitUntilNonFull();

m_buffer[m_producerIndex] = value;
Interlocked.Exchange(ref m_producerIndex, (m_producerIndex + 1) %
1024);

if (m_consumerWaiting == 1)
m_consumerEvent.Set();
}

private void WaitUntilNonFull()
{
for (int i = 0; i != 1000; i += 1)
{
if ((((m_producerIndex + 1) % 1024) != m_consumerIndex))
return;
}

Interlocked.Exchange(ref m_producerWaiting, 1);
try
{
while ((((m_producerIndex + 1) % 1024) == m_consumerIndex))
m_producerEvent.WaitOne();
}
finally
{
m_producerWaiting = 0;
}
}

public int Dequeue()
{
if (m_consumerIndex == m_producerIndex)
WaitUntilNonEmpty();

int value = m_buffer[m_consumerIndex];
m_buffer[m_consumerIndex] = 0;
Interlocked.Exchange(ref m_consumerIndex, (m_consumerIndex + 1) %
1024);

if (m_producerWaiting == 1)
m_producerEvent.Set();
return value;
}

private void WaitUntilNonEmpty()
{
for (int i = 0; i != 1000; i += 1)
{
if (m_consumerIndex != m_producerIndex)
return;
}

Interlocked.Exchange(ref m_consumerWaiting, 1);
try
{
while (m_consumerIndex == m_producerIndex)
m_consumerEvent.WaitOne();
}
finally
{
m_consumerWaiting = 0;
}
}
}


And here is my optimized queue.
Main optimization I've already described in Joe's blog:
1. Since I use spin-waiting, I replace Interlocked.Exchange with
volatile store.
2. Consumer and producer do not read each other indexes, instead they
rely on cell value to determine as to whether it's already consumed/
produced.
You may notice these ugly w1, w2, p1, ... members - it's cache line
padding, I don't know what is the best way to express them in C# (you
may experiment with the implementation by removing some/all of the
paddings).


class Queue
{
private int w1, w2, w3, w4, w5, w6, w7, w8, w9, w10, w11, w12, w13,
w14, w15, w16;
private volatile int [] m_buffer;
private int p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13,
p14, p15, p16;
private int m_consumerIndex;
private int m1, m2, m3, m4, m5, m6, m7, m8, m9, m10, m11, m12, m13,
m14, m15, m16;
private int m_producerIndex;
private int z1, z2, z3, z4, z5, z6, z7, z8, z9, z10, z11, z12, z13,
z14, z15, z16;

public Queue()
{
m_buffer = new int [1024];
}

public void Enqueue(int value)
{
if (m_buffer[m_producerIndex] != 0)
WaitFull();
m_buffer[m_producerIndex] = value;
m_producerIndex = (m_producerIndex + 1) % 1024;
}

public int Dequeue()
{
int value = m_buffer[m_consumerIndex];
if (value == 0)
value = WaitEmpty();
m_buffer[m_consumerIndex] = 0;
m_consumerIndex = (m_consumerIndex + 1) % 1024;
return value;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private void WaitFull()
{
int count = 0;
for (; ; )
{
if (m_buffer[m_producerIndex] == 0)
break;
if (count < 1000)
count += 1;
else
Thread.Sleep(0);
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
private int WaitEmpty()
{
int count = 0;
for (; ; )
{
if (m_buffer[m_consumerIndex] != 0)
break;
if (count < 1000)
count += 1;
else
Thread.Sleep(0);
}
return m_buffer[m_consumerIndex];
}
}

Will be glad to hear your feedback on this.

--
Dmitriy V'jukov
Message has been deleted

Chae Lim

unread,
Oct 6, 2009, 3:12:33 PM10/6/09
to lock...@googlegroups.com
Good work Dmitriy.

I don't see any big issue and yours better in the performance perspective.
Two minor things I noticed, one is zero value is special and seems to be
avoided using as a real value. Second, I'm kind of thinking if back-off
waiting is a good idea or not inside the two Wait functions when the wait
condition couldn’t be met for a while. I mean your implementation definitely
faster than Joe's but can use more CPU time especially producer and/or
consumer doing not just running Enqueue/Dequeue hard loop but also doing
other tasks or waiting something else.

P.S. I've been really enjoying all your posts and would like to express my
appreciation.

Dmitriy V'jukov

unread,
Oct 7, 2009, 2:40:14 AM10/7/09
to Scalable Synchronization Algorithms
On Oct 6, 12:12 pm, "Chae Lim" <chae...@gmail.com> wrote:
> Good work Dmitriy.

Hi Chae Lim,
Thank you.

> I don't see any big issue and yours better in the performance perspective.
> Two minor things I noticed, one is zero value is special and seems to be
> avoided using as a real value.


Yes, it is.
If it is a problem, then separate flag can be used:

struct X
{
volatile bool empty;
int data1;
int data2;
}


> Second, I'm kind of thinking if back-off
> waiting is a good idea or not inside the two Wait functions when the wait
> condition couldn’t be met for a while. I mean your implementation definitely
> faster than Joe's but can use more CPU time especially producer and/or
> consumer doing not just running Enqueue/Dequeue hard loop but also doing
> other tasks or waiting something else.


Well, it depends.
First, if producer-consumer queues are empty then application is
probably idling, so spin-waiting burns otherwise unused cycles. It's
not always the case, though.
I understand that blocking is preferable in general case, but as
always it's a question of trade-offs.
The easiest way to patch waiting is to apply more aggressive back-off.
Something like:
1000 cycles of active spin waiting
10 times Sleep(0)
Sleep(1)
Sleep(3)
Sleep(5)
Sleep(10)
Sleep(10)
Sleep(10)
...


More interesting way to fix that is to use asymmetric eventcount:
http://groups.google.com/group/lock-free/browse_frm/thread/31f07e15df7f988e
The essence of Dekker/Peterson synchronization is the following
pattern:
thread1:
flag1 = 1;
full_memory_fence();
if (flag2) {...}

thread2:
flag2 = 1;
full_memory_fence();
if (flag1) {...}

This is basically what is used in Joe's queue for blocking/signaling.
Presence of full_memory_fence() (which is expressed as
Interlocked.Exchange in Joe's code) guarantees than either thread1
will see flag2=1 or thread2 will see flag1=1 or both. However
full_memory_fence() is somehow expensive.
Asymmetric synchronization pattern basically allows us to remove
full_memory_fence() from one thread, by the cost of significantly more
expensive fence in other thread:

thread1:
flag1 = 1;
// nothing here
if (flag2) {...}

thread2:
flag2 = 1;
system_wide_fence(); // very expensive
if (flag1) {...}

But since blocking/signaling is involved ONLY when queue is empty/full
(WaitUntilNonEmpty()/WaitUntilNonFull(), expected to be slow-path,
i.e. rare), it's Ok to make one path much heavier.

Asymmetric blocking/signaling is a win-win solution, it will allow to
make fast-path fast (remove full fence from Enqueue/Dequeue
operations) and at the same time use kernel blocking (instead of
spinning).

However the main problem is how to implement system_wide_fence().
On post-Vista systems FlushProcessorWriteBuffers() function can be
used as system_wide_fence().

--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Oct 7, 2009, 2:45:17 AM10/7/09
to Scalable Synchronization Algorithms
On Oct 6, 11:40 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:

> > Second, I'm kind of thinking if back-off
> > waiting is a good idea or not inside the two Wait functions when the wait
> > condition couldn’t be met for a while. I mean your implementation definitely
> > faster than Joe's but can use more CPU time especially producer and/or
> > consumer doing not just running Enqueue/Dequeue hard loop but also doing
> > other tasks or waiting something else.
>
> Well, it depends.

The point is that it's worth tackling. When I insert
Interlocked.Exchange into Enqueue/Dequeue functions of my queue:

public void Enqueue(int value)
{
if (m_buffer[m_producerIndex] != 0)
WaitFull();
Interlocked.Exchange(ref m_buffer[m_producerIndex],
value);
m_producerIndex = (m_producerIndex + 1) % 1024;
}

public int Dequeue()
{
int value = m_buffer[m_consumerIndex];
if (value == 0)
value = WaitEmpty();
Interlocked.Exchange(ref m_buffer[m_consumerIndex], 0);
m_consumerIndex = (m_consumerIndex + 1) % 1024;
return value;
}

execution time instantly jumps from 340 ms to 1600 ms.



--
Dmitriy V'jukov

James Michels

unread,
Oct 12, 2009, 9:22:02 AM10/12/09
to Scalable Synchronization Algorithms
Why are you marking the methods as NoInlining?

I get everything else, but clearly I am missing something there.

Thanks
Jim

Dmitriy V'jukov

unread,
Oct 12, 2009, 9:34:03 AM10/12/09
to Scalable Synchronization Algorithms
On Oct 12, 5:22 pm, James Michels <james.p.mich...@gmail.com> wrote:
> Why are you marking the methods as NoInlining?
>
> I get everything else, but clearly I am missing something there.

Hi Jim,

It has nothing to do with correctness, you can safely ignore them.
They are used solely for optimization purposes. It's a habit from
native world, probably one better does not use them in managed world
at all. However the idea is as follows. Those methods are considered
as slow/cold-path, i.e. executed rarely. Consider:

//pseudo-code
void enqueue()
{
...
enqueue_slow();
...
}

[NoInline]
void enqueue_slow()
{
...
}

for (int i = 0; i != 1000; i += 1)
queue.enqueue(i);

Assume that compiler considers possibilities for inlining. It has 4
options:
1. inline nothing
2. inline enqueue
3. inline enqueue_slow
4. inline enqueue and enqueue_slow

We can do nothing with 1, probably some inlining limit is reached.
We do not care about 4, all is inlined anyway.
However 2 is preferable to 3, and that's what I am trying to achieve
with NoInline.


--
Dmitry Vyukov
Reply all
Reply to author
Forward
0 new messages