Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Distributed Multi-Producer/Consumer Bakery SpinLock Queue...

28 views
Skip to first unread message

chris.m.t...@gmail.com

unread,
Mar 5, 2017, 6:51:31 PM3/5/17
to
This is through my Google account, because my eternal September server is acting odd right now. There might be some other duplicate posts coming in. Anyway;

Here is some experimental code that implements a multi-producer/consumer spinlock queue. Keep in mind that this can be converted into an entity that supports conditional waits through eventcounts.

Here is the c++11 code, does it work for you? If not, please explain every problem you encountered. Give it some time to complete, it should show progress in the console:

raw code, no ads:

http://pastebin.com/raw/hXQfrCSN

__________________________________________
#include <cstdio>
#include <memory>
#include <atomic>
#include <thread>
#include <functional>


#define N 10000000

#define mb_relaxed std::memory_order_relaxed
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release

#define mb_fence(mb) std::atomic_thread_fence(mb)

/*
struct cell { uint32_t ver; double state; };

uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2

void init() {
for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}

void producer(double state) {
uint32_t ver = XADD(&head, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver) backoff();
c.state = state;
STORE(&c.ver, ver + 1);
}

double consumer() {
uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver + 1) backoff();
double state = c.state;
STORE(&c.ver, ver + N);
return state;
}
*/


template<typename T, unsigned long T_depth>
struct single_producer_consumer_queue
{
struct cell
{
std::atomic<unsigned long> m_ver;
T m_data;
};

std::atomic<unsigned long> m_head;
std::atomic<unsigned long> m_tail;
cell m_cells[T_depth];


single_producer_consumer_queue()
: m_head(0),
m_tail(0)
{
for (unsigned long i = 0; i < T_depth; ++i)
{
m_cells[i].m_ver.store(i, mb_relaxed);
}
}


void produce(T const& in)
{
unsigned long ver = produce_begin();
while (!produce_commit(ver, in))
{
std::printf("produce wait ver %lu\n", ver);
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
}
}


void consume(T& out)
{
unsigned long ver = consume_begin();
while (!consume_commit(ver, out))
{
std::printf("consume wait ver %lu\n", ver);
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
}

}


unsigned long produce_begin()
{
return m_head.fetch_add(1, mb_relaxed);
}

bool produce_commit(unsigned long ver, T const& in)
{
cell& c = m_cells[ver & (T_depth - 1)];
if (c.m_ver.load(mb_relaxed) != ver) return false;
mb_fence(mb_acquire);
c.m_data = in;
mb_fence(mb_release);
c.m_ver.store(ver + 1, mb_relaxed);
return true;
}

unsigned long consume_begin()
{
return m_tail.fetch_add(1, mb_relaxed);
}

bool consume_commit(unsigned long ver, T& out)
{
cell& c = m_cells[ver & (T_depth - 1)];
if (c.m_ver.load(mb_relaxed) != ver + 1) return false;
mb_fence(mb_acquire);
out = c.m_data;
mb_fence(mb_release);
c.m_ver.store(ver + T_depth, mb_relaxed);
return true;
}
};


typedef single_producer_consumer_queue<unsigned long, 8192 * 2> spscq_t;

void producer(spscq_t* const q)
{
std::printf("producer(%p)\n", (void*)q);

for (unsigned long i = 0; i < N; ++i)
{
q->produce(i);
//std::printf("producer(%p)::produced(%lu)\n", (void*)&q, i);
std::this_thread::yield();
std::this_thread::yield();

if (!(i % 100000))
{
std::printf("producer(%p)::producer(%lu)\n", (void*)&q, i);
}
}
}

void consumer(spscq_t* const q)
{
std::printf("consumer(%p)\n", (void*)q);

unsigned long consumed = 0;

for (unsigned long i = 0; i < N; ++i)
{

q->consume(consumed);
std::this_thread::yield();
std::this_thread::yield();


if (! (i % 100000))
{
std::printf("consumer(%p)::consumed(%lu):%lu\n", (void*)&q, consumed, i);
}
}

std::printf("\n\n final: consumer(%p)::consumed(%lu)\n", (void*)q, consumed);
}


int main(void)
{
{
std::unique_ptr<spscq_t> q(new spscq_t);

std::thread consumer_thread(consumer, q.get());
std::thread producer_thread(producer, q.get());

producer_thread.join();
consumer_thread.join();
}

std::printf("\nComplete, hit <ENTER> to exit...\n");
std::fflush(stdout);
std::getchar();

return 0;
}
__________________________________________


One should see something kind of similar to this at the end of output:
__________________________________________
producer(019CF664)::producer(9300000)
consumer(0188F5F4)::consumed(9300000):9300000
producer(019CF664)::producer(9400000)
consumer(0188F5F4)::consumed(9400000):9400000
producer(019CF664)::producer(9500000)
consumer(0188F5F4)::consumed(9500000):9500000
producer(019CF664)::producer(9600000)
consumer(0188F5F4)::consumed(9600000):9600000
producer(019CF664)::producer(9700000)
consumer(0188F5F4)::consumed(9700000):9700000
producer(019CF664)::producer(9800000)
consumer(0188F5F4)::consumed(9800000):9800000
producer(019CF664)::producer(9900000)
consumer(0188F5F4)::consumed(9900000):9900000


final: consumer(0149FD78)::consumed(9999999)

Complete, hit <ENTER> to exit...
__________________________________________


The final should always equal 9999999 for the settings in the posted program related to N-1.

This test is for two threads a producer, and consumer. I will add more testes in this thread, over time. And answer any questions.

Again, can anybody compile this pile?

;^o

Chris M. Thomasson

unread,
Mar 5, 2017, 9:43:34 PM3/5/17
to

Here is some experimental code that implements a multi-producer/consumer
spinlock queue. Keep in mind that this can be converted into an entity
that supports conditional waits through eventcounts.

Here is the c++11 code, does it work for you? If not, please explain
every problem you encountered. Give it some time to complete, it should
show progress in the console:

Chris M. Thomasson

unread,
Mar 5, 2017, 9:43:36 PM3/5/17
to

Chris M. Thomasson

unread,
Mar 5, 2017, 11:49:43 PM3/5/17
to
On 3/5/2017 2:48 PM, Chris M. Thomasson wrote:
>
> Here is some experimental code that implements a multi-producer/consumer
> spinlock queue. Keep in mind that this can be converted into an entity
> that supports conditional waits through eventcounts.
>
> Here is the c++11 code, does it work for you? If not, please explain
> every problem you encountered. Give it some time to complete, it should
> show progress in the console:
[...]

The code has a little bastard in there that makes the output seem as if
the two threads have two different queues! This is because the output
via printf is taking a pointer to the threads local space for holding
the damn pointer!

Take a look at line 170 here:

http://pastebin.com/hXQfrCSN

line 170: std::printf("consumer(%p)::consumed(%lu):%lu\n", (void*)&q,
consumed, i);


ummm,, wrt is that &q!!!!

damn!

Chris M. Thomasson

unread,
Mar 7, 2017, 6:27:11 PM3/7/17
to
On 3/5/2017 2:48 PM, Chris M. Thomasson wrote:
>
> Here is some experimental code that implements a multi-producer/consumer
> spinlock queue. Keep in mind that this can be converted into an entity
> that supports conditional waits through eventcounts.
>
> Here is the c++11 code, does it work for you? If not, please explain
> every problem you encountered. Give it some time to complete, it should
> show progress in the console:
> __________________________________________
[...]

Think of transforming each call to:

std::this_thread::yield();

in the following code:

http://pastebin.com/raw/hXQfrCSN

with a "possible opportunity" to perform another "useful action",
instead of "blocking/spinning" on a single action that is currently in use?

Fractal layered queuing architecture to the rescue... ;^)

lol.
0 new messages