A modification version to Dmitriy's unbounded MPSC queue

361 views
Skip to first unread message

Gawain

unread,
Dec 13, 2011, 5:15:45 AM12/13/11
to Scalable Synchronization Algorithms, dvyu...@gmail.com

First of all, Dmitriy's unbounded MPSC queue is the best one I have
even seen in the C/C++ environments. Here I made a little change to
the algorithm that: 1) Simplify the pop_front() operation; 2)
Eliminate the possible "false sharing" issue(Although the possibility
is very low).


/*
* umpsc_que.h, a modification version to Dmitriy's unbounded MPSC
queue
*
* Created on: 2011-12-12
* Author: xia...@gmail.com
*/

struct que_node
{
que_node* _next;
};

struct umpsc_que
{
public:
typedef que_node node_type;

umpsc_que() : _writer(&_header)
{
}

void push_back(node_type* node) // always succ
{
node->_next = 0;
node_type* prev = (node_type*)XCHG(&_writer, node);
prev->_next = node;
}

node_type* pop_front()
{
if(empty()) { return 0; }

node_type* r = _header._next;
if(r == _writer) {
_header._next = 0;
if(CAS(&_writer, r, &_header)) { return r; }
}

if(r->_next) {
_header._next = r->_next;
return r;
} else { // in-consistant state
_header._next = r;
return 0;
}
}

bool empty() const { return _header._next == 0; }

node_type* front() const { return _header._next; }

protected:
node_type* _writer;
char _pad[128 - sizeof(node_type*)];
node_type _header;
};


Gawain

unread,
Jan 7, 2012, 3:47:21 AM1/7/12
to Scalable Synchronization Algorithms
Update the soruce code, and now the pop() operation is quite simple:

/*
* umpsc_que.h, a modification version to Dmitriy's unbounded MPSC
queue
*
* Created on: 2012-01-5
* Author: xia...@gmail.com
*/

#include <cstdatomic>

struct node_type
{
node_type* _next;
};

struct umpsc_que
{
public:

umpsc_que()
{
_writer.store(&_header, std::memory_order_relaxed);
}

void push(node_type* node)
{
node->_next = 0;
node_type* prev = _writer.exchange(node, std::memory_order_release);
prev->_next = node;
}

node_type* pop()
{
node_type* r = _header._next;

if(r) {
if(r->_next) {
_header._next = r->_next;
} else {
_header._next = 0;
node_type* expected = r;
if(!_writer.compare_exchange_strong(expected, &_header,
std::memory_order_release, std::memory_order_relaxed)) {
_header._next = r;
r = 0;
}
}
}

return r;
}

protected:
std::atomic<node_type*> _writer;
char _pad[128 - sizeof(_writer)];
node_type _header;
};

uec...@gmail.com

unread,
Sep 1, 2013, 9:07:06 PM9/1/13
to lock...@googlegroups.com
On Saturday, January 7, 2012 9:47:21 PM UTC+13, Gawain wrote:
Update the soruce code, and now the pop() operation is quite simple:

The repeated updates to the next pointer make me nervous; also I'm not sure how you can have consistency with two "release" orderings and no acquire/consume.  But I'm still fairly new to all of this.  I tried reformulating your updated algorithm into an RRD testcase:

class mpsc_unbounded_ptr2
{
public:
    struct node
    {
        VAR_T(node*) next;
    };

public:
    mpsc_unbounded_ptr2()
    {
        VAR(m_tail.next) = NULL;
        m_head.store(&m_tail, std::memory_order_relaxed);
    }

    void enqueue(node* n)
    {
        VAR(n->next) = NULL;
        node *prev = m_head.exchange(n, std::memory_order_release);
        VAR(prev->next) = n;
    }

    node *dequeue()
    {
        node *r = VAR(m_tail.next);
        if (r)
        {
            if (VAR(r->next))
            {
                VAR(m_tail.next) = VAR(r->next);
            }
            else
            {
                VAR(m_tail.next) = NULL;
                node *expected = r;
                if (!m_head.compare_exchange_strong(expected, &m_tail, std::memory_order_release, std::memory_order_relaxed))
                {
                    VAR(m_tail.next) = r;
                    r = NULL;
                }
            }
        }
        return r;
    }

private:
    std::atomic<node*> m_head;
    char _pad1[64 - sizeof(std::atomic<node*>)];
    node m_tail;
    char _pad2[64 - sizeof(node)];
};

struct mpsc_unbounded_ptr2_test : rl::test_suite<mpsc_unbounded_ptr2_test, 4>
{
    struct val : public mpsc_unbounded_ptr2::node
    {
        explicit val(int v) : value(v) {}
        VAR_T(int) value;
    };
    mpsc_unbounded_ptr2 q;

    void thread(unsigned thread_index)
    {
        if (0 == thread_index)
        {
            val *v = new val(11);
            q.enqueue(v);
        }
        else if (1 == thread_index)
        {
            val *v = new val(12);
            q.enqueue(v);
        }
        else if (2 == thread_index)
        {
            val *v = new val(13);
            q.enqueue(v);
        }
        else
        {
            int sum = 0, count = 0;
            while (count < 3)
            {
                mpsc_unbounded_ptr2::node *n;
                while ((n = q.dequeue()) == NULL) { Sleep(0); }

                val *v = static_cast<val*>(n);
                sum += v->VAR(value);
                ++count;
                delete v;
            }
            RL_ASSERT(36 == sum);
        }
    }
};

But this produces a data race on the accesses to the "next" pointer (specifically, one of the enqueue threads can be storing to "next" at the same time as the dequeue thread is doing its initial load into "r".  I'm not sure if this is a problem with the algorithm or my translation of it, or if this is benign (and if it's benign, how to make RRD ignore it and check for other issues).
Reply all
Reply to author
Forward
0 new messages