Double ended queue - review

66 views
Skip to first unread message

holo...@gmail.com

unread,
Dec 12, 2008, 4:14:53 AM12/12/08
to
Hello,

I'm trying to implement a C++ version of double-ended queue presented
in Herlihy's "The Art of Multiprocessor Programming". It's a single
producer, multiple consumer queue, where producer thread pushes/pops
from bottom and other threads can only pop from top.
Original version was in Java, here's my try in C++. I'm rather new to
world of lock-free programming, so I'm not sure if I'm not missing
something (I'm not sure about those barriers for example, that's a
tricky one, in the worst case I can modify m_bottom using interlocked
ops, but it's only written by one thread, others can only read it, so
in theory it works). I don't expect it to be very universal, I'm going
to use it for x86/Xbox360 only. My first tests finished successfully,
but I'd be very grateful for any tips/findings.

// Double-ended queue.
// Single producer, multiple consumers.
// Holds pointers to T, not Ts themselves.
template<typename T>
class LockFreeDEQueue
{
public:
explicit LockFreeDEQueue(int maxCapacity)
{
m_data = new T*[maxCapacity];
m_top = 0;
m_topCounter = 0;
m_bottom = 0;
}
~LockFreeDEQueue()
{
delete[] m_data;
m_data = 0;
}
void PushBottom(T* t)
{
m_data[m_bottom++] = t;
Interlocked::ReadWriteBarrier();
}
// Can be called from multiple threads.
T* PopTop()
{
int32_t topCounter = m_topCounter;
int32_t top = m_top;

// Empty queue.
if (m_bottom <= top)
return 0;

T* t = m_data[top];
if (CAS2(&m_top, top, topCounter, top + 1, topCounter + 1))
return t;
return 0;
}
// Only called from one thread.
T* PopBottom()
{
if (m_bottom == 0) // Empty queue
return 0;

--m_bottom;
Interlocked::ReadWriteBarrier();
T* t = m_data[m_bottom];
int32_t topCounter = m_topCounter;
int32_t top = m_top;

// No danger of conflict, just return.
if (m_bottom > top)
return t;
// Possible conflict, slow-path.
if (m_bottom == top)
{
m_bottom = 0;
if (CAS2(&m_top, top, topCounter, 0, topCounter + 1))
return t;
}
FetchAndStore(&m_top, 0, topCounter + 1);
return 0;
}

private:
T** m_data;
int32_t volatile m_top;
int32_t volatile m_topCounter;
int32_t volatile m_bottom;
};

Torsten Robitzki

unread,
Dec 13, 2008, 4:50:50 AM12/13/08
to
Hello,

holo...@gmail.com wrote:

> Original version was in Java, here's my try in C++.

In C++ there is no need for everything to be a pointer. So I would store
Ts and let the user decide to store pointers to T if he needs that.

You should declare a private copy constructor and assignment operator
without defining them. Or otherwise provide them correctly.

best regards
Torsten

--
kostenlose Wirtschaftssimulation: http://www.financial-rumors.de

JC

unread,
Dec 13, 2008, 9:08:39 PM12/13/08
to


Also you need to make sure you have no elements left in the queue when
you delete it, or you have some other way of keeping track of the
pointers in the queue, otherwise you'll leak all the remaining queued
items.

You may additionally want to consider using std::deque or similar as
your core. Then you don't have to worry about the implementation
details (or limiting it to a max capacity) of the queue itself, you
only have to worry about providing thread-safety. Not that there's a
particular problem with your implementation, but the use of, say,
std::deque<T> with T = boost::shared_ptr<YourObject>, for example,
entirely eliminates the need for you to think about both the queue
implementation and memory management of queued objects and queue data.

Jason

Chris M. Thomasson

unread,
Dec 13, 2008, 11:55:30 PM12/13/08
to

<holo...@gmail.com> wrote in message
news:c43407b3-5797-4857...@35g2000pry.googlegroups.com...

> Hello,
>
> I'm trying to implement a C++ version of double-ended queue presented
> in Herlihy's "The Art of Multiprocessor Programming". It's a single
> producer, multiple consumer queue, where producer thread pushes/pops
> from bottom and other threads can only pop from top.
> Original version was in Java, here's my try in C++. I'm rather new to
> world of lock-free programming, so I'm not sure if I'm not missing
> something (I'm not sure about those barriers for example, that's a
> tricky one, in the worst case I can modify m_bottom using interlocked
> ops, but it's only written by one thread, others can only read it, so
> in theory it works). I don't expect it to be very universal, I'm going
> to use it for x86/Xbox360 only. My first tests finished successfully,
> but I'd be very grateful for any tips/findings.
>
[...]

> // Only called from one thread.
> T* PopBottom()
> {
> if (m_bottom == 0) // Empty queue
> return 0;
>
> --m_bottom;
> Interlocked::ReadWriteBarrier();
> T* t = m_data[m_bottom];
> int32_t topCounter = m_topCounter;
> int32_t top = m_top;
>
> // No danger of conflict, just return.
> if (m_bottom > top)
> return t;
> // Possible conflict, slow-path.
> if (m_bottom == top)
> {
> m_bottom = 0;
> if (CAS2(&m_top, top, topCounter, 0, topCounter + 1))
> return t;
> }
> FetchAndStore(&m_top, 0, topCounter + 1);

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


What is the second parameter for? Is this some sort of conditional
fetch-and-store?


> return 0;
> }


> // Can be called from multiple threads.
> T* PopTop()
> {
> int32_t topCounter = m_topCounter;
> int32_t top = m_top;

> // Empty queue.
> if (m_bottom <= top)
> return 0;

> T* t = m_data[top];
> if (CAS2(&m_top, top, topCounter, top + 1, topCounter + 1))
> return t;
> return 0;
> }


Are you sure that `m_top' will never grow greater than `maxCapacity - 1'?

Chris M. Thomasson

unread,
Dec 14, 2008, 12:04:37 AM12/14/08
to

"Chris M. Thomasson" <n...@spam.invalid> wrote in message
news:Mk01l.2455$hs1...@newsfe04.iad...

[...]

addr passed to DWCAS needs to be on an 8-byte boundary for a 32-bit machine;
your data layout is as follows:


T** m_data;
int32_t volatile m_top;
int32_t volatile m_topCounter;
int32_t volatile m_bottom;


Well, AFAICT, `m_top' might not be properly aligned for a DWCAS...

Dmitriy V'jukov

unread,
Dec 14, 2008, 4:59:50 AM12/14/08
to
On 13 дек, 12:50, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:

> In C++ there is no need for everything to be a pointer. So I would store
> Ts and let the user decide to store pointers to T if he needs that.

This doesn't apply to low-level synchronization algorithms. This
algorithm doesn't even respect C++ type system. For example, one
thread can be replacing element, and another concurrently copying it.

--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Dec 14, 2008, 5:24:10 AM12/14/08
to
On 12 дек, 12:14, "holor...@gmail.com" <holor...@gmail.com> wrote:

>         void PushBottom(T* t)
>         {
>                 m_data[m_bottom++] = t;
>                 Interlocked::ReadWriteBarrier();
>         }

You really must not try to save on source lines here. The more
explicit code sequences the better. IIRC here must be:

void PushBottom(T* t)
{
int32_t bottom = m_bottom; // relaxed load
m_data[bottom] = t;
Interlocked::ReleaseBarrier(); // or ReadWriteBarrier
()
m_bottom = bottom + 1;
}

Also, strictly speaking you have to declare the array as:
T* volatile* m_data;

Also, there must be some additional memory fences:


// Can be called from multiple threads.
T* PopTop()
{
int32_t topCounter = m_topCounter;
int32_t top = m_top;

// Empty queue.
if (m_bottom <= top)
return 0;

// orders load of m_bottom and load from m_data
Interlocked::AcquireBarrier(); // or ReadWriteBarrier
()

T* t = m_data[top];
if (CAS2(&m_top, top, topCounter, top + 1, topCounter
+ 1))
return t;
return 0;
}

I can suggest that you stick with, for example, Threading Building
Blocks atomics library. It provides atomic operations with
sufficiently detailed memory fences.

Also you can try to use Relacy Race Detector for verification of such
algorithms:
http://groups.google.com/group/relacy
Fleeting peer-reviews really does NOT help with such kind of stuff.
I.e. with high probability some subtle bugs will still be there after
review. And Relacy Race Detector can detect all kinds of multi-
threading errors (races, deadlocks, livelocks, accesses to freed
memory and so on) in a second.


--
Dmitriy V'jukov

Torsten Robitzki

unread,
Dec 14, 2008, 12:51:08 PM12/14/08
to
Dmitriy V'jukov wrote:

>>In C++ there is no need for everything to be a pointer. So I would store
>>Ts and let the user decide to store pointers to T if he needs that.
>
>
> This doesn't apply to low-level synchronization algorithms. This
> algorithm doesn't even respect C++ type system. For example, one
> thread can be replacing element, and another concurrently copying it.

I can't see your arguments, why m_data has to be an array of pointers to
T instead of simply and more general an array of T. But I fail too, to
see the requirements that are fulfilled by OPs implementation.

Dmitriy V'jukov

unread,
Dec 14, 2008, 1:28:59 PM12/14/08
to
On 14 дек, 20:51, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> Dmitriy V'jukov wrote:
> >>In C++ there is no need for everything to be a pointer. So I would store
> >>Ts and let the user decide to store pointers to T if he needs that.
>
> > This doesn't apply to low-level synchronization algorithms. This
> > algorithm doesn't even respect C++ type system. For example, one
> > thread can be replacing element, and another concurrently copying it.
>
> I can't see your arguments, why m_data has to be an array of pointers to
> T instead of simply and more general an array of T. But I fail too, to
> see the requirements that are fulfilled by OPs implementation.

(1) There will be concurrent conflicting (one access is modification)
accesses to the objects. Any type T that is a bit more complicated
than POD struct can cause access violations/sigsegv or infinite loop
or something similar in such context. The container does NOT respect
type system.

(2) You can't destroy user objects once they are constructed in array.
I.e. there will be basically unlimited number of unneeded user
objects, and if user object holds some resources it can be expensive.
And if you will try to destroy them in array, (1) will get worse -
threads will read destroyed or semi-destroyed objects.

Well, actually I implemented such container with array value T (not
T*). But then one must be extremely careful when designing T, it's not
like T in std::vector<>. Basically T must provide *atomic* assignment
and *atomic* copy operation, and copy operation must be ready to read
basically any non-type-safe trash. Yes, later operation will detect
inconsistency, but it's important to not catch access violation and
not fall into infinite loop at this step.

--
Dmitriy V'jukov

holo...@gmail.com

unread,
Dec 15, 2008, 7:30:33 AM12/15/08
to
Thanks for opinions guys, much appreciated.

- I store pointers for reasons given by Dimitriy mainly. This class
was created to fulfill rather specific needs, and in many places I'm
ready to sacfifice ease of use/universality for speed.

On 14 Gru, 05:55, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> > FetchAndStore(&m_top, 0, topCounter + 1);

> What is the second parameter for? Is this some sort of conditional
> fetch-and-store?

No, it's a 64-bit version. I load 2 32-bit words to address given in
first argument. As m_topCounter resides just after m_top, it'll be
updated as well.

> Are you sure that `m_top' will never grow greater than `maxCapacity - 1'?

I'm not :) It's responsibility of the caller to make sure of that. Not
very elegant, I agree, but that's another one of those places where I
chose simplicity.
Thanks for alignment tip!

Dimitriy - thanks for notes, I'll try to run it under Race Detector
(kudos for making it public).

Torsten Robitzki

unread,
Dec 15, 2008, 12:42:49 PM12/15/08
to
holo...@gmail.com wrote:
> void PushBottom(T* t)
> {
> m_data[m_bottom++] = t;
> Interlocked::ReadWriteBarrier();
> }
> // Can be called from multiple threads.
> T* PopTop()
> {
> int32_t topCounter = m_topCounter;
> int32_t top = m_top;
>
> // Empty queue.
> if (m_bottom <= top)
> return 0;
>
> T* t = m_data[top];
> if (CAS2(&m_top, top, topCounter, top + 1, topCounter + 1))
> return t;
> return 0;
> }

Assumed that this are the two main functions of the class. One thread
reading from the queue by calling PopTop() and one thread by writing to
it by calling PushBottom(). What happens once both m_top and m_bottom
grow over maxCapacity?

How is this suppossed to work, in cases that the writer thread produces
elements faster, than the consumer thread can consume them?

How is m_topCounter supposed to change it's value?

best regards
Torsten

Torsten Robitzki

unread,
Dec 15, 2008, 12:49:01 PM12/15/08
to
Dmitriy V'jukov wrote:
> On 14 дек, 20:51, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:

>>I can't see your arguments, why m_data has to be an array of pointers to
>>T instead of simply and more general an array of T.

> (1) There will be concurrent conflicting (one access is modification)
> accesses to the objects. Any type T that is a bit more complicated
> than POD struct can cause access violations/sigsegv or infinite loop
> or something similar in such context. The container does NOT respect
> type system.

There is not a single cast in the code, the OP provided. Why do you
think, that this code doesn't respect the C++ type system?

> (2) You can't destroy user objects once they are constructed in array.

Sure, you can by invocing the destructor directly.

> I.e. there will be basically unlimited number of unneeded user
> objects, and if user object holds some resources it can be expensive.

This doesn't get better if every such object is create on the free store.

> Well, actually I implemented such container with array value T (not
> T*). But then one must be extremely careful when designing T, it's not
> like T in std::vector<>. Basically T must provide *atomic* assignment
> and *atomic* copy operation, and copy operation must be ready to read
> basically any non-type-safe trash.

If there are such requirements to T, I would simply add them to the
documentation of the queue and give the decision to the user.

But I can't see why assignment and copy of T must be atomic in OPs case.

best regards
Torsten

holo...@gmail.com

unread,
Dec 15, 2008, 3:16:32 PM12/15/08
to
On 15 Gru, 18:42, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> Assumed that this are the two main functions of the class. One thread
> reading from the queue by calling PopTop() and one thread by writing to
> it by calling PushBottom(). What happens once both m_top and m_bottom
> grow over maxCapacity?
Application crashes :). It's caller's responsibility to make sure it
doesn't happen. Queue's bounded, so it can hold up to maxCapacity
items at any given moment. I wanted to start easy before moving to
unbounded version.

> How is m_topCounter supposed to change it's value?

CAS2 operates on 64-bits.

Dmitriy V'jukov

unread,
Dec 16, 2008, 5:44:27 AM12/16/08
to
On 15 дек, 20:49, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> Dmitriy V'jukov wrote:
> > On 14 ÄÅË, 20:51, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> >>I can't see your arguments, why m_data has to be an array of pointers to
> >>T instead of simply and more general an array of T.
> > (1) There will be concurrent conflicting (one access is modification)
> > accesses to the objects. Any type T that is a bit more complicated
> > than POD struct can cause access violations/sigsegv or infinite loop
> > or something similar in such context. The container does NOT respect
> > type system.
>
> There is not a single cast in the code, the OP provided. Why do you
> think, that this code doesn't respect the C++ type system?
>
> > (2) You can't destroy user objects once they are constructed in array.
>
> Sure, you can by invocing the destructor directly.
>
> > I.e. there will be basically unlimited number of unneeded user
> > objects, and if user object holds some resources it can be expensive.
>
> This doesn't get better if every such object is create on the free store.
>
> > Well, actually I implemented such container with array value T (not
> > T*). But then one must be extremely careful when designing T, it's not
> > like T in std::vector<>. Basically T must provide *atomic* assignment
> > and *atomic* copy operation, and copy operation must be ready to read
> > basically any non-type-safe trash.
>
> If there are such requirements to T, I would simply add them to the
> documentation of the queue and give the decision to the user.


It's one variant. The other variant is not add them to documentation
and use T*.


>
> But I can't see why assignment and copy of T must be atomic in OPs case.


Do you see that one thread can mutate the object in line:
m_data[m_bottom++] = t;
and another thread can concurrently read the object in line:


T* t = m_data[top];

?


--
Dmitriy V'jukov

Torsten Robitzki

unread,
Dec 16, 2008, 1:07:17 PM12/16/08
to

that would only be true, for the case where m_bottom == top and that
seems to be the case for an empty queue.

Dmitriy V'jukov

unread,
Dec 17, 2008, 9:00:24 AM12/17/08
to
On Dec 16, 9:07 pm, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> Dmitriy V'jukov wrote:
> > On 15 дек, 20:49, Torsten Robitzki <MyFirstn...@robitzki.de> wrote:
> >>But I can't see why assignment and copy of T must be atomic in OPs case.
>
> > Do you see that one thread can mutate the object in line:
> >                 m_data[m_bottom++] = t;
> > and another thread can concurrently read the object in line:
> >                 T* t = m_data[top];
> > ?
>
> that would only be true, for the case where m_bottom == top and that
> seems to be the case for an empty queue.

Consider following scenario.

m_bottom = 1;
m_top = 0;
i.e. one element in the queue

Stealer reads m_top:
int32_t top = m_top; // = 0

And starts copy operation:
T t = m_data[top];

But suspended in the middle of the copy.

Not owner successfully pops that single element, resets m_bottom =
m_top = 0, and then enqueue new element into position 0.

Now stealer wakes up and continues his copy operation, but the element
is already changed, so it has read half from old object and half from
new object. Assume that T contains std::vector<Y>, which in turn
contains Y* m_begin, and Y* m_end. Assume that stealer has read
m_begin from old element and m_end from new element, and then tries to
copy that (begin, end) range. Well, he is up to his neck in $hit now.
This is why this algorithm doesn't respect C++ type system.

Usually there is basically no sense in making such algorithms general
at all. Event if deque is parametrized by element type, it is usually
used with single element type, and tightly coupled with details of
that type anyway.

--
Dmitriy V'jukov

Chris M. Thomasson

unread,
Dec 20, 2008, 7:53:59 AM12/20/08
to
<holo...@gmail.com> wrote in message
news:679df80a-d722-445e...@r36g2000prf.googlegroups.com...

> Thanks for opinions guys, much appreciated.
>
> - I store pointers for reasons given by Dimitriy mainly. This class
> was created to fulfill rather specific needs, and in many places I'm
> ready to sacfifice ease of use/universality for speed.
>
> On 14 Gru, 05:55, "Chris M. Thomasson" <n...@spam.invalid> wrote:
>> > FetchAndStore(&m_top, 0, topCounter + 1);
>> What is the second parameter for? Is this some sort of conditional
>> fetch-and-store?
> No, it's a 64-bit version. I load 2 32-bit words to address given in
> first argument. As m_topCounter resides just after m_top, it'll be
> updated as well.

Yeah, I missed that; sorry about that non-sense!

;^(...


>> Are you sure that `m_top' will never grow greater than `maxCapacity - 1'?
> I'm not :) It's responsibility of the caller to make sure of that. Not
> very elegant, I agree, but that's another one of those places where I
> chose simplicity.
> Thanks for alignment tip!
>
> Dimitriy - thanks for notes, I'll try to run it under Race Detector
> (kudos for making it public).

What do you think of Relacy? IMVHO, it presents a very a fine-grain
development environment indeed...

Reply all
Reply to author
Forward
0 new messages