Something for Senderx to play with.
Joe Seigh
Ahh yes, your right. Its all monotonic counters. A nice solution was staring
me right in the face, and I did not see it!
> Something for Senderx to play with.
Thank you! ;)
SenderX wrote:
>
> >I noticed that if you implement a lock-free LIFO queue where the
> > version count is incremented when you push onto the queue that
> > the version count can be used as an eventcount.
>
> Ahh yes, your right. Its all monotonic counters. A nice solution was staring
> me right in the face, and I did not see it!
>
You only do a futex_wake if you see waiters, so basically adding blocking to
a lock-free queue is free. But if you expect any significant amount of
blocking (anything more than infrequent) then you're better off using
a mutex+condvar. Eventcounts are susceptable to thundering herd since they're
a little too non-blocking. Locks seem to do better under those conditions
since they're sort of a semaphore which limits the number of runable threads
somewhat. That's not to say locks are better.
Joe Seigh
Implementation might look something like:
____________________________________________________________
struct node {
node* nx;
};
class stack {
enum const_e {
WAITBIT = 0x80000000
};
struct anchor64 {
node* hd;
int32_t ver;
};
typedef anchor64 anchor;
anchor m_anchor; // = { 0 }
mutex m_mtx;
condvar m_cv;
private:
anchor sys_trypop(int32_t wbit) {
anchor cmp, xchg;
do {
cmp = m_anchor;
if (! cmp.hd) {
xchg.hd = 0;
xchg.ver = cmp.ver | wbit;
} else {
xchg.hd = cmp.hd->nx; // hazard
xchg.ver = cmp.ver;
}
} while (! DWCAS(&m_anchor, &cmp, &xchg));
return cmp;
}
public:
void push(node* const n) {
anchor cmp, xchg;
xchg.hd = n;
do {
cmp = m_anchor;
n->nx = cmp.hd;
if (cmp.ver & WAITBIT) {
xchg.ver = (cmp.ver & ~WAITBIT) + 1;
} else {
xchg.ver = cmp.ver + 1;
}
} while (! DWCAS(&m_anchor, &cmp, &xchg));
if (cmp.ver & WAITBIT) {
m_cv.broadcast();
}
}
node* trypop() {
return sys_trypop(0).hd;
}
node* waitpop() {
for (;;) {
anchor64 const cmp1 = sys_trypop(WAITBIT);
if (cmp1.hd) {
break;
} else {
mutex::scopeguard lock(m_mtx);
anchor64 const cmp2 = sys_trypop(0);
if (cmp2.hd) {
return cmp2.hd;
} else if (cmp1.ver == cmp2.ver) {
m_cv.wait(lock);
}
}
}
return cmp1.hd;
}
};
____________________________________________________________
That should do it...
> public:
> void push(node* const n) {
> anchor cmp, xchg;
> xchg.hd = n;
> do {
> cmp = m_anchor;
> n->nx = cmp.hd;
> if (cmp.ver & WAITBIT) {
> xchg.ver = (cmp.ver & ~WAITBIT) + 1;
> } else {
> xchg.ver = cmp.ver + 1;
> }
> } while (! DWCAS(&m_anchor, &cmp, &xchg));
> if (cmp.ver & WAITBIT) {
> m_cv.broadcast();
> }
> }
[...]
I believe that there is a race-condition here. It has to do with resetting
the waiters bit. It needs to be done under the protection the lock. Here is
the fixed function:
____________________________________________________________
void push(node* const n) {
bool locked = false;
anchor cmp, xchg;
xchg.hd = n;
do {
cmp = m_anchor;
n->nx = cmp.hd;
if (cmp.ver & WAITBIT) {
if (! locked) {
m_mtx.lock();
locked = true;
}
xchg.ver = (cmp.ver & ~WAITBIT) + 1;
} else {
xchg.ver = cmp.ver + 1;
}
} while (! DWCAS(&m_anchor, &cmp, &xchg));
if (locked) {
m_cv.broadcast();
m_mtx.unlock();
}
}
____________________________________________________________
There... Now that eliminates the race-condition all together. This makes
resetting the waiters-bit and broadcasting to the condvar atomic with the
slow-path in the waitpop function.
Sorry for all that.
It's better to define WAITBIT as 1 and increment count by 2. Then you
don't need to cope with counter overflows in every operation.
Dmitriy V'jukov
Actually, now that I think about it, alls I need to do is lock the mutex
before a broadcast. I don't need to lock it while performing the DWCAS. So,
it can look like:
______________________________________________________________
void push(node* const n) {
anchor cmp, xchg;
xchg.hd = n;
do {
cmp = m_anchor;
n->nx = cmp.hd;
if (cmp.ver & WAITBIT) {
xchg.ver = (cmp.ver & ~WAITBIT) + 1;
} else {
xchg.ver = cmp.ver + 1;
}
} while (! DWCAS(&m_anchor, &cmp, &xchg));
if (cmp.ver & WAITBIT) {
mutex::scopeguard lock(m_mtx);
m_cv.broadcast();
}
}
______________________________________________________________
That eliminates the race-condition as well.
I see your point.