http://groups.google.com/group/comp.programming.threads/browse_frm/thread/aa8c62ad06dbb380
This version has the ability to wake a specific number of waiters instead of
just broadcasting to every waiter. It also has correct wait-epoch logic
which makes creating a condition-variable trivial; a sample condvar
implementation is included. I only need to add timeout/cancel logic; that
should be done in a couple of days. Anyway here is the code-draft:
http://appcore.home.comcast.net/misc/winsync_v0001_evcount_hpp.html
__________________________________________________________________
/**************************************************************
***************************************************************
==============================================================
Experimental, Fast-Pathed Thread Synchronization for Windows
___________________________________________________________
// \\
\\ Created By //
_________//______________\\__________
// \\
*| |*
*| Chris M. Thomasson |*
*| |*
*| cri...@comcast.net |*
*| news://comp.programming.threads |*
*| http://appcore.home.comcast.net |*
*| |*
\\___________________________________//
==============================================================
/**************************************************************
**************************************************************/
#if ! defined(__cplusplus)
#error EVCOUNT_WIN_HPP - A valid C++ compiler is required!
#elif ! defined(EVCOUNT_WIN_V0001_HPP)
#define EVCOUNT_WIN_V0001_HPP() 0x1
/* Include Support
_______________________________________________*/
#undef _WIN32_WINNT
#undef WIN32_LEAN_AND_MEAN
#define _WIN32_WINNT 0x4000
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <exception>
#include <cassert>
#include <climits>
/* Unexpected Error macros
_______________________________________________*/
#if ! defined(EVCOUNT_WIN_UNEXPECTED)
#if defined(_MSC_VER)
#define EVCOUNT_WIN_UNEXPECTED(mp_this) \
assert(false), unexpected();
#else
#define EVCOUNT_WIN_UNEXPECTED(mp_this) \
assert(false), std::unexpected();
#endif
#endif
#if ! defined(EVCOUNT_WIN_CTOR_UNEXPECTED)
#define EVCOUNT_WIN_CTOR_UNEXPECTED(mp_this) \
assert(false); throw std::exception()
#endif
#if ! defined(EVCOUNT_WIN_DTOR_SANITY_UNEXPECTED)
#define EVCOUNT_WIN_DTOR_SANITY_UNEXPECTED(mp_this) \
assert(false)
#endif
#if ! defined(EVCOUNT_WIN_DTOR_UNEXPECTED)
#define EVCOUNT_WIN_DTOR_UNEXPECTED(mp_this)assert(false)
#endif
#if ! defined(EVCOUNT_WIN_ROBUST_RECOVER)
/*** TO-DO ***: Add robust recover logic!!! */
#define EVCOUNT_WIN_ROBUST_RECOVER EVCOUNT_WIN_UNEXPECTED
#endif
/* Library Version Selection Macros
_______________________________________________*/
#if ! defined(EVCOUNT_WIN_VER_DEFAULT)
#define EVCOUNT_WIN_VER_DEFAULT()v0001
#endif
#if ! defined(EVCOUNT_WIN_VER_SELECT)
#define EVCOUNT_WIN_VER_SELECT EVCOUNT_WIN_VER_DEFAULT
#endif
/* Interlocked API Support/Workarounds
_______________________________________________*/
#if defined(_WIN32_WINNT) && (_WIN32_WINNT <= 0x4000)
#define INTERLOCKED_NT40
#endif
#if ! defined(INTERLOCKED_VOLATILE)
#define INTERLOCKED_VOLATILE()
#endif
// add an interlocked load naked function
#if ! defined(InterlockedLoadNaked)
#define InterlockedLoadNaked(mp_pthis) ( \
(LONG)(*((LONG volatile*)(mp_pthis))) \
)
#endif
// add an interlocked load /w fence membar function
#if ! defined(InterlockedLoadFence)
#define InterlockedLoadFence(mp_pthis) ( \
InterlockedExchangeAdd((mp_pthis), 0) \
)
#endif
// Windows api-declaration fixup...
#if ! defined(InterlockedCompareExchangeLong)
#if defined(INTERLOCKED_NT40)
// ICE uses PVOID as parameters; we need to cast! ;(...
#define \
InterlockedCompareExchangeLong( \
mp_pthis, \
mp_xchg, \
mp_cmp \
) ( \
(LONG)InterlockedCompareExchange( \
(PVOID*)(mp_pthis), \
(PVOID)(mp_xchg), \
(PVOID)(mp_cmp) \
) \
)
#else
// assume that ICE uses LONG as parameters
#define InterlockedCompareExchangeLong \
InterlockedCompareExchange
#endif
#endif
/*************************************************************/
// System Library Version 0001
namespace winsyncsys {
namespace v0001 {
/**************************************************************
=============================================================*/
/* Wait/Lock-Free Fast-Pathed Event-Count
_____________________________________________________*/
class evcount {
// No Copy/Assign
//---------------------------------------
evcount(evcount const&);
evcount const& operator =(evcount const&);
/* Public Types
_______________________________________________*/
public:
typedef LONG evwaitkey;
/* Private Types/Flags/Ect
_______________________________________________*/
private:
#define EVCOUNTSYS_WIN_BROADCAST() LONG_MAX
enum flags_e {
WAITERS_BIT = 0x80000000 // indicates active waiters
};
/* Private State
_______________________________________________*/
private:
// event-count key - atomic monotonic counter /w waiters-bit
evwaitkey INTERLOCKED_VOLATILE() m_waitkey;
LONG m_sigreq; // signals reuqested
LONG m_sigepoch; // signals issued for current wait-epoch
HANDLE m_wmtx; // wait-set mutex - mutex
HANDLE m_wepoch; // wait-set epoch - manual-reset event
HANDLE m_wset; // watt-set - semaphore
/* Public Ctor/Dtor
_______________________________________________*/
public:
// constructor
//---------------------------------------
evcount()
: m_waitkey(0),
m_sigreq(0),
m_sigepoch(0),
// acquire our sync-resources
m_wmtx(CreateMutex(0, FALSE, 0)),
m_wepoch(CreateEvent(0, TRUE, TRUE, 0)),
m_wset(CreateSemaphore(0, 0, LONG_MAX, 0)) {
// ensure we acquired our sync-resources!
if (! m_wset || ! m_wepoch || ! m_wmtx) {
if (m_wset) { CloseHandle(m_wset); }
if (m_wepoch) { CloseHandle(m_wepoch); }
if (m_wmtx) { CloseHandle(m_wmtx); }
EVCOUNT_WIN_CTOR_UNEXPECTED(this); // SH&T!
}
}
// destructor
//---------------------------------------
~evcount() throw() {
// sanity check!
if (m_sigreq || m_sigepoch) {
EVCOUNT_WIN_DTOR_SANITY_UNEXPECTED(this); // FU%K!!!
}
// release our sync-resources
if (! CloseHandle(m_wset) ||
! CloseHandle(m_wepoch) ||
! CloseHandle(m_wmtx)) {
EVCOUNT_WIN_DTOR_UNEXPECTED(this); // DAMN! ;^0
}
}
/* Private Atomic Event-Count Key API
_______________________________________________*/
private:
// atomically read and set the
// event-counts key waiters-bit
//---------------------------------------
inline evwaitkey sys_setkey(evwaitkey const wbit) throw() {
evwaitkey cmptmp, cmp = InterlockedLoadNaked(&m_waitkey);
// set the waiters-bit
do {
cmptmp = cmp;
cmp = InterlockedCompareExchangeLong(
&m_waitkey, cmp | wbit, cmp);
} while(cmp != cmptmp);
return cmp;
}
// atomically increment the event-counts
// key; clear the waiters-bit if there
// are no more waiters, otherwise the
// waiters-bit is set; called with m_mtx
// locked
//---------------------------------------
void sys_resetkey(
evwaitkey cmp, LONG CONST sigxchg,
evwaitkey const addend, evwaitkey const wbit) throw() {
evwaitkey cmptmp;
/* IMPORTANT! The waiters-bit can only be cleared
if the subsequent signal swap is zero, otherwise
it needs to be set again...
*/
do {
cmptmp = cmp;
cmp = InterlockedCompareExchangeLong(
&m_waitkey,
(sigxchg)
? (cmp + addend) | wbit // set bit
: (cmp + addend) & ~wbit, // clear bit
cmp);
} while(cmp != cmptmp);
}
/* Private Wait-Set Lock API
_______________________________________________*/
private:
// locks the wait-set mutex
//---------------------------------------
inline void sys_wmtxlock() throw() {
DWORD CONST waitstatus =
WaitForSingleObject(m_wmtx, INFINITE);
// validate the mutex lock wait-status
if (waitstatus != WAIT_OBJECT_0) {
if (waitstatus != WAIT_ABANDONED) {
EVCOUNT_WIN_UNEXPECTED(this);
}
/*** TO-DO ***: Add robust recovery logic! */
EVCOUNT_WIN_ROBUST_RECOVER(this);
}
}
// unlocks the wait-set mutex; called
// with m_mtx locked
//---------------------------------------
inline void sys_wmtxunlock() throw() {
if (! ReleaseMutex(m_wmtx)) {
EVCOUNT_WIN_UNEXPECTED(this);
}
}
// atomically unlocks the wait-set and
// waits on the wait-epocjh gate, then
// waits on the wait-set semaphore called
// with m_mtx locked
//---------------------------------------
bool sys_wmtxunlockwait(
LONG CONST sigreq, DWORD CONST timeout
) throw() {
/* atomically request a signal; unlock wait-set mutex and
enter current wait-epoch
*/
m_sigreq += sigreq;
if (SignalObjectAndWait(
m_wmtx,
m_wepoch,
INFINITE,
FALSE) != WAIT_OBJECT_0) {
EVCOUNT_WIN_UNEXPECTED(this);
}
// wait on current wait-epoch...
DWORD CONST waitstatus =
WaitForSingleObject(m_wset, timeout);
// validate the prior wait-status
if (waitstatus == WAIT_OBJECT_0) {
return true; // okay!
} else if (waitstatus != WAIT_TIMEOUT) {
EVCOUNT_WIN_UNEXPECTED(this);
} else if (timeout == INFINITE) {
EVCOUNT_WIN_UNEXPECTED(this);
}
return false; // timeout!
}
/* Private Wait-Epoch API
_______________________________________________*/
private:
// begins a wait-epoch; called with m_mtx
// locked
//-----------------------------------------
void sys_wepochbegin(
evwaitkey cmp, LONG CONST count) throw() {
// decrement and adjust signal request
LONG sigxchg, sigreq = m_sigreq, sigepoch = m_sigepoch;
if (count < sigreq) {
sigxchg = sigreq - count;
sigreq = count;
} else {
sigxchg = 0;
}
// reset the event-count key
sys_resetkey(cmp, sigxchg, 1, WAITERS_BIT);
// validate the wait-state; can't signal during wait-epoch!
if (sigreq && ! sigepoch) {
LONG sigprev;
// swap requested signals into the wait-epoch
m_sigreq = sigxchg;
m_sigepoch = sigreq;
if (// close wait-epoch gate
! ResetEvent(m_wepoch) ||
// post to the wait-epoch semaphore
! ReleaseSemaphore(m_wset, sigreq, &sigprev)) {
EVCOUNT_WIN_UNEXPECTED(this);
}
/*** TO-DO ***: Add cancellation processing logic! */
if (sigprev) {
/* as of now, there is not timeout/cancel code,
therefore, we have serious error if this path is
taken! ;^(...
*/
EVCOUNT_WIN_UNEXPECTED(this);
}
}
}
// ends the current epoch wait; called
// with m_mtx locked
//-----------------------------------------
void sys_wepochend(LONG CONST count) throw() {
m_sigepoch -= count;
assert(m_sigepoch > -1);
// check for the end of the wait-epoch
if (! m_sigepoch) {
if (! SetEvent(m_wepoch)) {
EVCOUNT_WIN_UNEXPECTED(this);
}
}
}
/* Private Waiter API
_______________________________________________*/
private:
// waits on a previous key
//-----------------------------------------
bool sys_waitkey(
evwaitkey const cmp, DWORD CONST timeout) throw() {
if (! timeout) {
if ((InterlockedLoadNaked(
&m_waitkey) & ~WAITERS_BIT) ==
(cmp & ~WAITERS_BIT)) {
return false;
}
}
bool status = true;
sys_wmtxlock();
if ((InterlockedLoadNaked(&m_waitkey) & ~WAITERS_BIT) ==
(cmp & ~WAITERS_BIT)) {
if (timeout) {
status = sys_wmtxunlockwait(1, timeout);
sys_wmtxlock();
sys_wepochend(1);
} else {
status = false;
}
}
sys_wmtxunlock();
return status;
}
/* Private Signal API
_______________________________________________*/
private:
// issues signals
//-----------------------------------------
void sys_sigissue(
evwaitkey const cmp, LONG CONST count) {
if (cmp & WAITERS_BIT) {
sys_wmtxlock();
sys_wepochbegin(cmp, count);
sys_wmtxunlock();
}
}
/* Public Waiter API
_______________________________________________*/
public:
/*** TO-DO ***: Add timeout/cancel logic! */
/*
// timed-wait on a previous key
//-----------------------------------------
inline void timedwaitkey(
evwaitkey const cmp, DWORD CONST timeout) throw() {
sys_waitkey(cmp, timeout);
}
*/
// waits on a previous key
//-----------------------------------------
inline void waitkey(evwaitkey const cmp) throw() {
sys_waitkey(cmp, INFINITE);
}
// trys to wait on a previous key
//-----------------------------------------
inline bool trywaitkey(evwaitkey const cmp) throw() {
return sys_waitkey(cmp, 0);
}
// gets the current key
//-----------------------------------------
inline evwaitkey getkey() const throw() {
return const_cast<evcount*>(this)->sys_setkey(WAITERS_BIT);
}
/* Public Signal API
_______________________________________________*/
public:
// broadcasts with store/load membar
// semantcs
//-----------------------------------------
inline void broadcast() throw() {
sys_sigissue(InterlockedLoadFence(&m_waitkey),
EVCOUNTSYS_WIN_BROADCAST());
}
// broadcasts with naked load
//-----------------------------------------
inline void broadcast_mbnaked() throw() {
sys_sigissue(InterlockedLoadNaked(&m_waitkey),
EVCOUNTSYS_WIN_BROADCAST());
}
// signals with store/load membar
// semantcs
//-----------------------------------------
inline void signal(LONG count = 1) throw() {
sys_sigissue(InterlockedLoadFence(&m_waitkey), count);
}
// signals with with naked load
//-----------------------------------------
inline void signal_mbnaked(LONG count = 1) throw() {
sys_sigissue(InterlockedLoadNaked(&m_waitkey), count);
}
}; // class evcount
/* Wait/Lock-Free Fast-Pathed Condition-Variable
_____________________________________________________*/
class condvar {
// No Copy/Assign
//-----------------------------------------
condvar(condvar const&);
condvar const& operator =(condvar const&);
/* Private State
_______________________________________________*/
private:
mutable evcount m_wset; // wait-set
/* Public Ctor/Dtor
_______________________________________________*/
public:
inline condvar() {}
inline ~condvar() throw() {}
/* Public Signal API
_______________________________________________*/
public:
// signals the condvar
//-----------------------------------------
inline void signal() throw() {
m_wset.signal();
}
// broadcasts the condvar
//-----------------------------------------
inline void broadcast() throw() {
m_wset.broadcast();
}
/* Public Waiter API
_______________________________________________*/
public:
// waits on the condvar; type T must have
// public lock/unlock member functions!
//-----------------------------------------
template<typename T>
void wait(T& umtx) throw() {
evcount::evwaitkey const key = m_wset.getkey();
umtx.unlock();
m_wset.waitket(key);
umtx.lock();
}
// waits on the condvar; type T must have
// public wrlock/wrunlock member functions!
//-----------------------------------------
template<typename T>
void wrwait(T& umtx) throw() {
evcount::evwaitkey const key = m_wset.getkey();
umtx.wrunlock();
m_wset.waitket(key);
umtx.wrlock();
}
// waits on the condvar; type T must have
// public rdlock/rdunlock member
// functions!
//-----------------------------------------
template<typename T>
void rdwait(T const& umtx) const throw() {
evcount::evwaitkey const key = m_wset.getkey();
umtx.rdunlock();
m_wset.waitket(key);
umtx.rdlock();
}
// waits on the condvar; Windows
// Critical-Section
//-----------------------------------------
void wait(LPCRITICAL_SECTION CONST umtx) throw() {
evcount::evwaitkey const key = m_wset.getkey();
LeaveCriticalSection(umtx);
m_wset.waitkey(key);
EnterCriticalSection(umtx);
}
}; // class condvar
/*=============================================================
**************************************************************/
}} // namespace winsyncsys::v0001
// Simplisic Library Version Selection/Abstraction
namespace winsync = winsyncsys::EVCOUNT_WIN_VER_SELECT();
/*************************************************************/
#endif // #elif ! defined(EVCOUNT_WIN_V0001_HPP)
__________________________________________________________________
That should compile fine. If you are using a recent Windows SDK you will
need to define _WIN32_WINNT to a value greater than 0x4000. This is because
Microsoft has changed the InterlockedCompareExchange API declaration. On
older SDK's it used to be declared as taking PVOID parameters, then they
changed it to LONG. Anyway, I attempted a little workaround.
Here is basic usage-pattern for a basic producer/consumer scheme:
__________________________________________________________________
class nodecache {
winsync::evcount m_wset;
lockfree_stack m_stack;
public:
nodecache(size_t depth = 1024) {
for(size_t i = 0; i < depth; ++i) {
m_stack.push(new lockfree_node());
}
}
~nodecache() throw() {
lockfree_node* node;
while(node = m_stack.trypop()) {
delete node;
}
}
public:
lockfree_node* waitpop(void* state) throw() {
lockfree_node* node;
while(! (node = m_stack.trypop())) {
winsync::evcount::evwaitkey const wkey = m_wset.getkey();
if (node = m_stack.trypop()) { break; }
m_wset.waitkey(wkey);
}
assert(node);
node->setstate(state);
return node;
}
void push(lockfree_node* const node) throw() {
m_stack.push(node);
m_wset.signal();
}
};
class queue {
winsync::evcount m_wset;
lockfree_queue m_queue;
public:
void* waitpop(nodecache& ncache, void* const state) throw() {
lockfree_node* node;
while(! (node = m_queue.trypop())) {
winsync::evcount::evwaitkey const wkey = m_wset.getkey();
if (node = m_queue.trypop()) { break; }
m_wset.waitkey(wkey);
}
assert(node);
void* const state = node->getstate();
ncache.push(node);
return state;
}
void push(nodecache& ncache, void* const state) throw() {
lockfree_node* node = ncache.waitpop(state);
m_queue.push(node);
m_wset.signal();
}
};
// global state
static nodecache g_ncache;
static stack g_stack;
void consumers() {
void* const state = m_stack.pop(g_ncache);
}
void producers() {
void* const state = State_Create(...);
if (state) {
m_stack.push(g_ncache, state);
}
}
___________________________________________________________
The above code show how to augment existing lock-free collections with
conditional waiting without altering the original collection code. Here is
some more information wrt technical aspects:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/00cf4a952ff5af7d
(read all...)
Well, before I go any further, I was wondering if anybody has any
comments/suggestions/bug-reports?
Thank you.
--
Chris M. Thomasson
http://appcore.home.comcast.net
[...]
The following code is obsolete and I am not going to support it any more:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/355e7054cfa6d7ca
because I am using my new windows event-count object to construct a
condition-variable.
> Anyway here is the code-draft:
[...]
Sorry about the spelling mistakes in the code comments. I will fix that in
upcoming versions I post here.
--
Volodymyr
"Chris Thomasson" <cri...@comcast.net> wrote in message
news:wPKdnW9NfZQyH__a...@comcast.com...
This is a poor idea -- many ISPs only allow attachments to messages in
.binaries newsgroups and such.
--
Later,
Jerry.
The universe is a figment of its own imagination.