There is no limitations on queue size and producer/consumer count.
Implementation requires only CAS instruction. Values must be of word
size (if DWCAS supported then values can be of dword size). There must
be some reserved value that never enqueued (currently 0).
The main point is: for underlying data-structure I use nested-list,
i.e. values organized in continuous buffers of some constant size
(nests), nests linked in single-linked list. Nest filled with values
only once and only once consumed, then nest is recycled. So there is
no cyclic buffers like in bounded queue on top of vector.
Schema of dequeue operation:
1. Get current tail nest
2. Read current dequeue position in this nest
3. If current tail nest if fully consumed and there is next nest then
recycle the nest and try to move to next nest
4. If current tail nest if fully consumed and there is no next nest
then return 0
5. Read value in current dequeue position
6. If value == 0 then return 0
7. Try to increment current dequeue position with CAS
8. If success then return value
9. If fail then goto 2
Important points about enqueue operation: I don't maintain current
enqueue position in nest at all. Instead I use enqueue position hint
and some sophisticated oracle to determine actual enqueue position.
Nest is zeroized after allocation, so I use CAS(&enqueue_position,
value, 0) to enqueue value. After successful enqueue I update enqueue
position hint with plain store operation.
Algorithm of oracle to determine actual enqueue position (subject for
modifications and improvements):
1. Check current enqueue position hint
2. If fail then check several next positions
3. If fail then use binary search in [enqueue_position_hint,
end_of_nest] to find right position
Note: hint can't point to position after actual enqueue position, only
before.
Schema of enqueue operation:
1. Get current head nest
2. Read current enqueue position hint in this nest
3. If nest is fully filled then allocate new nest and link this new
node as head
4. Try to write value with CAS(&enqueue_position, value, 0)
5. If success then return
6. If fail then use oracle to predict right enqueue position and goto
3
Implementation notes.
1. For managing nest life time and prevent ABA I use PDR scheme for
nests. I cache current nest in TLS, so atomic RMW issued only once for
nest. Head and tail pointers is strong PDR pointers, all other
pointers to nests is normal PDR pointers.
Maybe other scheme can be used here to prevent ABA and premature node
reusing. But I use PDR here for simplicity.
2. For allocation/deallocation of nests I use lock-free stack-based
freelist. Before nest is placed to freelist, nest is cleared, i.e.
buffer zeroized and enqueue/dequeue positions set to 0. If there are
no nests in freelist, allocation requests forwarded to system
allocator.
Here is code sketch:
unsigned const nest_size = 1000;
struct nest_t
{
// can be incorrect
unsigned enqueue_pos_hint_;
// always maintained in correct state
unsigned dequeue_pos_;
void* buffer_[nest_size];
nest_t* next_;
nest_t()
{
next_ = 0;
enqueue_pos_hint_ = 0;
dequeue_pos_ = 0;
memset(buffer_, 0, sizeof(buffer_));
}
};
struct mpmc_queue_t
{
global_ptr<nest_t> head_;
global_ptr<nest_t> tail_;
mpmc_queue_t()
{
head_ = tail_ = alloc_node();
}
void enqueue(void* value)
{
// here strong acquire of head
// cached in tls, so atomic rmw issued
// only once for nest
local_ptr nest (head_);
unsigned pos = nest->enqueue_pos_hint_;
do
{
if (nest_is_over(nest.get(), pos))
{
// current nest is full
// have to allocate new nest and link it
nest_t* new_nest = alloc_node();
nest_t* old_nest = CAS(nest->next_, new_nest, (nest_t*)0);
if (!old_nest)
{
head_.CAS(new_nest, nest.get());
}
else
{
head_.CAS(old_nest, nest.get());
free_node(new_nest);
}
// update current nest
nest = head_;
pos = nest->enqueue_pos_hint_;
}
// check whether current pos is empty
if (check_enqueue_pos(nest.get(), pos))
{
// try to write the value
if (!CAS_fail(nest->buffer_[pos], value, 0))
{
// plain store to enqueue_pos_hint_
nest->enqueue_pos_hint_ = pos + 1;
return;
}
}
// here some sophisticated logic
// to determine enqueue pos
// involves look-ahead, binary-search etc...
pos = enqueue_pos_oracle(nest.get(), pos);
} while (true);
}
void* dequeue()
{
// here strong acquire of head
// cached in tls, so atomic rmw issued
// only once for nest
local_ptr nest (tail_);
char* value;
unsigned pos;
do
{
pos = nest->dequeue_pos_;
if (nest_is_over(nest.get(), pos))
{
// current nest is fully consumed
// try to switch to next nest
nest_t* next = nest->next_;
if (!next) return 0;
tail_.CAS(next, nest.get());
nest = tail_;
}
// read current value
value = nest->buffer_[pos];
if (!value) return 0;
}
// try to increment dequeue pos
while (CAS_fail(nest->dequeue_pos_, pos + 1, pos));
return value;
}
};
I think that nested-list based lock-free data structures are very
interesting and perspective. I haven't seen it before. Definitely some
other lock-free data structures can be build on top of nested-list...
Comments/suggestions/thoughts are welcome ;)
Dmitriy V'jukov
> Comments/suggestions/thoughts are welcome ;)
I am looking forward to reading through this code! Thank you for trying to
come up with solutions to this which only rely on single CAS on pop, or
push...
;^)
Humm...
:^/
Something like this: 2020 CASes to enqueue and dequeue 1000 values...
In integer arithmetic: 2020/1000 = 2 CASes to enqueue and dequeue 1
value
:)
Dmitriy V'jukov
1 CAS per operation _in_common_case_
Particularly: 1 CAS per operation if nest in not fully filled/consumed
When new nest comes into play more CASes needed to manage lifetime of
nest, allocate new nest, enqueue nest into list...
At least I don't relay on GC, don't require DWCAS/MCAS like all those
"scientists" :)
Dmitriy V'jukov
Some more thoughts.
Advantages over list based queue:
- I don't have to allocate/deallocate/manage lifetime of individual
nodes for every value. So few CASes go away.
Advantages over vector based queue:
- Unbounded size. Additional nests can be allocated as needed. And
excess nests can be deallocated (thanks to PDR).
- Nest filled with values only once and consumed only once. So I
don't need to cope with situation when head pursue tail and consumer
don't have to zeroize cell after himself (whole nest zeroized at once
when recycled).
Disadvantages:
- Complicated logic. I have to implement and list logic (on nest
level) and vector logic (on individual cell level).
I think that this queue will kick ass of any other queue out there :)
I am not sure about scheme when we have N^2 spsc queues (everyone-to-
everyone) and consumer multiplex them. But this scheme is not producer-
consumer strictly saying.
Dmitriy V'jukov
[...]
Okay. So your "fast-path" only has one CAS. Fine.
[...]
> At least I don't relay on GC, don't require DWCAS/MCAS like all those
> "scientists" :)
You got that right!
Not the queuing scheme I invented for the distributed lock-free event
multiplexer used in vZOOM.
--- I can do unbounded queue with no interlocked operations and no
#LoadLoad, #StoreLoad or #LoadStore style memory barriers at all... I have
not posted the code for this. ---
No Jokes.
The multiplexer/demultiplexer makes distributes the messages from multiple
producers to multiple consumers.
Therefore, the queuing system can also act a delegate-based event
multi-caster.
I used this quite a bit in some GUI applications I was tinkering around with
a while ago.
> No Jokes.
Let me guess :)
You dedicate one thread as multiplexer/demultiplexer. And use 2*N spsc
queues (not N*(N-1) ). So every other thread have queue to multiplexer/
demultiplexer and from multiplexer/demultiplexer. So every thread
doesnt' need to do event multiplexing manually.
?
Dmitriy V'jukov
Thanks, that's a useful technique.
--
David Hopwood <david....@industrial-designers.co.uk>
Most useful lock-free techniques make use of PDR in some way or another.
Well, any thread can act as a multiplexer or demultiplexer. Any thread can
receive messages from any other thread. A thread needs to register with
another thread for communication. So, when I create a thread, I use PDR to
walk a linked-list of active thread structures, and pushes message
descriptors, onto itself and each thread. Now, the newly created thread can
multicast a single message to any other thread. Basically, you end up with
any thread being able to multicast to any other thread, using nothing but
atomic operation and membar free (e.g., except for #StoreStore) single
producer-consumer queues.
BTW, yes Dmitriy, I think you created a workable queue. Good job.
:^)
I have to work on my habit of posting experimental codes to public places!
o tempora! o mores! :)
Dmitriy V'jukov
> You know Dmitriy, sometimes is good to keep important things secret for a
> while...
>
> I have to work on my habit of posting experimental codes to public places!
Why don't you work on cutting down on the smilies?
Bitter people like myself don't like to be reminded of "smiling".
will do.
> Well, any thread can act as a multiplexer or demultiplexer. Any thread can
> receive messages from any other thread. A thread needs to register with
> another thread for communication. So, when I create a thread, I use PDR to
> walk a linked-list of active thread structures, and pushes message
> descriptors, onto itself and each thread. Now, the newly created thread can
> multicast a single message to any other thread. Basically, you end up with
> any thread being able to multicast to any other thread, using nothing but
> atomic operation and membar free (e.g., except for #StoreStore) single
> producer-consumer queues.
So you create N^2 queues...
Don't you think that this is too costly? For example if I have 4 cores
and 4 threads, then I would have 16 queues. So.. this is acceptably.
But is common pattern to create threads more then cores. For example
if I have 32 threads, I would have 1000 queues and every thread would
have to multiplex 31 queue... This is arguably...
Maybe on ccNUMA, this is reasonable, but on few-core desktop server
imho this is not worth...
Btw your solution is not producer-consumer strictly saying. Because
there is no load balancing. And it is not so easy to attach nontrivial
(not round-robin) load balancing, anyway this increase overheads
further...
So I think you solution is just to another problem.
It is more solution to messaging. My solution is more to producer-
consumer.
And maybe even for messaging I would prefer solution based on mpsc
queues on top of lifo stack with reverse consuming. It has only 1 CAS
per enqueue and 0 (amortized) CAS on dequeue. After all CAS is only
CAS, it is not call to remote web-service over SOAP :) And you can
easyly overtake CAS overheads if you have O(N) complexity and you do a
lot of other work instead of 1 CAS...
This is my imho. But I target more to few-core systems, not to many-
core ccNUMA systems, I just didn't work with them...
Dmitriy V'jukov
It totally depends on how the user sets up their environment. A common setup
is to use a single-thread as the multiplexer/demultiplexer. Another possible
setup is allowing several threads, or every thread, to act as the
multiplexer/demultiplexer. Its usually useful to have several threads
multicasting so that your messages don't get bottlenecked, which can happen
if you use a single-thread.
I need to clarify this statement:
">>So, when I create a thread, I use PDR to
>> walk a linked-list of active thread structures, and pushes message
>> descriptors, onto itself and each thread. "
which should read:
"So, when I create a thread which requests to act as a
multiplexer/demultiplexer, I use PDR to
walk a linked-list of active thread structures, and pushes message
descriptors, onto itself and each thread. "
A thread that does not request to act as a multiplexer/demultiplexer does
not need to walk the thread list and push any message descriptors. They can
just produce messages which can get multicasted to every other thread,
however, they will NOT be doing the multicasting themselves...
> Btw your solution is not producer-consumer strictly saying. Because
> there is no load balancing. And it is not so easy to attach nontrivial
> (not round-robin) load balancing, anyway this increase overheads
> further...
There are load-balancing mechniasims if the user chooses to use them.
> So I think you solution is just to another problem.
> It is more solution to messaging. My solution is more to producer-
> consumer.
Well, I can produce a single message which can get multicasted to every
other thread. This can work. The difference is wether the thread can
multi-cast the message itself, or not.
> And maybe even for messaging I would prefer solution based on mpsc
> queues on top of lifo stack with reverse consuming.
I remember when Joe Seigh mentiond this idea a while back:
[...]
http://groups.google.ca/group/comp.programming.threads/msg/a359bfb41c68b98b
Yup. The idea has been around, and it works... I just wonder about lengthy
reversals...
In this type of design, you would normally only have N^2 queues where
N is the number of cores, not the number of threads. The scheduler for
each core can distribute messages to its own threads without any additional
synchronization (this assumes that the scheduler and the message passing
implementation are cooperating, which probably requires that they are
both implemented by the same library, language implementation, or OS).
The most complicated part of making such a design work well would be load
balancing, not the messaging itself.
--
David Hopwood <david....@industrial-designers.co.uk>
> Yup. The idea has been around, and it works... I just wonder about lengthy
> reversals...
You can fix the problem this way:
1. Producer, after pushing item onto the stack, must store backlink
from previous node to current with ordinal store
2. Producer, that push first element onto the stack, store link to the
node into tail pointer with ordinal store
So we get "almost double-linked list" here
3. Consumer first load tail pointer
4. Second XCHG head pointer to 0 (as usual)
5. Go through backlinks and consume. No reversals!
6. If backlink is not yet stored by producer to the moment consumer
want to consume the item, there are 2 ways:
6.1. Just regard there are no next item yet
6.2. Repair backlinks manually
(and this also relate to situation when tail pointer is not yet
stored)
(assume this is rare condition, i.e. backlinks and tail pointer
usually must be stored already)
What do you think?
Dmitriy V'jukov
> void enqueue(void* value)
> {
> // here strong acquire of head
> // cached in tls, so atomic rmw issued
> // only once for nest
> local_ptr nest (head_);
^^^^^^^^^^
So, the local_ptr's in the example are all in per-thread structures whose
pointers are stored in TLS? For the lifetime of a thread?
Or, do you have to acquire strong PDR reference 1 time per-call into
enqueue/dequeue?
Please clarify... Thanks.
> In this type of design, you would normally only have N^2 queues where
> N is the number of cores, not the number of threads. The scheduler for
> each core can distribute messages to its own threads without any additional
> synchronization (this assumes that the scheduler and the message passing
> implementation are cooperating, which probably requires that they are
> both implemented by the same library, language implementation, or OS).
It seems that you are saying about some specific situation...
I don't want to require user of this solution to bind threads to
cores. And I certainly don't want to make any invasion to language or
OS core. So all I have is threads. Imho the fewer the assumptions, the
more usable the solution.
And wrt some particular application, normally I don't want to bind
threads to cores too, because it prevents OS to make optimal dynamic
scheduling (load-balancing is in one sense).
Dmitriy V'jukov
Seems like there still could be a race-condition wrt setting the backlink
from the previous node. Humm...
Here I imply something like this:
http://groups.google.com/group/comp.programming.threads/msg/91d01886ae690185
I.e. I mean something like this:
void enqueue(local_mediator* m, void* value)
{
local_ptr nest (m, head_);
// ...
}
void producer_thread()
{
// ...
local_mediator m; // <-- here cached acquired pointer
while (true)
{
queue.enqueue(&m, value);
}
// ...
}
So, here we acquire only weak reference, not strong.
But certanly we can store this cached pointer in tls. Then I imply
something like:
void arbitrary_user_thread_func()
{
// here we can initialize all per-thread structures
// and in destructor we will deinitialize and release all cached
references
my_library_thread_initializer initializer;
// ...
// here arbitrary user work
// including calls to my shared queue
// ...
}
If it is "well-known" queue in the program, then we can place
local_mediator object on stack of every thread that works with the
queue. And this is the better variant.
If it is not "well-known" queue in the program, then we must use tls.
And also I think of the next scheme: we can not use PDR here at all.
Then nests will not be freed to system-allocator at all, only
recycled. And then we must fix ABA some different way. We must fix ABA
for enqueueing, dequeueing, moving of head pointer and moving of tail
pointer. I fix some of this, but not all. Anyway algorithm becomes
_very_ messy w/o PDR. So I don't think elemination of PDR is worthy of
it...
Dmitriy V'jukov