On 20-Mar-17 2:13 PM, Chris Vine wrote:
> On Sun, 19 Mar 2017 17:42:13 -0700
> "Chris M. Thomasson" <inv...@invalid.invalid> wrote:
>> This queue can be spiced up with exotic C++ atomics and such, but for
>> now, lets focus on this simple construct:
>>
>> A simple fifo thread-safe queue, based on mutexs, condvars, and a c++
>> deque. This queue is actually multi-producer/consumer, but the test
>> example code uses a single producer. Also, it uses a special mutex to
>> make sure calls to printf are thread-safe! ;^)
>>
>>
http://pastebin.com/raw/GA9pJvKN
>
> For a generic thread safe queue using the standard containers, I would
> generally use a list rather than a deque and splice the new item in
> when pushing. This means that if T has a copy constructor which
> allocates, the allocation takes place outside the queue mutex so
> substantially reducing contention (all that is done under the mutex is
> to splice, which only swaps some pointers). I say "generic" because if
> you know T will be a fundamental type or an aggregate of fundamental
> types, a deque would be better.
That's an interesting advantage of `std::list`.
I was sure that no practically relevant advantage was left after C++11.
> Doing a similar splice operation in the pop() method to minimize
> contention does however mean sacrificing strong exception safety if the
> queue item type's move assignment operator (or if it has none, its
> copy assignment operator) might throw. So it is a matter of design
> whether you want to de-queue using splicing also, or offer that as an
> option.
Worth noting that std::vector, instead of prohibiting a non-copyable
item type with possibly throwing move operation, allows it and then
doesn't offer any guarantee.
Let's define the strong exception guarantee for a pop-like operation,
whether it returns the item via function result or as out-argument, as
* either client code has a logical copy of the former front item, and
the front of the queue has been popped, or
* an exception was thrown and the queue is unchanged.
The C++03 standard library sought to /avoid/ providing this guarantee by
separating the value copying, `front()`, from the actual popping,
`pop_front()`. With no combined operation the queue itself was exception
safe, providing the strong guarantee for each of its operations. And by
just being somewhat awkward, with a separate `pop_front()` call after
acquiring the value, the client code was safe.
So the question is, how can that after-copy call be automated?
A natural way would be via a destructor. But doing that directly would
rely on detecting whether an exception was thrown by the (logical) copying.
Given that a combined pop operation can be called during a stack
unwinding, the only way I can think of to detect a copy exception is to
return not the value/object itself, but a double indirection, which
really complicates things relative to Chris M.thomasson's example:
<code>
#include <cstdio>
#include <deque>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <thread>
#include <utility> // std::forward, std::move
#include <algorithm>
#include <cassert>
template< class Item, class Queue >
class Queue_popper_;
template< class Item, class Queue_popper >
class Queue_front_item_
{
template< class I, class Q > friend class Queue_popper_;
private:
Queue_popper* p_popper_;
Queue_front_item_( Queue_front_item_ const& ) = delete;
public:
Item item;
~Queue_front_item_()
{ p_popper_->on_object_destroyed(); }
Queue_front_item_( Queue_front_item_&& other )
: p_popper_( other.p_popper_ )
, item{ std::move( other.item ) }
{ p_popper_->on_object_created(); }
private:
template< class... Args >
Queue_front_item_( Queue_popper& popper, Args&&... args )
: p_popper_{ &popper }
, item{ std::forward<Args>( args )... }
{ p_popper_->on_object_created(); }
};
template< class Item, class Queue >
class Queue_popper_
{
private:
using Lock = std::unique_lock<std::mutex>;
Queue* p_queue_ = nullptr;
int n_item_calls_ = 0;
int n_created_objects_ = 0;
Lock lock_;
Queue_popper_( Queue_popper_ const& ) = delete;
public:
void on_object_created() { ++n_created_objects_; }
void on_object_destroyed() { --n_created_objects_; }
auto item()
-> Queue_front_item_<Item, Queue_popper_>
{
assert( n_item_calls_ == 0 );
++n_item_calls_;
if( noexcept( Item{ std::move( p_queue_->front() ) } ) )
{
return { *this, std::move( p_queue_->front() ) };
}
else
{
return { *this, p_queue_->front() };
}
}
~Queue_popper_()
{
assert( 0 <= n_created_objects_ and n_created_objects_ <= 1 );
if( n_item_calls_ == 0 or n_created_objects_ == 1 )
{
p_queue_->pop_front();
}
}
Queue_popper_( Queue& q, Lock&& lock )
: p_queue_{ &q }
, lock_( move( lock ) )
{}
Queue_popper_( Queue_popper_&& other ) noexcept
: p_queue_{ exchange( nullptr, other.p_queue_ ) }
, n_item_calls_{ exchange( 0, other.n_item_calls_ ) }
, n_created_objects_{ exchange( 0, other.n_created_objects_ ) }
, lock_{ move( other.lock_ ) }
{}
};
template<typename T>
struct queue
{
typedef std::deque<T> raw_queue_t;
raw_queue_t m_queue;
std::condition_variable m_cond;
std::mutex m_mutex;
void push(T const& obj)
{
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push_back(obj);
}
m_cond.notify_one();
}
auto pop()
-> Queue_popper_<T, std::deque<T>>
{
std::unique_lock<std::mutex> lock(m_mutex);
while (! m_queue.size()) m_cond.wait(lock);
return { m_queue, move( lock ) };
}
};
typedef queue<unsigned int> string_queue_t;
#define CONSUMERS 5
#define N 1000
void producer(std::mutex& std_out_mtx, string_queue_t& queue)
{
{
std::unique_lock<std::mutex> lock(std_out_mtx);
std::printf("producer::queue::(%p) - enter\n", (void*)&queue);
}
for (unsigned int i = 0; i < N; ++i)
{
queue.push(i + 1);
std::this_thread::yield(); // just for some spice
}
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
queue.push(0);
std::this_thread::yield(); // just for some spice
}
{
std::unique_lock<std::mutex> lock(std_out_mtx);
std::printf("producer::queue::(%p) - exit\n", (void*)&queue);
}
}
void consumer(unsigned int id, std::mutex& std_out_mtx, string_queue_t&
queue)
{
{
std::unique_lock<std::mutex> lock(std_out_mtx);
std::printf("consumer(%u)::queue::(%p) - enter\n", id,
(void*)&queue);
}
unsigned int prev = 0;
for (;;)
{
auto const msg = queue.pop().item();
{
std::unique_lock<std::mutex> lock(std_out_mtx);
std::printf("consumer(%u)::msg::(%u)\n", id, msg.item );
}
if (msg.item == 0) break;
assert(msg.item > prev); // validate fifo nature
prev = msg.item;
}
{
std::unique_lock<std::mutex> lock(std_out_mtx);
std::printf("consumer::queue::(%p) - exit\n", (void*)&queue);
}
}
int main()
{
string_queue_t queue;
std::mutex std_out_mutex;
std::thread consumers[CONSUMERS];
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i] = std::thread(
consumer,
i,
std::ref(std_out_mutex),
std::ref(queue)
);
}
std::thread producer_thread(
producer,
std::ref(std_out_mutex),
std::ref(queue)
);
producer_thread.join();
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i].join();
}
}
</code>
I'm not even entirely sure that that strong exception guarantee logic is
correct, but I've verified that this
modified-to-be-safe-for-general-item-type version produces exactly as
many bytes of output as original.
> Apropos of strong exception safety, your pop() method's strong
> exception safety relies on RVO. That is OK I guess with C++17 which has
> mandatory RVO (technically I don't think that the standard guarantees
> that std::deque::pop_front() does not throw, but it seems inconceivable
> that it would if the queue item's destructor does not throw). In
> earlier times strong exception safety would have been dealt with by
> pop() taking a T& out parameter so the popped object is not copied or
> moved twice.
I think a more reasonable approach is to /require/ that a move operation
on the item type will be no-throwing.
Not that it necessarily actually moves, but just that it doesn't throw.
> You might well want rate-limiting on a queue of this kind to prevent
> producers overrunning consumers. That introduces all sorts of other
> trade-offs.
Cheers!,
- Alf