Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

One way to get strong exception guarantee for a multithreaded pop: a Dequeued_ class

22 views
Skip to first unread message

Alf P. Steinbach

unread,
Mar 20, 2017, 10:47:34 PM3/20/17
to
Based on Chris M. Thomasson's earlier example of producer/consumer queue.

The idea here is to replace the named `pop()` operation with a wrapper
class for the result, here called `Dequeued_`.

I think that's new because I haven't seen it anywhere.


#include <cstdio>
#include <deque>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <thread>
#include <utility> // std::move
#include <algorithm>
#include <cassert>

namespace g{
std::mutex std_out_mutex;
} // namespace g

using Lock = std::unique_lock<std::mutex>;

template< class... Args >
void say( char const* format, Args... args )
{
Lock const lock{ g::std_out_mutex };
printf( format, args... );
}

template< class Item >
class Dequeued_
{
private:
Item item_;

template< class Queue >
static auto get_lock( Queue& q )
-> Lock
{
Lock lock{ q.m_mutex };
while( q.m_queue.size() == 0 )
{
q.m_cond.wait( lock );
}
return lock; // It's moved.
}

template< class Raw_queue >
Dequeued_( Lock, Raw_queue& raw_q )
: item_{ raw_q.front() }
{ raw_q.pop_front(); }

public:
auto item() const -> Item const& { return item_; }
auto item() -> Item& { return item_; }

template< class Queue >
Dequeued_( Queue& q )
: Dequeued_{ get_lock( q ), q.m_queue }
{}
};

template< class Item >
class Queue_
{
template< class > friend class Dequeued_;
private:
std::deque<Item> m_queue;
std::condition_variable m_cond;
std::mutex m_mutex;

public:
void push( Item obj )
{
{
Lock const lock{ m_mutex };
m_queue.push_back( std::move( obj ) );
}

m_cond.notify_one();
}

auto pop() -> Item = delete; // Use Dequeued_ instead.
};

using String_queue = Queue_<unsigned>;
int const n_consumers = 5;
int const N = 1000;

void producer( String_queue& queue )
{
say( "producer::queue::(%p) - enter\n", (void*)&queue );

for( unsigned i = 0; i < N; ++i )
{
queue.push( i + 1 );
std::this_thread::yield(); // just for some spice
}

for( unsigned i = 0; i < n_consumers; ++i )
{
queue.push( 0 );
std::this_thread::yield(); // just for some spice
}

say( "producer::queue::(%p) - exit\n", (void*)&queue );
}


void consumer( unsigned const id, String_queue& queue )
{
say( "consumer(%u)::queue::(%p) - enter\n", id, (void*)&queue );

unsigned prev = 0;

for (;;)
{
Dequeued_<unsigned> const msg{ queue };

say( "consumer(%u)::msg::(%u)\n", id, msg.item() );

if( msg.item() == 0 ) break;
assert( msg.item() > prev ); // validate fifo nature

prev = msg.item();
}

say( "consumer::queue::(%p) - exit\n", (void*)&queue );
}

auto main()
-> int
{
String_queue queue;

std::thread consumers[n_consumers];

for( unsigned i = 0; i < n_consumers; ++i )
{
consumers[i] = std::thread(
consumer,
i,
std::ref( queue )
);
}

std::thread producer_thread(
producer,
std::ref(queue)
);

producer_thread.join();
for( auto& t : consumers ) { t.join(); }
}


Cheers!,

- Alf

kushal bhattacharya

unread,
Mar 21, 2017, 12:56:27 AM3/21/17
to
hi,
I have onw question regarding the implementation.
Are you putting the lock when you are enquiring for size of queue in the while loop ?

Alf P. Steinbach

unread,
Mar 21, 2017, 1:25:07 AM3/21/17
to
On 21-Mar-17 5:56 AM, kushal bhattacharya wrote:
> hi,
> I have onw question regarding the implementation.
> Are you putting the lock when you are enquiring for size of queue in the while loop ?

Yes. Here's that code:

Lock lock{ q.m_mutex };
while( q.m_queue.size() == 0 )
{
q.m_cond.wait( lock );
}
return lock; // It's moved.

The declaration of lock acquires the mutex; it's now locked.

Next, the while loop condition, checking the queue size, is evaluated,
with the mutex acquired. Only one thread can have it at a time, and
right now it's this thread.

The `q.m_cond.wait( lock )` statement then releases the mutex, opening
up for other threads to access the queue. In particular, to push items
onto it. When that `wait` returns it acquires the mutex again. And so in
the next iteration of the loop, if there is one, the mutex is again held
by this thread when the queue size is checked, and so on, and on.

Finally the `return` /moves/ the lock back up to the caller of this
function. The `std::unique_lock` guarantees its move semantics, that the
locking is actually moved.

You can find documentation of these details at <url:
http://cppreference.com>.


Cheers!,

- Alf

Chris M. Thomasson

unread,
Mar 21, 2017, 5:21:23 PM3/21/17
to
> {}[...]


Love this idea of the special class as an aid for a pop function, and
how it only requires copy constructable, not assignment. I also like
because I did not know that a move would not release the lock wrt the
get_lock function, it hands off, or moves ownership of the mutex back to
the caller. Thanks Alf. :^)
0 new messages