Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

write-prioritized shared mutex

10 views
Skip to first unread message

Bonita Montero

unread,
Oct 18, 2019, 11:20:47 PM10/18/19
to
For some other code I needed a special shared mutex that gives
writers priority ver readers. With my shared mutex, when a writer
wants to get access, all readers since then are allowed to continue
and all new readers are deferred.
So here's the code:

#include <cstdint>
#include <cassert>
#include "cas.h"
#include "semaphore.h"

class wprio_shared_mutex
{
public:
wprio_shared_mutex();
~wprio_shared_mutex();
void lock_shared();
void unlock_shared();
void shared_to_write();
void lock_writer();
void write_to_shared();
void unlock_writer();

private:
std::uint64_t m_atomic; // bit 0 - 20: readers
// bit 21 - 41: waiting readers
// bit 42 - 62: waiting writers
// bit 61: writer-flag
semaphore m_releaseReadersSem,
m_releaseWriterSem;

static unsigned const WAITING_READERS_BASE = 21,
WAITING_WRITERS_BASE = 42,
WRITER_FLAG_BASE = 63;
static std::uint64_t const MASK21 = 0x1FFFFFu;
static std::uint64_t const READERS_MASK = MASK21,
WAITING_READERS_MASK = MASK21 <<
WAITING_READERS_BASE,
WAITING_WRITERS_MASK = MASK21 <<
WAITING_WRITERS_BASE,
WRITER_FLAG_MASK =
(std::uint64_t)1 << WRITER_FLAG_BASE;
static std::uint64_t const READER_VALUE = (std::uint64_t)1,
WAITING_READERS_VALUE =
(std::uint64_t)1 << WAITING_READERS_BASE,
WAITING_WRITERS_VALUE =
(std::uint64_t)1 << WAITING_WRITERS_BASE;

static bool check( uint64_t flags );
};


inline
bool wprio_shared_mutex::check( std::uint64_t flags )
{
unsigned readers = (unsigned)(flags
& MASK21),
waitingReaders = (unsigned)((flags >>
WAITING_READERS_BASE) & MASK21),
waitingWriters = (unsigned)((flags >>
WAITING_WRITERS_BASE) & MASK21),
writerFlag = (unsigned)((flags >> WRITER_FLAG_BASE)
& 1);
if( readers && (waitingReaders || writerFlag) )
return false;
if( waitingReaders && (readers || !writerFlag) )
return false;
if( waitingWriters && !(writerFlag || readers) )
return false;
if( writerFlag && readers )
return false;
return true;
}

wprio_shared_mutex::wprio_shared_mutex()
{
m_atomic = 0;
}

wprio_shared_mutex::~wprio_shared_mutex()
{
assert(m_atomic == 0);
}

void wprio_shared_mutex::lock_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & WRITER_FLAG_MASK) == 0 )
{
assert((cmp & READERS_MASK) != READERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + READER_VALUE )) == cmp )
return;
}
else
{
assert((cmp & WAITING_READERS_MASK) != WAITING_READERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + WAITING_READERS_VALUE
)) == cmp )
{
m_releaseReadersSem.forced_wait();
return;
}
}
}
}

void wprio_shared_mutex::unlock_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
assert((cmp & READERS_MASK) >= READER_VALUE);
if( (cmp & READERS_MASK) != READER_VALUE || (cmp &
WAITING_WRITERS_MASK) == 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp - READER_VALUE )) == cmp )
return;
}
else
{
assert(!(cmp & WRITER_FLAG_MASK));
if( (niu = CAS( m_atomic, cmp, (cmp - READER_VALUE -
WAITING_WRITERS_VALUE) | WRITER_FLAG_MASK )) == cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
}
}
}

void wprio_shared_mutex::shared_to_write()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
assert((cmp & READERS_MASK) >= READER_VALUE);
if( (cmp & READERS_MASK) == READER_VALUE )
{
assert(!(cmp & WRITER_FLAG_MASK));
if( (niu = CAS( m_atomic, cmp, (cmp - READER_VALUE) |
WRITER_FLAG_MASK )) == cmp )
return;
}
else
{
assert((cmp & READERS_MASK) > READER_VALUE && (cmp &
WAITING_WRITERS_MASK) != WAITING_WRITERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp - READER_VALUE +
WAITING_WRITERS_VALUE )) == cmp )
{
m_releaseWriterSem.forced_wait();
return;
}
}
}
}

void wprio_shared_mutex::lock_writer()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & (WRITER_FLAG_MASK | READERS_MASK)) == 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp | WRITER_FLAG_MASK )) ==
cmp )
return;
}
else
{
assert((cmp & WAITING_WRITERS_MASK) != WAITING_WRITERS_MASK);
if( (niu = CAS( m_atomic, cmp, cmp + WAITING_WRITERS_VALUE
)) == cmp )
{
m_releaseWriterSem.forced_wait();
return;
}
}
}
}

void wprio_shared_mutex::write_to_shared()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(check( cmp ));
if( (cmp & WAITING_WRITERS_MASK) == 0 )
{
uint64_t wakeups;
if( (niu = CAS( m_atomic, cmp, (cmp & ~WRITER_FLAG_MASK) -
(cmp & WAITING_READERS_MASK) + (wakeups = (cmp & WAITING_READERS_MASK)
>> WAITING_READERS_BASE) + 1 )) == cmp )
{
if( wakeups )
m_releaseReadersSem.forced_release(
(unsigned)wakeups );
return;
}
else
continue;
}
if( (niu = CAS( m_atomic, cmp, cmp - WAITING_WRITERS_VALUE ))
== cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
else
continue;
}
}

void wprio_shared_mutex::unlock_writer()
{
using namespace std;
for( uint64_t cmp = m_atomic, niu; ; cmp = niu )
{
assert(cmp & WRITER_FLAG_MASK && !(cmp & READERS_MASK));
assert(check( cmp ));
if( (cmp & WAITING_WRITERS_MASK) != 0 )
{
if( (niu = CAS( m_atomic, cmp, cmp - WAITING_WRITERS_VALUE
)) == cmp )
{
m_releaseWriterSem.forced_release( 1 );
return;
}
else
continue;
}
if( (cmp & WAITING_READERS_MASK) != 0 )
{
uint64_t wakeups;
if( (niu = CAS( m_atomic, cmp, (cmp & ~WRITER_FLAG_MASK) -
(cmp & WAITING_READERS_MASK) + (wakeups = (cmp & WAITING_READERS_MASK)
>> WAITING_READERS_BASE) )) == cmp )
{
m_releaseReadersSem.forced_release( (unsigned)wakeups );
return;
}
else
continue;
}
if( (niu = CAS( m_atomic, cmp, 0 )) == cmp )
return;
}
}

Chris M. Thomasson

unread,
Oct 19, 2019, 2:14:18 AM10/19/19
to
On 10/18/2019 8:20 PM, Bonita Montero wrote:
> For some other code I needed a special shared mutex that gives
> writers  priority ver readers. With my shared mutex, when a writer
> wants to get access, all readers since then are allowed to continue
> and all new readers are deferred.
> So here's the code:
[...]

Before I take the time to read it, how close is this to your other
read/write primitive, you were partitioning the counter field, right?
Keep mine in mind as well for it does not use CAS at all.

Bonita Montero

unread,
Oct 19, 2019, 2:26:02 AM10/19/19
to
>> For some other code I needed a special shared mutex that gives
>> writers  priority ver readers. With my shared mutex, when a writer
>> wants to get access, all readers since then are allowed to continue
>> and all new readers are deferred.
>> So here's the code:
> [...]

> Before I take the time to read it, how close is this to your other
> read/write primitive, you were partitioning the counter field, right?

Very close.
There'S also an atomic value as well as a semaphore and nothin more.

> Keep mine in mind as well for it does not use CAS at all.

Show it.

Chris M. Thomasson

unread,
Oct 19, 2019, 2:28:30 AM10/19/19
to
You already saw it, and commented about it, remember?

Chris M. Thomasson

unread,
Oct 19, 2019, 2:31:06 AM10/19/19
to
Wait, are you referencing Joe Seighs semaphore, or my read write mutex
that does not use any CAS? I posted them here in this very group. Btw,
the semaphore does not use CAS as well... They are called loopless
algorithms. As in, no need to loop on a failed CAS... ;^)

Chris M. Thomasson

unread,
Oct 19, 2019, 2:35:25 AM10/19/19
to
My pure standard C++11 code can be found here:

https://pastebin.com/raw/1QtPCGhV

It should compile right up, and run on any system that has a C++11 compiler.
0 new messages