Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss
Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

MPSC FIFO Queue w/o atomic_rmw/membars

57 views
Skip to first unread message

Dmitriy Vyukov

unread,
Jun 29, 2007, 6:30:55 AM6/29/07
to
Features:
- multiple producer/single consumer
- strong fifo
- unbounded
- no atomic_rmw or memory barriers (at least on x86, on other
architectures #StoreStore may be needed while enqueuing) while
enqueuing/dequeuing *and* while node management/reclamation

I combine ideas from here:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/378a35b21ae2b42e
(lock-free node allocator for fifo-queues)

and from here:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/bf2261a78b4d3188
(MPMC Unbounded FIFO Queue w/ 1 CAS/Operation)

And add some kind of TCP acknowledgement :)

The idea.
Before putting node to shared buffer producer puts node to his own
local list, and then put node to shared buffer. In shared buffer node
can get... lost. You don't mishear. Node is stored to shared buffer
with plain store, no CAS, so node can get lost.
Consumer remember last consumed node for every producer. And before
consuming node consumer check whether this the next node after last
consumed node. If this is the next node after last consumed node, then
all is ok. And if this is not the next node after last consumed node,
then some nodes from this consumer get lost, and consumer restore lost
nodes using producer local list of nodes.

Here is the code:

----------------------------------------------------------------------------------------

struct producer_local_t;

struct node_t
{
void* user_data;

// used to organize producer local
// list of enqueued nodes
node_t* local_next;

// incremented with every produced node
unsigned tag;

producer_local_t* producer_local;

// whether node is already consumed or not
bool is_consumed;
};

struct producer_local_t
{
// last consumed node
node_t* last_node;

// last consumed node tag
unsigned last_tag;
};

struct producer_t
{
producer_local_t local;

unsigned tag_counter;

// newest allocated node
node_t* head;

// oldest allocated node
node_t* tail;

producer_t()
{
local.last_node = 0;
local.last_tag = 0;
tag_counter = 0;
head = 0;
tail = 0;
}
};

struct mpsc_queue_t
{
// for simplicity assuming that the buffer is infinite
static const unsigned infinity = 100;
node_t* buffer[infinity];

node_t** consume_pos;

node_t** produce_pos_hint;

mpsc_queue_t()
{
memset(buffer, 0, sizeof(buffer));
consume_pos = buffer;
produce_pos_hint = buffer;
}

void enqueue(producer_t* producer, void* data)
{
// multiple producers here

if (0 == producer->local.last_node)
first_time_setup(producer);

// allocate node for enqueue
node_t* node = 0;

// try to reuse oldest node
if (producer->tail->is_consumed)
{
// reuse oldest node
node = producer->tail;
producer->tail = producer->tail->local_next;
}
else
{
// allocate new node
node = new node_t;
node->producer_local = &producer->local;
}

// fill node
node->is_consumed = false;
node->local_next = 0;
node->tag = producer->tag_counter++;
node->user_data = data;

producer->head->local_next = node;
producer->head = node;

//#StoreStore

// get next empty position
node_t** pos = oracle();

// hazard store
// this store potentially can be
// overwritten by other producer
*pos = node;

// one more hazard store
produce_pos_hint = pos + 1;
}

void first_time_setup(producer_t* producer)
{
// allocate new node
node_t* node = new node_t;
node->producer_local = &producer->local;
node->is_consumed = false;
node->local_next = 0;
node->tag = producer->tag_counter++;
node->user_data = 0;

producer->head = node;
producer->tail = node;
producer->local.last_node = node;
}

node_t** oracle()
{
// for now just find first empty cell in buffer
node_t** node = produce_pos_hint;
while (*node) ++node;
return node;
}

void* dequeue()
{
// only single consumer here

node_t* node = *consume_pos;

if (0 == node)
return 0;

// Only rely on data-dependency here

// chech whether we have lost some nodes
// since previous consumed node from the producer or not
producer_local_t* local = node->producer_local;
if (local->last_tag + 1 == node->tag)
// producer tags are consistent
// i.e. we don't lost any nodes,
// so just increment pos
++consume_pos;
else
// producer tags are inconsistent
// i.e. we have lost some nodes,
// so get next node from previous
// consumed node from the producer
// and _don't_ increment pos
node = local->last_node->local_next;

// we don't need last consumed node,
// so mark the node accordingly
local->last_node->is_consumed = true;

// set last consumed node
local->last_node = node;
local->last_tag = node->tag;

return node->user_data;
}
};

----------------------------------------------------------------------------------------

And here is usage example:

mpsc_queue_t queue;

void* produce() {return 0;}
void consume(void* data) {}

void producer_thread()
{
producer_t producer;
while (true)
{
void* data = produce();
queue.enqueue(&producer, data);
}
}

void consumer_thread()
{
while (true)
{
void* data = queue.dequeue();
if (data) consume(data);
}
}

----------------------------------------------------------------------------------------

And here is single-threaded test case:

int main()
{
mpsc_queue_t queue;
producer_t producer1;
producer_t producer2;

void* d1 = queue.dequeue();

queue.enqueue(&producer1, (void*)1);

void* d2 = queue.dequeue();
void* d3 = queue.dequeue();

queue.enqueue(&producer1, (void*)2);

queue.enqueue(&producer2, (void*)3);
queue.enqueue(&producer2, (void*)4);

void* d4 = queue.dequeue();

queue.enqueue(&producer1, (void*)5);
queue.enqueue(&producer2, (void*)6);

void* d5 = queue.dequeue();
void* d6 = queue.dequeue();
void* d7 = queue.dequeue();
void* d8 = queue.dequeue();
void* d9 = queue.dequeue();
}


Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jun 29, 2007, 7:19:22 AM6/29/07
to
Dmitriy Vyukov wrote:

> struct mpsc_queue_t
> {
> // for simplicity assuming that the buffer is infinite
> static const unsigned infinity = 100;
> node_t* buffer[infinity];

Code rely on infinite buffer. To correct this fault solution from here
can be used:

I.e. nested list from big chunks can be used as underlying storage.
And nest lifetime can be managed with some form of PDR (RCU/SMR/
atomic_ptr)

Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jun 29, 2007, 8:37:56 AM6/29/07
to
On Jun 29, 2:30 pm, Dmitriy Vyukov <dvyu...@gmail.com> wrote:

> The idea.
> Before putting node to shared buffer producer puts node to his own
> local list, and then put node to shared buffer. In shared buffer node
> can get... lost. You don't mishear. Node is stored to shared buffer
> with plain store, no CAS, so node can get lost.

The main question is: what will be the frequency of node loss in
shared buffer?

The good news is: frequency up to 50% or even up to 66% is OK. Because
this means that 33% of nodes will be successfully enqueued, so
consumer can restore all other nodes.

Dmitriy V'jukov

Gil Hamilton

unread,
Jun 29, 2007, 9:42:37 AM6/29/07
to
Dmitriy Vyukov <dvy...@gmail.com> wrote in
news:1183113055.1...@e9g2000prf.googlegroups.com:
> Before putting node to shared buffer producer puts node to his own
> local list, and then put node to shared buffer. In shared buffer node
> can get... lost. You don't mishear. Node is stored to shared buffer
> with plain store, no CAS, so node can get lost.
> Consumer remember last consumed node for every producer. And before
> consuming node consumer check whether this the next node after last
> consumed node. If this is the next node after last consumed node, then
> all is ok. And if this is not the next node after last consumed node,
> then some nodes from this consumer get lost, and consumer restore lost
> nodes using producer local list of nodes.

You claim this algorithm is "strong fifo". However, in fact, since
multiple producers can potentially "lose" a node at the same instant, you
then have no way of determining which was first-in. In fact, at best
it's only fifo on a per-producer basis.

Furthermore, if/when a producer's node is lost, that will not even be
discovered until the next time that the producer manages to produce a
node that *isn't* lost, if ever. So if there are many producers and/or
some producers produce a node only occasionally, lost nodes can sit
unconsumed for an indefinite time.

If there are only a few producers, wouldn't it be far simpler and more
reliable to implement an array of spsc queues that are handled round-
robin?

Finally, you say "unbounded" but the code has this curious formulation:


> // for simplicity assuming that the buffer is infinite
> static const unsigned infinity = 100;
> node_t* buffer[infinity];

It seems to me that "unbounded" then is only true if in fact 'buffer' is
truly of infinite length: not a very realistic assumption. In
particular, the code doesn't handle the case where produce_pos_hint
reaches the end of buffer and there isn't any obvious good way of
handling that case here. Indeed, the re-use of buffer positions among
the producers here would seem to be a complex problem in itself, not
something to be assumed away.

(One other minor note: you need to pull the 'node->user_data' value out
before you mark the node with 'is_consumed = true'.)

In short (as often seems to be the case with code snippets posted to this
group [and I have been guilty of this myself in the past]) the utility of
your algorithm relies on certain critical assumptions -- or the
implementation of certain critical details -- that are not provided.

GH

Dmitriy Vyukov

unread,
Jun 29, 2007, 10:47:01 AM6/29/07
to
On Jun 29, 5:42 pm, Gil Hamilton <gil_hamil...@hotmail.com> wrote:

First of all, thank you for such detailed review and comments.

> You claim this algorithm is "strong fifo". However, in fact, since
> multiple producers can potentially "lose" a node at the same instant, you
> then have no way of determining which was first-in. In fact, at best
> it's only fifo on a per-producer basis.

Yes. I mean exactly this. Strong fifo on per-producer basis. I was a
little unprecise in formulation. Imho there is a little sense to talk
about order between events in different threads...
Anyway this is from where I get possibility to eliminate all memory
barriers and atomic_rmw.


> Furthermore, if/when a producer's node is lost, that will not even be
> discovered until the next time that the producer manages to produce a
> node that *isn't* lost, if ever. So if there are many producers and/or
> some producers produce a node only occasionally, lost nodes can sit
> unconsumed for an indefinite time.

Yes. You are right.
The requirement like this must be added: "producer must produce
periodically".
And the open question: what will be the frequency of node loss? I
don't make any test yet. But I think (hope) that this frequency will
be low with real world workloads.
So assuming that producer produce periodically and loss frequency is
low, I don't think that this will be a big problem in most
environments. But definitely this moment must be taken into account.

> If there are only a few producers, wouldn't it be far simpler and more
> reliable to implement an array of spsc queues that are handled round-
> robin?

If there are few producers, then - yes. It will be very good solution.
But I am targeted and thinking about manycore machines with, for
example, 100 cores (Intel promise 80 cores in 5 years). And you can
have, for example, 2 threads per core. So 200 threads. Every consumer
must pull 200 spsc queues... And to block when all queues are empty,
consumer must check all 200 queues 2 times...


> Finally, you say "unbounded" but the code has this curious formulation:
>
> > // for simplicity assuming that the buffer is infinite
> > static const unsigned infinity = 100;
> > node_t* buffer[infinity];
>
> It seems to me that "unbounded" then is only true if in fact 'buffer' is
> truly of infinite length: not a very realistic assumption. In
> particular, the code doesn't handle the case where produce_pos_hint
> reaches the end of buffer and there isn't any obvious good way of
> handling that case here. Indeed, the re-use of buffer positions among
> the producers here would seem to be a complex problem in itself, not
> something to be assumed away.

I have solution for this problem.
I provide infinite buffer here, as I say, only for simplicity of code.
Please see this post:
http://groups.google.com/group/comp.programming.threads/msg/3563963963e4fc84?hl=en&
And this:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/bf2261a78b4d3188


> (One other minor note: you need to pull the 'node->user_data' value out
> before you mark the node with 'is_consumed = true'.)

I mark with 'is_consumed = true' previous node, not current. Please
see the code again:

// mark *previous* node


local->last_node->is_consumed = true;

// set last consumed node
local->last_node = node;
local->last_tag = node->tag;

// this node still have 'is_consumed = false'
return node->user_data;

> In short (as often seems to be the case with code snippets posted to this
> group [and I have been guilty of this myself in the past]) the utility of
> your algorithm relies on certain critical assumptions -- or the
> implementation of certain critical details -- that are not provided.

Unfortunately I don't have enough time/resources to provide fully
implemented/tested code. But I was trying to provide all necessary
details. Fill free to ask any details - I will try to provide all
details.
And one more time, thank for comments - there are very few people
which usually comment my such posts ;)


Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jun 30, 2007, 7:45:15 AM6/30/07
to
On 29 , 17:42, Gil Hamilton <gil_hamil...@hotmail.com> wrote:

> Furthermore, if/when a producer's node is lost, that will not even be
> discovered until the next time that the producer manages to produce a
> node that *isn't* lost, if ever. So if there are many producers and/or
> some producers produce a node only occasionally, lost nodes can sit
> unconsumed for an indefinite time.

Here can be different schemes.

Initially I was thinking about next scheme. Consumer doesn't determine
lost nodes. Producer determine lost node. If producer (while trying to
reuse node) see unconsumed node followed by consumed node, then
consumer determine lost node and enqueue this node repeatedly.
But this totally break down fifio order.

To prevent situation when lost nodes can sit unconsumed for an
indefinite time, consumer can actively check for lost nodes. I.e.
consumer remember last consumed node, so he can check
last_consumed_node->local_next and determine such nodes.
But this makes this queue more like N spsc queues...
Maybe some trick can be added to prevent situation when lost nodes can
sit unconsumed for an indefinite time and at the same time consumer
don't have to pull N local queues...

Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jun 30, 2007, 7:50:10 AM6/30/07
to
On 29 , 18:47, Dmitriy Vyukov <dvyu...@gmail.com> wrote:

> > If there are only a few producers, wouldn't it be far simpler and more
> > reliable to implement an array of spsc queues that are handled round-
> > robin?
>
> If there are few producers, then - yes. It will be very good solution.
> But I am targeted and thinking about manycore machines with, for
> example, 100 cores (Intel promise 80 cores in 5 years). And you can
> have, for example, 2 threads per core. So 200 threads. Every consumer
> must pull 200 spsc queues... And to block when all queues are empty,
> consumer must check all 200 queues 2 times...

And to determine total node count in N spsc queues, consumer have to
check all N queues too. Node count needed for load-balancing,
statistics, feedback etc...

I am thinking about solution when consumer have, for example, N/10
mpsc queues instead of N spsc queues (N - number of threads). So and
number of queues would be moderate and contention would be moderate
too.

Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jul 3, 2007, 12:51:20 PM7/3/07
to
On Jun 30, 3:45 pm, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> On 29 , 17:42, Gil Hamilton <gil_hamil...@hotmail.com> wrote:
>
> > Furthermore, if/when a producer's node is lost, that will not even be
> > discovered until the next time that the producer manages to produce a
> > node that *isn't* lost, if ever. So if there are many producers and/or
> > some producers produce a node only occasionally, lost nodes can sit
> > unconsumed for an indefinite time.

Imo this is the major drawback of this queue. So here is one more
solution to this.
Consumers can periodically enqueue fake elements into queue in their
'on_idle' state. This fake elements will push those 'frozen' nodes in
queue. I think this will practically solve the problem.

And if environment already assume some form of periodical thread
communication (for example RCU), this communication can push 'frozen'
nodes in queue.


Dmitriy V'jukov

0 new messages