16#include <condition_variable>
91class concurrent_queue {
92 static_assert(std::is_move_constructible_v<T>,
"T must be move constructible");
101 auto* dummy =
new node{};
132 if (
shutdown_.load(std::memory_order_acquire)) {
136 auto* new_node =
new node{std::move(value)};
144 size_.fetch_add(1, std::memory_order_release);
161 if (next ==
nullptr) {
166 std::optional<T> value;
167 if (next->
data.has_value()) {
168 value = std::move(*next->
data);
176 size_.fetch_sub(1, std::memory_order_release);
186 template <
typename Rep,
typename Period>
187 [[nodiscard]]
auto wait_dequeue(
const std::chrono::duration<Rep, Period>& timeout)
188 -> std::optional<T> {
195 std::unique_lock<std::mutex> lock(
cv_mutex_);
196 auto deadline = std::chrono::steady_clock::now() + timeout;
198 while (!
shutdown_.load(std::memory_order_acquire)) {
207 if (std::chrono::steady_clock::now() >= deadline) {
212 cv_.wait_until(lock, deadline);
231 [[nodiscard]]
auto empty() const noexcept ->
bool {
232 return size_.load(std::memory_order_acquire) == 0;
238 [[nodiscard]]
auto size() const noexcept ->
std::
size_t {
239 return size_.load(std::memory_order_acquire);
246 std::lock_guard<std::mutex> lock(
cv_mutex_);
254 std::lock_guard<std::mutex> lock(
cv_mutex_);
262 shutdown_.store(
true, std::memory_order_release);
270 return shutdown_.load(std::memory_order_acquire);
293 std::condition_variable
cv_;
Thread-safe MPMC queue with blocking wait support (Internal implementation)
void notify_all()
Wakes all waiting consumers.
auto is_shutdown() const noexcept -> bool
Checks if shutdown has been signaled.
auto size() const noexcept -> std::size_t
Gets approximate queue size.
void notify_one()
Wakes one waiting consumer.
auto empty() const noexcept -> bool
Checks if the queue appears empty.
std::condition_variable cv_
void shutdown()
Signals shutdown and wakes all waiters.
void enqueue(T value)
Enqueues a value into the queue.
~concurrent_queue()
Destructor - signals shutdown and drains the queue.
concurrent_queue(concurrent_queue &&)=delete
concurrent_queue(const concurrent_queue &)=delete
auto try_dequeue() -> std::optional< T >
Tries to dequeue a value (non-blocking)
std::atomic< std::size_t > size_
auto wait_dequeue() -> std::optional< T >
Dequeues a value with indefinite blocking wait.
std::atomic< bool > shutdown_
concurrent_queue()
Constructs an empty queue.
concurrent_queue & operator=(const concurrent_queue &)=delete
concurrent_queue & operator=(concurrent_queue &&)=delete
auto wait_dequeue(const std::chrono::duration< Rep, Period > &timeout) -> std::optional< T >
Dequeues a value with blocking wait.
Core threading foundation of the thread system library.