Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
concurrent_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <atomic>
15#include <chrono>
16#include <condition_variable>
17#include <cstddef>
18#include <mutex>
19#include <optional>
20#include <type_traits>
21
22namespace kcenon::thread {
23
24namespace detail {
25
90template <typename T>
91class concurrent_queue {
92 static_assert(std::is_move_constructible_v<T>, "T must be move constructible");
93
94public:
101 auto* dummy = new node{};
102 head_ = dummy;
103 tail_ = dummy;
104 }
105
110 shutdown();
111 // Drain remaining items
112 while (try_dequeue()) {
113 }
114 // Delete the dummy node
115 delete head_;
116 }
117
118 // Non-copyable and non-movable
123
131 void enqueue(T value) {
132 if (shutdown_.load(std::memory_order_acquire)) {
133 return;
134 }
135
136 auto* new_node = new node{std::move(value)};
137
138 {
139 std::lock_guard<std::mutex> lock(tail_mutex_);
140 tail_->next = new_node;
141 tail_ = new_node;
142 }
143
144 size_.fetch_add(1, std::memory_order_release);
145 notify_one();
146 }
147
155 [[nodiscard]] auto try_dequeue() -> std::optional<T> {
156 std::lock_guard<std::mutex> lock(head_mutex_);
157
158 node* old_head = head_;
159 node* next = old_head->next;
160
161 if (next == nullptr) {
162 return std::nullopt; // Queue is empty
163 }
164
165 // Extract value from next node
166 std::optional<T> value;
167 if (next->data.has_value()) {
168 value = std::move(*next->data);
169 next->data.reset();
170 }
171
172 // Advance head (old_head becomes the new dummy)
173 head_ = next;
174 delete old_head;
175
176 size_.fetch_sub(1, std::memory_order_release);
177 return value;
178 }
179
186 template <typename Rep, typename Period>
187 [[nodiscard]] auto wait_dequeue(const std::chrono::duration<Rep, Period>& timeout)
188 -> std::optional<T> {
189 // First try without waiting
190 if (auto value = try_dequeue()) {
191 return value;
192 }
193
194 // Wait for notification
195 std::unique_lock<std::mutex> lock(cv_mutex_);
196 auto deadline = std::chrono::steady_clock::now() + timeout;
197
198 while (!shutdown_.load(std::memory_order_acquire)) {
199 // Try to dequeue
200 lock.unlock();
201 if (auto value = try_dequeue()) {
202 return value;
203 }
204 lock.lock();
205
206 // Check if we should stop waiting
207 if (std::chrono::steady_clock::now() >= deadline) {
208 return std::nullopt;
209 }
210
211 // Wait for notification or timeout
212 cv_.wait_until(lock, deadline);
213 }
214
215 // Final attempt after shutdown
216 return try_dequeue();
217 }
218
224 [[nodiscard]] auto wait_dequeue() -> std::optional<T> {
225 return wait_dequeue(std::chrono::hours{24 * 365});
226 }
227
231 [[nodiscard]] auto empty() const noexcept -> bool {
232 return size_.load(std::memory_order_acquire) == 0;
233 }
234
238 [[nodiscard]] auto size() const noexcept -> std::size_t {
239 return size_.load(std::memory_order_acquire);
240 }
241
245 void notify_one() {
246 std::lock_guard<std::mutex> lock(cv_mutex_);
247 cv_.notify_one();
248 }
249
253 void notify_all() {
254 std::lock_guard<std::mutex> lock(cv_mutex_);
255 cv_.notify_all();
256 }
257
261 void shutdown() {
262 shutdown_.store(true, std::memory_order_release);
263 notify_all();
264 }
265
269 [[nodiscard]] auto is_shutdown() const noexcept -> bool {
270 return shutdown_.load(std::memory_order_acquire);
271 }
272
273private:
274 struct node {
275 std::optional<T> data;
276 node* next{nullptr};
277
278 node() = default;
279 explicit node(T value) : data(std::move(value)) {}
280 };
281
284
285 mutable std::mutex head_mutex_;
286 mutable std::mutex tail_mutex_;
287
288 std::atomic<std::size_t> size_{0};
289 std::atomic<bool> shutdown_{false};
290
291 // For blocking wait support
292 mutable std::mutex cv_mutex_;
293 std::condition_variable cv_;
294};
295
296} // namespace detail
297
298} // namespace kcenon::thread
Thread-safe MPMC queue with blocking wait support (Internal implementation)
Definition forward.h:156
void notify_all()
Wakes all waiting consumers.
auto is_shutdown() const noexcept -> bool
Checks if shutdown has been signaled.
auto size() const noexcept -> std::size_t
Gets approximate queue size.
void notify_one()
Wakes one waiting consumer.
auto empty() const noexcept -> bool
Checks if the queue appears empty.
void shutdown()
Signals shutdown and wakes all waiters.
void enqueue(T value)
Enqueues a value into the queue.
~concurrent_queue()
Destructor - signals shutdown and drains the queue.
concurrent_queue(concurrent_queue &&)=delete
concurrent_queue(const concurrent_queue &)=delete
auto try_dequeue() -> std::optional< T >
Tries to dequeue a value (non-blocking)
auto wait_dequeue() -> std::optional< T >
Dequeues a value with indefinite blocking wait.
concurrent_queue()
Constructs an empty queue.
concurrent_queue & operator=(const concurrent_queue &)=delete
concurrent_queue & operator=(concurrent_queue &&)=delete
auto wait_dequeue(const std::chrono::duration< Rep, Period > &timeout) -> std::optional< T >
Dequeues a value with blocking wait.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.