I think I've put the finishing touches to the code now. For the mutex
part I introduced spinning, which I adopted from glibc. Spinning usually
makes little sense for producer-consumer relationships because the time
it takes to put an item in the queue or take it out of it is usually
very short, and the time it takes to produce the item before and consume
it afterwards is usually very short is usually orders of magnitude
higher; Therefore, a collision during locking can occur quite rarely.
Nevertheless, I can also imagine cases where items are produced and
consumed with high frequency, and spinning could make sense there.
So, here are the two changed files:
// monitor.h
#pragma once
#if defined(_WIN32)
#define NOMINMAX
#include <Windows.h>
#elif defined(__unix__)
#include <sys/types.h>
#include <sys/sem.h>
#include <sys/stat.h>
#else
#error unsupported platform
#endif
#include <atomic>
#include <type_traits>
#if defined(_WIN32)
#include "xhandle.h"
#endif
struct monitor
{
monitor( uint16_t maxSpin = 0 );
~monitor();
void lock();
void unlock();
bool try_lock();
void wait();
void notify();
void notify_all();
uint16_t maxSpin( int16_t maxSpin );
private:
inline static thread_local char t_dummy;
static constexpr bool USE64 = std::atomic_int64_t::is_always_lock_free;
using aword_t = std::conditional_t<USE64, uint64_t, uint32_t>;
static constexpr unsigned BITS = sizeof(aword_t) * 8 / 2;
static constexpr aword_t
ENTER_VALUE = 1,
SIGNAL_VALUE = 1ull << BITS,
ENTER_MASK = SIGNAL_VALUE - 1,
SIGNAL_MASK = ENTER_MASK << BITS;
std::atomic<aword_t> m_atomic;
std::atomic<void *> m_threadId;
uint32_t m_recCount;
bool spinLock( aword_t &ref, bool once );
#if defined(_WIN32)
static constexpr uint32_t SEM_MAX = std::numeric_limits<LONG>::max();
XHANDLE
m_xhEnterEvt,
m_xhSignalSem;
#elif defined(__unix__)
static constexpr uint32_t SEM_MAX = std::numeric_limits<short>::max();
int m_sems;
int semop( std::initializer_list<sembuf> sems );
#endif
std::atomic_uint16_t m_maxSpin, m_spinLimit;
};
// monitor.cpp
#include <iostream>
#include <limits>
#include <system_error>
#include <cassert>
#if defined(__x86_64__) || defined(__i386__)
#include <immintrin.h>
#endif
#include "monitor.h"
using namespace std;
monitor::monitor( uint16_t maxSpin ) :
m_atomic( 0 ),
m_threadId( nullptr ),
#if defined(_WIN32)
m_xhEnterEvt( CreateEventA( nullptr, FALSE, FALSE, nullptr ) ),
m_xhSignalSem( CreateSemaphoreA( nullptr, 0, SEM_MAX, nullptr ) ),
#elif defined(__unix__)
m_sems( semget( IPC_PRIVATE, 2, S_IRUSR | S_IWUSR ) ),
#endif
m_maxSpin( maxSpin ),
m_spinLimit( 0 )
{
#if defined(_WIN32)
if( !m_xhEnterEvt.get() || !m_xhSignalSem.get() )
throw system_error( GetLastError(), system_category(), "can't
initialize monitor object" );
#elif defined(__unix__)
auto zeroSem = [&]() -> bool
{
#if defined(__linux__)
return true;
#else
short vals[2] = { 0, 0 };
return !semctl( m_sems, 0, SETALL, vals );
#endif
};
if( m_sems == -1 || zeroSem() )
{
int errNo = errno;
if( m_sems != -1 )
this->~monitor();
throw system_error(errNo, system_category(), "can't initialize monitor
object" );
}
#endif
}
monitor::~monitor()
{
#if defined(__unix__)
int ret = semctl( m_sems, 0, IPC_RMID );
assert(ret != -1);
#endif
}
void monitor::lock()
{
if( m_threadId.load( memory_order_relaxed ) == &t_dummy )
{
if( m_recCount == (uint32_t)-1 )
throw system_error( (int)errc::result_out_of_range,
generic_category(), "montor's recursion count saturated" );
++m_recCount;
return;
}
aword_t ref = m_atomic.load( memory_order_relaxed );
if( spinLock( ref, false ) )
return;
do
{
if( (ref & ENTER_MASK) == ENTER_MASK )
throw system_error( (int)errc::result_out_of_range,
generic_category(), "montor's locker count saturated" );
assert((ref & ENTER_MASK) >= ref >> BITS);
} while( !m_atomic.compare_exchange_strong( ref, ref + 1,
memory_order_acquire, memory_order_relaxed ) );
if( (ref & ENTER_MASK) != ref >> BITS ) [[likely]]
{
#if defined(_WIN32)
if( WaitForSingleObject( m_xhEnterEvt.get(), INFINITE ) != WAIT_OBJECT_0 )
terminate();
#elif defined(__unix__)
if( semop( { { 0, -1, 0 } } ) == -1 )
terminate();
#endif
}
m_threadId.store( &t_dummy, memory_order_relaxed );
m_recCount = 0;
}
void monitor::unlock()
{
if( m_threadId.load( memory_order_relaxed ) == &t_dummy && m_recCount )
return (void)--m_recCount;
aword_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) && m_threadId == &t_dummy);
m_threadId.store( nullptr, memory_order_relaxed );
do
assert((ref & ENTER_MASK) > ref >> BITS);
while( !m_atomic.compare_exchange_strong( ref, ref - 1,
memory_order_release, memory_order_relaxed ) );
if( (ref & ENTER_MASK) - (ref >> BITS) == 1 ) [[likely]]
return;
#if defined(_WIN32)
if( !SetEvent( m_xhEnterEvt.get() ) )
terminate();
#elif defined(__unix__)
if( semop( { { 0, 1, IPC_NOWAIT } } ) == -1 )
terminate();
#endif
}
bool monitor::try_lock()
{
aword_t ref = m_atomic.load( memory_order_relaxed );
return spinLock( ref, true );
}
void monitor::wait()
{
assert(m_threadId == &t_dummy && !m_recCount);
m_threadId.store( nullptr, memory_order_relaxed );
aword_t ref = m_atomic.load( memory_order_relaxed );
do
assert((ref & ENTER_MASK) > ref >> BITS);
while( !m_atomic.compare_exchange_strong( ref, ref + SIGNAL_VALUE,
memory_order_release, memory_order_relaxed ) );
if( (ref & ENTER_MASK) - (ref >> BITS) > 1 )
{
#if defined(_WIN32)
if( !SetEvent( m_xhEnterEvt.get() ) )
terminate();
#elif defined(__unix__)
if( semop( { { 0, 1, IPC_NOWAIT } } ) == -1 )
terminate();
#endif
}
#if defined(_WIN32)
HANDLE waitFor[2] { m_xhEnterEvt.get(), m_xhSignalSem.get() };
if( WaitForMultipleObjects( 2, waitFor, TRUE, INFINITE ) != WAIT_OBJECT_0 )
terminate();
#elif defined(__unix__)
if( semop( { { 0, -1, 0 }, { 1, -1, 0 } } ) == -1 )
terminate();
#endif
m_threadId.store( &t_dummy, memory_order_relaxed );
m_recCount = 0;
}
void monitor::notify()
{
aword_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) > ref >> BITS && m_threadId == &t_dummy);
do
if( !(ref >> BITS) )
return;
while( !m_atomic.compare_exchange_strong( ref, ref - SIGNAL_VALUE,
memory_order_relaxed, memory_order_relaxed ) );
#if defined(_WIN32)
if( !ReleaseSemaphore( m_xhSignalSem.get(), 1, nullptr ) )
terminate();
#elif defined(__unix__)
if( semop( { { 1, 1, IPC_NOWAIT } }) == -1 )
terminate();
#endif
}
void monitor::notify_all()
{
aword_t ref = m_atomic.load( memory_order_relaxed );
assert((ref & ENTER_MASK) > ref >> BITS && m_threadId == &t_dummy);
uint32_t n;
do
if( !(n = (uint32_t)(ref >> BITS)) )
return;
while( !m_atomic.compare_exchange_strong( ref, ref & ENTER_MASK,
memory_order_relaxed, memory_order_relaxed ) );
#if defined(_WIN32)
if( n > SEM_MAX || !ReleaseSemaphore( m_xhSignalSem.get(), n, nullptr ) )
terminate();
#elif defined(__unix__)
for( uint32_t nRelease; n; n -= nRelease )
if( semop( { { 1, (short)(nRelease = n <= SEM_MAX ? n : SEM_MAX),
IPC_NOWAIT } } ) == -1 )
terminate();
#endif
}
uint16_t monitor::maxSpin( int16_t maxSpin )
{
uint16_t curMaxSpin = m_maxSpin.load( memory_order_relaxed );
if( maxSpin >= 0 )
m_maxSpin.store( maxSpin, memory_order_relaxed );
return curMaxSpin;
}
bool monitor::spinLock( aword_t &ref, bool once )
{
// spinning algorithm taken from glibc
uint32_t maxSpin = m_maxSpin.load( memory_order_relaxed );
once = once && !maxSpin;
maxSpin = !once ? maxSpin : 1;
if( !maxSpin )
return false;
uint32_t
prevSpinLimit = m_spinLimit.load( memory_order_relaxed ),
spinLimit = prevSpinLimit * 2u + 10u,
spinCount = 0;
spinLimit = spinLimit <= maxSpin ? spinLimit : maxSpin;
bool locked = false;
for( ; ; ref = m_atomic.load( memory_order_relaxed ) )
{
assert((ref & ENTER_MASK) >= ref >> BITS);
if( uint32_t enterers = ref & ENTER_MASK;
enterers == ref >> BITS && enterers != ENTER_MASK
&& m_atomic.compare_exchange_strong( ref, ref + 1,
memory_order_acquire, memory_order_relaxed ) )
{
m_threadId.store( &t_dummy, memory_order_relaxed );
m_recCount = 0;
locked = true;
break;
}
if( ++spinCount == spinLimit )
break;
#if defined(_WIN32)
YieldProcessor();
#elif (defined(__GNUC__) || defined(__clang__)) && (defined(__x86_64__)
|| defined(__i386__))
_mm_pause();
#elif (defined(__GNUC__) || defined(__clang__)) && (defined(__arm__) ||
defined(__aarch64__))
__yield();
#else
#error "need platform-specific pause-instruction"
#endif
}
if( !once ) [[likely]]
m_spinLimit.store( (uint16_t)(prevSpinLimit + (int32_t)(spinCount -
prevSpinLimit) / 8), memory_order_relaxed );
return locked;
}
#if defined(__unix__)
inline int monitor::semop( initializer_list<sembuf> sems )
{
int ret;
while( (ret = ::semop( m_sems, const_cast<sembuf *>(sems.begin()),
sems.size() )) == -1 && errno == EINTR );
return ret;
}
#endif