Multi-producer/multi-consumer SEH-based queue

686 views
Skip to first unread message

Dmitriy V'jukov

unread,
Jul 30, 2009, 7:51:50 PM7/30/09
to Scalable Synchronization Algorithms
Here is my multi-producer/single-consumer queue:
http://groups.google.ru/group/lock-free/browse_frm/thread/55df71b87acb8201
The interesting part of the algorithm is an XCHG-based producer part.

As Chris Thomasson correctly noted, the XCHG-based producer part can
be combined with the well-known CAS-based consumer part in order to
get multi-producer/multi-consumer (MPMC) queue:
http://groups.google.ru/group/comp.programming.threads/browse_frm/thread/053e322ea90e4ad5

In general, one may combine different producer and consumer parts into
single queue provided that queue structure stays the same. For
example, it's possible to combine the XCHG-based producer part with
consumer part from 2-LOCK queue algorithm. The resulting 1-LOCK/1-XCHG
queue will have quite appealing characteristics (wait-free producers,
and 1 spin-lock acquisition for consumers, no need for ABA prevention
nor safe memory reclamation).

But what I'm going to show is a bit more interesting, it's a novel
consumer part for MPMC queue.
The problem with classical CAS-based MPMC queue is that both producers
and consumers may touch potentially freed/reused memory, moreover
producers may write to that memory. That's why it requires safe
memory reclamation (SMR), and in general SMR is quite problematic in a
non-managed non-kernel environment (C/C++).
XCHG-based producer part gracefully avoids touching freed/reused
memory. So now the problem is with consumer part only, but note that
consumers may only read from freed/reused (no writes to that memory).
The key point of the proposed algorithm is handling of reads from
reused memory with failing CAS, and handling of reads from freed
memory with SEH/signal handler.
Main characteristics of the algorithm:
- intrusive
- producers: 1 XCHG, wait-free
- consumers: 1 CAS on common path, mostly lock-free (***)
- producers and consumers do not content with each other (until queue
is empty)
- no need for safe memory reclamation

(***) requires additional comments. There is a small (1 machine
instruction in length) window of inconsistency for producers. If
producer will be preempted there he may (or may not) cause blocking of
consumers (other producers are still wait-free). If producer will be
terminated there he will cause system-wide stall. Taking into account
length of the window, probability of these things may be considered
negligible in most situations.

The algorithm requires double-word CAS (for pointer + ABA counter). On
64-bit systems it may be reduced to single-word (64-bit) CAS with
pointer packing technique. For example, on Intel64/Windows any aligned
pointer may be packed to 39 bits, this allows for 25-bit ABA counter.

OK, here we go:

/* Multi-producer/multi-consumer queue
* 2009, Dmitriy V'yukov
* Distributed under the terms of the GNU General Public License
* as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later
version.
* See: http://www.gnu.org/licenses
*/

// 32-bit, Windows, MSVC
#include <windows.h>
#include <intrin.h>

class mpmc_queue
{
public:
struct node_t
{
node_t* volatile next_;
};

mpmc_queue()
{
head_.ptr_ = 0;
head_.cnt_ = 0;
tail_ = &head_.ptr_;
}

~mpmc_queue()
{
ASSERT(head_.ptr_ == 0);
ASSERT(tail_ == &head_.ptr_);
}

void enqueue(node_t* node)
{
ASSERT(node);
node->next_ = 0;
node_t** prev = (node_t**)
_InterlockedExchange((long*)&tail_, (long)node);
ASSERT(prev);
// <--- the window of inconsistency is HERE (***)
prev[0] = node;
}

node_t* dequeue()
{
unsigned retry_count = 0;
retry:
__try
{
head_t h;
h.ptr_= head_.ptr_;
h.cnt_ = head_.cnt_;
for (;;)
{
node_t* n = h.ptr_;
if (n == 0)
return 0;
if (n->next_)
{
head_t xchg = {n->next_, h.cnt_ + 1};
__int64 prev_raw =
_InterlockedCompareExchange64
(&head_.whole_, xchg.whole_, h.whole_);
head_t prev = *(head_t*)&prev_raw;
if (*(__int64*)&prev == *(__int64*)&h)
return n;
h.ptr_ = prev.ptr_;
h.cnt_ = prev.cnt_;
}
else
{
node_t* t = (node_t*)tail_;
if (n != t)
{
// spinning here may only be caused
// by producer preempted in (***)
SwitchToThread();
h.ptr_= head_.ptr_;
h.cnt_ = head_.cnt_;
continue;
}
head_t xchg = {0, h.cnt_ + 1};
head_t prev;
prev.whole_ = _InterlockedCompareExchange64
(&head_.whole_, xchg.whole_, h.whole_);
if (prev.whole_ == h.whole_)
{
node_t* prev_tail = (node_t*)
_InterlockedCompareExchange
((long*)&tail_, (long)&head_.ptr_, (long)
n);
if (prev_tail == n)
return n;
// spinning here may only be caused
// by producer preempted in (***)
while (n->next_ == 0)
SwitchToThread();
head_.ptr_ = n->next_;
return n;
}
h.ptr_ = prev.ptr_;
h.cnt_ = prev.cnt_;
}
}
}
__except ((GetExceptionCode() == EXCEPTION_ACCESS_VIOLATION
&& ++retry_count < 64*1024) ?
EXCEPTION_EXECUTE_HANDLER : EXCEPTION_CONTINUE_SEARCH)
{
goto retry;
}
}

private:
union head_t
{
struct
{
node_t* ptr_;
unsigned cnt_;
};
__int64 whole_;
};

head_t volatile head_;
char pad_ [64];
node_t* volatile* volatile tail_;

mpmc_queue(mpmc_queue const&);
mpmc_queue& operator = (mpmc_queue const&);
};

And here is a small test:

/* Multi-producer/multi-consumer queue
* 2009, Dmitriy V'yukov
* Distributed under the terms of the GNU General Public License
* as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later
version.
* See: http://www.gnu.org/licenses
*/

size_t const thread_count = 8;
size_t const batch_size = 32;
size_t const iter_count = 400000;

bool volatile g_start = 0;

struct my_node : mpmc_queue::node_t
{
int data;
char pad [64];
};

unsigned __stdcall thread_func(void* ctx)
{
mpmc_queue& queue = *(mpmc_queue*)ctx;

srand((unsigned)time(0) + GetCurrentThreadId());
size_t pause = rand() % 1000;

my_node* node_cache [batch_size];
for (size_t i = 0; i != batch_size; i += 1)
{
node_cache[i] = new my_node;
node_cache[i]->data = i;
}

while (g_start == 0)
SwitchToThread();

for (size_t i = 0; i != pause; i += 1)
_mm_pause();

for (int iter = 0; iter != iter_count; ++iter)
{
for (size_t i = 0; i != batch_size; i += 1)
{
queue.enqueue(node_cache[i]);
}
for (size_t i = 0; i != batch_size; i += 1)
{
for (;;)
{
my_node* node = (my_node*)queue.dequeue();
if (node)
{
node_cache[i] = node;
break;
}
SwitchToThread();
}
}
}

return 0;
}

int main()
{
mpmc_queue queue;

HANDLE threads [thread_count];
for (int i = 0; i != thread_count; ++i)
{
threads[i] = (HANDLE)_beginthreadex(0, 0, thread_func, &queue,
0, 0);
}

Sleep(1);

unsigned __int64 start = __rdtsc();
g_start = 1;

WaitForMultipleObjects(thread_count, threads, 1, INFINITE);

unsigned __int64 end = __rdtsc();
unsigned __int64 time = end - start;
std::cout << "cycles/op=" << time / (batch_size * iter_count * 2 *
thread_count) << std::endl;
}

--
Dmitriy V'yukov

kimo

unread,
Jul 30, 2009, 9:21:17 PM7/30/09
to lock...@googlegroups.com
How does this compare with the 2009 Scaleable Reader/Writer locks from Sun>

Dmitriy V'jukov

unread,
Jul 31, 2009, 5:15:37 AM7/31/09
to Scalable Synchronization Algorithms
On 31 июл, 05:21, kimo <k...@webnetic.net> wrote:
> How does this compare with the 2009 Scaleable Reader/Writer locks from Sun>http://research.sun.com/scalable/pubs/index.html


I'm not sure what do you mean. Queues do not compare to reader-writer
locks at all.


--
Dmitriy V'yukov

Dmitriy V'jukov

unread,
Aug 18, 2009, 8:51:23 AM8/18/09
to Scalable Synchronization Algorithms
I've verified the algorithm with Relacy Race Detector, and it does not
find any errors. Here is the test code:
http://relacy.pastebin.com/f5df9dcfa


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 18, 2009, 8:57:38 AM8/18/09
to Scalable Synchronization Algorithms
This algorithm can be extended with batch producing and consuming
operations.
Batch producing is quite straight-forward:

void enqueue_batch(node_t* first, node_t* last)
{
ASSERT(first && last);
last->next_ = 0;
node_t** prev = (node_t**)_InterlockedExchange((long*)&tail_,
(long)last);
ASSERT(prev);
prev[0] = first;
}


Batch consuming is a bit trickier because dequeue() operation is only
lock-free, thus may suffer from live-lock. So I decided to reduce
batch size if thread encounters contention:

node_t* dequeue_batch(size_t max)
{
unsigned retry_count = 0;
retry:
__try
{
head_t h;
h.ptr_= head_.ptr_;
h.cnt_ = head_.cnt_;
for (;;)
{
node_t* n = h.ptr_;
if (n == 0)
{
return 0;
}
node_t* next = n->next_;
if (next)
{
size_t count = 1;
node_t* last = n;
while (next->next_ && count < max)
{
last = next;
next = next->next_;
count += 1;
}
head_t xchg = {next, h.cnt_ + 1};
__int64 prev_raw = _InterlockedCompareExchange64
((__int64*)&head_, *(__int64*)&xchg, *(__int64*)&h);
head_t prev = *(head_t*)&prev_raw;
if (*(__int64*)&prev == *(__int64*)&h)
{
last->next_ = 0;
return n;
}
if (max > 1)
max /= 2;
h.ptr_ = prev.ptr_;
h.cnt_ = prev.cnt_;
}
else
{
node_t* t = (node_t*)tail_;
if (n != t)
{
SwitchToThread();
h.ptr_= head_.ptr_;
h.cnt_ = head_.cnt_;
continue;
}
head_t xchg = {0, h.cnt_ + 1};
__int64 prev_raw = _InterlockedCompareExchange64
((__int64*)&head_, *(__int64*)&xchg, *(__int64*)&h);
head_t prev = *(head_t*)&prev_raw;
if (*(__int64*)&prev == *(__int64*)&h)
{
node_t* prev_tail = (node_t*)
_InterlockedCompareExchange((long*)&tail_, (long)&head_.ptr_, (long)
n);
if (prev_tail == n)
{
n->next_ = 0;
return n;
}
else
{
while (n->next_ == 0)
{
SwitchToThread();
}
head_.ptr_ = n->next_;
n->next_ = 0;
return n;
}
}
h.ptr_ = prev.ptr_;
h.cnt_ = prev.cnt_;
}
}
}
__except (GetExceptionCode() == EXCEPTION_ACCESS_VIOLATION && +
+retry_count < 16*1024 ?
EXCEPTION_EXECUTE_HANDLER : EXCEPTION_CONTINUE_SEARCH)
{
goto retry;
}
}

However this part is definitely a subject for tweaking for particular
environment.

In synthetic tests batch producing/consuming increases performance
very dramatically. Batch size of 4 increases performance ~3x.

Enjoy!

--
Dmitriy V'jukov

Andrew Venikov

unread,
Feb 8, 2010, 3:23:59 PM2/8/10
to Dmitriy V'jukov, lock...@googlegroups.com

On Jul 30 2009, 6:51 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> Here is my multi-producer/single-consumer queue:http://groups.google.ru/group/lock-free/browse_frm/thread/55df71b87ac...

A couple of questions.

First off, I'm sorry if this topic has already been covered somewhere
else, but I
couldn't find any discussions on this topic.

I'm going to skip the discussion about using memory fences as the code
seems to
assume that it's going to be run on a very strict memory model (such
as x86)

Q. #1
It seems to me that the test "if (n != t)" and the whole block under
it could
be skipped since the code that follows takes care of cases when the
tail was
changed from "under" us. Am I right?

Q. #2
Why are we loading head_ in stages and not as a whole number? Since
"head_" is
volatile, the compiler is not allowed to optimize two reads into a
single read.
That means that two separate reads must be issued. That also means
that it's
possible to read a pointer with an older value and a counter with a
newer value.
I don't see if that creates correctness issues, but it sure does
confuse a lot.


Thanks,
Andy.

Dmitriy Vyukov

unread,
Feb 9, 2010, 2:31:08 AM2/9/10
to Scalable Synchronization Algorithms
On Feb 8, 11:23 pm, Andrew Venikov <aveni...@gmail.com> wrote:
> On Jul 30 2009, 6:51 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
>
> > Here is my multi-producer/single-consumer queue:http://groups.google.ru/group/lock-free/browse_frm/thread/55df71b87ac...
>
> A couple of questions.
>
> First off, I'm sorry if this topic has already been covered somewhere
> else, but I
> couldn't find any discussions on this topic.
>
> I'm going to skip the discussion about using memory fences as the code
> seems to
> assume that it's going to be run on a very strict memory model (such
> as x86)


Hi Andrew,

Yes, it assumes strict memory model. By "// 32-bit, Windows, MSVC" I
imply x86.

> Q. #1
> It seems to me that the test "if (n != t)" and the whole block under
> it could
> be skipped since the code that follows takes care of cases when the
> tail was
> changed from "under" us. Am I right?


No. It handles the following special case.
If (n->next_ == 0) and (n != t), then producer is already in the
middle of enqueue. I.e. he changed tail_ with XCHG, but not yet
updated next pointer, i.e. he is currently at the point marked with
"(***)".
So consumer just spins waiting for producer, that's what "(n != t)"
block handles.

> Q. #2
> Why are we loading head_ in stages and not as a whole number? Since
> "head_" is
> volatile, the compiler is not allowed to optimize two reads into a
> single read.
> That means that two separate reads must be issued. That also means
> that it's
> possible to read a pointer with an older value and a counter with a
> newer value.
> I don't see if that creates correctness issues, but it sure does
> confuse a lot.


What I wanted to emphasize is that the algorithm does not require
atomic double-word load (there are algorithms that require atomic
double-word load, or at least loads must be issued in particular
order).
Of course, you may code it as atomic load if you wish, that won't
sacrifice correctness.
As for performance, plain load from cached location is pretty cheap
(portion of a cycle). And atomic load has to use MMX/SSE load
instructions along with conversion from MMX/SSE register to general-
purpose register, which I suspect is slower.
Btw, if you issue plain load on __int64 variable, compiler will indeed
issue 2 32-bit loads.


--
Dmitriy V'jukov

Andrew Venikov

unread,
Feb 9, 2010, 10:46:16 AM2/9/10
to Scalable Synchronization Algorithms
Dmitriy,

thank you for your explanation.
But it looks like I'm still missing something.

On Feb 9, 2:31 am, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> On Feb 8, 11:23 pm, Andrew Venikov <aveni...@gmail.com> wrote:

<snip>

> > Q. #1
> > It seems to me that the test "if (n != t)" and the whole block under
> > it could
> > be skipped since the code that follows takes care of cases when the
> > tail was
> > changed from "under" us. Am I right?
>
> No. It handles the following special case.
> If (n->next_ == 0) and (n != t), then producer is already in the
> middle of enqueue. I.e. he changed tail_ with XCHG, but not yet
> updated next pointer, i.e. he is currently at the point marked with
> "(***)".
> So consumer just spins waiting for producer, that's what "(n != t)"
> block handles.

What I meant to say here is that even if you remove (n != t) check
and
the block that gets executed if it's true, the algorithm would still
be correct.
Let's see what happens if you remove it.
First, we get in that area only if prior to that we've established
that
there's only one element in the queue. (n->next_ was NULL).
No, let's say that producers have modified the tail. Possibly several
times.
The first thing that we'd do in that case would be CAS on the head. If
id
doesn't succede - fine we're back looping. If it does succeed
however,
we'll try to CAS the tail which will fail in our case. That means that
we
need to wait for the consumer to update "next" pointer of the first
node
in the queue, which you do in your "***" loop.
I think what I'm trying to say is that you do this "waiting" loop in
two
different places. I think detecting that the tail has been modified
earlier
(at n != t line) adds no benefits as compared to when we detect it
later
(at CAS(tail_) line).

What am I missing?

Thanks,
Andy.

Dmitriy Vyukov

unread,
Feb 11, 2010, 9:48:16 AM2/11/10
to Scalable Synchronization Algorithms


Hi Andrew,

I've carefully re-examined the algorithm and I think you are generally
right. However, here is one caveat.
Second spin-wait "while (n->next_ == 0) SwitchToThread();" is
conducted when the consumer reset 'head_' to zero, and this prevents
all other consumers from any progress (they just instantly piss off
once they see 'head_ == 0'). That's not very good.

I think that part of the algorithm can improved along the lines of:


if (prev.whole_ == h.whole_)
{
node_t* prev_tail = (node_t*)
_InterlockedCompareExchange
((long*)&tail_, (long)&head_.ptr_, (long)n);
if (prev_tail == n)
return n;

// CHANGED START HERE \/ \/ \/
if (n->next_)
{
// if the next pointer is already established by the producer,
// then we consume current message


head_.ptr_ = n->next_;
return n;
}

// otherwise restore 'head_' value to unblock other consumers
// and wait for the producer to establish the next link
head_.ptr_ = n;


SwitchToThread();
h.ptr_= head_.ptr_;
h.cnt_ = head_.cnt_;
continue;
}

And even with this improvement I would still prefer to leave first
spin-wait "while (n->next_ == 0) SwitchToThread();" as is, because if
we know that the producer is preempted in (***) there is no need in
constant CASes and modifications of shared state. Well, it's like
TATAS mutex algorithm - why try to modify shared state if we know that
we will fail?

--
Dmitriy V'jukov

Reply all
Reply to author
Forward
0 new messages