Bonita Montero
unread,Oct 18, 2019, 11:20:47 PM10/18/19You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
For some other code I needed a special shared mutex that gives
writers priority ver readers. With my shared mutex, when a writer
wants to get access, all readers since then are allowed to continue
and all new readers are deferred.
So here's the code:
#include <cstdint>
#include <cassert>
#include "cas.h"
#include "semaphore.h"
class wprio_shared_mutex
{
public:
wprio_shared_mutex();
~wprio_shared_mutex();
void lock_shared();
void unlock_shared();
void shared_to_write();
void lock_writer();
void write_to_shared();
void unlock_writer();
private:
std::uint64_t m_atomic; // bit 0 - 20: readers
// bit 21 - 41: waiting readers
// bit 42 - 62: waiting writers
// bit 61: writer-flag
semaphore m_releaseReadersSem,
m_releaseWriterSem;
static unsigned const WAITING_READERS_BASE = 21,
WAITING_WRITERS_BASE = 42,
WRITER_FLAG_BASE = 63;
static std::uint64_t const MASK21 = 0x1FFFFFu;
static std::uint64_t const READERS_MASK = MASK21,
WAITING_READERS_MASK = MASK21 <<
WAITING_READERS_BASE,
WAITING_WRITERS_MASK = MASK21 <<
WAITING_WRITERS_BASE,
WRITER_FLAG_MASK =
(std::uint64_t)1 << WRITER_FLAG_BASE;
static std::uint64_t const READER_VALUE = (std::uint64_t)1,
WAITING_READERS_VALUE =
(std::uint64_t)1 << WAITING_READERS_BASE,
WAITING_WRITERS_VALUE =
(std::uint64_t)1 << WAITING_WRITERS_BASE;
static bool check( uint64_t flags );
};
inline
bool wprio_shared_mutex::check( std::uint64_t flags )
{
unsigned readers = (unsigned)(flags
& MASK21),
waitingReaders = (unsigned)((flags >>
WAITING_READERS_BASE) & MASK21),
waitingWriters = (unsigned)((flags >>
WAITING_WRITERS_BASE) & MASK21),
writerFlag = (unsigned)((flags >> WRITER_FLAG_BASE)
& 1);
if( readers && (waitingReaders || writerFlag) )
return false;
if( waitingReaders && (readers || !writerFlag) )
return false;
if( waitingWriters && !(writerFlag || readers) )
return false;
if( writerFlag && readers )
return false;
return true;
}
wprio_shared_mutex::wprio_shared_mutex()
{
m_atomic = 0;
}
wprio_shared_mutex::~wprio_shared_mutex()
{
assert(m_atomic == 0);
}
void wprio_shared_mutex::lock_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & WRITER_FLAG_MASK) == 0 )
{
assert((cmp & READERS_MASK) != READERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + READER_VALUE )) == cmp )
return;
}
else
{
assert((cmp & WAITING_READERS_MASK) != WAITING_READERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + WAITING_READERS_VALUE
)) == cmp )
{
m_releaseReadersSem.forced_wait();
return;
}
}
}
}
void wprio_shared_mutex::unlock_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
assert((cmp & READERS_MASK) >= READER_VALUE);
if( (cmp & READERS_MASK) != READER_VALUE || (cmp &
WAITING_WRITERS_MASK) == 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp - READER_VALUE )) == cmp )
return;
}
else
{
assert(!(cmp & WRITER_FLAG_MASK));
if( (niu = CAS( m_atomic, cmp, (cmp - READER_VALUE -
WAITING_WRITERS_VALUE) | WRITER_FLAG_MASK )) == cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
}
}
}
void wprio_shared_mutex::shared_to_write()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
assert((cmp & READERS_MASK) >= READER_VALUE);
if( (cmp & READERS_MASK) == READER_VALUE )
{
assert(!(cmp & WRITER_FLAG_MASK));
if( (niu = CAS( m_atomic, cmp, (cmp - READER_VALUE) |
WRITER_FLAG_MASK )) == cmp )
return;
}
else
{
assert((cmp & READERS_MASK) > READER_VALUE && (cmp &
WAITING_WRITERS_MASK) != WAITING_WRITERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp - READER_VALUE +
WAITING_WRITERS_VALUE )) == cmp )
{
m_releaseWriterSem.forced_wait();
return;
}
}
}
}
void wprio_shared_mutex::lock_writer()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & (WRITER_FLAG_MASK | READERS_MASK)) == 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp | WRITER_FLAG_MASK )) ==
cmp )
return;
}
else
{
assert((cmp & WAITING_WRITERS_MASK) != WAITING_WRITERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + WAITING_WRITERS_VALUE
)) == cmp )
{
m_releaseWriterSem.forced_wait();
return;
}
}
}
}
void wprio_shared_mutex::write_to_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & WAITING_WRITERS_MASK) == 0 )
{
uint64_t wakeups;
if( (niu = CAS( m_atomic, cmp, (cmp & ~WRITER_FLAG_MASK) -
(cmp & WAITING_READERS_MASK) + (wakeups = (cmp & WAITING_READERS_MASK)
>> WAITING_READERS_BASE) + 1 )) == cmp )
{
if( wakeups )
m_releaseReadersSem.forced_release(
(unsigned)wakeups );
return;
}
else
continue;
}
if( (niu = CAS( m_atomic, cmp, cmp - WAITING_WRITERS_VALUE ))
== cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
else
continue;
}
}
void wprio_shared_mutex::unlock_writer()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(cmp & WRITER_FLAG_MASK && !(cmp & READERS_MASK));
assert(check( cmp ));
if( (cmp & WAITING_WRITERS_MASK) != 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp - WAITING_WRITERS_VALUE
)) == cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
else
continue;
}
if( (cmp & WAITING_READERS_MASK) != 0 )
{
uint64_t wakeups;
if( (niu = CAS( m_atomic, cmp, (cmp & ~WRITER_FLAG_MASK) -
(cmp & WAITING_READERS_MASK) + (wakeups = (cmp & WAITING_READERS_MASK)
>> WAITING_READERS_BASE) )) == cmp )
{
m_releaseReadersSem.forced_release( (unsigned)wakeups );
return;
}
else
continue;
}
if( (niu = CAS( m_atomic, cmp, 0 )) == cmp )
return;
}
}