Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

atomic xchg based stack, for learning...

74 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 29, 2018, 2:30:56 AM12/29/18
to
Here is the original link:

https://groups.google.com/d/topic/lock-free/wwsAmcuAefA/discussion

This experimental algorithm uses only XCHG at the cost of having a
consumer wait for the next pointer to be set in a node. However, it
allows for real work to be done before any waits are performed. So, the
"real" work should provide a "natural backoff" that might minimize the
waiting. The algorithm can be improved upon quite a bit. I have some
"interesting" ideas for it. Well, here is the code in the form of a
Relacy unit test:

Notice the "hand off" comments and 0xDEADBEEF, this can be made to work
much better...

Relacy Unit Test, C++
____________________________
// Simple Atomic Stack
// For Academic and Experimental things...
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Simple macro based redirection of the verbose std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB_SEQ_CST std::memory_order_seq_cst
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
#define THREADS 3
#define ITERS 2


// Experimental Stack
struct ct_ecstack
{
#define WAITNEXT ((node*)0xDEADBEEF)

struct node
{
std::atomic<node*> m_next;
VAR_T(unsigned int) m_data;

node(unsigned int data) : m_next(nullptr), m_data(data) {}

void process()
{
// simulate some work?
rl::yield(1 + rl::rand(2), $); // ;^)
}

node* next_wait()
{
node* next = nullptr;

while ((next = m_next.load(CT_MB_RLX)) == WAITNEXT)
{
// Humm, we can actually do other work right here...
// Hand off...
rl::yield(1, $);
}

return next;
}
};


std::atomic<node*> m_head;

ct_ecstack() : m_head(nullptr) {}


void push(node* n)
{
n->m_next.store(WAITNEXT, CT_MB_RLX);
node* head = m_head.exchange(n, CT_MB_REL); // release
n->m_next.store(head, CT_MB_RLX);
}


node* flush_try()
{
return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
}
};




// Relacy Stack Test...
struct ct_ecstack_test
: rl::test_suite<ct_ecstack_test, THREADS>
{
ct_ecstack g_ecstack;

void process()
{
ct_ecstack::node* n = g_ecstack.flush_try(); // flush all

while (n)
{
// Process n first, acts like a backoff for the next wait
// Hand off some other nodes? Future note...
n->process();

// Wait for the next pointer, or hand off?
ct_ecstack::node* next = n->next_wait();

// Destroy
delete n;

// Loop on...
n = next;
}
}

void thread(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
g_ecstack.push(new ct_ecstack::node(tidx));
g_ecstack.push(new ct_ecstack::node(tidx));
g_ecstack.push(new ct_ecstack::node(tidx));

process();

g_ecstack.push(new ct_ecstack::node(tidx));
}

process();
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_ecstack_test>(p);
}

return 0;
}
____________________________


It is very simple, but might be "interesting". ct_ecstack is what I am
focusing on. Producer's use ct_ecstack::push to add a node to the stack,
and Consumers use ct_ecstack::flush_try to remove all of the nodes in
one shot. All of the atomic operations use XCHG. There is a special
caveat that this can create... The consumers need to use
ct_ecstack::node::next_wait to gain next pointers wrt their iteration
process. A consumer can consume more than one node, so it needs to
iterate. One point, real work is performed before any wait is performed.

For instance, take a look at ct_ecstack_test::process:
____________________________
void process()
{
ct_ecstack::node* n = g_ecstack.flush_try(); // flush all

while (n)
{
// Process n first, acts like a backoff for the next wait
// Hand off some other nodes? Future note...
n->process();
//^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

// We process before we:


// Wait for the next pointer, or hand off?
ct_ecstack::node* next = n->next_wait();

// Wait. Might be beneficial...

// Destroy
delete n;

// Loop on...
n = next;
}
}
____________________________



Might be pretty cool wrt its simplicity, and no use of CAS! ;^)

Chris M. Thomasson

unread,
Jan 4, 2019, 9:37:17 PM1/4/19
to
On 12/28/2018 11:30 PM, Chris M. Thomasson wrote:
> Here is the original link:
>
> https://groups.google.com/d/topic/lock-free/wwsAmcuAefA/discussion
>
> This experimental algorithm uses only XCHG at the cost of having a
> consumer wait for the next pointer to be set in a node. However, it
> allows for real work to be done before any waits are performed. So, the
> "real" work should provide a "natural backoff" that might minimize the
> waiting. The algorithm can be improved upon quite a bit. I have some
> "interesting" ideas for it. Well, here is the code in the form of a
> Relacy unit test:
[...]

Fwiw, I finally found the time to code this experimental implementation
up in c++11. Here is the code:

https://pastebin.com/raw/j41cPT9S
(raw text, C code, no ads...)
____________________________________
// Simple XCHG based Atomic Stack
// By: Chris M. Thomasson


#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>
#include <functional>
#include <cassert>


// sorry about the macros...
#define THREADS 42
#define ITERS 100000


#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release


// HACK! Humm...
#define CT_WAIT ((ct_work*)(0xDEADBEEFU))



// Just to track all the dynamic allocations...
static std::atomic<unsigned long> g_allocs(0);
static std::mutex g_cout_mtx;


// A work node
struct ct_work
{
std::atomic<ct_work*> m_next;
std::thread::id m_data;
ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {}


void process()
{
// Simulate just a tiny little work?
g_cout_mtx.lock();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();

std::thread::id local = std::this_thread::get_id();

if (m_data == local)
{
// std::cout << "processing local = " << m_data <<
// " from " << std::this_thread::get_id() << "\n";
}

else
{
std::cout << "processing foreign = " << m_data <<
" from " << std::this_thread::get_id() << "\n";
}

std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
g_cout_mtx.unlock();
}


ct_work* get_next() const
{
ct_work* w = nullptr;

while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT)
{
// we can spin, or even do other work right here...
std::this_thread::yield();
}

return w;
}
};



// Easy Stack, only uses XCHG
struct ct_estack
{
std::atomic<ct_work*> m_head;
ct_estack() : m_head(nullptr) {}


void push(ct_work* n)
{
n->m_next.store(CT_WAIT, CT_MB_RLX);
ct_work* head = m_head.exchange(n, CT_MB_REL); // release
n->m_next.store(head, CT_MB_RLX);
}


ct_work* flush_try()
{
return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
}
};



// Consume an Easy Stack...
void ct_consume(
ct_estack& estack
) {
ct_work* w = estack.flush_try();

while (w)
{
// Process FIRST!
w->process();

// Now, we can gain the next pointer.
ct_work* next = w->get_next();

// Okay, we can delete the work
delete w;
g_allocs.fetch_sub(1, CT_MB_RLX); // dec

w = next;
}
}



// Our shared state
struct ct_shared
{
ct_estack m_estack;
};



// Produce some work...
void ct_produce(
ct_estack& estack
) {
ct_work* w = new ct_work(std::this_thread::get_id());
g_allocs.fetch_add(1, CT_MB_RLX); // inc
estack.push(w);
}


// Do some work...
void ct_worker(ct_shared& shared)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
ct_produce(shared.m_estack);
ct_produce(shared.m_estack);
ct_produce(shared.m_estack);

std::this_thread::yield(); // little spice...

ct_consume(shared.m_estack);
}

ct_consume(shared.m_estack);
}



int main(void)
{
{
ct_shared shared;
std::thread threads[THREADS];

for (unsigned long i = 0; i < THREADS; ++i)
{
threads[i] = std::thread(ct_worker, std::ref(shared));
}

for (unsigned long i = 0; i < THREADS; ++i)
{
threads[i].join();
}
}

if (g_allocs.load(CT_MB_RLX) != 0)
{
std::cout << "\n\nLEAKED!!!!\n";
}

std::cout << "\n\nFIN!\n";

return 0;
}
____________________________________

Can anybody get this sucker to compile and run to completion? If so, can
you show me some of the output? I am getting some data like in GCC:
____________________________________
processing foreign = 33 from 32
processing foreign = 33 from 32
processing foreign = 33 from 32
processing foreign = 41 from 26
processing foreign = 41 from 26
[...]
____________________________________


And some like the following in MSVC, the thread ids are larger there:
____________________________________
processing foreign = 15644 from 9844
processing foreign = 15644 from 9844
processing foreign = 18092 from 19964
processing foreign = 18092 from 19964
____________________________________


Any thoughts?

Chris M. Thomasson

unread,
Jan 4, 2019, 9:40:56 PM1/4/19
to
On 1/4/2019 6:37 PM, Chris M. Thomasson wrote:
> Fwiw, I finally found the time to code this experimental implementation
> up in c++11. Here is the code:
>
> https://pastebin.com/raw/j41cPT9S
> (raw text, C code, no ads...)
[...]

Fwiw, it boils down to the following code:
________________________________
// A work node
struct ct_work
{
std::atomic<ct_work*> m_next;
std::thread::id m_data;
ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {}


void process()
{
// [...] // real work...
}


ct_work* get_next() const
{
ct_work* w = nullptr;

while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT)
{
// we can spin, or even do other work right here...
std::this_thread::yield();
}

return w;
}
};


// Easy Stack, only uses XCHG
struct ct_estack
{
std::atomic<ct_work*> m_head;
ct_estack() : m_head(nullptr) {}


void push(ct_work* n)
{
n->m_next.store(CT_WAIT, CT_MB_RLX);
ct_work* head = m_head.exchange(n, CT_MB_REL); // release
n->m_next.store(head, CT_MB_RLX);
}


ct_work* flush_try()
{
return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
}
};
________________________________


Fairly simple?


> Any thoughts?

Chris M. Thomasson

unread,
Jan 6, 2019, 7:58:26 PM1/6/19
to
On 1/4/2019 6:37 PM, Chris M. Thomasson wrote:
> On 12/28/2018 11:30 PM, Chris M. Thomasson wrote:
>> Here is the original link:
>>
>> https://groups.google.com/d/topic/lock-free/wwsAmcuAefA/discussion
>>
>> This experimental algorithm uses only XCHG at the cost of having a
>> consumer wait for the next pointer to be set in a node. However, it
>> allows for real work to be done before any waits are performed.

[...]
> Can anybody get this sucker to compile and run to completion? If so, can
> you show me some of the output? I am getting some data like in GCC:
> ____________________________________
> processing foreign = 33 from 32
> processing foreign = 33 from 32
> processing foreign = 33 from 32
> processing foreign = 41 from 26
> processing foreign = 41 from 26
> [...]
> ____________________________________
>
>
> And some like the following in MSVC, the thread ids are larger there:
> ____________________________________
> processing foreign = 15644 from 9844
> processing foreign = 15644 from 9844
> processing foreign = 18092 from 19964
> processing foreign = 18092 from 19964
> ____________________________________
>
>
> Any thoughts?

Just wondering if somebody can get it to run without producing any
"processing foreign =" like output? If so, that means that every thread
is working on its own work. When we see "processing foreign" it means
that a thread is working on a work item that it did not create. The more
local work we can do is critical wrt cache locality.

Melzzzzz

unread,
Jan 6, 2019, 8:10:43 PM1/6/19
to
On my machine it does not scale even on two cores, and I have eight.
Lot of foreign ...


--
press any key to continue or any other to quit...

Chris M. Thomasson

unread,
Jan 6, 2019, 8:34:42 PM1/6/19
to
Thank you for giving it a go Melzzzzz. Okay, now the work all contends
for a global mutex g_cout_mtx for printing the output to the screen via
cout. Removing this mutex seems to reduce processing foreign events.

If you find some more free time, can you try to compare the next version
to this base version? Or just try to omit the mutex in ct_work::process:

Something like:
_________________
void process()
{
/*
// Simulate just a tiny little work?
g_cout_mtx.lock();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();

std::thread::id local = std::this_thread::get_id();

if (m_data == local)
{
// std::cout << "processing local = " << m_data <<
// " from " << std::this_thread::get_id() << "\n";
}

else
{
std::cout << "processing foreign = " << m_data <<
" from " << std::this_thread::get_id() << "\n";
}

std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
g_cout_mtx.unlock();
*/

std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
}
_________________

On my end, it virtually eliminates foreign processing...

Melzzzzz

unread,
Jan 6, 2019, 8:53:24 PM1/6/19
to
Without mutex it scales across 14-15 logical cores, no foreign at all.

>
> Something like:
> _________________
> void process()
> {
> /*

Chris M. Thomasson

unread,
Jan 6, 2019, 9:21:20 PM1/6/19
to
Perfect! It does share when it has to, however, it can fly when needed...

Chris M. Thomasson

unread,
Jan 6, 2019, 9:24:06 PM1/6/19
to
There are several improvements that can be made. Humm, putting on my
thinking cap:

https://youtu.be/W6DtyhzABF0?t=19



Chris M. Thomasson

unread,
Jan 6, 2019, 11:51:50 PM1/6/19
to
On 12/28/2018 11:30 PM, Chris M. Thomasson wrote:
> Here is the original link:
>
> https://groups.google.com/d/topic/lock-free/wwsAmcuAefA/discussion
>
> This experimental algorithm uses only XCHG at the cost of having a
> consumer wait for the next pointer to be set in a node. However, it
> allows for real work to be done before any waits are performed. So, the
> "real" work should provide a "natural backoff" that might minimize the
> waiting. The algorithm can be improved upon quite a bit. I have some
> "interesting" ideas for it. Well, here is the code in the form of a
> Relacy unit test:
[...]
> Might be pretty cool wrt its simplicity, and no use of CAS! ;^)

Let me first start by thanking Melzzzzz again. :^)

Fwiw, the first thing I think about wrt making things "better" is trying
to get rid of the atomic exchange pounding away at that "single"
location in ct_estack::m_head. Notice that ct_shared only has a single
ct_estack? Humm... What if there were a table of ct_estack's TS such
that a thread T can map its id, via hash?, to an index into said table
TS? This would greatly reduce pressure wrt producing and consuming
ct_work structs. The consumers via the ct_consume procedure would try
its own index first, before looking for others. This would distribute
things right of the bat. Ahhh, the multex comes mind:

https://groups.google.com/d/topic/comp.lang.c++/sV4WC_cBb9Q/discussion

Instead of locks, this can be using ct_estack's. It has to make it more
NUMA friendly. affinity masks might be next... Hummm. So many ideas!

Clever hand offs are in mind...

;^)

Melzzzzz

unread,
Jan 7, 2019, 12:04:44 AM1/7/19
to
On 2019-01-07, Chris M. Thomasson <invalid_chr...@invalid.invalid> wrote:
>
> Clever hand offs are in mind...
>
> ;^)

;)

Chris M. Thomasson

unread,
Jan 7, 2019, 4:24:33 PM1/7/19
to
On 1/6/2019 9:04 PM, Melzzzzz wrote:
> On 2019-01-07, Chris M. Thomasson <invalid_chr...@invalid.invalid> wrote:
>>
>> Clever hand offs are in mind...
>>
>> ;^)
>
> ;)

I am wondering, distribution aside, if a thread can get "too" much work
wrt flushing all of the ct_work structs from a ct_estack. Distributing
the ct_estack's over a table would reduce, or amortize this. But, then
it becomes a question of balance. When can the processing of foreign
events become beneficial in nature? If a particular ct_work item takes a
little while to process, then a hand off to another thread than can
process subsequent work should be a win, in a sense. However, this
induces the processing of foreign events.

Say a thread consumed 5 work items w[0...4], and processed w[0], while 4
items w[1...4] are just sitting there. Well, if w[0] is expected to do
some heavy work, we can hand off w[0] to another thread, or even pass
w[1...4] as a single unit to an ct_estack for other threads to grab.

The CT_WAIT is interesting to me as well. 0xDEADBEEF. Well, this can be
a pointer to a thread local struct instead of 0xDEADBEEF. It would open
up some interesting possibilities wrt hand off opportunities.

Just brainstorming here. Will have some code that demonstrates some of
this up sometime today or tomorrow.

Chris M. Thomasson

unread,
Jan 8, 2019, 3:20:23 AM1/8/19
to
On 12/28/2018 11:30 PM, Chris M. Thomasson wrote:
[...]

Fwiw, I coded up a distributed table version in a Relacy test unit:
____________________
// Simple Atomic Stack
// Distributed with a Table
// For Academic and Experimental things...
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Simple macro based redirection of the verbose std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB_SEQ_CST std::memory_order_seq_cst
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
#define PRODUCERS 4
#define CONSUMERS 2
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 5
#define TABLE_SZ 2


// Experimental Stack
struct ct_ecstack
{
#define WAITNEXT ((node*)0xDEADBEEF)

struct node
{
std::atomic<node*> m_next;
VAR_T(unsigned int) m_data;

node(unsigned int data) : m_next(nullptr), m_data(data) {}

void process()
{
// simulate some work?
rl::yield(1 + rl::rand(2), $); // ;^)

unsigned long tid = rl::thread_index();
unsigned long wid = VAR(m_data);
RL_ASSERT(tid != wid);
}

node* next_wait()
{
node* next = nullptr;

while ((next = m_next.load(CT_MB_RLX)) == WAITNEXT)
{
// Humm, we can actually do other work right here...
// Hand off...
rl::yield(1, $);
}

return next;
}
};


std::atomic<node*> m_head;

ct_ecstack() : m_head(nullptr) {}


void push(node* n)
{
n->m_next.store(WAITNEXT, CT_MB_RLX);
node* head = m_head.exchange(n, CT_MB_REL); // release
n->m_next.store(head, CT_MB_RLX);
}


node* flush_try()
{
return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
}
};


// Easy Stack Table
struct ct_estable
{
ct_ecstack m_buffer[TABLE_SZ];

ct_ecstack& hash(unsigned long tid)
{
return m_buffer[tid % TABLE_SZ];
}
};




#define CT_STOP (PRODUCERS * ITERS)


// Relacy Stack Test...
struct ct_ecstack_test
: rl::test_suite<ct_ecstack_test, THREADS>
{
ct_estable g_estable;
unsigned long g_contrived_stop;

void before()
{
g_contrived_stop = 0;
}

void after()
{
RL_ASSERT(g_contrived_stop == CT_STOP);
}

unsigned long process_stack(ct_ecstack& stack)
{
unsigned long count = 0;

ct_ecstack::node* n = stack.flush_try(); // flush all

while (n)
{
// Process n first, acts like a backoff for the next wait
// Hand off some other nodes? Future note...
n->process();

// Wait for the next pointer, or hand off?
ct_ecstack::node* next = n->next_wait();

if (next)
{
int g = 0;
}

// Destroy
delete n;

// Loop on...
n = next;
++count;
}

return count;
}

unsigned long process()
{
unsigned long tid = rl::thread_index();
unsigned long count = 0;

for (unsigned long i = 0; i < TABLE_SZ; ++i)
{
count = process_stack(g_estable.m_buffer[(tid + i) %
TABLE_SZ]);

if (count > 0)
{
break;
}

int g = 0;
}

return count;
}


void consumer(unsigned int tidx)
{
while (g_contrived_stop != CT_STOP)
{
unsigned long count = process();

if (! count) rl::yield(1, $);

g_contrived_stop += count;
}
}


void producer(unsigned int tidx)
{
ct_ecstack& stack = g_estable.m_buffer[tidx % TABLE_SZ];

for (unsigned long i = 0; i < ITERS; ++i)
{
stack.push(new ct_ecstack::node(tidx));
}

// g_contrived_stop += process();
}


void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
producer(tidx);
}

else
{
consumer(tidx);
}
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_ecstack_test>(p);
}

return 0;
}
____________________


This is a different style of setup wrt the original code where each
thread can take on the role of a producer or a consumer. In this unit
test I am modeling a _strict_ producer to consumer relationship, where a
producer never consumes, and vice versa. It forces 100% foreign
processing because a producer can never consume its own work. I know
this is bad, but I just wanted to code one up for fun. Works like a
charm. Should have some C++11 code soon.

Chris M. Thomasson

unread,
Jan 15, 2019, 1:20:22 AM1/15/19
to
On 1/4/2019 6:37 PM, Chris M. Thomasson wrote:
> // Consume an Easy Stack...
> void ct_consume(
>     ct_estack& estack
> ) {
>     ct_work* w = estack.flush_try();
>
>     while (w)
>     {
>         // Process FIRST!
>         w->process();
>
>         // Now, we can gain the next pointer.
>         ct_work* next = w->get_next();
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

just an idea... What about combining the user processing with the
loading of the next pointer? So, processing can convert the code above
into something like:

ct_work* next = w->process();

It seems rather horrible to intrude into user processing. Damn.
0 new messages