Simple and efficient bounded MPMC queue

775 views
Skip to first unread message

Dmitriy Vyukov

unread,
Mar 9, 2010, 1:41:32 PM3/9/10
to Scalable Synchronization Algorithms
The algorithm looks pretty simple (to the degree the term "simple"
applicable to lock-free algorithms), but I do not remember I saw it
elsewhere.

It's not lock-free in the official meaning (progress guarantees,
termination safety), just implemented w/o locks.

The cost of enqueue/dequeue is 1 CAS. No amortization, just 1 CAS. No
dynamic memory allocation/management during operation. Producers and
consumers are separated from each other (as in the two-lock queue),
i.e. do not touch the same data while queue is not empty.
On my dual-core laptop enqueue/dequeue takes 75 cycles on average in a
synthetic multi-threaded benchmark.

Here it is:

/* Multi-producer/multi-consumer bounded queue
* 2010, Dmitry Vyukov
* Distributed under the terms of the GNU General Public License
* as published by the Free Software Foundation,
* either version 3 of the License,
* or (at your option) any later version.
* See: http://www.gnu.org/licenses
*/
template<typename T>
class mpmc_bounded_queue
{
public:
mpmc_bounded_queue(size_t buffer_size)
: buffer_(new cell_t [buffer_size])
, buffer_mask_(buffer_size - 1)
{
assert((buffer_size >= 2) && ((buffer_size & (buffer_size -
1)) == 0));
for (size_t i = 0; i != buffer_size; i += 1)
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
enqueue_pos_.store(0, std::memory_order_relaxed);
dequeue_pos_.store(0, std::memory_order_relaxed);
}

~mpmc_bounded_queue()
{
delete [] buffer_;
}

bool enqueue(T const& data)
{
cell_t* cell;
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell-
>sequence_.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
if (dif == 0)
{
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1,
std::memory_order_relaxed))
break;
}
else if (dif < 0)
return false;
else
pos = enqueue_pos_.load(std::memory_order_relaxed);
}

cell->data_ = data;
cell->sequence_.store(pos + 1, std::memory_order_release);

return true;
}

bool dequeue(T& data)
{
cell_t* cell;
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell-
>sequence_.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
if (dif == 0)
{
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1,
std::memory_order_relaxed))
break;
}
else if (dif < 0)
return false;
else
pos = dequeue_pos_.load(std::memory_order_relaxed);
}

data = cell->data_;
cell->sequence_.store(pos + buffer_mask_ + 1,
std::memory_order_release);

return true;
}

private:
struct cell_t
{
std::atomic<size_t> sequence_;
T data_;
};

static size_t const cacheline_size = 64;
typedef char cacheline_pad_t [cacheline_size];

cacheline_pad_t pad0_;
cell_t* const buffer_;
size_t const buffer_mask_;
cacheline_pad_t pad1_;
std::atomic<size_t> enqueue_pos_;
cacheline_pad_t pad2_;
std::atomic<size_t> dequeue_pos_;
cacheline_pad_t pad3_;

mpmc_bounded_queue(mpmc_bounded_queue const&);
void operator = (mpmc_bounded_queue const&);
};


I've also uploaded it to the files section:
http://groups.google.com/group/lock-free/files
(the file contains limited std::atomic implementation and test suite,
ready to run, Windows, MSVC, x86-32)

--
Dmitriy V'jukov

Reply all
Reply to author
Forward
0 new messages