Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

xchg based work "queue" thing, just for fun...

37 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 26, 2023, 12:56:44 AM12/26/23
to
Fwiw, here is my C++ code using Relacy Race Detector, I will have some
more time tomorrow to port it to 100% pure C++11 that anybody can try
out. Fwiw, Relacy can be found here:

A work "queue" thing that only uses atomic xchg. Pretty interesting to
me, and I actually have a real use for it. :^D

Works good on x86 wrt LOCK XCHG! :^)


Here is my current crude code:
_____________________________
#include <iostream>
#include <cstddef>
#include <relacy/relacy.hpp>


#define CT_THREAD_N 5
#define CT_WORK_N 3
#define CT_PRODUCER_WORK_N 5
#define CT_CONSUMER_WORK_N 2
#define CT_DISTRIBUTE_WORK_N CT_THREAD_N


// Notes:
// Using spin backoff for waits as of now
// Need to slow-path it with with kernel waits
// Need to refine the per-node backoff


// humm...
#define CT_PENDING ((ct_xchg_lifo_ver_001::work*)(0xDEADBEEF))

static unsigned long g_stats_local_work = 0;
static unsigned long g_stats_foreign_work = 0;


struct ct_xchg_lifo_ver_001
{

// simulate some work...
struct work
{
rl::atomic<work*> m_next;
VAR_T(unsigned long) m_work;
VAR_T(unsigned long) m_tidx;


work(unsigned long tidx)
: m_next(CT_PENDING),
m_work(0),
m_tidx(tidx)
{

}


~work()
{
RL_ASSERT(VAR(m_work) == 1);
}


// no data races on m_work!
void execute_work(unsigned long tidx)
{
VAR(m_work) += 1;

unsigned long local_tidx = VAR(m_tidx);

if (local_tidx != tidx)
{
g_stats_foreign_work += 1;
/*
std::cout << "foriegn work detected "
<< local_tidx
<< " -> "
<< tidx
<< std::endl;
*/
}

else
{
g_stats_local_work += 1;

/*
std::cout << "local work detected "
<< local_tidx
<< " -> "
<< tidx
<< std::endl;
*/
}
}


// this is the pending node logic...
work* get_next()
{
work* next = m_next.load(rl::mo_relaxed, $);

while (next == CT_PENDING)
{
rl::backoff();
next = m_next.load(rl::mo_relaxed, $);
}

return next;
}



static void process(work* cur, unsigned long tidx)
{
// must not be pending!
RL_ASSERT(cur != CT_PENDING);

// process work nodes...

while (cur)
{
// do real work _before_ the pending logic
// this is highly beneficial... Big time!
cur->execute_work(tidx);

// get the next work node.
cur = cur->get_next();
}
}


// dump worked on nodes...
static void destroy(work* cur, unsigned long tidx)
{
// no pending work shall be destroyed!
RL_ASSERT(cur != CT_PENDING);

while (cur)
{
work* next = cur->m_next.load(rl::mo_relaxed, $);

// no pending work shall be destroyed!
RL_ASSERT(next != CT_PENDING);

delete cur;
cur = next;
}
}
};



rl::atomic<work*> m_head;

ct_xchg_lifo_ver_001()
: m_head(nullptr)
{

}

~ct_xchg_lifo_ver_001()
{
RL_ASSERT(!m_head.load(rl::mo_relaxed, $));
}



void push(work* w)
{
// this completes the pending logic...
w->m_next.store(CT_PENDING, rl::mo_relaxed, $);
work* head = m_head.exchange(w, rl::mo_release, $);
w->m_next.store(head, rl::mo_relaxed, $);
}


work* flush()
{
return m_head.exchange(nullptr, rl::mo_acquire, $);
}
};



// A test of a simple way to distribute work
// Using the ct_xchg_lifo_ver_001 API
template<typename T, std::size_t T_N>
struct ct_distribute_ver_001
{
T m_table[T_N];
typedef typename T::work work;


void push(work* w, unsigned int tidx)
{
T& t = m_table[tidx % T_N];
t.push(w);
}


work* pop(unsigned int tidx)
{
for (std::size_t i = 0; i < T_N; ++i)
{
T& t = m_table[(i + tidx) % T_N];
work* w = t.flush();
if (w) return w;
}

return nullptr;
}
};



// Relacy Test Unit...
struct ct_relacy_test_fun
: rl::test_suite<ct_relacy_test_fun, CT_THREAD_N>
{
typedef ct_distribute_ver_001<
ct_xchg_lifo_ver_001,
CT_DISTRIBUTE_WORK_N
> distribute;


distribute m_work;


void thread(unsigned int tidx)
{
for (unsigned long wi = 0; wi < CT_WORK_N; ++wi)
{
for (unsigned long i = 0; i < CT_PRODUCER_WORK_N; ++i)
{
distribute::work* w = new distribute::work(tidx);
m_work.push(w, tidx);
}

for (unsigned long i = 0; i < CT_CONSUMER_WORK_N; ++i)
{
distribute::work* w = m_work.pop(tidx);
distribute::work::process(w, tidx);
distribute::work::destroy(w, tidx);
}
}
}
};


int
main()
{
std::cout << "Exchange LIFO Container Experiment ver:0.0.2\n";
std::cout << "Relacy Unit Test ver:0.0.2\n";
std::cout << "by: Chris M. Thomasson\n";
std::cout << "__________________________________\n" << std::endl;

{
rl::test_params p;

p.iteration_count = 1000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

std::cout << "Executing Relacy Unit Test...\n";
std::cout << "__________________________________" << std::endl;

rl::simulate<ct_relacy_test_fun>(p);


std::cout << "\nwork load "
<< g_stats_local_work
<< ", "
<< g_stats_foreign_work
<< std::endl;
}

return 0;
}
_____________________________

Chris M. Thomasson

unread,
Dec 26, 2023, 12:58:40 AM12/26/23
to
On 12/25/2023 9:56 PM, Chris M. Thomasson wrote:
> Fwiw, here is my C++ code using Relacy Race Detector, I will have some
> more time tomorrow to port it to 100% pure C++11 that anybody can try
> out. Fwiw, Relacy can be found here:
[...

Damn it! I forgot the link:

https://github.com/dvyukov/relacy

Sorry! ;^o

0 new messages