Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Synchronization with producer - consumer circular buffer.

545 views
Skip to first unread message

Jane's Conference

unread,
Oct 15, 2009, 7:29:26 AM10/15/09
to
Hi to everybody,

I'd like to pass data between a producer and a consumer thread using a
circular buffer.

1 There is only one consumer and only one producer, and the buffer and
the items you can put on the buffer are fixed in size, so we can
generalize with, for example, an array of integer of length n, with n
> 1.

2 The producer and the consumer must be synchronized, i.e. the writer
can't write on a item position (slot) that the consumer is reading and
vice versa.

3 The producer can be faster than the consumer or it can be slower
slower. In general, who is slower at a given moment is completely
random.

4 If the producer is slower, the consumer must wait until there is
something (at least an item) on the buffer.

5 If the producer is faster, the producer *must not* block when the
buffer is full, but it must overwrite older items, (except the one the
consumer is reading at the moment, see point 2). I want to stress this
point: the producer must not wait for the consumer to free an item,
but it *must* overwrite the oldest item in the buffer. In other words,
the producer *must never block* for n > 1.
I *already have* the solution for the blocking consumer, so please,
I'm not interested on it.

Can someone give me some hint on this problem? Or point me to a
concrete implementation? I tought these producer-consumer problems
were common in multimedia real time programming (where you must
overwrite, for example, old video or audio frames when the consumer is
slower), but I only can find the blocking consumer implementations.

Many thanks,
Kind regards,

Chris.

Dmitriy Vyukov

unread,
Oct 15, 2009, 7:57:10 AM10/15/09
to
On Oct 15, 3:29 pm, "Jane's Conference" <janesconfere...@gmail.com>
wrote:

> 5 If the producer is faster, the producer *must not* block when the
> buffer is full, but it must overwrite older items, (except the one the
> consumer is reading at the moment, see point 2). I want to stress this
> point: the producer must not wait for the consumer to free an item,
> but it *must* overwrite the oldest item in the buffer. In other words,
> the producer *must never block* for n > 1.
> I *already have* the solution for the blocking consumer, so please,
> I'm not interested on it.


What item producer must overwrite in such situation? Any requirements
to that?
If it overwrites the oldest produced and not yet consumed item, then
it will break FIFO order, i.e. consumer will receive newer item and
then older one.
If it overwrites the latest produced item, then FIFO is preserved,
however you are losing fresh data, which may be suboptimal (it's
usually better to lose older data).


--
Dmitriy V'jukov

Marcel Müller

unread,
Oct 15, 2009, 8:18:50 AM10/15/09
to
Hi!

Jane's Conference wrote:
> I'd like to pass data between a producer and a consumer thread using a
> circular buffer.
>
> 1 There is only one consumer and only one producer, and the buffer and
> the items you can put on the buffer are fixed in size, so we can
> generalize with, for example, an array of integer of length n, with n
>> 1.
>
> 2 The producer and the consumer must be synchronized, i.e. the writer
> can't write on a item position (slot) that the consumer is reading and
> vice versa.

At this point, you need to partition the buffer into separate blocks.

Once you do this, you do no longer have a circular buffer, but you have
a set of independent buffers. Whether they are part of a larger buffer
does not care.

To manage the buffers you need two queues. One queue with empty buffers
and another one with filled buffers.


> 3 The producer can be faster than the consumer or it can be slower
> slower. In general, who is slower at a given moment is completely
> random.
>
> 4 If the producer is slower, the consumer must wait until there is
> something (at least an item) on the buffer.

Straight forward so far.


> 5 If the producer is faster, the producer *must not* block when the
> buffer is full, but it must overwrite older items, (except the one the
> consumer is reading at the moment, see point 2). I want to stress this
> point: the producer must not wait for the consumer to free an item,
> but it *must* overwrite the oldest item in the buffer. In other words,
> the producer *must never block* for n > 1.

Since you say 'overwrite existing buffer' I guess that you want to
process the buffers always in order, even if some buffers are dropped.

All you have to do for this is to do a non-blocking read from the empty
buffer queue and if this fails simply read the next buffer from the full
buffer queue and reuse it. (At this point your buffer blocks are no
longer circular.)

More exactly:

The consumer waits for a full buffer to become available. It removes the
buffer from the full buffer queue, processes the content and puts the
buffer into the empty buffer queue.
Depending on your application the consumer has to own up two two buffers
at once to keep the processing active at buffer switches. This will
modify your condition at (5) to n > 2.

The producer looks for a buffer in the empty queue. If it fails it takes
the next buffer to be processed by the consumer from the full queue, if
any. You have to do a double check here because the consumer could just
have passed another buffer to the empty queue and taken the last one
from the full queue. But the third try (again to the empty queue) must
not fail.
Then the buffer is filled with new content and passes it to the full queue.
You may need to take up to two buffers at once for the producer for the
same reasons as above. In this case you need at least 4 buffers to keep
the ball rolling.

Depending on your needs choose an appropriate queue implementation. But
remember that the full buffer queue is single producer and /multiple/
consumer.
And be aware of priority inversions. So either choose a non-blocking
implementation or do some compensation like priority inheritance.


> Can someone give me some hint on this problem? Or point me to a
> concrete implementation? I tought these producer-consumer problems
> were common in multimedia real time programming (where you must
> overwrite, for example, old video or audio frames when the consumer is
> slower), but I only can find the blocking consumer implementations.

? - You said in (4) that your consumer should block too.


Marcel

Jane's Conference

unread,
Oct 15, 2009, 8:57:30 AM10/15/09
to

Ok, but wikipedia ( http://en.wikipedia.org/wiki/Circular_buffer )
says:

"An example that could possibly use an overwriting circular buffer is
with multimedia. If the buffer is used as the bounded buffer in the
producer-consumer problem then it is probably desired for the producer
(e.g., an audio generator) to overwrite old data if the consumer
(e.g., the sound card) is unable to momentarily keep up. Another
example is the digital waveguide synthesis method which uses circular
buffers to efficiently simulate the sound of vibrating strings or wind
instruments."
[...]
"Use a Fill Count
The second simplest solution is to use a fill count. The fill count is
implemented as an additional variable which keeps the number of
readable bytes in the buffer. This variable has to be increased if the
write (end) pointer is moved, and to be decreased if the read (start)
pointer is moved.
In the situation if both pointers pointing at the same location, you
consider the fill count to distinguish if the buffer is empty or full.
* Note: When using semaphores in a Producer-consumer model, the
semaphores act as a fill count."

So I thought there actually *is* a way to do producer - consumer on a
circular buffer overwriting older data and synchronizing with a
semaphore.

Is it possible this is not a "classical" problem, just like dining
philosophers or reader-writers? I looked for a solution in the little
book of semaphore, but no trace. Yet I can see a lot of situations
where this kind of problem arises, as wikipedia states.

> More exactly:
>
> The consumer waits for a full buffer to become available. It removes the
> buffer from the full buffer queue, processes the content and puts the
> buffer into the empty buffer queue.
> Depending on your application the consumer has to own up two two buffers
> at once to keep the processing active at buffer switches. This will
> modify your condition at (5) to n > 2.

Well, I can imagine it for n = 2; the (slower) consumer is reading
from, say, buffer 0, and the (faster) producer is continuously
producing on buffer 1; as soon as the consumer releases buffer 0, the
consumer finishes, starts writing on buffer 0 and the producer starts
reading the last frame in buffer 1. Is it feasible with the solution
you propose?


> The producer looks for a buffer in the empty queue. If it fails it takes
> the next buffer to be processed by the consumer from the full queue, if
> any. You have to do a double check here because the consumer could just
> have passed another buffer to the empty queue and taken the last one
> from the full queue. But the third try (again to the empty queue) must
> not fail.
> Then the buffer is filled with new content and passes it to the full queue.
> You may need to take up to two buffers at once for the producer for the
> same reasons as above. In this case you need at least 4 buffers to keep
> the ball rolling.
>
> Depending on your needs choose an appropriate queue implementation. But
> remember that the full buffer queue is single producer and /multiple/
> consumer.
> And be aware of priority inversions. So either choose a non-blocking
> implementation or do some compensation like priority inheritance.
>
> > Can someone give me some hint on this problem? Or point me to a
> > concrete implementation? I tought these producer-consumer problems
> > were common in multimedia real time programming (where you must
> > overwrite, for example, old video or audio frames when the consumer is
> > slower), but I only can find the blocking consumer implementations.
>
> ? - You said in (4) that your consumer should block too.

Yep, mistype, I meant "producer".

> Marcel

Marcel Müller

unread,
Oct 15, 2009, 2:28:29 PM10/15/09
to
Jane's Conference wrote:
>> Since you say 'overwrite existing buffer' I guess that you want to
>> process the buffers always in order, even if some buffers are dropped.
>>
>> All you have to do for this is to do a non-blocking read from the empty
>> buffer queue and if this fails simply read the next buffer from the full
>> buffer queue and reuse it. (At this point your buffer blocks are no
>> longer circular.)
>
> Ok, but wikipedia ( http://en.wikipedia.org/wiki/Circular_buffer )
> says:
> [...]

> So I thought there actually *is* a way to do producer - consumer on a
> circular buffer overwriting older data and synchronizing with a
> semaphore.

Feel free to seek for a complicated solution. But I don't recommend that.
And I am pretty sure that if you want to fill and consume single buffers
in unsynchronized context a circular buffer will not fit your needs.


> Is it possible this is not a "classical" problem, just like dining
> philosophers or reader-writers?

It is somewhat classical. And the solution is common too.

> I looked for a solution in the little
> book of semaphore, but no trace. Yet I can see a lot of situations
> where this kind of problem arises, as wikipedia states.

Nothing but multimedia applications I guess. In most other cases
dropping buffers is not a very useful way.


>> More exactly:
>>
>> The consumer waits for a full buffer to become available. It removes the
>> buffer from the full buffer queue, processes the content and puts the
>> buffer into the empty buffer queue.
>> Depending on your application the consumer has to own up two two buffers
>> at once to keep the processing active at buffer switches. This will
>> modify your condition at (5) to n > 2.
>
> Well, I can imagine it for n = 2; the (slower) consumer is reading
> from, say, buffer 0, and the (faster) producer is continuously
> producing on buffer 1; as soon as the consumer releases buffer 0, the
> consumer finishes, starts writing on buffer 0 and the producer starts
> reading the last frame in buffer 1. Is it feasible with the solution
> you propose?

No.
The problem is the moment when the consumer releases buffer 0. At this
point it does not yet own buffer 1. So the throughput is zero at this time.
If you have asynchronous processing in the consumer, e.g SG-DMA to the
I/O sub system, the consumer needs to own at least one buffer at any
time to keep the maximum throughput. In this case the consumer has to
acquire the next buffer before it releases buffer 0. So the consumer can
own two buffers at once and you need at least a third one for the producer.


Marcel

Jane's Conference

unread,
Oct 15, 2009, 4:11:38 PM10/15/09
to

Thank you Marcel, it was very useful.

One last question: what would be the best way to handle the n = 2
situation, in your opinion? Big raw buffers and stringent memory
contraints could force me to implement the double buffering case
(obviously, I don't expect the performance to be as optimal as in the
n > 2 cases).

Chris

Marcel Müller

unread,
Oct 16, 2009, 3:08:03 AM10/16/09
to
Jane's Conference wrote:
> One last question: what would be the best way to handle the n = 2
> situation, in your opinion?

If neither your producer nor your consumer needs to own at least one
buffer at any time there is nothing special to do. You can safely use
two buffers.
In any other case you need more buffers, whether you put them into the
pool or you use them for double buffering does not count.

In general I would avoid double buffering if possible. It can create a
significant load at the FSB e.g. in case of uncompressed video data.
Furthermore you have to take care of cache poisoning.
And last but not least you will not be able to implement double
buffering as no-lock implementation. This could cause priority
inversion, which is quite difficult to avoid, because even a simple
write to a logfile in the critical section of the consumer will break
the solution with priority inheritance.

Double buffering should be preserved as last resort for thunking layers
where the address space is not visible to some of the partners or things
like that.


> Big raw buffers and stringent memory
> contraints could force me to implement the double buffering case

? - don't know what you mean here.

But why do you want to deal with only two buffers? Latency? To keep the
latency down you could also split your two buffers into 4 buffers of
half the size. The only restriction is that you need to take care of the
context switch times of your operation system. The smaller the buffers
the faster your high priority producer needs to get scheduled. Otherwise
it runs out of buffers.


Marcel

David Schwartz

unread,
Oct 16, 2009, 5:47:58 AM10/16/09
to
On Oct 15, 1:11 pm, "Jane's Conference" <janesconfere...@gmail.com>
wrote:

> One last question: what would be the best way to handle the n = 2


> situation, in your opinion? Big raw buffers and stringent memory
> contraints could force me to implement the double buffering case
> (obviously, I don't expect the performance to be as optimal as in the
> n > 2 cases).

Producer takes whichever buffer the consumer is not using. Consumer
takes whichever buffer the producer is not using. If neither is in
use, always take older buffer.

DS

Hallvard B Furuseth

unread,
Oct 16, 2009, 5:04:10 PM10/16/09
to
Jane's Conference writes:
> 5 (...) the producer must not wait for the consumer to free an item,

May it block just while the consumer is maintaining the circual buffer
structure itself (marking last/first/currently read item)? Or are you
saying you need a lock-free solution?

> but it *must* overwrite the oldest item in the buffer.

Then I don't see how the two-buffer solution people are speaking of can
work, since there can be more old items in the buffer which the consumer
currently has locked.

How about this:
- 4 indexes: oldest item, next item to write, item being read,
hole left by producer when it skipped over the item being read.
The two latter can be -1 when there is no such item.
- Fill count of completed items.
Signal a cond when incrementing it from 0, to restart the
waiting consumer.

--
Hallvard

Hallvard B Furuseth

unread,
Oct 17, 2009, 6:06:36 PM10/17/09
to
I wrote:
> How about this:
> - 4 indexes: oldest item, next item to write, item being read,
> hole left by producer when it skipped over the item being read.
> The two latter can be -1 when there is no such item.

Eh, sorry. The "holes" would be bitflags in each buffer item, of
course. Or a separate bitvector. The consumer will never see two
consecutive holes, as far as I can see,

> - Fill count of completed items.
> Signal a cond when incrementing it from 0, to restart the
> waiting consumer.

Something like this - keeping a readhead before oldest item instead of
the oldest item

Init(n) {
assert n > 1
buf = [n items with item.readme = False]
(count, head, reading, stop) = (0, -1, -1, False)
(mtx, cnd) = (make_mutex(), make_condvar())
}
Consumer() {
loop {
local i = -1
with mtx.lock {
reading = -1
while not (count > 0 or stop): cnd.wait(mtx)
if count > 0 {
(i, count) = (head, count-1)
repeat { # one or two iterations
i = (i+1) mod buf.length
} until buf[i].readme
(reading, head, buf[i].readme) = (i, i, False) # oldest item
}
}
if i < 0: return
consume(buf[i])
}
}
Producer() {
local i = 0
while produce(buf[i]) {
with mtx.lock {
buf[i].readme = True
repeat { # one or two iterations
local overtake = (head == i)
i = (i+1) mod buf.length
if overtake: head = i
} until i != reading
if buf[i].readme { # can only happen after overtake above
buf[i].readme = False # was oldest item
} else {
count += 1
cnd.signal()
}
}
}
with mtx.lock {
stop = True
cnd.signal()
}
}

--
Hallvard

Dmitriy Vyukov

unread,
Oct 18, 2009, 7:28:11 AM10/18/09
to
On Oct 15, 4:29 am, "Jane's Conference" <janesconfere...@gmail.com>
wrote:

> Hi to everybody,
>
> I'd like to pass data between a producer and a consumer thread using a
> circular buffer.


Here it is.
Algorithm outline:
There are 2 FIFO queues. First is used for passing full nodes and is
single-producer/multi-consumer. Second is used for passing empty nodes
back and is multi-producer/single-consumer.
Producer first tries to dequeue a node from free queue, if fails
dequeue a node from full queue. Then fills the node with data. Then
enqueues the node into full queue. (plus some trickery to handle ABA
problem)
Consumer dequeues a node from full queue. Consume the data. Then
enqueues the node to free queue.
The queue is 100% lock-free. Producer never blocks. Consumer blocks
only when queue is empty.
A block of memory is allocated only once in constructor, then split
into nodes. No memory management during enqueue/dequeue calls. The
queue is "zero-copy", for that purpose enqueue/dequeue methods are
split into "prepare" and "commit" parts. So usage pattern is:

void consumer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->dequeue_prepare();
consume_impl(data);
q->dequeue_commit();
}
}

void producer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->enqueue_prepare();
produce_impl(data);
q->enqueue_commit();
}
}

Here is the implementation:

template<typename T>
class queue
{
public:
queue(size_t count)
{
assert(count >= 6);
sema = CreateSemaphore(0, 0, 1, 0);
waiting = false;
deq_node = 0;
block = new node [count];
block->next = 0;
full_tail = block;
full_head = block;
free_head = block + 1;
free_tail = block + count - 1;
free_tail->next = 0;
for (size_t i = 1; i != count - 1; i += 1)
block[i].next = block + i + 1;
}

~queue()
{
CloseHandle(sema);
delete [] block;
}

T& enqueue_prepare()
{
return full_tail->data;
}

void enqueue_commit()
{
node* n = get_free_node();
n->next = 0;
full_tail->next = n;
full_tail = n;
if (waiting)
{
waiting = false;
ReleaseSemaphore(sema, 1, 0);
}
}

T& dequeue_prepare()
{
deq_node = get_full_node();
return deq_node->data;
}

void dequeue_commit()
{
deq_node->next = 0;
node* prev = free_tail.exchange(deq_node);
prev->next = deq_node;
}

private:
struct node
{
std::atomic<node*> next;
T data;
};

node* block;
node* full_tail;
node* free_head;
node* deq_node;
char pad [64];
std::atomic<node*> full_head;
std::atomic<node*> free_tail;
std::atomic<bool> waiting;
HANDLE sema;

node* get_free_node()
{
for (;;)
{
node* n = free_head;
node* next = n->next;
if (next)
{
free_head = next;
return n;
}

n = full_head;
next = n->next;
if (next)
{
if (full_head.compare_exchange_strong(n, next))
{
node* n2 = free_head;
node* next2 = n2->next;
if (next2)
{
n->next = 0;
node* prev = free_tail.exchange(n);
prev->next = n;
free_head = next2;
return n2;
}
else
{
return n;
}
}
}
}
}

node* get_full_node()
{
node* n = full_head;
for (;;)
{
node* next = n->next;
if (next == 0)
{
waiting = true;
n = full_head;
next = n->next;
if (next)
{
waiting = false;
}
else
{
WaitForSingleObject(sema, INFINITE);
n = full_head;
continue;
}
}
if (full_head.compare_exchange_strong(n, next))
return n;
}
}
};


--
Dmitrit V'jukov

Chris M. Thomasson

unread,
Oct 18, 2009, 4:26:31 PM10/18/09
to
"Jane's Conference" <janesco...@gmail.com> wrote in message
news:159cef18-2778-4d83...@g19g2000yqo.googlegroups.com...


FWIW, here is a quick pseudo-code sketch of a funny algorithm which works
for multiple producers and consumers:
_____________________________________________________________
template<typename T, size_t T_size>
class funny_queue
{
atomic<T*> m_buffer[T_size]; // = { NULL }
atomic<size_t> m_head; // = 0
atomic<size_t> m_tail; // = 0
eventcount m_ecpop;


public:
T* push(T* state)
{
assert(state);

size_t i = m_head.fetch_add(1) % T_size;

if (! (state = m_buffer[i].exchange(state)))
{
m_ecpop.broadcast();
}

return state;
}


T* pop()
{
size_t i = m_tail.fetch_add(1) % T_size;

T* state;

while (! (state = m_buffer[i].exchange(NULL)))
{
eventcount::key_type eckey = m_ecpop.get();

if ((state = m_buffer[i].exchange(NULL))) break;

m_ecpop.wait(eckey);
}

return state;
}
};
_____________________________________________________________


Producers can overwrite buffers at will and consume the previous data.
Consumers will block when there is no data for them to consume.


The funny part is that a producer can act like a consumer...


;^)

Dmitriy Vyukov

unread,
Oct 19, 2009, 12:47:24 AM10/19/09
to
On 19 окт, 00:26, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> FWIW, here is a quick pseudo-code sketch of a funny algorithm which works
> for multiple producers and consumers:
> _____________________________________________________________

> Producers can overwrite buffers at will and consume the previous data.
> Consumers will block when there is no data for them to consume.
>
> The funny part is that a producer can act like a consumer...
>
> ;^)


Hi Chris,

It will indeed work.
However it does not provide FIFO ordering, i.e. consumer will consume
newer data and then older one. The fun begins when you try to provide
FIFO in this situation, because both producer and consumer have to
process buffer elements not in sequential order. So you have to
organize linked list on top of array, and then you have to cope with
safe memory reclamation and ABA.
Your variant is definitely simpler than mine.
Unfortunately topicstarter did not say what order he needs.

--
Dmitriy V'jukov

Chris M. Thomasson

unread,
Oct 19, 2009, 8:11:49 AM10/19/09
to
"Dmitriy Vyukov" <dvy...@gmail.com> wrote in message
news:0e8ac038-a00a-4851...@37g2000yqm.googlegroups.com...

On 19 О©╫О©╫О©╫, 00:26, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> > FWIW, here is a quick pseudo-code sketch of a funny algorithm which
> > works
> > for multiple producers and consumers:
> > _____________________________________________________________
> > Producers can overwrite buffers at will and consume the previous data.
> > Consumers will block when there is no data for them to consume.
> >
> > The funny part is that a producer can act like a consumer...
> >
> > ;^)

> It will indeed work.
> However it does not provide FIFO ordering, i.e. consumer will consume
> newer data and then older one. The fun begins when you try to provide
> FIFO in this situation, because both producer and consumer have to
> process buffer elements not in sequential order. So you have to
> organize linked list on top of array, and then you have to cope with
> safe memory reclamation and ABA.

Yes. The "funny_queue" is definitely `SCHED_OTHER'...

:^)


> Your variant is definitely simpler than mine.
> Unfortunately topicstarter did not say what order he needs.

Well, the "funny_queue" should come in handy if one does not need a strict
ordering of consumed items. Humm... It does have some nice properties in
that it does not need CAS even in a multi-producer/consumer environment.

jacktumi

unread,
Dec 14, 2009, 8:16:19 PM12/14/09
to
On Oct 17, 2:06 pm, Hallvard B Furuseth <h.b.furus...@usit.uio.no>
wrote:

Just a curiosity: in the Producer() function the statement cnd.signal
() (signaling the condition) would cause the locking of the mutex
associated to the condition variable; would it not? Isn't this
breaking the OP requirement that the Producer must not lock?

Thanks,

GT

Hallvard B Furuseth

unread,
Dec 15, 2009, 6:41:40 AM12/15/09
to
jacktumi writes:
> Just a curiosity: in the Producer() function the statement cnd.signal
> () (signaling the condition) would cause the locking of the mutex
> associated to the condition variable; would it not? Isn't this
> breaking the OP requirement that the Producer must not lock?

cnd.signal doesn't block, but 'with mtx.lock' certainly does:-)
So yes, like a number of other postings I'm breaking a requirement
of the OP's. (In my case, not using lock-free code.) It doesn't
block waiting for a consumer to consume though, just for it to
update the buffer structure. IIRC some other solutions were more
lock-friendly but didn't necessarily pick the oldest item, but
I may remember wrongly.

--
Hallvard

0 new messages