Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

possible use cases for a exchange based queue...

27 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 7, 2018, 6:57:32 PM12/7/18
to
One can use a simple exchange to get a sort of work queue thing up and
running, however the use case can be interesting and simply might not be
compatible with with certain algorithms. There really is no distinction
between producer and consumer threads in this scheme. The act of
producing work, can actually consume work at the same time!

So, it is kind of sometime like:
________________________
atomic<work*> g_work; // let's say it already has a work item...


// push some new work
work* new_work = new work();

work* old_work = g_work.exchange(new_work, memory_order_acq_rel);

// need to see if we have any old work to process...
if (old_work)
{
old_work->process();
delete old_work;
}
________________________


Sometimes, this type of pattern does not fit in with certain use cases.
For instance, if a program expects a producer thread to just push items
into a queue, and that's it. In this type of case, a producer is NOT
expected to actually _work_ on the damn things. Well, this does not jive
here because a producer can suddenly morph into a consumer.

Fwiw, I should have some running example code up tonight or sometime
tomorrow.

Chris M. Thomasson

unread,
Dec 7, 2018, 10:41:29 PM12/7/18
to
On 12/7/2018 3:57 PM, Chris M. Thomasson wrote:
> One can use a simple exchange to get a sort of work queue thing up and
> running, however the use case can be interesting and simply might not be
> compatible with with certain algorithms. There really is no distinction
> between producer and consumer threads in this scheme. The act of
> producing work, can actually consume work at the same time!
>
> So, it is kind of sometime like:
[...]

Fwiw, here is a hyper simple Relacy unit test:

// I love this, been using it for years...
https://github.com/dvyukov/relacy

___________________________
// A stupid simple swap queue... ;^)


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <vector>
#include <algorithm>
#include <iostream>
#include <cstdio>
#include <cstdlib>


#define ct_mb_relaxed std::memory_order_relaxed
#define ct_mb_acquire std::memory_order_acquire
#define ct_mb_release std::memory_order_release
#define ct_mb_acq_rel std::memory_order_acq_rel


#define THREADS 5
#define ITERS 3


struct ct_swapq
{
struct work
{
VAR_T(unsigned int) m_data;

work(unsigned int data) : m_data(data) {}

void process()
{
unsigned int x = VAR(m_data);
RL_ASSERT(x >= 0);
//std::cout << VAR(m_data) << "\n";
}
};

std::atomic<work*> m_head;

ct_swapq() : m_head(nullptr) { }


work* swap(work* n)
{
return m_head.exchange(n, ct_mb_acq_rel);
}
};



// Test
struct ct_swapq_test
: rl::test_suite<ct_swapq_test, THREADS>
{
ct_swapq g_mpmc;


void before()
{

}


void after()
{

}


void thread(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
ct_swapq::work* wnew = new ct_swapq::work(tidx * tidx + i);
ct_swapq::work* wold = g_mpmc.swap(wnew);

if (wold)
{
wold->process();
delete wold;
}
}

ct_swapq::work* wold = g_mpmc.swap(nullptr);

if (wold)
{
wold->process();
delete wold;
}
}
};


int main()
{
{
rl::test_params p;

p.iteration_count = 5000000;
//p.execution_depth_limit = 33333;
// p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_swapq_test>(p);
}

return 0;
}
___________________________


Next code will be a simple C++11 example.

Chris M. Thomasson

unread,
Dec 7, 2018, 11:23:00 PM12/7/18
to
On 12/7/2018 7:41 PM, Chris M. Thomasson wrote:
> On 12/7/2018 3:57 PM, Chris M. Thomasson wrote:
>> One can use a simple exchange to get a sort of work queue thing up and
>> running, however the use case can be interesting and simply might not
>> be compatible with with certain algorithms. There really is no
>> distinction between producer and consumer threads in this scheme. The
>> act of producing work, can actually consume work at the same time!
>>
>> So, it is kind of sometime like:
> [...]
>
> Fwiw, here is a hyper simple Relacy unit test:
>
> // I love this, been using it for years...
> https://github.com/dvyukov/relacy
>
> ___________________________
> // A stupid simple swap queue... ;^)
[...]
> #define ct_mb_acq_rel std::memory_order_acq_rel
[...]
> struct ct_swapq
> {
>     struct work
>     {
>         VAR_T(unsigned int) m_data;
>
>         work(unsigned int data) : m_data(data) {}
>
>         void process()
>         {
>             unsigned int x = VAR(m_data);
>             RL_ASSERT(x >= 0);
>             //std::cout << VAR(m_data) << "\n";
>         }
>     };
>
>     std::atomic<work*> m_head;
>
>     ct_swapq() : m_head(nullptr) { }
>
>
>     work* swap(work* n)
>     {
>         return m_head.exchange(n, ct_mb_acq_rel);
>     }
> };
[...]

server based work flow loop, radical pseudo-code:

worker_thread:
_____________________
ct_swapq g_q;

for (;;) // shutdown process omitted for brevity...
{
work* w = g_q.swap(nullptr);
if (w) { w->process(); delete w; }

w = IO.get_request();

work* nw = g_q.swap(w);
if (nw) { nw->process(); delete nw; }
}
_____________________

It should work fine.

Chris M. Thomasson

unread,
Dec 10, 2018, 5:09:50 PM12/10/18
to
On 12/7/2018 7:41 PM, Chris M. Thomasson wrote:
> On 12/7/2018 3:57 PM, Chris M. Thomasson wrote:
>> One can use a simple exchange to get a sort of work queue thing up and
>> running, however the use case can be interesting and simply might not
>> be compatible with with certain algorithms. There really is no
>> distinction between producer and consumer threads in this scheme. The
>> act of producing work, can actually consume work at the same time!
>>
>> So, it is kind of sometime like:
> [...]
>
> Fwiw, here is a hyper simple Relacy unit test:
>
> // I love this, been using it for years...
> https://github.com/dvyukov/relacy
[...]

Btw, has anybody here even tried Relacy? It is header only... :^)

Chris M. Thomasson

unread,
Dec 12, 2018, 10:56:10 PM12/12/18
to
On 12/7/2018 3:57 PM, Chris M. Thomasson wrote:
Before I post my code, this queue keeps an upper bound on in flight
messages. There can be very interesting inter-thread exchanges.
Actually, this thing wants more threads!
0 new messages