multi-producer/single-consumer memory passing queue

247 views
Skip to first unread message

Dmitriy V'jukov

unread,
Apr 1, 2008, 5:33:18 PM4/1/08
to lock-free
Cross-post in comp.programming.threads:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/e082e1eb26397d5e

multi-producer/single-consumer memory passing queue.

Not suitable for situations where data must be passed promptly and
reactively. Suitable for situations where data may be passed
eventually, in 'best-effort' manner.

The queue is designed to use in this memory allocator algorithm:
http://groups.google.com/group/comp.programming.threads/tree/browse_frm/thread/8245f4b48591fc69

The queue internally works like N single-producer/single-consumer
queues with automatic multiplexing. But multiplexing works in such a
way that dequeue operation is O(1), not O(N).

There is always 1 item per producer delayed in queue. Sometimes more.

Fast-path for dequeue and enqueue operations is atomic-free and heavy
memory barrier free.

Here is the code:

struct node
{
node* next_;

};

struct anchor;
struct mpsc_mem_queue;

// target for DWCAS
struct anchor_ptr
{
anchor* ptr_;
unsigned cntr_; // ABA counter

anchor_ptr(anchor* ptr = 0, unsigned cntr = 0)
: ptr_(ptr)
, cntr_(cntr)
{
}

anchor_ptr(__int64 raw)
{
new (this) anchor_ptr (*(anchor_ptr*)&raw);
}

operator __int64 () const
{
return *(__int64*)this;
}

};

// anchor is related to pair (queue, producer)
// so in environment with N threads and N queues
// must be N*(N-1) anchors
// anchor is no more then spsc-queue
struct anchor
{
anchor_ptr next_;
node* tail_;
char cl_pad1_[64];
node* head_;
bool linked_;
anchor* anchor_list_;
mpsc_mem_queue* parent_; // for debug
unsigned owner_thread_; // for debug
bool attached_;
char cl_pad2_[64];

anchor()
{
next_.ptr_ = 0;
next_.cntr_ = 0;
tail_ = 0;
head_ = 0;
linked_ = false;
parent_ = 0;
owner_thread_ = 0;
anchor_list_ = 0;
attached_ = false;
}

};

struct mpsc_mem_queue
{
anchor stub_;
node stub_node_;
anchor* tail_;
char cl_pad1_[64];
anchor_ptr head_;
anchor* anchor_list_;
unsigned owner_thread_;
char cl_pad2_[64];

mpsc_mem_queue()
{
stub_node_.next_ = 0;
stub_.head_ = &stub_node_;
stub_.tail_ = &stub_node_;
tail_ = &stub_;
head_.ptr_ = &stub_;
head_.cntr_ = 0;
anchor_list_ = 0;
owner_thread_ = 0;
}

void init()
{
assert(0 == owner_thread_);
owner_thread_ = GetCurrentThreadId();
}

void attach_anchor(anchor* a)
{
assert(0 == a->parent_);
assert(0 == a->owner_thread_);
assert(false == a->attached_);

a->parent_ = this;
a->owner_thread_ = GetCurrentThreadId();
a->attached_ = true;

for (;;)
{
a->anchor_list_ = anchor_list_;
anchor* old = (anchor*)_InterlockedCompareExchange
((long*)&anchor_list_,
(long)a, (long)a->anchor_list_);
if (old == a->anchor_list_)
break;
}
}

void enqueue(anchor* a, node* n)
{
assert(this == a->parent_);
assert(GetCurrentThreadId() == a->owner_thread_);
assert(true == a->attached_);

n->next_ = 0;
if (a->head_)
{
a->head_->next_ = n;
a->head_= n;
if (false == a->linked_)
enqueue_anchor(a);
}
else
{
a->head_ = n;
a->tail_ = n;
}
}

node* dequeue()
{
assert(GetCurrentThreadId() == owner_thread_);

anchor* tail;
anchor_ptr next;
anchor_ptr head;
for (;;)
{
tail = tail_;
if (tail->tail_->next_)
{
node* n = tail->tail_;
tail->tail_ = tail->tail_->next_;
return n;
}
else
{
head = head_;
next = tail->next_;
if (next.ptr_)
{
if (head.ptr_ == tail)
{
_InterlockedCompareExchange64
((__int64*)&head_, next, head);
}
tail->linked_ = false;
tail_ = next.ptr_;
}
else
{
if (tail != &stub_)
{
enqueue_anchor(&stub_);
tail_ = tail->next_.ptr_;
tail->linked_ = false;

}
return 0;
}
}
}
}

void enqueue_anchor(anchor* a)
{
a->next_.ptr_ = 0;
a->linked_ = true;
for (;;)
{
anchor_ptr head = head_;
anchor_ptr next = head.ptr_->next_;
if (next.ptr_)
{
_InterlockedCompareExchange64
((__int64*)&head_, next, head);
continue;
}
else
{
a->next_.cntr_ = head.cntr_ + 1;
anchor_ptr xchg (a, next.cntr_ + 1);
anchor_ptr* dst = &head_.ptr_->next_;
anchor_ptr old = _InterlockedCompareExchange64
((__int64*)dst, xchg, next);
if (old == next)
{
_InterlockedCompareExchange64
((__int64*)&head_, xchg, head);
break;
}
}
}
}

node* finalize()
{
assert(GetCurrentThreadId() == owner_thread_);

while (anchor_list_)
{
if (anchor_list_->tail_)
{
node* n = anchor_list_->tail_;
anchor_list_->tail_ = anchor_list_->tail_->next_;
return n;
}
else
{
anchor_list_->attached_ = false;
anchor_list_ = anchor_list_->anchor_list_;
}
}
return 0;
}
};

Dmitriy V'jukov

Dmitriy V'jukov

unread,
Apr 1, 2008, 5:35:33 PM4/1/08
to lock-free
On Apr 2, 1:33 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> multi-producer/single-consumer memory passing queue.


Here is simple test/usage example (it compiles under MSVC 2005):

#define _WIN32_WINNT 0x0500
#include <windows.h>
#include <process.h>
#include <intrin.h>
#include <time.h>
#include <stdlib.h>

#include <deque>
#include <cassert>

struct my_node : node
{
my_node(int sender, int data)
: sender_(sender)
, data_(data)
{
}

int sender_;
int data_;

};

class barrier
{
public:
barrier(int count)
: count_(count)
, event_(CreateEvent(0, 1, 0, 0))
{
}

void wait()
{
if (_InterlockedDecrement(&count_))
WaitForSingleObject(event_, INFINITE);
else
SetEvent(event_);
}

private:
long count_;
HANDLE event_;

};

class mpsc_queue
{
public:
mpsc_queue()
{
InitializeCriticalSection(&cs_);
}

void enqueue(my_node* n)
{
EnterCriticalSection(&cs_);
queue_.push_back(n);
LeaveCriticalSection(&cs_);
}

my_node* dequeue()
{
my_node* n = 0;
EnterCriticalSection(&cs_);
if (queue_.size())
{
n = queue_.front();
queue_.pop_front();
}
LeaveCriticalSection(&cs_);
return n;
}

private:
std::deque<my_node*> queue_;
CRITICAL_SECTION cs_;

};

int const thread_count = 8;
int const iter_count = 1000000;
int const batch_size = 16;

mpsc_mem_queue queues [thread_count];
mpsc_queue req_queues [thread_count];

barrier start (thread_count);
barrier finalize1 (thread_count);
barrier finalize2 (thread_count);
barrier stop (thread_count);

unsigned __stdcall thread_func(void* p)
{
srand((unsigned)time(0));
int const id = (int)p;
anchor anchors [thread_count];
queues[id].init();
for (int i = 0; i != thread_count; ++i)
queues[i].attach_anchor(&anchors[i]);

HANDLE heap = HeapCreate(HEAP_NO_SERIALIZE, 0, 0);

start.wait();

for (int i = 0; i != iter_count; ++i)
{
if (0 == (i / batch_size) % 2)
{
int const target = rand() % thread_count;
my_node* n = (my_node*)HeapAlloc(heap, 0,
sizeof(my_node));
new (n) my_node(id, i);
req_queues[target].enqueue(n);
}
else
{
if (my_node* n = req_queues[id].dequeue())
{
int const sender = n->sender_;
n->sender_ = id;
queues[sender].enqueue(&anchors[sender], n);
}

if (my_node* n = (my_node*)queues[id].dequeue())
HeapFree(heap, 0, n);
}
}

finalize1.wait();

while (my_node* n = req_queues[id].dequeue())
{
int const sender = n->sender_;
n->sender_ = id;
queues[sender].enqueue(&anchors[sender], n);
}

finalize2.wait();

while (my_node* n = (my_node*)queues[id].finalize())
HeapFree(heap, 0, n);

stop.wait();

HeapDestroy(heap);

return 0;

}

int main()
{
srand((unsigned)time(0));
HANDLE threads [thread_count] = {};
for (int i = 0; i != thread_count; ++i)
threads[i] = (HANDLE)_beginthreadex(0, 0, &thread_func,
(void*)i, 0, 0);
WaitForMultipleObjects(thread_count, threads, 1, INFINITE);

}


Dmitriy V'jukov
Reply all
Reply to author
Forward
0 new messages