Alf P. Steinbach
unread,Mar 20, 2017, 10:45:47 PM3/20/17You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
Based on Chris M. Thomasson's earlier example of producer/consumer queue.
The idea here is to replace the named `pop()` operation with a wrapper
class for the result, here called `Dequeued_`.
I think that's new because I haven't seen it anywhere.
#include <cstdio>
#include <deque>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <thread>
#include <utility> // std::move
#include <algorithm>
#include <cassert>
namespace g{
std::mutex std_out_mutex;
} // namespace g
using Lock = std::unique_lock<std::mutex>;
template< class... Args >
void say( char const* format, Args... args )
{
Lock const lock{ g::std_out_mutex };
printf( format, args... );
}
template< class Item >
class Dequeued_
{
private:
Item item_;
template< class Queue >
static auto get_lock( Queue& q )
-> Lock
{
Lock lock{ q.m_mutex };
while( q.m_queue.size() == 0 )
{
q.m_cond.wait( lock );
}
return lock; // It's moved.
}
template< class Raw_queue >
Dequeued_( Lock, Raw_queue& raw_q )
: item_{ raw_q.front() }
{ raw_q.pop_front(); }
public:
auto item() const -> Item const& { return item_; }
auto item() -> Item& { return item_; }
template< class Queue >
Dequeued_( Queue& q )
: Dequeued_{ get_lock( q ), q.m_queue }
{}
};
template< class Item >
class Queue_
{
template< class > friend class Dequeued_;
private:
std::deque<Item> m_queue;
std::condition_variable m_cond;
std::mutex m_mutex;
public:
void push( Item obj )
{
{
Lock const lock{ m_mutex };
m_queue.push_back( std::move( obj ) );
}
m_cond.notify_one();
}
auto pop() -> Item = delete; // Use Dequeued_ instead.
};
using String_queue = Queue_<unsigned>;
int const n_consumers = 5;
int const N = 1000;
void producer( String_queue& queue )
{
say( "producer::queue::(%p) - enter\n", (void*)&queue );
for( unsigned i = 0; i < N; ++i )
{
queue.push( i + 1 );
std::this_thread::yield(); // just for some spice
}
for( unsigned i = 0; i < n_consumers; ++i )
{
queue.push( 0 );
std::this_thread::yield(); // just for some spice
}
say( "producer::queue::(%p) - exit\n", (void*)&queue );
}
void consumer( unsigned const id, String_queue& queue )
{
say( "consumer(%u)::queue::(%p) - enter\n", id, (void*)&queue );
unsigned prev = 0;
for (;;)
{
Dequeued_<unsigned> const msg{ queue };
say( "consumer(%u)::msg::(%u)\n", id, msg.item() );
if( msg.item() == 0 ) break;
assert( msg.item() > prev ); // validate fifo nature
prev = msg.item();
}
say( "consumer::queue::(%p) - exit\n", (void*)&queue );
}
auto main()
-> int
{
String_queue queue;
std::thread consumers[n_consumers];
for( unsigned i = 0; i < n_consumers; ++i )
{
consumers[i] = std::thread(
consumer,
i,
std::ref( queue )
);
}
std::thread producer_thread(
producer,
std::ref(queue)
);
producer_thread.join();
for( auto& t : consumers ) { t.join(); }
}
Cheers!,
- Alf