Asymmetric Countdown Latch

47 views
Skip to first unread message

Dmitriy Vyukov

unread,
Jan 27, 2010, 12:27:32 PM1/27/10
to Scalable Synchronization Algorithms
Consider situation when we know that countdown latch will be most
likely decremented by a single thread. For example, consider fork/join
tasking library based on a Cilk-style scheduler; after completion
child tasks have to notify parent task, when the last child task
completed parent task must be re-scheduled as a continuation. It's
reasonably to assume that all the children tasks will be executed by a
single worker thread, though. So what we want is to NOT do atomic RMW
operation to countdown pending children tasks.

Just to make it clear, asymmetric countdown latch is a try to improve
on the following simple algorithm:
volatile int count; // = N
bool decrement()
{
return 0 == atomic_dec(&count);
}

The idea is as simple as:

volatile int total; // = N
volatile int local; // = 0
volatile int remote; // = 0

// executed by a single "local" thread
bool decrement_local()
{
local += 1;
if (local + remote == total)
return true;
}

// executed by all other threads
bool decrement_remote()
{
atomic_inc(&remote);
system_wide_synchronize();
if (local + remote == total)
return true;
}

This way decrement_local() becomes basically costless (of course,
decrement_remote() becomes much more heavier now).
[You may read on the idea of asymmetric synchronization here:
http://blogs.sun.com/dave/resource/Asymmetric-Dekker-Synchronization.txt]

Devil in the details, of course.
The problem is that with above dead-simple asymmetric algorithm both
local and remote thread may return "true". Another problem is that one
thread may return true and then delete the latch, while another thread
is still inside of decrement_xxx() method (thus SIGSEGV).

In the process of fixing above issues, the algorithm become much more
complicated. However, the good news is that if a latch is decremented
by a single thread (only decrement_local() is called), it's still
basically costless.
Here is a full-blown implementation:

class asymmetric_countdown_latch
{
public:
asymmetric_countdown_latch(unsigned count)
: count_(count)
, local_count_(0)
, local_commit_(0)
, local_fired_(0)
, remote_count_(0)
, remote_commit_(0)
, remote_fired_(0)
, fired_(0)
{
}

~asymmetric_countdown_latch()
{
int sum = local_count_.load(std::memory_order_relaxed)
+ remote_count_.load(std::memory_order_relaxed);
assert((sum == count_) || (sum == count_ - 1));
}

bool decrement_local()
{
int lc = local_count_.load(std::memory_order_acquire) + 1;
if (lc == count_)
return true;
int rc0 = remote_count_.load(std::memory_order_acquire);
if (lc + rc0 == count_)
{
while (rc0 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
local_count_.store(lc, std::memory_order_release);
int rc = remote_count_.load(std::memory_order_acquire);
bool fired = (lc + rc == count_);
local_fired_.store(fired, std::memory_order_release);
int lcom = local_commit_.load(std::memory_order_acquire) + 1;
local_commit_.store(lcom, std::memory_order_release);
if (fired == false)
return false;
while (rc != remote_commit_.load(std::memory_order_acquire))
SwitchToThread();
if (remote_fired_.load(std::memory_order_acquire))
return (fired_.fetch_add(1, std::memory_order_acq_rel) ==
1);
return true;
}

bool decrement_remote()
{
int lc0 = local_count_.load(std::memory_order_acquire);
int rc = remote_count_.load(std::memory_order_acquire) + 1;
if (rc == count_)
{
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
if (lc0 + rc == count_)
{
while (lc0 != local_commit_.load
(std::memory_order_acquire))
SwitchToThread();
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
rc = remote_count_.fetch_add(1, std::memory_order_acq_rel) +
1;
if (rc == count_)
{
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
FlushProcessWriteBuffers();
int lc = local_count_.load(std::memory_order_acquire);
bool fired = (lc + rc == count_);
if (fired)
remote_fired_.store(fired, std::memory_order_release);
remote_commit_.fetch_add(1, std::memory_order_acq_rel);
if (fired == false)
return false;
while (lc != local_commit_.load(std::memory_order_acquire))
SwitchToThread();
while (rc != remote_commit_.load(std::memory_order_acquire))
SwitchToThread();
if (local_fired_.load(std::memory_order_acquire))
return (fired_.fetch_add(1, std::memory_order_acq_rel) ==
1);
return true;
}

private:
int const count_;
std::atomic<int> local_count_;
std::atomic<int> local_commit_;
std::atomic<int> local_fired_;
std::atomic<int> remote_count_;
std::atomic<int> remote_commit_;
std::atomic<int> remote_fired_;
std::atomic<int> fired_;

asymmetric_countdown_latch(asymmetric_countdown_latch const&);
void operator = (asymmetric_countdown_latch const&);
};

Well, it's “a bit” involved. So here is a bit simplified pseudo-code
for both decrement_local() and decrement_remote():

my_count += 1;
[FlushProcessWriteBuffers() in case of remote operation]
bool fired = my_count + his_count == count;
if (fired)
my_fired = true;
my_commit += 1;
if (fired == false)
return false;
while (his_count != his_commit)
yield();
if (his_fired)
return 1 == fired_.fetch_add(1);
else
return true;

In the above pseudo-code I omit various fast-paths. For example, if in
decrement_local() we observe that (local_count_ == count_) then we
just instantly return 'true'.

--
Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jan 27, 2010, 12:32:21 PM1/27/10
to Scalable Synchronization Algorithms
In this particular algorithm "remote" threads need to synchronize with
only single "local" thread, so full semantics of
FlushProcessWriteBuffers() (IPI to every processor) are not required.
One can use lighter-weight mechanisms, for example remote thread may
send a signal to local thread, or remote thread may migrate to the
processor where local thread is running.
However, do not try to use SuspendThread/ResumeThread, I've just tried
to use them and the test deadlocks :)


--
Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jan 27, 2010, 12:41:28 PM1/27/10
to Scalable Synchronization Algorithms
For your convenience, here is source code and a unit-test for Relacy
Race Detector:
http://pastebin.com/f171104b5

And here is working implementation for Windows/x86 + a benchmark on
which I tried to estimate the performance:
http://pastebin.com/f120371c7

It seems that performance is highly platform dependent. Cost of
FlushProcessWriteBuffers() depends on number of processors and
distance between them, while cost of atomic RMW depends on processor
model.
However, when number of remote operations is << 1% then asymmetric
latch is indeed faster than plain-atomic-RMW-based latch.
On my dual-core laptop I observe some 5x speedup (atomic RMW are quite
cost nowadays on Intel hardware).


--
Dmitriy V'jukov

Dmitriy Vyukov

unread,
Jan 27, 2010, 12:46:16 PM1/27/10
to Scalable Synchronization Algorithms
On Jan 27, 8:27 pm, Dmitriy Vyukov <dvyu...@gmail.com> wrote:

> Here is a full-blown implementation:

>     bool decrement_local()


I suspect it's possible to substantially simplify the algorithm. That
is just the first *working* thing I was able to come out with.

--
Dmitriy V'jukov

Chris M. Thomasson

unread,
Jan 27, 2010, 11:23:51 PM1/27/10
to Scalable Synchronization Algorithms

I cannot seem to find `FlushProcessWriteBuffers()' in Relacy 2.2?

Dmitriy Vyukov

unread,
Jan 31, 2010, 1:07:00 PM1/31/10
to Scalable Synchronization Algorithms, rel...@googlegroups.com
On Jan 28, 7:23 am, "Chris M. Thomasson" <cris...@charter.net> wrote:

> I cannot seem to find `FlushProcessWriteBuffers()' in Relacy 2.2?

Hi Chris,

I've uploaded release 2.3 with FlushProcessWriteBuffers() emulation.
http://groups.google.com/group/relacy/files

--
Dmitriy V'jukov

Reply all
Reply to author
Forward
0 new messages