Bonita Montero
unread,Nov 8, 2023, 10:28:08 PM11/8/23You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
Java and .NET have monitor objects instead of a combination of mutexes
and condition variables. The advantage of a monitor object is that when
you wait for a monitor to be signalled you can wait for that and for the
mutexe's semaphore to be unlocked in _one_ step. With a condition varia-
ble you have first to wait for the notify()-semaphore to be signalled
and for the mutexe's lock in two steps.
The below code simply has a 32 or 64 bit atomic (depening on if the 64
bit variant is lock-free or not) and the lower half is the number of
threads waiting to enter the mutex part and the upper half is the num-
ber of threads waiting to be notified. As all threads wanting to be
notified are also waiting for the mutex the lower half is always >=
the upper half, which I check for in several asserts.
On Windows waiting to be notified and waiting to lock the mutex part
to be unlocked is done by WaitForMultipleObjects(). On Linux there's
no way to wait for mutliple kernel handles to be signalled, but there
are SystemV semaphore sets which may consist of several semaphores and
you may have multiple operations to proceed atomically on this set.
The drawback of combining the mutex and condition-variable parts is
that you can't have multiple conditions associated with the same mutex.
// monitor.h
#pragma once
#if defined(_WIN32)
#define NOMINMAX
#include <Windows.h>
#elif defined(__unix__)
#include <sys/types.h>
#include <sys/sem.h>
#include <sys/stat.h>
#else
#error unsupported platform
#endif
#include <atomic>
#include <semaphore>
#include <type_traits>
#if defined(_WIN32)
#include "xhandle.h"
#endif
struct monitor
{
monitor();
~monitor();
void lock();
void unlock();
void wait();
void notify();
void notify_all();
private:
inline static thread_local char t_dummy;
static constexpr bool USE64 = std::atomic_int64_t::is_always_lock_free;
using atomic_word_t = std::conditional_t<USE64, uint64_t, uint32_t>;
static constexpr unsigned BITS = USE64 ? 32 : 16;
static constexpr atomic_word_t
ENTER_VALUE = 1,
SIGNAL_VALUE = USE64 ? 1ull << 32 : 1,
ENTER_MASK = USE64 ? (uint32_t)-1 : (uint16_t)-1,
SIGNAL_MASK = USE64 ? (uint64_t)(uint32_t)-1 << 32 :
(uint32_t)(uint16_t)-1 << 16;
std::atomic<atomic_word_t> m_atomic;
std::atomic<char *> m_threadId;
std::atomic_uint32_t m_recCount;
#if defined(_WIN32)
static constexpr uint32_t SEM_MAX = std::numeric_limits<LONG>::max();
XHANDLE
m_xhEnterEvt,
m_xhSignalSem;
#elif defined(__unix__)
static constexpr uint32_t SEM_MAX = std::numeric_limits<short>::max();
int m_sems;
int semop( std::initializer_list<sembuf> sems );
#endif
};
// monitor.cpp
#include <iostream>
#include <limits>
#include <system_error>
#include <cassert>
#include "monitor.h"
using namespace std;
monitor::monitor() :
m_atomic( 0 ),
m_threadId( nullptr )
#if defined(_WIN32)
, m_xhEnterEvt( CreateEventA( nullptr, FALSE, FALSE, nullptr ) ),
m_xhSignalSem( CreateSemaphoreA( nullptr, 0, SEM_MAX, nullptr ) )
#elif defined(__unix__)
, m_sems( semget( IPC_PRIVATE, 2, S_IRUSR | S_IWUSR ) )
#endif
{
#if defined(_WIN32)
if( !m_xhEnterEvt.get() || !m_xhSignalSem.get() )
throw system_error( GetLastError(), system_category(), "can't
initialize monitor object" );
#elif defined(__unix__)
union semun { int val; void *p; } su;
su.val = 0;
#if defined(__linux__)
if( m_sems == -1 )
#else
if( m_sems == -1 || semctl( m_sems, 0, SETVAL, su ) == -1 || semctl(
m_sems, 1, SETVAL, su ) == -1 )
#endif
throw system_error( errno, system_category(), "can't initialize
monitor object" );
#endif
}
monitor::~monitor()
{
#if defined(__unix__)
int ret = semctl( m_sems, 0, IPC_RMID );
assert(ret != -1);
#endif
}
void monitor::lock()
{
if( m_threadId.load( memory_order_relaxed ) == &t_dummy )
{
uint32_t oldRecCount = m_recCount.load( memory_order_relaxed );
if( oldRecCount == (uint32_t)-1 )
throw system_error( (int)errc::result_out_of_range,
generic_category(), "montor's recursion count saturated" );
m_recCount.store( oldRecCount + 1, memory_order_relaxed );
return;
}
atomic_word_t ref = m_atomic.load( memory_order_relaxed );
do
{
if( (ref & ENTER_MASK) == ENTER_MASK )
throw system_error( (int)errc::result_out_of_range,
generic_category(), "montor's locker count saturated" );
assert((ref & ENTER_MASK) >= ref >> BITS);
} while( !m_atomic.compare_exchange_strong( ref, ref + 1,
memory_order_acquire, memory_order_relaxed ) );
auto initThread = [&]()
{
m_threadId.store( &t_dummy, memory_order_relaxed );
m_recCount.store( 0, memory_order_relaxed );
};
if( (ref & ENTER_MASK) == ref >> BITS ) [[likely]]
return initThread();
#if defined(_WIN32)
if( WaitForSingleObject( m_xhEnterEvt.get(), INFINITE ) != WAIT_OBJECT_0 )
terminate();
#elif defined(__unix__)
if( semop( { { 0, -1, 0 } } ) )
terminate();
#endif
initThread();
}
void monitor::unlock()
{
if( uint32_t rc; m_threadId.load( memory_order_relaxed ) == &t_dummy &&
(rc = m_recCount.load( memory_order_relaxed )) )
{
m_recCount.store( rc - 1, memory_order_relaxed );
return;
}
atomic_word_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) && m_threadId == &t_dummy);
m_threadId.store( nullptr, memory_order_relaxed );
do
assert((ref & ENTER_MASK) >= ref >> BITS);
while( !m_atomic.compare_exchange_strong( ref, ref - 1,
memory_order_release, memory_order_relaxed ) );
if( (ref & ENTER_MASK) == 1 ) [[likely]]
return;
#if defined(_WIN32)
if( !SetEvent( m_xhEnterEvt.get() ) )
terminate();
#elif defined(__unix__)
if( semop( { { 0, 1, IPC_NOWAIT } } ) )
terminate();
#endif
}
void monitor::wait()
{
assert(m_threadId == &t_dummy && !m_recCount);
m_threadId.store( nullptr, memory_order_relaxed );
atomic_word_t ref = m_atomic.load( memory_order_relaxed );
do
assert((ref & ENTER_MASK) > ref >> BITS);
while( !m_atomic.compare_exchange_strong( ref, ref + SIGNAL_VALUE,
memory_order_release, memory_order_relaxed ) );
if( (ref & ENTER_MASK) - (ref >> BITS) > 1 )
{
#if defined(_WIN32)
if( !SetEvent( m_xhEnterEvt.get() ) )
terminate();
#elif defined(__unix__)
if( semop( { { 0, 1, IPC_NOWAIT } } ) )
terminate();
#endif
}
#if defined(_WIN32)
HANDLE waitFor[2] { m_xhEnterEvt.get(), m_xhSignalSem.get() };
if( WaitForMultipleObjects( 2, waitFor, TRUE, INFINITE ) != WAIT_OBJECT_0 )
terminate();
#elif defined(__unix__)
if( semop( { { 0, -1, 0 }, { 1, -1, 0 } } ) )
terminate();
#endif
m_threadId.store( &t_dummy, memory_order_relaxed );
m_recCount.store( 0, memory_order_relaxed );
}
void monitor::notify()
{
atomic_word_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) > ref >> BITS && m_threadId == &t_dummy);
uint32_t n;
while( (n = (uint32_t)(ref >> BITS)) &&
!m_atomic.compare_exchange_strong( ref, ref - SIGNAL_VALUE,
memory_order_relaxed, memory_order_relaxed ) );
if( !(ref >> BITS) )
return;
#if defined(_WIN32)
if( !ReleaseSemaphore( m_xhSignalSem.get(), 1, nullptr ) )
terminate();
#elif defined(__unix__)
int ret;
do
ret = semop( { { 1, 1, IPC_NOWAIT } } );
while( ret == EAGAIN );
if( ret )
terminate();
#endif
}
void monitor::notify_all()
{
atomic_word_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) > ref >> BITS && m_threadId == &t_dummy);
uint32_t n;
while( (n = (uint32_t)(ref >> BITS)) &&
!m_atomic.compare_exchange_strong( ref, ref & ENTER_MASK,
memory_order_relaxed, memory_order_relaxed ) );
while( n )
{
uint32_t nRelease = n <= SEM_MAX ? n : SEM_MAX;
#if defined(_WIN32)
BOOL succ;
for( ; !(succ = ReleaseSemaphore( m_xhSignalSem.get(), nRelease,
nullptr )) && GetLastError() == ERROR_TOO_MANY_POSTS;
nRelease = nRelease > 1 ? nRelease / 2 : nRelease );
if( !succ )
terminate();
#elif defined(__unix__)
int ret;
for( ; (ret = semop( { { 1, (short)nRelease, IPC_NOWAIT } } )) == EAGAIN;
nRelease = nRelease > 1 ? nRelease / 2 : nRelease );
if( ret )
terminate();
#endif
n -= nRelease;
}
}
#if defined(__unix__)
int monitor::semop( initializer_list<sembuf> sems )
{
int ret;
while( (ret = ::semop( m_sems, const_cast<sembuf *>(sems.begin()),
sems.size() )) == EINTR );
return ret;
}
#endif