Unbounded SPSC queue

694 views
Skip to first unread message

Dmitriy Vyukov

unread,
Sep 8, 2010, 5:13:11 AM9/8/10
to Scalable Synchronization Algorithms
Initially appeared at:
http://software.intel.com/en-us/articles/single-producer-single-consumer-queue

Unbounded single-producer/single-consumer node-based queue. Internal
non-reducible cache of nodes is used. Dequeue operation is always wait-
free. Enqueue operation is wait-free in common case (when there is
available node in the cache), otherwise enqueue operation
calls ::operator new(), so probably not wait-free. No atomic RMW
operations nor heavy memory fences are used, i.e. enqueue and dequeue
operations issue just several plain loads, several plain stores and
one conditional branching. Cache-conscious data layout is used, so
producer and consumer can work simultaneously causing no cache-
coherence traffic.

Single-producer/single-consumer queue can be used for communication
with thread which services hardware device (wait-free property is
required), or when there are naturally only one producer and one
consumer. Also N single-producer/single-consumer queues can be used to
construct multi-producer/single-consumer queue, or N^2 queues can be
used to construct fully-connected system of N threads (other partially-
connected topologies are also possible).

Hardware platform: x86-32/64

Compiler: Intel C++ Compiler


// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*>(addr);
__memory_barrier(); // compiler fence
return v;
}

// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
// hardware fence is implicit on x86
__memory_barrier(); // compiler fence
*const_cast<T volatile*>(addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;

// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_= tail_copy_ = n;
}

~spsc_queue()
{
node* n = first_;
do
{
node* next = n->next_;
delete n;
n = next;
}
while (n);
}

void enqueue(T v)
{
node* n = alloc_node();
n->next_ = 0;
n->value_ = v;
store_release(&head_->next_, n);
head_ = n;
}

// returns 'false' if queue is empty
bool dequeue(T& v)
{
if (load_consume(&tail_->next_))
{
v = tail_->next_->value_;
store_release(&tail_, tail_->next_);
return true;
}
else
{
return false;
}
}

private:
// internal node structure
struct node
{
node* next_;
T value_;
};

// consumer part
// accessed mainly by consumer, infrequently be producer
node* tail_; // tail of the queue

// delimiter between consumer part and producer part,
// so that they situated on different cache lines
char cache_line_pad_ [cache_line_size];

// producer part
// accessed only by producer
node* head_; // head of the queue
node* first_; // last unused node (tail of node cache)
node* tail_copy_; // helper (points somewhere between first_ and
tail_)

node* alloc_node()
{
// first tries to allocate node from internal node cache,
// if attempt fails, allocates node via ::operator new()

if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
tail_copy_ = load_consume(&tail_);
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
node* n = new node;
return n;
}

spsc_queue(spsc_queue const&);
spsc_queue& operator = (spsc_queue const&);
};

// usage example
int main()
{
spsc_queue<int> q;
q.enqueue(1);
q.enqueue(2);
int v;
bool b = q.dequeue(v);
b = q.dequeue(v);
q.enqueue(3);
q.enqueue(4);
b = q.dequeue(v);
b = q.dequeue(v);
b = q.dequeue(v);
}



--
Dmitriy V'jukov

sbahra

unread,
Sep 12, 2010, 8:03:58 PM9/12/10
to Scalable Synchronization Algorithms
Hi Dmitriy,

I was just working on my own implementation when I fell upon this.
Our implementations are very similar (except that our naming
scheme is different, my head is your tail). However, I am confused
about something else that is different in your implementation.

Let us assume the following state.

head = STUB -> 0
tail = STUB -> 0

Let's say Thread 1 executes an enqueue and is preempted
after successfully executing line 5. For simplicity, let us
assume that this is the first item added. We now have the
following state.

head = STUB -> N -> 0
tail = STUB -> N -> 0

Let us assume that the dequeue is executed by Thread 2
when Thread 1 is pre-empted at line 5.

STUB's next entry is a non-empty value, N. So at line
14 we set tail back to the initial state. We now have.

head = STUB -> 0
tail = STUB -> 0

Now thread 1 is executing again and it updates head to
the new entry, so now we have.

head = N -> 0 (from STUB -> 0)
tail = STUB -> 0 (if head is N -> 0 then tail must be STUB -> N -> 0).

Things will no longer be reachable.

1 {
2 node* n = alloc_node();
3 n->next_ = 0;
4 n->value_ = v;
5 store_release(&head_->next_, n);
6 head_ = n;
7 }
8 // returns 'false' if queue is empty
9 bool dequeue(T& v)
10 {
11 if (load_consume(&tail_->next_))
12 {
13 v = tail_->next_->value_;
14 store_release(&tail_, tail_->next_);
15 return true;
16 }
17 else
18 {
19 return false;
20 }
21 }
22

On Sep 8, 5:13 am, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> Initially appeared at:http://software.intel.com/en-us/articles/single-producer-single-consu...

sbahra

unread,
Sep 12, 2010, 8:09:03 PM9/12/10
to Scalable Synchronization Algorithms
Sorry, I misread store_release(&tail_, tail_->next_);
Ignore the previous post.

On Sep 8, 5:13 am, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> Initially appeared at:http://software.intel.com/en-us/articles/single-producer-single-consu...
>

Eduardo Juan

unread,
Sep 29, 2010, 7:57:08 PM9/29/10
to Scalable Synchronization Algorithms
Hi Dmitriy!

I was testing this queue on Linux and it does perform 2x fold
comparing to the non-intrusive mpsc queue (http://groups.google.com/
group/lock-free/browse_frm/thread/55df71b87acb8201)
in my QuadCore X9650.

I'm having problems with this queue regarding the use of a char
pointer inside my container object:

struct qndata_t{
int i;
char *str;
};

If I alloc heap memory on the producer and assign the resulting
pointer to str and enque the qndata_t object to the consumer, the
program abort due to corruption on memory.

I'm using a full hardware gcc's builtin mfence:

#if ( (__GNUC__ == 4) && (__GNUC_MINOR__ >= 1) || __GNUC__ > 4) && \
(defined(__x86_64__) || defined(__i386__))
#define __memory_barrier __sync_synchronize
#endif

Does the intrusiveness of this kind of queues does not allow me pass
(malloc/new)ed objects on the 'qndata_t' container?

Thanks a lot the help!

Best regards,
Edu

Dmitriy Vyukov

unread,
Sep 29, 2010, 8:07:29 PM9/29/10
to Scalable Synchronization Algorithms
On Sep 30, 3:57 am, Eduardo Juan <ejua...@gmail.com> wrote:
> Hi Dmitriy!

Hi Eduardo!

> I was testing this queue on Linux and it does perform 2x fold
> comparing to the non-intrusive mpsc queue (http://groups.google.com/
> group/lock-free/browse_frm/thread/55df71b87acb8201)
> in my QuadCore X9650.

So this queue is faster? Not surprising, this is SPSC, and that is
MPSC.

> I'm having problems with this queue regarding the use of a char
> pointer inside my container object:
>
> struct qndata_t{
>    int i;
>    char *str;
>
> };
>
> If I alloc heap memory on the producer and assign the resulting
> pointer to str and enque the qndata_t object to the consumer, the
> program abort due to corruption on memory.
>
> I'm using a full hardware gcc's builtin mfence:
>
> #if ( (__GNUC__ == 4) && (__GNUC_MINOR__ >= 1) || __GNUC__ > 4) && \
> (defined(__x86_64__) || defined(__i386__))
> #define __memory_barrier __sync_synchronize
> #endif
>
> Does the intrusiveness of this kind of queues does not allow me pass
> (malloc/new)ed objects on the 'qndata_t' container?

No, it must not. May you provide some minimal code example that
exposes the problem?


--
Dmitriy V'jukov

Eduardo Juan

unread,
Sep 29, 2010, 8:31:19 PM9/29/10
to Scalable Synchronization Algorithms
Hey!

Here it is a "working crashing example" :)

/*
* spscqueue.h
*
* See: http://software.intel.com/en-us/articles/single-producer-single-consumer-queue/
*
*/

/*
* From GNU compiler 4.1.2+ has builtin atomic ops - See:
* http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
*/
#if ( (__GNUC__ == 4) && (__GNUC_MINOR__ >= 1) || __GNUC__ > 4) && \
(defined(__x86_64__) || defined(__i386__))
#define __memory_barrier __sync_synchronize
#endif

// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr) {
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*> (addr);
__memory_barrier(); // compiler fence
return v;
}

// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v) {
// hardware fence is implicit on x86
__memory_barrier(); // compiler fence
*const_cast<T volatile*> (addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;
// single-producer/single-consumer queue
template<typename T>
class spsc_queue {
public:
spsc_queue() {
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_ = tail_copy_ = n;
/*
* spsc_queue.cpp
*/

#include <iostream>
#include <memory>
#include <new>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <sched.h>
#include <inttypes.h>
#include <errno.h>
#include <pthread.h>

#include "spscqueue.h"

using namespace std;

#define MAX_GETLOCK_SLEEPTIME_PRODUCER 1000 //usces
#define MAX_GETLOCK_SLEEPTIME_CONSUMER 0 //usces

struct tharg_t{

char threadtype;
int16_t threadid;
uint64_t sleepuntilbegin;
};

struct qndata_t {

char *raw_data; //heap - malloc/new
int i;

qndata_t() : raw_data(0) ,i(0) {}

~qndata_t(){
delete raw_data;
}
};

spsc_queue<qndata_t> g_spscqueue;
pthread_t g_pth_producer, g_pth_consumer;

static void *pthConsumer(void *arg) {

assert(arg);

tharg_t *tharg = (tharg_t *) arg;
spsc_queue<qndata_t> *spscqueue = &g_spscqueue;
qndata_t qndata;

if(tharg->sleepuntilbegin > 0) usleep(tharg->sleepuntilbegin); //
thread start synchronization

//Runtime
while( !spscqueue->dequeue(qndata) ) {
sleep(1);
}

delete qndata.raw_data;
qndata.raw_data = NULL;

pthread_exit(0);
}

static void *pthProducer(void *arg) {

assert(arg);

int i;

tharg_t *tharg = (tharg_t *) arg;
spsc_queue<qndata_t> *spscqueue = &g_spscqueue;

qndata_t qndata;

if(tharg->sleepuntilbegin > 0) usleep(tharg->sleepuntilbegin); //
thread start synchronization

//Runtime

//Setup container to send to consumer thread
i = 0;

//qndata.raw_data = new char (100);
qndata.raw_data = (char *) malloc(100);
sprintf(qndata.raw_data,"%d",i);
qndata.i = i;

spscqueue->enqueue(qndata);

pthread_exit(0);
}

int main(void){

int th=0;
tharg_t tharg_consumer , tharg_producer;

//Launch pthreads
tharg_producer.threadid = 1;
tharg_producer.sleepuntilbegin = MAX_GETLOCK_SLEEPTIME_PRODUCER;
if(pthread_create(&g_pth_producer,NULL,pthProducer,(void *)
&tharg_producer) != 0){
perror("pthread_create (producer): ");
exit(0);
}

tharg_consumer.threadid = 2;
tharg_consumer.sleepuntilbegin = MAX_GETLOCK_SLEEPTIME_CONSUMER;
if(pthread_create(&g_pth_consumer,NULL,pthConsumer,(void *)
&tharg_consumer) != 0){
perror("pthread_create (consumer): ");
exit(0);
}

while(th < 2){
pthread_join(g_pth_producer,NULL);
pthread_join(g_pth_consumer,NULL);
th++;
}

cout << "\n[MAIN] All threads finished...\n" << endl;

return 0;
}
**********************************************************************

To compile on Linux with g++ version >= 4.1.2:

$ g++ -std=c++0x -o spsc_queue spsc_queue.cpp -lpthread


Thanks again for the help!!

Regards,
Edu

Eduardo Juan

unread,
Sep 29, 2010, 8:39:32 PM9/29/10
to Scalable Synchronization Algorithms
Oh BTW, it would be amazingly good if the consumer would block if
queue empty.
Do you have any example using your atomic ref count for on this
queue??

Edu

Dmitriy Vyukov

unread,
Sep 29, 2010, 8:45:31 PM9/29/10
to Scalable Synchronization Algorithms
On Sep 30, 4:31 am, Eduardo Juan <ejua...@gmail.com> wrote:
> Hey!
>
> Here it is a "working crashing example" :)
>
> /*
>  * spscqueue.h
>  *
>  * See:http://software.intel.com/en-us/articles/single-producer-single-consu...
>  *
>  */
>
> /*
>  * From GNU compiler 4.1.2+ has builtin atomic ops - See:
>  *http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
You should remove:
~qndata_t(){
delete raw_data;
}

Well, it's not even double free, I count at least 4 places where the
raw_data is freed (explicit free, + 2 destructors of local vars, +
enqueue() parameter destructor).

--
Dmitriy V'jukov

Eduardo Juan

unread,
Sep 30, 2010, 8:10:42 PM9/30/10
to Scalable Synchronization Algorithms
Hi Dmitriy! Thank you so much! It worked like a charm!

What a childish error I made :(

Best regards,
Edu

Dmitriy Vyukov

unread,
Oct 1, 2010, 2:37:35 AM10/1/10
to Scalable Synchronization Algorithms
You can use an eventcount for blocking. Check out this thread (there
are examples of usage, as well as some implementation):
http://software.intel.com/en-us/forums/showthread.php?t=62364

--
Dmitriy V'jukov
Reply all
Reply to author
Forward
0 new messages