// Push and LIFO pop at one end by a single thread and FIFO pop from the other end by other threads
template<class T>
class WorkStealQ
{
public:
WorkStealQ(void) = delete;
inline WorkStealQ(uint32_t bufferSize); // Must be positive power-of-two
WorkStealQ(WorkStealQ const &) = delete;
inline WorkStealQ(WorkStealQ &&) noexcept = default;
WorkStealQ &operator=(WorkStealQ const &) = delete;
inline WorkStealQ &operator=(WorkStealQ &&) = default;
// PushUnsafe() will not check for a full queue, in the case where task counts are externally constrained
// This is more efficient due to the lack of need to reference the atomic head variable
inline void PushUnsafe(T &&task);
inline void PushUnsafe(T const &task);
inline bool Push(T &&task);
inline bool Push(T const &task);
inline T Pop(void);
inline T Steal(void);
inline uint32_t Space(void) const noexcept; // Slow as it checks atomic head and tail
private:
const uint32_t _mask;
std::vector<T> _tasks;
std::atomic<uint32_t> _head, _tail; /// TODO: Try padding
};
template<class T>
inline WorkStealQ<T>::WorkStealQ(uint32_t bufferSize) : _mask(bufferSize - 1), _tasks(bufferSize), _head{0}, _tail{0}
{
if (bufferSize < 2 || bufferSize > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()) // Limit is needed for signed comparison cast in Push()
|| bufferSize & _mask) throw std::invalid_argument("bufferSize must be a positive power-of-two that fits in a signed 32-bit integer");
}
template<class T>
inline void WorkStealQ<T>::PushUnsafe(T &&task)
{
auto head{_head.load(std::memory_order_relaxed)};
_tasks[head & _mask] = std::move(task);
_head.store(head + 1, std::memory_order_release);
}
template<class T>
inline void WorkStealQ<T>::PushUnsafe(T const &task)
{
auto head{ _head.load(std::memory_order_relaxed) };
_tasks[head & _mask] = task;
_head.store(head + 1, std::memory_order_release);
}
template<class T>
inline bool WorkStealQ<T>::Push(T &&task)
{
auto head{_head.load(std::memory_order_relaxed)};
if (static_cast<int32_t>(head - _tail.load(std::memory_order_relaxed)) >= static_cast<int32_t>(_tasks.size())) return false;
_tasks[head & _mask] = std::move(task);
_head.store(head + 1, std::memory_order_release);
return true;
}
template<class T>
inline bool WorkStealQ<T>::Push(T const &task)
{
auto head{_head.load(std::memory_order_relaxed)};
if (static_cast<int32_t>(head - _tail.load(std::memory_order_relaxed)) >= static_cast<int32_t>(_tasks.size())) return false;
_tasks[head & _mask] = task;
_head.store(head + 1, std::memory_order_release);
return true;
}
template<class T>
inline T WorkStealQ<T>::Pop(void)
{
auto head{_head.load(std::memory_order_relaxed) - 1};
_head.exchange(head, std::memory_order_seq_cst); // Full barrier
auto tail{_tail.load(std::memory_order_acquire)};
if (head < tail)
{
_head.store(tail, std::memory_order_release);
return T();
}
auto task{_tasks[head & _mask]};
if (head != tail) return task;
auto tailNew{tail + 1};
if (_tail.compare_exchange_weak(tail, tailNew, std::memory_order_release, std::memory_order_relaxed))
{
_head.store(tailNew);
return task;
}
else
{
_head.store(tailNew);
return T();
}
}
template<class T>
inline T WorkStealQ<T>::Steal(void)
{
auto tail{_tail.load(std::memory_order_acquire)}; // Tail must be loaded before head
if (_head.load(std::memory_order_acquire) <= tail) return T();
auto task{_tasks[tail & _mask]};
if (_tail.compare_exchange_weak(tail, tail + 1, std::memory_order_release, std::memory_order_relaxed)) return task;
return T();
}
template<class T>
inline uint32_t WorkStealQ<T>::Space(void) const noexcept
{
return _head.load(std::memory_order_relaxed) - _tail.load(std::memory_order_relaxed) & _mask;
}