Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Blocking queue

181 views
Skip to first unread message

mvor...@gmail.com

unread,
Feb 3, 2019, 9:19:55 AM2/3/19
to
My multi-threaded blocking queue implementation: https://blog.vorbrodt.me/?p=409

Chris M. Thomasson

unread,
Feb 3, 2019, 4:52:41 PM2/3/19
to
On 2/3/2019 6:19 AM, mvor...@gmail.com wrote:
> My multi-threaded blocking queue implementation: https://blog.vorbrodt.me/?p=409
>

Looks okay at first viewing. I should code it up in Relacy Race
Detector. Not sure why you are using semaphores when a single mutex and
condition variable will do. Your queue has a lot of overhead. Each
semaphore has a mutex and condvar. You have two of them plus another
mutex. That is:

3 mutexes and 2 condvars for a single queue?

Chris M. Thomasson

unread,
Feb 4, 2019, 12:01:41 AM2/4/19
to
Still need to examine it more carefully, but the following function is a
race-condition if it is concurrently called by more than one thread:

bool empty()
{
return m_count == 0;
}

You are allowing reads and writes to hit m_count at the same time in
this call.

Öö Tiib

unread,
Feb 4, 2019, 4:32:34 AM2/4/19
to
On Sunday, 3 February 2019 16:19:55 UTC+2, mvor...@gmail.com wrote:
> My multi-threaded blocking queue implementation: https://blog.vorbrodt.me/?p=409

If you want to make std::queue pop blocking then one mutex and one
condition variable should be fine. Something like that:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
public:

T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto item = queue_.front();
queue_.pop();
return item;
}

void push(T item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}

private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};

So blocking pop implemented, without need for boost or anything.
Now if you have some sort of improvement to it for some
special case then describe the case and show benefits of
your implementation with benchmarks.

mvor...@gmail.com

unread,
Feb 4, 2019, 8:08:14 AM2/4/19
to
your implementation grows indefinitely on push(). mine will eventually block when the queue fills up.

mvor...@gmail.com

unread,
Feb 4, 2019, 8:09:50 AM2/4/19
to
you are right. I modified the code to lock around the check for empty.

mvor...@gmail.com

unread,
Feb 4, 2019, 8:12:16 AM2/4/19
to
semaphores for clarity of implementation and easy counting of empty and full slots.

Öö Tiib

unread,
Feb 4, 2019, 9:20:11 AM2/4/19
to
Then one more conditional variable is needed for blocking push,
still no need for boost or anything.

Bonita Montero

unread,
Feb 4, 2019, 10:26:44 AM2/4/19
to
> your implementation grows indefinitely on push(). mine will eventually block when the queue fills up.

When do you think a queue with limited capacity is necessary?
I think that's rather exotic.

Robert Wessel

unread,
Feb 4, 2019, 10:59:46 AM2/4/19
to
Limited queue growth is not an uncommon requirement. In cases where
you have one set of processes generating work, and another performing
it, you often want to keep those decoupled and running independently
to make maximum use of resources, but you don't want unlimited numbers
of work items filling memory. Usually, though, I've done that by
placing an separate limit on the queue depth. You can often avoid the
blocking entirely if you can have the generating process execute the
work unit directly if the queue is full.

Bonita Montero

unread,
Feb 4, 2019, 12:12:58 PM2/4/19
to
> Limited queue growth is not an uncommon requirement. In cases where
> you have one set of processes generating work, and another performing
> it, you often want to keep those decoupled and running independently
> to make maximum use of resources, but you don't want unlimited numbers
> of work items filling memory.

Why not? Memory is cheap.

> Usually, though, I've done that by placing an separate limit on the
> queue depth. You can often avoid the blocking entirely if you can
> have the generating process execute the work unit directly if the
> queue is full.

... and thereby block further processing in the producer.
Very smart!

Bonita Montero

unread,
Feb 4, 2019, 12:18:02 PM2/4/19
to
> Why not? Memory is cheap.

To be more precise: The data-structures which are handed through the
queue to the consumer are in memory anyway and are usually much larger
than a simple link-node in the queue.

David Brown

unread,
Feb 4, 2019, 2:40:45 PM2/4/19
to
That means that the producer needs to use a lot more memory to make
these data structures - it can make sense to limit that by blocking the
producer.

And memory is not always cheap. Unlimited memory is almost always very
expensive.

(That does not mean having a specific limit to the size of the queue is
the best solution - it might be, but there may be other solutions.)

mvor...@gmail.com

unread,
Feb 4, 2019, 2:58:56 PM2/4/19
to
it's used for throttling. sometimes you don't want it to grow indefinitely.

Chris M. Thomasson

unread,
Feb 4, 2019, 4:46:38 PM2/4/19
to
Fair enough! :^)

Chris M. Thomasson

unread,
Feb 4, 2019, 5:00:11 PM2/4/19
to
Btw, there is a faster semaphore out there that can be implemented in C++11:

https://groups.google.com/d/topic/comp.lang.c++/60IpFwqbtu8/discussion
(read all if interested...)

Here is a direct link to a crude sample implementation:

http://pastebin.com/raw/Q7p6e7Xc
(no ads, raw text...)

Imvvho, this semaphore is nice, notice how it can skip waiting on a
mutex/condvar based semaphore?


Chris M. Thomasson

unread,
Feb 4, 2019, 5:14:30 PM2/4/19
to
That's fine. Now, if this empty check could be relaxed to a standard
form of "racy", then one could define m_count as a std::atomic type, and
read and write it with std::memory_order_relaxed. As is, it would be
fine using your current code. However, the data returned is dubious
because the mutex m_cs was not locked where you mutate m_count. If empty
is called a lot, then this type of racy check might be perfectly fine.
Keep in mind that there is a difference between racing using atomic
types vs raw variables. Fwiw, here is a massive race condition, yet it
is 100% perfectly standard compliant:

https://groups.google.com/d/topic/comp.lang.c++/7u_rLgQe86k/discussion

;^)

Chris M. Thomasson

unread,
Feb 4, 2019, 5:21:09 PM2/4/19
to
Big time! Fwiw, if interested, check out my simple mutation to a very
nice _bounded_ FIFO queue:

https://groups.google.com/d/topic/lock-free/acjQ3-89abE/discussion
(read all if interested, please read all...)

Chris M. Thomasson

unread,
Feb 4, 2019, 5:39:27 PM2/4/19
to
On 2/4/2019 11:40 AM, David Brown wrote:
> On 04/02/2019 18:17, Bonita Montero wrote:
>>> Why not? Memory is cheap.
>>
>> To be more precise: The data-structures which are handed through the
>> queue to the consumer are in memory anyway and are usually much larger
>> than a simple link-node in the queue.
>>
>
> That means that the producer needs to use a lot more memory to make
> these data structures - it can make sense to limit that by blocking the
> producer.

Right. Or even give a flag to the producer such that it can possibly do
"other" work instead of wait right then and there. It can do some other
work, then check back at the queue to see if it is still in the full
state. Like a continuation sparked by a queue full condition. A producer
can do other work if possible, and check the queue again. A producer can
even queue work locally to a per-thread structure on a bounded full
condition, then, when the queue gets consumed a bit, well, it can send
all the local work in a single atomic shot.

>
> And memory is not always cheap.  Unlimited memory is almost always very
> expensive.
>
> (That does not mean having a specific limit to the size of the queue is
> the best solution - it might be, but there may be other solutions.)
>

Fwiw, bounded queues are fairly normal, in my experience at least.

Chris M. Thomasson

unread,
Feb 4, 2019, 6:40:32 PM2/4/19
to
I see where you are catching exceptions now, was assuming POD. Will try
to get this running in Relacy Race Detector. If there is a subtle
problem, it will find it. Also, emulation of exceptions can be
accomplished with a prng.

Chris M. Thomasson

unread,
Feb 4, 2019, 7:03:07 PM2/4/19
to
On 2/3/2019 6:19 AM, mvor...@gmail.com wrote:
> My multi-threaded blocking queue implementation: https://blog.vorbrodt.me/?p=409
>

I noticed something strange in your semaphore::wait function. Your code is:

void wait()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [&]{ return m_count > 0; });
--m_count;
}

I had to alter the predicate to the following function because your
original deadlocks as-is. I decided to focus attention to your semaphore
in Relacy. This version works, notice the difference wrt the predicate
in the wait function:
_______________________________
class semaphore
{
public:
semaphore(unsigned int count) : m_count(count) {}
//semaphore(const semaphore&&) = delete;
//semaphore(semaphore&&) = delete;
//semaphore& operator = (const semaphore&) = delete;
//semaphore& operator = (semaphore&&) = delete;
//~semaphore() = default;

void post()
{
//std::unique_lock<std::mutex> lock(m_mutex);
m_mutex.lock($);
++VAR(m_count);
m_cv.notify_one($);
m_mutex.unlock($);
}

void wait()
{
//std::unique_lock<std::mutex> lock(m_mutex);
//m_cv.wait(lock, [&] { return m_count > 0; });

m_mutex.lock($);

while (VAR(m_count) == 0)
{
m_cv.wait(m_mutex, $);
}

--VAR(m_count);

m_mutex.unlock($);
}

private:
std::mutex m_mutex;
std::condition_variable m_cv;
//unsigned int m_count;
VAR_T(unsigned int) m_count;
};
_______________________________



Fwiw, here is my entire Relacy unit test:
_______________________________
// Queue Test...
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Simple macro based redirection of the verbose std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB_SEQ_CST std::memory_order_seq_cst
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
// PRODUCERS must equal CONSUMERS for this test
#define PRODUCERS 3
#define CONSUMERS 3
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 5




class semaphore
{
public:
semaphore(unsigned int count) : m_count(count) {}
//semaphore(const semaphore&&) = delete;
//semaphore(semaphore&&) = delete;
//semaphore& operator = (const semaphore&) = delete;
//semaphore& operator = (semaphore&&) = delete;
//~semaphore() = default;

void post()
{
//std::unique_lock<std::mutex> lock(m_mutex);
m_mutex.lock($);
++VAR(m_count);
m_cv.notify_one($);
m_mutex.unlock($);
}

void wait()
{
//std::unique_lock<std::mutex> lock(m_mutex);
//m_cv.wait(lock, [&] { return m_count > 0; });

m_mutex.lock($);

while (VAR(m_count) == 0)
{
m_cv.wait(m_mutex, $);
}

--VAR(m_count);

m_mutex.unlock($);
}

private:
std::mutex m_mutex;
std::condition_variable m_cv;
//unsigned int m_count;
VAR_T(unsigned int) m_count;
};






// Relacy Stack Test...
struct ct_qtest_test
: rl::test_suite<ct_qtest_test, THREADS>
{
semaphore g_sem;
VAR_T(unsigned int) g_shared;

ct_qtest_test() : g_sem(1) {}

void before()
{
VAR(g_shared) = 0;
}

void after()
{
RL_ASSERT(VAR(g_shared) == 0);
}


void consumer(unsigned int tidx)
{
g_sem.wait();
--VAR(g_shared);
g_sem.post();
}


void producer(unsigned int tidx)
{
g_sem.wait();
++VAR(g_shared);
g_sem.post();
}


void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
producer(tidx);
}

else
{
consumer(tidx);
}
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_qtest_test>(p);
}

return 0;
}
_______________________________


This works without any problems.

Chris M. Thomasson

unread,
Feb 4, 2019, 7:25:16 PM2/4/19
to
On 2/4/2019 4:02 PM, Chris M. Thomasson wrote:
> On 2/3/2019 6:19 AM, mvor...@gmail.com wrote:
>> My multi-threaded blocking queue implementation:
>> https://blog.vorbrodt.me/?p=409
>>
>
> I noticed something strange in your semaphore::wait function. Your code is:
>
>     void wait()
>     {
>         std::unique_lock<std::mutex> lock(m_mutex);
>         m_cv.wait(lock, [&]{ return m_count > 0; });
>         --m_count;
>     }
[...]

Funny, the two conditions are in reverse, and basically work the same.
Relacy does not handle condvar waits with Lambdas. Sorry about that.

Chris M. Thomasson

unread,
Feb 4, 2019, 8:11:35 PM2/4/19
to
On 2/3/2019 6:19 AM, mvor...@gmail.com wrote:
> My multi-threaded blocking queue implementation: https://blog.vorbrodt.me/?p=409
>

I finally implemented the whole thing in Relacy. It works fine. Fwiw,
here is my test unit:
__________________________
// Queue Test...
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Simple macro based redirection of the verbose std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB_SEQ_CST std::memory_order_seq_cst
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
// PRODUCERS must equal CONSUMERS for this test
#define PRODUCERS 3
#define CONSUMERS 3
#define BUFFER 2
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 7




class semaphore
{
public:
semaphore(unsigned int count) : m_count(count) {}
//semaphore(const semaphore&&) = delete;
//semaphore(semaphore&&) = delete;
//semaphore& operator = (const semaphore&) = delete;
//semaphore& operator = (semaphore&&) = delete;
//~semaphore() = default;

void post()
{
//std::unique_lock<std::mutex> lock(m_mutex);
m_mutex.lock($);
++VAR(m_count);
m_cv.notify_one($);
m_mutex.unlock($);
}

void wait()
{
//std::unique_lock<std::mutex> lock(m_mutex);
//m_cv.wait(lock, [&] { return m_count > 0; });

m_mutex.lock($);

while (VAR(m_count) == 0)
{
m_cv.wait(m_mutex, $);
}

--VAR(m_count);

m_mutex.unlock($);
}

private:
std::mutex m_mutex;
std::condition_variable m_cv;
//unsigned int m_count;
VAR_T(unsigned int) m_count;
};


template<typename T, std::size_t T_size>
class blocking_queue
{

public:
blocking_queue()
: m_pushIndex(0), m_popIndex(0), m_count(0),
m_openSlots(T_size), m_fullSlots(0) {}

//blocking_queue(const blocking_queue&) = delete;
//blocking_queue(blocking_queue&&) = delete;
//blocking_queue& operator = (const blocking_queue&) = delete;
//blocking_queue& operator = (blocking_queue&&) = delete;

/*
~blocking_queue()
{
while (m_count--)
{
m_data[m_popIndex].~T();
m_popIndex = ++m_popIndex % m_size;
}
operator delete(m_data);
}
*/

void push(const T& item)
{
m_openSlots.wait();
{
//std::lock_guard<std::mutex> lock(m_cs);
//new (m_data + m_pushIndex) T(item);
m_cs.lock($);
VAR(m_data[VAR(m_pushIndex)]) = item;
VAR(m_pushIndex) = ++VAR(m_pushIndex) % T_size;
++VAR(m_count);
m_cs.unlock($);
}
m_fullSlots.post();
}

void pop(T& item)
{
m_fullSlots.wait();
{
//std::lock_guard<std::mutex> lock(m_cs);
m_cs.lock($);
item = VAR(m_data[VAR(m_popIndex)]);
VAR(m_popIndex) = ++VAR(m_popIndex) % T_size;
--VAR(m_count);
m_cs.unlock($);
}
m_openSlots.post();
}

/*
bool empty()
{
std::lock_guard<std::mutex> lock(m_cs);
return m_count == 0;
}
*/

private:
//unsigned int m_size;
VAR_T(unsigned int) m_pushIndex;
VAR_T(unsigned int) m_popIndex;
VAR_T(unsigned int) m_count;
VAR_T(T) m_data[T_size];

semaphore m_openSlots;
semaphore m_fullSlots;
std::mutex m_cs;
};


// Relacy Stack Test...
struct ct_qtest_test
: rl::test_suite<ct_qtest_test, THREADS>
{
blocking_queue<unsigned int, BUFFER> g_queue;

void before()
{

}

void after()
{

}


void consumer(unsigned int tidx)
{
unsigned int data = 0;

for (unsigned int i = 0; i < ITERS; ++i)
{
g_queue.pop(data);
RL_ASSERT(data != tidx);
}
}


void producer(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
g_queue.push(tidx);
}
}


void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
producer(tidx);
}

else
{
consumer(tidx);
}
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_qtest_test>(p);
}

return 0;
}
__________________________


A okay. :^)

Bonita Montero

unread,
Feb 5, 2019, 2:41:51 AM2/5/19
to
>> To be more precise: The data-structures which are handed through the
>> queue to the consumer are in memory anyway and are usually much larger
>> than a simple link-node in the queue.

> That means that the producer needs to use a lot more memory to make
> these data structures - ...

No, these data-structures are in memory anyway.
Just the link-node in the queue comes afterwards.

Juha Nieminen

unread,
Feb 5, 2019, 4:36:09 AM2/5/19
to
Chris M. Thomasson <invalid_chris_t...@invalid.com> wrote:
> That's fine. Now, if this empty check could be relaxed to a standard
> form of "racy", then one could define m_count as a std::atomic type, and
> read and write it with std::memory_order_relaxed. As is, it would be
> fine using your current code. However, the data returned is dubious
> because the mutex m_cs was not locked where you mutate m_count. If empty
> is called a lot, then this type of racy check might be perfectly fine.

Making the variable atomic only makes sure that reading and writing the
variable itself concurrently won't result in garbage values. However, it
doesn't protect from other types of race conditions that might happen.
For example, a routine might read the variable and see that the
container is not empty, when in reality it is; it's just that the
routine that emptied the container was just about to zero the size
variable when that other thread read it.

If the code needs to make sure it doesn't get incorrect information
from that function, it needs to use the same mutex, to make sure
it's not reading the variable while another thread is modifying
the data structure.

All this can add quite a lot of overhead, but that's the eternal dilemma
with multithreading. (A lot of research has been put into developing
lock-free data containers, but that's a very hard problem.)

--- news://freenews.netfront.net/ - complaints: ne...@netfront.net ---

David Brown

unread,
Feb 5, 2019, 5:16:08 AM2/5/19
to
They will not be in memory if the producer has stopped producing!


Bonita Montero

unread,
Feb 5, 2019, 5:46:21 AM2/5/19
to
>> No, these data-structures are in memory anyway.
>> Just the link-node in the queue comes afterwards.

> They will not be in memory if the producer has stopped producing!

Its plainly idiotic not to hand off items to another threads because
of memory-issues; if there isn't a memory-collapse that stops further
processing it makes always sense to hand of the items to the consumer.
And if there is a memory-collapse, this will just stop the processing
of an item. No one develops the producer in a way that it will build
different data-structures for processing in the producer if a queue
is full. And this short-circuited processing would be stupid also
because it wouls stop the producer from processing input from other
sources for a while.

Chris M. Thomasson

unread,
Feb 5, 2019, 7:23:48 PM2/5/19
to
On 2/5/2019 1:35 AM, Juha Nieminen wrote:
> Chris M. Thomasson <invalid_chris_t...@invalid.com> wrote:
>> That's fine. Now, if this empty check could be relaxed to a standard
>> form of "racy", then one could define m_count as a std::atomic type, and
>> read and write it with std::memory_order_relaxed. As is, it would be
>> fine using your current code. However, the data returned is dubious
>> because the mutex m_cs was not locked where you mutate m_count. If empty
>> is called a lot, then this type of racy check might be perfectly fine.
>
> Making the variable atomic only makes sure that reading and writing the
> variable itself concurrently won't result in garbage values.

It also prevents undefined behavior wrt a raw data-race.


> However, it
> doesn't protect from other types of race conditions that might happen.
> For example, a routine might read the variable and see that the
> container is not empty, when in reality it is; it's just that the
> routine that emptied the container was just about to zero the size
> variable when that other thread read it.

Fair enough. Even if the empty condition returns a coherent answer, it
still unlocks the mutex. So, the next time we check it, the empty
condition might be different. Fwiw, I usually only use these types of
empty function calls for statistics. Just to see how many times the
queue is empty within a certain time frame. This type of data can be
used for heuristics to alter server state, build graphs or whatever...
0 new messages