On Nov 12, 4:05 am, Pierre Habouzit <pierre.habou...@intersec-
Hi Pierre,
Yes, it's a way too much overhead for RUNNABLE/NONRUNNABLE detection.
It can be done with ~1 cycle overhead for producers and 0 cycle
overhead for consumer's fast-path. The idea is that we only need to
track state changes between RUNNABLE<->NONRUNNABLE, moreover on
producer's side it can be combined with the XCHG in enqueue(), while
on consumer's side we need an additional RMW which is executed only
when queue become empty. The key is that we can use low bit of queue
tail pointer as RUNNABLE/NONRUNNABLE flag.
Here is the code (and yes, here I still call queue's tail as 'head'):
template<typename T>
T XCHG(T volatile* dest, T value)
{
return (T)_InterlockedExchange((long*)dest, (long)value);
}
template<typename T>
bool CAS(T volatile* dest, T cmp, T xchg)
{
return cmp == (T)_InterlockedCompareExchange((long*)dest,
(long)xchg, (long)cmp);
}
struct mpscq_node_t
{
mpscq_node_t* volatile next;
void* state;
};
struct mpscq_t
{
mpscq_node_t* volatile head;
mpscq_node_t* tail;
};
void mpscq_create(mpscq_t* self, mpscq_node_t* stub)
{
stub->next = 0;
self->tail = stub;
// mark it as empty
self->head = (mpscq_node_t*)((uintptr_t)stub | 1);
}
bool spscq_push(mpscq_t* self, mpscq_node_t* n)
{
n->next = 0;
// serialization-point wrt producers
mpscq_node_t* prev = XCHG(&self->head, n);
// only 2 AND instructions on fast-path added
bool was_empty = ((uintptr_t)prev & 1) != 0;
prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
prev->next = n; // serialization-point wrt consumer
return was_empty;
}
mpscq_node_t* spscq_pop(mpscq_t* self)
{
mpscq_node_t* tail = self->tail;
l_retry:
// fast-path is not modified
mpscq_node_t* next = tail->next; // serialization-point wrt
producers
if (next)
{
self->tail = next;
tail->state = next->state;
return tail;
}
mpscq_node_t* head = self->head;
// if head is marked as empty,
// then the queue had not be scheduled in the first place
assert(((uintptr_t)head & 1) == 0);
if (tail != head)
{
// there is just a temporal gap -> wait for the producer to
update 'next' link
while (tail->next == 0)
_mm_pause();
goto l_retry;
}
else
{
// the queue seems to be really empty -> try to mark it as
empty
mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
if (CAS(&self->head, tail, xchg))
// if we succesfully marked it as empty -> return
// the following producer will re-schedule the queue for
execution
return 0;
// producer had enqueued new item
goto l_retry;
}
}
int main()
{
mpscq_t q;
mpscq_create(&q, new mpscq_node_t);
mpscq_node_t* n = 0;
//n = spscq_pop(&q);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);
spscq_push(&q, new mpscq_node_t);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);
n = spscq_pop(&q);
spscq_push(&q, new mpscq_node_t);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);
n = spscq_pop(&q);
}
Now, whenever spscq_push() returns true, the thread has to put it into
scheduler for execution. Once the queue scheduled for execution, a
thread pops from the queue until it returns 0.
The nice thing is that consumer is not obliged to process all items in
a queue. It can process for example 100 items, and then just return
the queue to scheduler, and switch to other starving queues.
Also with regard to that _mm_pause() loop in pop(), instead of waiting
for producer to restore 'next' pointer, consumer can decide to just
return the queue to scheduler. Then handle some other queues, and when
he will back the first queue, the 'next' link will be most likely
already restored. So no senseless waiting.
--
Dmitriy V'jukov