This is through my Google account, because my eternal September server is acting odd right now. There might be some other duplicate posts coming in. Anyway;
Here is some experimental code that implements a multi-producer/consumer spinlock queue. Keep in mind that this can be converted into an entity that supports conditional waits through eventcounts.
Here is the c++11 code, does it work for you? If not, please explain every problem you encountered. Give it some time to complete, it should show progress in the console:
raw code, no ads:
http://pastebin.com/raw/hXQfrCSN
__________________________________________
#include <cstdio>
#include <memory>
#include <atomic>
#include <thread>
#include <functional>
#define N 10000000
#define mb_relaxed std::memory_order_relaxed
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_fence(mb) std::atomic_thread_fence(mb)
/*
struct cell { uint32_t ver; double state; };
uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2
void init() {
for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}
void producer(double state) {
uint32_t ver = XADD(&head, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver) backoff();
c.state = state;
STORE(&c.ver, ver + 1);
}
double consumer() {
uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver + 1) backoff();
double state = c.state;
STORE(&c.ver, ver + N);
return state;
}
*/
template<typename T, unsigned long T_depth>
struct single_producer_consumer_queue
{
struct cell
{
std::atomic<unsigned long> m_ver;
T m_data;
};
std::atomic<unsigned long> m_head;
std::atomic<unsigned long> m_tail;
cell m_cells[T_depth];
single_producer_consumer_queue()
: m_head(0),
m_tail(0)
{
for (unsigned long i = 0; i < T_depth; ++i)
{
m_cells[i].m_ver.store(i, mb_relaxed);
}
}
void produce(T const& in)
{
unsigned long ver = produce_begin();
while (!produce_commit(ver, in))
{
std::printf("produce wait ver %lu\n", ver);
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
}
}
void consume(T& out)
{
unsigned long ver = consume_begin();
while (!consume_commit(ver, out))
{
std::printf("consume wait ver %lu\n", ver);
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
}
}
unsigned long produce_begin()
{
return m_head.fetch_add(1, mb_relaxed);
}
bool produce_commit(unsigned long ver, T const& in)
{
cell& c = m_cells[ver & (T_depth - 1)];
if (c.m_ver.load(mb_relaxed) != ver) return false;
mb_fence(mb_acquire);
c.m_data = in;
mb_fence(mb_release);
c.m_ver.store(ver + 1, mb_relaxed);
return true;
}
unsigned long consume_begin()
{
return m_tail.fetch_add(1, mb_relaxed);
}
bool consume_commit(unsigned long ver, T& out)
{
cell& c = m_cells[ver & (T_depth - 1)];
if (c.m_ver.load(mb_relaxed) != ver + 1) return false;
mb_fence(mb_acquire);
out = c.m_data;
mb_fence(mb_release);
c.m_ver.store(ver + T_depth, mb_relaxed);
return true;
}
};
typedef single_producer_consumer_queue<unsigned long, 8192 * 2> spscq_t;
void producer(spscq_t* const q)
{
std::printf("producer(%p)\n", (void*)q);
for (unsigned long i = 0; i < N; ++i)
{
q->produce(i);
//std::printf("producer(%p)::produced(%lu)\n", (void*)&q, i);
std::this_thread::yield();
std::this_thread::yield();
if (!(i % 100000))
{
std::printf("producer(%p)::producer(%lu)\n", (void*)&q, i);
}
}
}
void consumer(spscq_t* const q)
{
std::printf("consumer(%p)\n", (void*)q);
unsigned long consumed = 0;
for (unsigned long i = 0; i < N; ++i)
{
q->consume(consumed);
std::this_thread::yield();
std::this_thread::yield();
if (! (i % 100000))
{
std::printf("consumer(%p)::consumed(%lu):%lu\n", (void*)&q, consumed, i);
}
}
std::printf("\n\n final: consumer(%p)::consumed(%lu)\n", (void*)q, consumed);
}
int main(void)
{
{
std::unique_ptr<spscq_t> q(new spscq_t);
std::thread consumer_thread(consumer, q.get());
std::thread producer_thread(producer, q.get());
producer_thread.join();
consumer_thread.join();
}
std::printf("\nComplete, hit <ENTER> to exit...\n");
std::fflush(stdout);
std::getchar();
return 0;
}
__________________________________________
One should see something kind of similar to this at the end of output:
__________________________________________
producer(019CF664)::producer(9300000)
consumer(0188F5F4)::consumed(9300000):9300000
producer(019CF664)::producer(9400000)
consumer(0188F5F4)::consumed(9400000):9400000
producer(019CF664)::producer(9500000)
consumer(0188F5F4)::consumed(9500000):9500000
producer(019CF664)::producer(9600000)
consumer(0188F5F4)::consumed(9600000):9600000
producer(019CF664)::producer(9700000)
consumer(0188F5F4)::consumed(9700000):9700000
producer(019CF664)::producer(9800000)
consumer(0188F5F4)::consumed(9800000):9800000
producer(019CF664)::producer(9900000)
consumer(0188F5F4)::consumed(9900000):9900000
final: consumer(0149FD78)::consumed(9999999)
Complete, hit <ENTER> to exit...
__________________________________________
The final should always equal 9999999 for the settings in the posted program related to N-1.
This test is for two threads a producer, and consumer. I will add more testes in this thread, over time. And answer any questions.
Again, can anybody compile this pile?
;^o