Yet Another MPMC Queue

1979 views
Skip to first unread message

Dmitriy Vyukov

unread,
Aug 18, 2009, 12:25:56 PM8/18/09
to Scalable Synchronization Algorithms
The main design goal was to achieve maximum scalability under high
load. The queue is intended to transfer pointers. The underlying data
structure is an linked-list of fixed-size arrays. The algorithm is
lock-free, but mostly wait-free. Requires double-word CAS.

Following benchmark was used for testing:

thread()
{
for (int i = 0; i != iter_count; i += 1)
{
for (int j = 0; j != batch_size; j += 1)
queue.enqueue((void*)1);
for (int j = 0; j != batch_size; j += 1)
queue.dequeue();
}
}

number_of_threads = 16
batch_size = 64

Hardware is Intel Core2Quad Q6600.
The test was conducted with SetProcessAffinityMask(GetCurrentProcess
(), affinity); where affinity = 1, 3, 5, 15.
Mean number of cycles per operation (enqueue/dequeue) was measured.

Here is the results:
affinity=1, cycles/op = 32
affinity=3, cycles/op = 65
affinity=5, cycles/op = 120
affinity=15, cycles/op = 265

For a comparison I benchmarked tbb::concurrent_bounded_queue<int*>
from Intel Threading Building Blocks 2.2, and here is the results:
affinity=1, cycles/op = 235 (7.3 slowdown)
affinity=3, cycles/op = 1071 (16.7 slowdown)
affinity=5, cycles/op = 2364 (19.7 slowdown)
affinity=15, cycles/op = 6085 (22.9 slowdown)

--
Dmitriy V'jukov


Dmitriy V'jukov

unread,
Aug 18, 2009, 12:36:52 PM8/18/09
to Scalable Synchronization Algorithms
The queue uses strongly thread-safe reference counting to manage block
lifetime, hence the need for double-word atomic operations. Other life-
time management technique can be employed (GC, SMR, RCU, etc) which
does not require double-word atomic operations.

The queue can be used with other user types than pointers. The only
requirement that the type must have some special value which means
NULL/NOT-PRESENT, and the type must allow atomic transition from NULL
to fully-constructed values.
For example following type can be used as value:

struct task
{
void (*f) (task*);
void* param1;
void* param2;
void* param3;

static void atomic_store(task& dst, task const& src)
{
dst.param1 = src.param1;
dst.param2 = src.param2;
dst.param3 = src.param3;
STORE_RELEASE(dst.f, src.f);
}

static bool check_present(task& t)
{
return LOAD_ACQUIRE(t.f) != 0;
}
};



--
Dmitriy V'jukov

Raine Fan

unread,
Aug 18, 2009, 1:42:56 PM8/18/09
to lock...@googlegroups.com
Hi Dmitriy!

The results looks amazing! Is this queue you are talking about the same as you posted in previous http://groups.google.com/group/lock-free/browse_thread/thread/c8e3201da4a6a300# ?

Does it support batch enqueueing/dequeueing ?

Thanks and regards!
Raine

From: Dmitriy V'jukov <dvy...@gmail.com>
To: Scalable Synchronization Algorithms <lock...@googlegroups.com>
Sent: Tuesday, August 18, 2009 1:36:52 PM
Subject: [lock-free] Re: Yet Another MPMC Queue

Dmitriy V'jukov

unread,
Aug 18, 2009, 2:02:38 PM8/18/09
to Scalable Synchronization Algorithms
On 18 авг, 21:42, Raine Fan <raine...@ymail.com> wrote:
> Hi Dmitriy!
>
> The results looks amazing! Is this queue you are talking about the same as you posted in previoushttp://groups.google.com/group/lock-free/browse_thread/thread/c8e3201...?
>
> Does it support batch enqueueing/dequeueing ?

Thank you for interest.

No, it's completely different design.
That queue is node-based and intrusive. This queue is array-based,
items are stored 'by value'. That queue scales worser, however you may
benefit from intrusiveness + simpler interface (no thread_proxy) + no
need to preallocate memory (this queue preallocates 4kb in ctor, that
queue does not allocate any memory at all).

I think it's possible to bolt batch producing/consuming on this queue.
But I was not thinking about it yet. Do you interested in this?

I am going to attach full sources with MSVC solution and benchmark
code later.
Here is the usage example, note thread_proxy usage:


unsigned __stdcall thread_func(void* ctx)
{
mpmc_queue<int>& q = *(mpmc_queue<int>*)ctx;

mpmc_queue<int>::thread_proxy enq_proxy;
mpmc_queue<int>::thread_proxy deq_proxy;

for (int iter = 0; iter != iter_count; ++iter)
{
for (size_t i = 0; i != batch_size; i += 1)
{
q.enqueue(enq_proxy, (int*)1);
}
for (size_t i = 0; i != batch_size; i += 1)
{
int* n = q.dequeue(deq_proxy);
}
}
return 0;
}


--
Dmitriy V'jukov

Raine

unread,
Aug 19, 2009, 11:25:33 AM8/19/09
to Scalable Synchronization Algorithms
Thanks for reply!

Oh...yes it would be nice to have batching on this type of queues. But
please take your time... It was just curiosity of mine if was the same
queue.

Thanks so much for your help!

Regards,
Raine

On Aug 18, 3:02 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> On 18 авг, 21:42, Raine Fan <raine...@ymail.com> wrote:
>
> > Hi Dmitriy!
>
> > The results looks amazing! Is this queue you are talking about the same as you posted in previoushttp://groups.google.com/group/lock-free/browse_thread/thread/c8e3201...
>

Dmitriy V'jukov

unread,
Aug 20, 2009, 2:20:35 PM8/20/09
to Scalable Synchronization Algorithms
On Aug 19, 8:25 am, Raine <raine...@ymail.com> wrote:
> Thanks for reply!
>
> Oh...yes it would be nice to have batching on this type of queues. But
> please take your time... It was just curiosity of mine if was the same
> queue.


Here is batch produce/consume methods.
With batching per element costs drop significantly. On the same
benchmark I experience nearly 4x speedup with batch size = 4, and 6x
speedup with batch size = 8. (however I used another (dual core)
machine for tests now).


void enqueue_batch(thread_proxy& proxy, T** begin, T** end)
{
ASSERT(begin && end > begin);
size_t count = end - begin;
std::atomic_thread_fence(std::memory_order_release);
cell_t* t = proxy.reacquire(tail_);
for (;;)
{
uintptr_t pos = t->produce_pos_.fetch_add(count,
std::memory_order_seq_cst);
if (pos < item_count)
{
size_t cell_count = std::min<size_t>(item_count - pos,
count);
for (size_t i = 0; i != cell_count; i += 1)
{
std::atomic<uintptr_t>& item = t->items_
[get_item_idx(pos + i)];
ASSERT(item.load(std::memory_order_relaxed) == 0);
T* v = begin[i];
ASSERT(v);
item.store((uintptr_t)v,
std::memory_order_relaxed);
}
if (ec_.is_armed())
ec_.notify_relaxed(notify_batch_pred(t, pos,
cell_count));
count -= cell_count;
if (count == 0)
return;
begin += cell_count;
}

t = enqueue_alloc(proxy, t);
}
}

size_t dequeue_batch(thread_proxy& proxy, T** begin, T** end)
{
ASSERT(begin && end > begin);
size_t count = end - begin;
cell_t* h = proxy.reacquire(head_);
for (;;)
{
uintptr_t pos = h->consume_pos_.load
(std::memory_order_relaxed);
if (pos < item_count)
{
size_t cell_count = std::min<size_t>(count, item_count
- pos);
size_t can_consume = 0;
for (; can_consume != cell_count; can_consume += 1)
{
begin[can_consume] = (T*)h->items_[get_item_idx
(pos)].load(std::memory_order_acquire);
if (begin[can_consume] == 0)
break;
}
if (can_consume < 2)
{
pos = h->consume_pos_.fetch_add(1,
std::memory_order_relaxed);
if (pos < item_count)
{
begin[0] = (T*)h->items_[get_item_idx
(pos)].load(std::memory_order_acquire);
if (begin[0])
return 1;
begin[0] = dequeue_wait(proxy, h, pos);
return 1;
}
}
else
{
if (h->consume_pos_.compare_exchange_strong(pos,
pos + can_consume, std::memory_order_relaxed))
return can_consume;
else
continue;
}
}
h = dequeue_alloc(proxy, h);
}
}

struct notify_batch_pred
{
cell_t* cell_;
size_t pos_;
size_t count_;

notify_batch_pred(cell_t* cell, size_t pos, size_t count)
: cell_(cell)
, pos_(pos)
, count_(count)
{}
bool operator () (void* ctx, size_t /*total*/, size_t /*idx*/)
{
for (size_t i = pos_; i != pos_ + count_; i += 1)
{
std::atomic<uintptr_t>& item = cell_->items_
[mpmc_queue<T>::get_item_idx(i)];
if (ctx == &item)
return true;
}
return false;
}
};


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 20, 2009, 2:23:23 PM8/20/09
to Scalable Synchronization Algorithms
Note that dequeue() may return less items than batch size. I.e. batch
size is just a hint for dequeue() function.

Here is new test function:

unsigned __stdcall thread_func(void* ctx)
{
mpmc_queue<int>& q = *(mpmc_queue<int>*)ctx;
mpmc_queue<int>::thread_proxy enq_proxy;
mpmc_queue<int>::thread_proxy deq_proxy;

int* items [pack_size];
for (size_t i = 0; i != pack_size; i += 1)
items[i] = (int*)1;

for (int iter = 0; iter != iter_count; ++iter)
{
for (size_t i = 0; i != batch_size / pack_size; i += 1)
{
q.enqueue_batch(enq_proxy, items, items + pack_size);
}
for (size_t i = 0; i != batch_size; i += 1)
{
size_t to_consume = std::min<size_t>(pack_size, batch_size
- i);
size_t count = q.dequeue_batch(deq_proxy, items, items +
to_consume);
i += count - 1;

Dmitriy V'jukov

unread,
Aug 20, 2009, 2:25:48 PM8/20/09
to Scalable Synchronization Algorithms
And here is size() function. It's a bit ugly, it has to iterate over
blocks and sum up total size. Also it may cause AV/SIGSEGV, which must
be suppressed:


size_t size(thread_proxy& proxy)
{
unsigned retry_count = 0;
retry:
__try
{
for (;;)
{
cell_t* head = proxy.reacquire(head_);
cell_t* cell = head;
size_t size = 0;
for (;;)
{
uintptr_t cons = cell->consume_pos_.load
(std::memory_order_acquire);
if (cons > item_count)
cons = item_count;
uintptr_t prod = cell->produce_pos_.load
(std::memory_order_acquire);
if (prod > item_count)
prod = item_count;
if (prod <= cons)
break;
size += prod - cons;
cell_t* next = cell->next_.load
(std::memory_order_acquire);
if (next == 0)
break;
cell = next;
}
if (head == head_.load
(std::memory_order_acquire).ptr_)
return size;
}
}
__except (GetExceptionCode() == EXCEPTION_ACCESS_VIOLATION && +
+retry_count < 16*1024 ?
EXCEPTION_EXECUTE_HANDLER : EXCEPTION_CONTINUE_SEARCH)
{
goto retry;
}
}



--
Dmitriy V'jukov

Chris Cochran

unread,
Aug 20, 2009, 3:25:43 PM8/20/09
to Scalable Synchronization Algorithms
On Aug 18, 9:25 am, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> The main design goal was to achieve maximum scalability under high
> load. The queue is intended to transfer pointers. The underlying data
> structure is an linked-list of fixed-size arrays. The algorithm is
> lock-free, but mostly wait-free. Requires double-word CAS.
>
> Mean number of cycles per operation (enqueue/dequeue) was measured.
>
> Here is the results:
> affinity=1, cycles/op = 32
> affinity=3, cycles/op = 65
> affinity=5, cycles/op = 120
> affinity=15, cycles/op = 265
>
> For a comparison I benchmarked tbb::concurrent_bounded_queue<int*>
> from Intel Threading Building Blocks 2.2, and here is the results:
> affinity=1, cycles/op = 235 (7.3 slowdown)
> affinity=3, cycles/op = 1071 (16.7 slowdown)
> affinity=5, cycles/op = 2364 (19.7 slowdown)
> affinity=15, cycles/op = 6085 (22.9 slowdown)
>
> --
> Dmitriy V'jukov

This looks really impressive and interesting. I might like to use it
for a work-stealing queue. I have recently been designing an array-
based method for this same type of thing, also with CPU scalability in
mind. Mine is based on an array of cachelines where,

(1) Producers reserve their own cacheline to post into, holding it
until it fills or it's released partially full.
(2) Consumers start at different points in the array, and
asynchronously consume their way around the array until it empties.
(3) Consumers simply find and reserve the next nonempty cacheline, and
single-threadedly process its entire contents before moving on to the
next one.
(4) Multiple such arrays are linked together to handle overflow. Any
array can be marked to require completion of all previous arrays, to
implement logical barriers, as needed by the application.

Speed is achieved from the natural cacheline batching, and from the
avoidance of hot spots by both consumers and producers.

However, my method would seem to become inefficient when the array
becomes nearly empty, and solving that appears to get back to the same
hot-spots from which this design is attempting to avoid. So I'm not
quite there yet...

But I am curious to see how your method avoids the hot-spots that
occur at each end of a queue, i.e. where the concurrent queue
management occurs.

Dmitriy V'jukov

unread,
Aug 21, 2009, 3:43:51 AM8/21/09
to Scalable Synchronization Algorithms
On Aug 20, 11:25 pm, Chris Cochran <ch...@megabasic.com> wrote:
> This looks really impressive and interesting.  I might like to use it
> for a work-stealing queue.  I have recently been designing an array-
> based method for this same type of thing, also with CPU scalability in
> mind.  Mine is based on an array of cachelines where,
>
> (1) Producers reserve their own cacheline to post into, holding it
> until it fills or it's released partially full.
> (2) Consumers start at different points in the array, and
> asynchronously consume their way around the array until it empties.
> (3) Consumers simply find and reserve the next nonempty cacheline, and
> single-threadedly process its entire contents before moving on to the
> next one.
> (4) Multiple such arrays are linked together to handle overflow.  Any
> array can be marked to require completion of all previous arrays, to
> implement logical barriers, as needed by the application.
>
> Speed is achieved from the natural cacheline batching, and from the
> avoidance of hot spots by both consumers and producers.
>
> However, my method would seem to become inefficient when the array
> becomes nearly empty, and solving that appears to get back to the same
> hot-spots from which this design is attempting to avoid.  So I'm not
> quite there yet...
>
> But I am curious to see how your method avoids the hot-spots that
> occur at each end of a queue, i.e. where the concurrent queue
> management occurs.


Hi Chris,

My algorithm does not avoid hot-spots, but it tries to make them as
benign as possible. In order to describe how current version achieves
this, I have to describe my initial idea and why it failed.
My initial idea was to build queue of the following blocks:

struct cell_t
{
short producer_pos;
short consumer_pos;
item_t data [(cache_line_size - 2*sizeof(short)) / sizeof(item_t)];
};

note that sizeof(cell_t) == cache_line_size, then

struct block_t
{
cell_t cells [64];
block_t* next;
// actually 'next' member must be embed into last cell_t,
// so that sizeof(block_t) == page_size (4096 or something like
that)
};

then the queue is represented as:

struct queue_t
{
cell_t* head;
cache_line_pad pad;
cell_t* tail;
};

then enqueue operation is:

enqueue(v)
{
for (;;)
{
cell_t* t = tail;
short pos = t->produce_pos.fetch_add(1);
if (pos < cell_size)
{
t->data[pos] = v;
return;
}
else if (is_not_last_cell_in_block(t)) // determined by alignment
move_tail_to_next_cell();
else
enqueue_new_block_and_move_tail();
}
}

and dequeue is:

dequeue()
{
for (;;)
{
cell_t* h = head;
short pos = h->consume_pos.fetch_add(1);
if (pos < cell_size)
{
while (h->data[pos] == 0) yield();
return h->data[pos];
}
else if (is_not_last_cell_in_block(h)) // determined by alignment
move_head_to_next_cell();
else
enqueue_new_block_and_move_head();
}
}


Ok, what do we get? We get is that each operation (either enqueue or
dequeue) causes transfer of only 1 cache-line (I mean common case when
we are inside of a single cell). I.e. thread reads tail (or head),
tail must be already in cache in SHARED state, because it's modified
infrequently (only once per cell). Then thread executes atomic
fetch_add on produce_pos (or consume_pos), this operation causes
transfer of cache line, so now cell is in our cache in MODIFIED state.
Then thread writes (reads) the value to the same cell, since cell is
already precached by previous fetch_add, this operation basically
costless.
Of course, when thread has to switch cell or enqueue new block, it
touches more cache-lines and executes more atomic RMWs. But this is
amortized by the size of the cell/block structures.

So what we get is a absolute minimum, each operation causes transfer
of single cache line (amortized). Well, you can't execute modification
operation on a centralized data-structure w/o at least 1 cache line
transfer.


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 21, 2009, 3:57:03 AM8/21/09
to Scalable Synchronization Algorithms
On Aug 21, 11:43 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:

> Ok, what do we get? We get is that each operation (either enqueue or
> dequeue) causes transfer of only 1 cache-line (I mean common case when
> we are inside of a single cell). I.e. thread reads tail (or head),
> tail must be already in cache in SHARED state, because it's modified
> infrequently (only once per cell). Then thread executes atomic
> fetch_add on produce_pos (or consume_pos), this operation causes
> transfer of cache line, so now cell is in our cache in MODIFIED state.
> Then thread writes (reads) the value to the same cell, since cell is
> already precached by previous fetch_add, this operation basically
> costless.
> Of course, when thread has to switch cell or enqueue new block, it
> touches more cache-lines and executes more atomic RMWs. But this is
> amortized by the size of the cell/block structures.
>
> So what we get is a absolute minimum, each operation causes transfer
> of single cache line (amortized). Well, you can't execute modification
> operation on a centralized data-structure w/o at least 1 cache line
> transfer.


After I had implemented above algorithm, I did some tests and
experimentation.
I've found that if I "increase cache line size", i.e. make cells
bigger (let's say 256 bytes, instead of 64), I get better performance.

Intermediate variant was as follows:

struct cell_t
{
unsigned produce_pos;
item_t data [(64 - sizeof(unsigned)) / sizeof(item_t)];
};

struct block_t
{
unsigned consume_pos;
cell cells [64];
block_t* next;
};

I.e. producers work on cell (cache line) basis. And consumers work on
block basis. The idea was that consumers does not modify data, so all
consumers may have whole block precached in SHARED state, it's
actually shared read-only data for them (of course provided that
producers already finished with this block).


But then by just pure experimentation I've found that following works
better in my setup:

struct block_t
{
uintptr_t consume_pos;
// note 256 here
item_t data [(256 - 3*sizeof(uintptr_t)) / sizeof(item_t)];
uintptr_t produce_pos;
block_t* next;
};


Frankly, I do not have clear explanation why this works better than my
initial idea.
I think that some parameters of the algorithms must be tuned for
particular hardware and workload.


--
Dmitriy V'jukov

Chris Cochran

unread,
Aug 21, 2009, 4:27:21 PM8/21/09
to Scalable Synchronization Algorithms
Dmitriy,

Several observations...

(1) Larger consumer blocks increase completion latency. Am I safe in
assuming that each block is fully processed by a single consumer? 63
work ptrs (in 256 bytes) would seem a bit hefty, while 15 work
pointers (in 64 bytes) would allow finer-grained concurrency.

(2) Maybe it doesn't work any better than your original idea. Best-
case clock times in the lab can be very different from clock times
that consider an actual processing load on the consuming CPUs, i.e.
each of those pointers presumably represent a finite request to
satisfy. I have found that obtaining definitive clock times on these
things can be elusive.

(3) I have trouble with the notion of four consumer CPUs fighting over
the consuming end of the queue, especially when it's not really a
queue at all. Queues usually imply some preservation of order, but
with multiple consumers, no particular consuming order can be
guaranteed. So why not ignore order, and open up the possibility of
distributed-consumer approaches having lower statistical contention.

Chris Cochran

unread,
Aug 22, 2009, 3:26:40 PM8/22/09
to Scalable Synchronization Algorithms


On Aug 21, 12:57 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> On Aug 21, 11:43 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
>
> ...
>
> Frankly, I do not have clear explanation why this works better than my
> initial idea.
> I think that some parameters of the algorithms must be tuned for
> particular hardware and workload.
>
> --
> Dmitriy V'jukov- Hide quoted text -
>
> - Show quoted text -

Isn't the clear explanation simply that, the more a processor can
remain executing pure, unobstucted single-threaded logic, the less
encumbered by synchro-requirements it becomes. This lowers (its often
substantial) cacheline load overhead. But too much single-
threadedness reduces concurrency opportunities, so finding the right
balance plays some role.

Sankar

unread,
Aug 25, 2009, 5:46:33 AM8/25/09
to Scalable Synchronization Algorithms
Hi Dmitriy,

Thank you for guiding me to the mpmc queue algorithm posted here.
I read about your initial idea and the experimentations that you had
done on it.
Im very much interested about these kind of algorithms and think I
have lots to learn from this.
Please take pain if I bore you with much of known details and correct
me if my observations are wrong

Following are my observations :

Lets consider a single producer and a single consumer working on the
queue.

The producer makes a atomic fetch_and_add on producer_pos and then
updates the enqueued value in the array of items.
The consumer makes a atomic fetch_and_add on consumer_pos and then
reads a item in the array of items.

Thereby, a producer makes two writes to finish enqueue,
while a consumer makes one write for updating consumer_pos and one
read for reading an item in the array to finish a dequeue.

It has to be noted that the two operations done by the producer and
consumer are seperated in the time domain.

Considering the initial idea where the cell is defined as,

struct cell_t
{
short producer_pos;
short consumer_pos;
item_t data [(cache_line_size - 2*sizeof(short)) / sizeof(item_t)];
};

The first write on producer_pos will result in an Intent to Write
(ITOW) that will make sure an Exclusive copy of
cell_t is available in its L1 cache. Now the producer will modify the
producer_pos and the state of the cacheline
that contains cell_t will be in Modified state.

Now at this point, if the consumer thread doesnot interfere with this
cell_t, then the next write to the array of items
by the producer will not result in an ITOW because the producer thread
already has the cacheline is Modified state.
So it can just do the data[pos] = value costlessly.

But if the consumer thread tries to do a write on consumer_pos before
the producer does a write on the array of items,
then the consumer thread has to get an exclusive copy of the cell_t in
its L1 cache.
So the consumer thread's write will make an ITOW, which will make the
cacheline modified by the producer to be updated
to their common ancestor(making the producer's cacheline to Invalid
state) and then an exclusive copy is got for the
consumer thread to modify. Now again if the producer thread tries to
write to the array of items, it makes another ITOW
because its cacheline is in invalid state. The ITOW by the producer
thread will make the cacheline of consumer thread
invalid( after updation to the common ancestor off course) and will
get an exclusive access from the common ancestor
and modifies it. Now when the consumer thread wants to read the value
from the array, it cannot do so because the
cacheline state is invalid and it makes an Intent to Read(ITOR) trying
to get a shared copy of the cacheline and
this will again cause a flush from the common ancestor which will
cause an update of the producer thread's cacheline
to the common ancestor. So end of this ITOR both the threads L1
cacheline will be in shared state and the consumer thread
reads the value.

So what happens in this worst case where the producer and consumer do
the steps alternatively w.r.t time,
are continuous invalidations across till the common ancestor to get
copies in L1 cache to modify or read. The case gets
even worser when the producer and consumer threads operate on cores on
different processors, causing updations of cacheline
across two ancestor levels. Worst when there are multiple producers
and consumers.

Now coming to the variant,

struct block_t
{
uintptr_t consume_pos;
// note 256 here
item_t data [(256 - 3*sizeof(uintptr_t)) / sizeof(item_t)];
uintptr_t produce_pos;
block_t* next;
};

Now the reasoning which I can find as to why the above variant
performs better is that the consume_pos
and produce_pos on located on different cachelines. So producers and
consumers dont contend to get an exclusive copy
to write to them. However different producers contend for produce_pos
and different consumers contend for consumer_pos,
but I dont think this can be avoided.

So I would expect the following variant to atleast perform as good as
your best implementation and
better than your initial idea.

struct block_t
{
uintptr_t consume_pos;
cachelinepad pad;
item_t data[(page_size - 2 * cachelinesize - sizeof(block_t*))/sizeof
(item_t)];
cachelinepad pad;
uintptr_t produce_pos;
block_t* next;
};

Here the consume_pos, produce_pos are on two seperate cachelines.

And to build on this, I think if choosing the position in data array
are directed to different cachelines
for consequetive (push/pop) operations in the queue then it would help
reduce contentions.
In short this would be similar to how consequetive operations lead to
choosing micro queues in an LRU way
in TBB concurrent_queue.

Regards,
Sankar

Dmitriy V'jukov

unread,
Aug 26, 2009, 3:25:23 AM8/26/09
to Scalable Synchronization Algorithms
On Aug 22, 12:27 am, Chris Cochran <ch...@megabasic.com> wrote:
> Dmitriy,
>
> Several observations...
>
> (1) Larger consumer blocks increase completion latency.  Am I safe in
> assuming that each block is fully processed by a single consumer?  63
> work ptrs (in 256 bytes) would seem a bit hefty, while 15 work
> pointers (in 64 bytes) would allow finer-grained concurrency.


No, each item is consumed separately. There is no "reservation" of
items for "future use".
Thus the queue provides strongest causal FIFO guarantees, and no
problems with stalled consumers.


> (2) Maybe it doesn't work any better than your original idea.  Best-
> case clock times in the lab can be very different from clock times
> that consider an actual processing load on the consuming CPUs, i.e.
> each of those pointers presumably represent a finite request to
> satisfy.  I have found that obtaining definitive clock times on these
> things can be elusive.


Maybe. It's just the only thing I can measure easily :)


> (3) I have trouble with the notion of four consumer CPUs fighting over
> the consuming end of the queue, especially when it's not really a
> queue at all.  Queues usually imply some preservation of order, but
> with multiple consumers, no particular consuming order can be
> guaranteed.  So why not ignore order, and open up the possibility of
> distributed-consumer approaches having lower statistical contention.


Well, first of all, yes, MPMC is an performance anti-pattern in
multicore world. And one better use distributed design. However note
that such MPMC queue can be used in a distributed work-stealing
environment too.

Regarding ordering. Usually user does not want any strict ordering in
multi-consumer environment, usually he uses MPMC queue just as load-
balancer, so "best-effort" FIFO is Ok.
But strictly saying user is able to determine whether queue provides
causal ordering or not even in multi-consumer environment...well, not
actually multi-consumer but special form of single-consumer where
abstract consumer "jumps" between physical threads. Ok, here is an
example. Assume thread 1 produces items 1, 2, 3 (in this particular
order). Then thread 2 consumes item 1. Then thread 2 wakes thread 3.
Then thread 3 executes dequeue() on the queue. The question is: what
item queue will return? Naive user expects item 2 (causal FIFO).
However "smart" distributed queue may return item 3, and that's not
what user expects. My queue will return item 2.
You may say that in this example user must use MPSC queue instead,
because there is actually only 1 consumer.
And I may say that user is going to user the queue with multiple
consumers. Later. But in this particular moment he uses it with only 1
"logical" consumer, and thus reasonably expects FIFO ordering.
So the bottom line. If one is developing library such as Intel TBB, he
better stick to strict causal FIFO ordering. In most other situations
it's perfectly reasonable to use MPSC, SPMC, SPSC and/or per-producer
FIFO, best-effort FIFO, best-effort LIFO, etc.


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 26, 2009, 3:27:34 AM8/26/09
to Scalable Synchronization Algorithms
On Aug 22, 11:26 pm, Chris Cochran <ch...@megabasic.com> wrote:
> On Aug 21, 12:57 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
>
> > On Aug 21, 11:43 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
>
> > ...
>
> > Frankly, I do not have clear explanation why this works better than my
> > initial idea.
> > I think that some parameters of the algorithms must be tuned for
> > particular hardware and workload.
>
>
> Isn't the clear explanation simply that, the more a processor can
> remain executing pure, unobstucted single-threaded logic, the less
> encumbered by synchro-requirements it becomes.  This lowers (its often
> substantial) cacheline load overhead.  But too much single-
> threadedness reduces concurrency opportunities, so finding the right
> balance plays some role.


My queue does not use "reservation for future use". Each item is
consumed separately and fairly with atomic RMW. No bonus from single-
threaded execution and no loss of concurrency.

--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 26, 2009, 3:53:25 AM8/26/09
to Scalable Synchronization Algorithms
On Aug 25, 1:46 pm, Sankar <shankar.n...@gmail.com> wrote:
> Hi Dmitriy,
>
>     Thank you for guiding me to the mpmc queue algorithm posted here.
> I read about your initial idea and the experimentations that you had
> done on it.
> Im very much interested about these kind of algorithms and think I
> have lots to learn from this.
> Please take pain if I bore you with much of known details and correct
> me if my observations are wrong
>
> Following are my observations :
>
> Lets consider a single producer and a single consumer working on the
> queue.
>
> The producer makes a atomic fetch_and_add on producer_pos and then
> updates the enqueued value in the array of items.
> The consumer makes a atomic fetch_and_add on consumer_pos and then
> reads a item in the array of items.
>
> Thereby, a producer makes two writes to finish enqueue,
> while a consumer makes one write for updating consumer_pos and one
> read for reading an item in the array to finish a dequeue.


Yes.


> It has to be noted that the two operations done by the producer and
> consumer are seperated in the time domain.


Yes.


> Considering the initial idea where the cell is defined as,
>
> struct cell_t
> {
>   short producer_pos;
>   short consumer_pos;
>   item_t data [(cache_line_size - 2*sizeof(short)) / sizeof(item_t)];
>
> };
>
> The first write on producer_pos will result in an Intent to Write
> (ITOW) that will make sure an Exclusive copy of
> cell_t is available in its L1 cache. Now the producer will modify the
> producer_pos and the state of the cacheline
> that contains cell_t will be in Modified state.
>
> Now at this point, if the consumer thread doesnot interfere with this
> cell_t, then the next write to the array of items
> by the producer will not result in an ITOW because the producer thread
> already has the cacheline is Modified state.
> So it can just do the data[pos] = value costlessly.


Yes. This is the intent.
There is very high probability that producers and consumers will be
able to accomplish whole transaction with the cache-line w/o
interruptions. Thus only 1 ITOW or RFO (request for ownership). And
this must be especially true in real environment where threads, you
know, do some useful work between queue operations :)
Well, yes, this is possible. This the "worst case" for the algorithm.
As I said, I do not have my own explanation. But I see "a hole" in
your.
Let's assume producer_pos and data array separated on different cache
lines. Now producer 1 has to issue first ITOW/RFO for producer_pos and
then issue second ITOW/RFO for data array. Then producer 2 comes 1
millisecond after (no contention), and he again has to issue 2 ITOW/
RFO.
So basically, my initial variant: 1 ITOW/RFO in common case, and 2
ITOW/RFO in the worst case.
Your variant: 2 ITOW/RFO always.
That's why I said I can't explain it for myself. I was trying to
achieve 1 ITOW/RFO at least in some cases.


Btw, I use similar to TBB micro_queue trick, i.e. I do some
intermixing of items:

static uintptr_t get_item_idx(uintptr_t cnt)
{
//return cnt;
//return (cnt & 1) * item_count / 2 + (cnt >> 1);
return (cnt & 3) * item_count / 4 + (cnt >> 2);
//return (cnt & 7) * item_count / 8 + (cnt >> 3);
}


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 26, 2009, 4:03:51 AM8/26/09
to Scalable Synchronization Algorithms
F#$% Didn't I post the sources of the queue? Can't find them in this
thread now...

Ok, uploaded it into Files section:
http://groups.google.com/group/lock-free/files

There is complete MSVC 2005 solution, which includes benchmark code I
used.

The sources are quite dirty (there are some pieces from Relacy Race
Detector verification), but I think it will be better if I upload then
now, then upload clean files sometime later.


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 26, 2009, 1:07:01 PM8/26/09
to Scalable Synchronization Algorithms
> Yes. This is the intent.
> There is very high probability that producers and consumers will be
> able to accomplish whole transaction with the cache-line w/o
> interruptions. Thus only 1 ITOW or RFO (request for ownership). And
> this must be especially true in real environment where threads, you
> know, do some useful work between queue operations :)

Yes this is true.

> As I said, I do not have my own explanation. But I see "a hole" in
> your.
> Let's assume producer_pos and data array separated on different cache
> lines. Now producer 1 has to issue first ITOW/RFO for producer_pos and
> then issue second ITOW/RFO for data array. Then producer 2 comes 1
> millisecond after (no contention), and he again has to issue 2 ITOW/
> RFO.
> So basically, my initial variant: 1 ITOW/RFO in common case, and 2
> ITOW/RFO in the worst case.
> Your variant: 2 ITOW/RFO always.
> That's why I said I can't explain it for myself. I was trying to
> achieve 1 ITOW/RFO at least in some cases.

Yes, 1 ITOW/RFO when threads dont really contend on queue.
I think I lost this point when I tried evaluating your algorithm.

> Btw, I use similar to TBB micro_queue trick, i.e. I do some
> intermixing of items:
>
> static uintptr_t get_item_idx(uintptr_t cnt)
> {
> //return cnt;
> //return (cnt & 1) * item_count / 2 + (cnt >> 1);
> return (cnt & 3) * item_count / 4 + (cnt >> 2);
> //return (cnt & 7) * item_count / 8 + (cnt >> 3);
> }

This is interesting. I think I will have to look into your code to
understand
this.

I had a few questions on your initial posts. But now that you have
shared the code,
I'll try to go through the code first.

As far as my knowledge as to how lock-free queues and TBB queue work,
I think this
is the best multi threaded queue. Do you see any scope of improvement
that you see in your idea?

And also its recently that I started learning about concurrent
datastructures and what I have observed with queue datastructures is
that initial queues talked abt using Compare and swap( CAS) and now
the TBB queue and your queue uses atomic_fetch_and_increment. Can you
explain
as to why this is so(Pros and cons)? And do you see something new
coming up going forward? Because I would be interested to spend some
time working on such areas, but as of now have no clue where to start.

And Thank you, for evaluating my observations.

Regards,
Sankar

Dmitriy V'jukov

unread,
Aug 26, 2009, 1:26:41 PM8/26/09
to Scalable Synchronization Algorithms
Feel free to ask any questions on the code.


>  As far as my knowledge as to how lock-free queues and TBB queue work,
> I think this
> is the best multi threaded queue.


Thank you. It's actually FAST.


> Do you see any scope of improvement
> that you see in your idea?


Hmmm... Various parameters of the queue (producer block size, consumer
block size, algorithm used by get_item_idx()) may be tuned. I've just
tried to pick parameters that maximize throughput in my particular
environment. Currently I have no idea as to what parameters must be on
different machine or under different workload.
Still opened question is how and why they affect performance (why
current queue works better than my initial variant?).
Other than that I can't think of any other improvement now. I've tried
to incorporate everything I consider important.


>  And also its recently that I started learning about concurrent
> datastructures and what I have observed with queue datastructures is
> that initial queues talked abt using Compare and swap( CAS) and now
> the TBB queue and your queue uses atomic_fetch_and_increment. Can you
> explain
> as to why this is so(Pros and cons)?


CAS is usually used in dynamically linked data-structures (single/
double linked lists.
FAA is usually used with arrays.
It's difficult to implement list with FAA. It's possible to implement
array with FAA (CAS is a superset of FAA), but algorithms based on FAA
is generally faster than that based on CAS (all others equal of
course). Algorithms based on FAA are frequently wait-free (at least in
common case). Algorithms based on CAS are usually only lock-free.


> And do you see something new
> coming up going forward? Because I would be interested to spend some
> time working on such areas, but as of now have no clue where to start.


What exactly do you mean? New algorithms?
In general I think that MPMC queue is a busted idea, because it
provoke contention. In general the way to go is MPSC/SPSC queues.

--
Dmitriy V'jukov

Sankar

unread,
Aug 27, 2009, 5:39:53 AM8/27/09
to Scalable Synchronization Algorithms
> Hmmm... Various parameters of the queue (producer block size, consumer
> block size, algorithm used by get_item_idx()) may be tuned. I've just
> tried to pick parameters that maximize throughput in my particular
> environment. Currently I have no idea as to what parameters must be on
> different machine or under different workload.
> Still opened question is how and why they affect performance (why
> current queue works better than my initial variant?).
> Other than that I can't think of any other improvement now. I've tried
> to incorporate everything I consider important.

But how would you be able to tune these parameters( producer block
size, consumer
block size, algorithm used by get_item_idx() ) in a generic way to
address all environments.
Do you mean to say that you will be able to do that once you find the
reason as to why
your current queue works better than your initial variant?

> What exactly do you mean? New algorithms?
Ya scalable synchronization algorithms( where u see lot of scope
for improvements) where I can put my head
and learn more(and probably use what I have learned til now) to come
out with something new.

> In general I think that MPMC queue is a busted idea, because it
> provoke contention. In general the way to go is MPSC/SPSC queues.
Oh. But I still see your queue faster compared to other queues even
for MPSC/SPSC cases.

--
Sankar

Sankar

unread,
Aug 31, 2009, 12:25:45 PM8/31/09
to Scalable Synchronization Algorithms
I went through part of the code of MPMC queue. I have the following
questions :

1. Why do the enqueue and dequeue methods require a thread_specific
thread_proxy variable
to be supplied. This can be handled inside the queue itself rite? The
enqueue and dequeue interfaces will get simplified as well.

2. I couldnt understand the terms "memory_order_relaxed,
memory_order_consume .. " used in the
code. If Im right I think these are to address the different
consistency models on different architectures. I have only worked on
x86 so far, so have never come across these( though I have seen these
terms in TBB atomic datatstructure). Can you suggest references where
I can learn these?

3. Could you explain about the event count implementation which you
have implemented. I dont understand the different terms which you have
used. How does it improve things over boost condition variable?

Regards,
Sankar

Dmitriy V'jukov

unread,
Aug 31, 2009, 12:56:10 PM8/31/09
to Scalable Synchronization Algorithms
On Aug 31, 9:25 am, Sankar <shankar.n...@gmail.com> wrote:
> I went through part of the code of MPMC queue. I have the following
> questions :
>
> 1. Why do the enqueue and dequeue methods require a thread_specific
> thread_proxy variable
> to be supplied. This can be handled inside the queue itself rite? The
> enqueue and dequeue interfaces will get simplified as well.

How can we handle this inside the queue itself?


> 2. I couldnt understand the terms "memory_order_relaxed,
> memory_order_consume .. " used in the
> code. If Im right I think these are to address the different
> consistency models on different architectures. I have only worked on
> x86 so far, so have never come across these( though I have seen these
> terms in TBB atomic datatstructure). Can you suggest references where
> I can learn these?

Yeah, TBB has similar atomics, however they never document their
semantics... Probably just because they are not a part of the public
interface.
In one word, memory_order_XXX constants control mutual ordering of
memory accesses.
My atomics mimic C++0x's atomics, so all these memory_order_relaxed,
memory_order_consume are precisely described in C++0x draft:
http://www.open-std.org/Jtc1/sc22/wg21/docs/papers/2009/n2857.pdf


> 3. Could you explain about the event count implementation which you
> have implemented. I dont understand the different terms which you have
> used. How does it improve things over boost condition variable?

Eventcount is, figuratively, condition variable for lock-free
algorithms.
Condition variable works only in pair with mutex. And mutations and
reads of the protected state must be done under the mutex.
So with condition variable producer code will look like:

void queue::enqueue(void* data)
{
enqueue_impl(data);
m_mtx.lock();
if (m_waiting_consumers)
m_condvar.broadcast();
m_mtx.unlock();
}

Since you have mutex operations on common-path, the whole idea of lock-
free gets destroyed. Now you better just move queue manipulation into
critical section, and forget about lock-free.
That's what about condition variables.

And eventcount solves the same problem (notification about state
change), but DOES NOT require a mutex, and with a minimum overhead on
producer and consumer.


--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Aug 31, 2009, 1:04:10 PM8/31/09
to Scalable Synchronization Algorithms
On Aug 27, 2:39 am, Sankar <shankar.n...@gmail.com> wrote:
> > Hmmm... Various parameters of the queue (producer block size, consumer
> > block size, algorithm used by get_item_idx()) may be tuned. I've just
> > tried to pick parameters that maximize throughput in my particular
> > environment. Currently I have no idea as to what parameters must be on
> > different machine or under different workload.
> > Still opened question is how and why they affect performance (why
> > current queue works better than my initial variant?).
> > Other than that I can't think of any other improvement now. I've tried
> > to incorporate everything I consider important.
>
> But how would you be able to tune these parameters( producer block
> size, consumer
> block size, algorithm used by get_item_idx() ) in a generic way to
> address all environments.

Well, I don't know. Maybe they are equal for all environments :)


> Do you mean to say that you will be able to do that once you find the
> reason as to why
> your current queue works better than your initial variant?

Maybe that will help, I don't know.


> > What exactly do you mean? New algorithms?
>
>       Ya scalable synchronization algorithms( where u see lot of scope
> for improvements) where I can put my head
> and learn more(and probably use what I have learned til now) to come
> out with something new.


IMHO, the most difficult and still unsolved is the problem of safe
memory reclamation (or deferred reclamation or garbage collection) for
native C/C++ environment.
I mean algorithms like SMR (safe memory reclamation), RCU (read copy
update), ROP (repeat offender problem), atomic reference counting,
proxy collector, SMR+RCU, VZOOM, etc.
However all of them have serious limitations (like work only in kernel
space, or handle only limited number of objects per thread, of have
big overheads, or require special periodic activity of all threads, or
not enough prompt, etc).



--
Dmitriy V'jukov

Sharath Gururaj

unread,
May 26, 2017, 6:44:35 AM5/26/17
to Scalable Synchronization Algorithms
Where can I see the code? I dont see it in this thread.

Dmitry Vyukov

unread,
May 29, 2017, 2:35:22 AM5/29/17
to lock...@googlegroups.com
This looks like almost complete code:
https://groups.google.com/d/msg/lock-free/MXPBpopJ-JA/16EwXUpaMkcJ
There is also some code in previous messages.
> --
>
> ---
> You received this message because you are subscribed to the Google Groups
> "Scalable Synchronization Algorithms" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to lock-free+...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/lock-free/c24e9ef1-5481-4c1a-b606-691833d28bfa%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
Dmitry Vyukov

All about lockfree/waitfree algorithms, multicore, scalability,
parallel computing and related topics:
http://www.1024cores.net

饶萌

unread,
Oct 18, 2018, 4:42:13 AM10/18/18
to Scalable Synchronization Algorithms
I've just created a Wait-Free MPMC queue in 100+ lines of C++11 code: WFMPMC, can you help check?

Thanks,
Meng

在 2017年5月29日星期一 UTC+8下午2:35:22,Dmitry Vyukov写道:
Reply all
Reply to author
Forward
0 new messages