Just to make it clear, asymmetric countdown latch is a try to improve
on the following simple algorithm:
volatile int count; // = N
bool decrement()
{
return 0 == atomic_dec(&count);
}
The idea is as simple as:
volatile int total; // = N
volatile int local; // = 0
volatile int remote; // = 0
// executed by a single "local" thread
bool decrement_local()
{
local += 1;
if (local + remote == total)
return true;
}
// executed by all other threads
bool decrement_remote()
{
atomic_inc(&remote);
system_wide_synchronize();
if (local + remote == total)
return true;
}
This way decrement_local() becomes basically costless (of course,
decrement_remote() becomes much more heavier now).
[You may read on the idea of asymmetric synchronization here:
http://blogs.sun.com/dave/resource/Asymmetric-Dekker-Synchronization.txt]
Devil in the details, of course.
The problem is that with above dead-simple asymmetric algorithm both
local and remote thread may return "true". Another problem is that one
thread may return true and then delete the latch, while another thread
is still inside of decrement_xxx() method (thus SIGSEGV).
In the process of fixing above issues, the algorithm become much more
complicated. However, the good news is that if a latch is decremented
by a single thread (only decrement_local() is called), it's still
basically costless.
Here is a full-blown implementation:
class asymmetric_countdown_latch
{
public:
asymmetric_countdown_latch(unsigned count)
: count_(count)
, local_count_(0)
, local_commit_(0)
, local_fired_(0)
, remote_count_(0)
, remote_commit_(0)
, remote_fired_(0)
, fired_(0)
{
}
~asymmetric_countdown_latch()
{
int sum = local_count_.load(std::memory_order_relaxed)
+ remote_count_.load(std::memory_order_relaxed);
assert((sum == count_) || (sum == count_ - 1));
}
bool decrement_local()
{
int lc = local_count_.load(std::memory_order_acquire) + 1;
if (lc == count_)
return true;
int rc0 = remote_count_.load(std::memory_order_acquire);
if (lc + rc0 == count_)
{
while (rc0 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
local_count_.store(lc, std::memory_order_release);
int rc = remote_count_.load(std::memory_order_acquire);
bool fired = (lc + rc == count_);
local_fired_.store(fired, std::memory_order_release);
int lcom = local_commit_.load(std::memory_order_acquire) + 1;
local_commit_.store(lcom, std::memory_order_release);
if (fired == false)
return false;
while (rc != remote_commit_.load(std::memory_order_acquire))
SwitchToThread();
if (remote_fired_.load(std::memory_order_acquire))
return (fired_.fetch_add(1, std::memory_order_acq_rel) ==
1);
return true;
}
bool decrement_remote()
{
int lc0 = local_count_.load(std::memory_order_acquire);
int rc = remote_count_.load(std::memory_order_acquire) + 1;
if (rc == count_)
{
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
if (lc0 + rc == count_)
{
while (lc0 != local_commit_.load
(std::memory_order_acquire))
SwitchToThread();
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
rc = remote_count_.fetch_add(1, std::memory_order_acq_rel) +
1;
if (rc == count_)
{
while (rc - 1 != remote_commit_.load
(std::memory_order_acquire))
SwitchToThread();
return true;
}
FlushProcessWriteBuffers();
int lc = local_count_.load(std::memory_order_acquire);
bool fired = (lc + rc == count_);
if (fired)
remote_fired_.store(fired, std::memory_order_release);
remote_commit_.fetch_add(1, std::memory_order_acq_rel);
if (fired == false)
return false;
while (lc != local_commit_.load(std::memory_order_acquire))
SwitchToThread();
while (rc != remote_commit_.load(std::memory_order_acquire))
SwitchToThread();
if (local_fired_.load(std::memory_order_acquire))
return (fired_.fetch_add(1, std::memory_order_acq_rel) ==
1);
return true;
}
private:
int const count_;
std::atomic<int> local_count_;
std::atomic<int> local_commit_;
std::atomic<int> local_fired_;
std::atomic<int> remote_count_;
std::atomic<int> remote_commit_;
std::atomic<int> remote_fired_;
std::atomic<int> fired_;
asymmetric_countdown_latch(asymmetric_countdown_latch const&);
void operator = (asymmetric_countdown_latch const&);
};
Well, it's “a bit” involved. So here is a bit simplified pseudo-code
for both decrement_local() and decrement_remote():
my_count += 1;
[FlushProcessWriteBuffers() in case of remote operation]
bool fired = my_count + his_count == count;
if (fired)
my_fired = true;
my_commit += 1;
if (fired == false)
return false;
while (his_count != his_commit)
yield();
if (his_fired)
return 1 == fired_.fetch_add(1);
else
return true;
In the above pseudo-code I omit various fast-paths. For example, if in
decrement_local() we observe that (local_count_ == count_) then we
just instantly return 'true'.
--
Dmitriy V'jukov
--
Dmitriy V'jukov
And here is working implementation for Windows/x86 + a benchmark on
which I tried to estimate the performance:
http://pastebin.com/f120371c7
It seems that performance is highly platform dependent. Cost of
FlushProcessWriteBuffers() depends on number of processors and
distance between them, while cost of atomic RMW depends on processor
model.
However, when number of remote operations is << 1% then asymmetric
latch is indeed faster than plain-atomic-RMW-based latch.
On my dual-core laptop I observe some 5x speedup (atomic RMW are quite
cost nowadays on Intel hardware).
--
Dmitriy V'jukov
> Here is a full-blown implementation:
> bool decrement_local()
I suspect it's possible to substantially simplify the algorithm. That
is just the first *working* thing I was able to come out with.
--
Dmitriy V'jukov
I cannot seem to find `FlushProcessWriteBuffers()' in Relacy 2.2?
> I cannot seem to find `FlushProcessWriteBuffers()' in Relacy 2.2?
Hi Chris,
I've uploaded release 2.3 with FlushProcessWriteBuffers() emulation.
http://groups.google.com/group/relacy/files
--
Dmitriy V'jukov