Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::detail::concurrent_queue< T > Class Template Reference

Thread-safe MPMC queue with blocking wait support (Internal implementation) More...

#include <concurrent_queue.h>

Collaboration diagram for kcenon::thread::detail::concurrent_queue< T >:
Collaboration graph

Classes

struct  node
 

Public Member Functions

 concurrent_queue ()
 Constructs an empty queue.
 
 ~concurrent_queue ()
 Destructor - signals shutdown and drains the queue.
 
 concurrent_queue (const concurrent_queue &)=delete
 
concurrent_queueoperator= (const concurrent_queue &)=delete
 
 concurrent_queue (concurrent_queue &&)=delete
 
concurrent_queueoperator= (concurrent_queue &&)=delete
 
void enqueue (T value)
 Enqueues a value into the queue.
 
auto try_dequeue () -> std::optional< T >
 Tries to dequeue a value (non-blocking)
 
template<typename Rep , typename Period >
auto wait_dequeue (const std::chrono::duration< Rep, Period > &timeout) -> std::optional< T >
 Dequeues a value with blocking wait.
 
auto wait_dequeue () -> std::optional< T >
 Dequeues a value with indefinite blocking wait.
 
auto empty () const noexcept -> bool
 Checks if the queue appears empty.
 
auto size () const noexcept -> std::size_t
 Gets approximate queue size.
 
void notify_one ()
 Wakes one waiting consumer.
 
void notify_all ()
 Wakes all waiting consumers.
 
void shutdown ()
 Signals shutdown and wakes all waiters.
 
auto is_shutdown () const noexcept -> bool
 Checks if shutdown has been signaled.
 

Private Attributes

nodehead_
 
nodetail_
 
std::mutex head_mutex_
 
std::mutex tail_mutex_
 
std::atomic< std::size_t > size_ {0}
 
std::atomic< bool > shutdown_ {false}
 
std::mutex cv_mutex_
 
std::condition_variable cv_
 

Detailed Description

template<typename T>
class kcenon::thread::detail::concurrent_queue< T >

Thread-safe MPMC queue with blocking wait support (Internal implementation)

This class implements a thread-safe Multi-Producer Multi-Consumer (MPMC) queue using fine-grained locking for simplicity and correctness. It provides both non-blocking and blocking operations for flexible use cases.

Implementation Notes

This implementation uses fine-grained locking with separate head and tail mutexes rather than a true lock-free algorithm. This provides:

  • Correctness guarantee without complex memory reclamation
  • Good performance for most use cases
  • Blocking wait support with condition variables

While a true lock-free Michael-Scott queue requires complex memory reclamation (Hazard Pointers or epoch-based reclamation), fine-grained locking offers comparable performance for most practical scenarios.

Naming Clarification

This class is intentionally named "concurrent_queue" (not "lockfree_queue") because:

  • It uses mutexes for synchronization (fine-grained locking)
  • "Lock-free" has a specific technical meaning (no locks, uses CAS loops)
  • "Concurrent" accurately describes thread-safe access without implying lock-free

For a true lock-free queue implementation, see lockfree_job_queue which uses the Michael-Scott algorithm with hazard pointers for memory reclamation.

Key Features

  • Thread Safety: Safe for concurrent access from multiple threads
  • Blocking Wait: wait_dequeue provides efficient blocking with timeout
  • Generic: Works with any move-constructible type
  • Low Contention: Separate locks for head and tail operations

Performance Characteristics

  • Enqueue: O(1), acquires tail lock only
  • Dequeue: O(1), acquires head lock only
  • No lock contention between enqueue and dequeue operations

Example Usage

// Producer thread
queue.enqueue("message");
// Consumer thread (non-blocking)
if (auto value = queue.try_dequeue()) {
process(*value);
}
// Consumer thread (blocking with timeout)
if (auto value = queue.wait_dequeue(std::chrono::milliseconds{100})) {
process(*value);
}
Thread-safe MPMC queue with blocking wait support (Internal implementation)
Definition forward.h:156
void enqueue(T value)
Enqueues a value into the queue.
auto try_dequeue() -> std::optional< T >
Tries to dequeue a value (non-blocking)
auto wait_dequeue(const std::chrono::duration< Rep, Period > &timeout) -> std::optional< T >
Dequeues a value with blocking wait.
Template Parameters
TThe element type (must be move-constructible)

Definition at line 156 of file forward.h.

Constructor & Destructor Documentation

◆ concurrent_queue() [1/3]

template<typename T >
kcenon::thread::detail::concurrent_queue< T >::concurrent_queue ( )
inline

Constructs an empty queue.

Initializes the queue with a dummy node to simplify the algorithm.

Definition at line 100 of file concurrent_queue.h.

100 {
101 auto* dummy = new node{};
102 head_ = dummy;
103 tail_ = dummy;
104 }

References kcenon::thread::detail::concurrent_queue< T >::head_, and kcenon::thread::detail::concurrent_queue< T >::tail_.

◆ ~concurrent_queue()

template<typename T >
kcenon::thread::detail::concurrent_queue< T >::~concurrent_queue ( )
inline

Destructor - signals shutdown and drains the queue.

Definition at line 109 of file concurrent_queue.h.

109 {
110 shutdown();
111 // Drain remaining items
112 while (try_dequeue()) {
113 }
114 // Delete the dummy node
115 delete head_;
116 }
void shutdown()
Signals shutdown and wakes all waiters.

References kcenon::thread::detail::concurrent_queue< T >::head_, kcenon::thread::detail::concurrent_queue< T >::shutdown(), and kcenon::thread::detail::concurrent_queue< T >::try_dequeue().

Here is the call graph for this function:

◆ concurrent_queue() [2/3]

template<typename T >
kcenon::thread::detail::concurrent_queue< T >::concurrent_queue ( const concurrent_queue< T > & )
delete

◆ concurrent_queue() [3/3]

template<typename T >
kcenon::thread::detail::concurrent_queue< T >::concurrent_queue ( concurrent_queue< T > && )
delete

Member Function Documentation

◆ empty()

template<typename T >
auto kcenon::thread::detail::concurrent_queue< T >::empty ( ) const -> bool
inlinenodiscardnoexcept

Checks if the queue appears empty.

Definition at line 231 of file concurrent_queue.h.

231 {
232 return size_.load(std::memory_order_acquire) == 0;
233 }

References kcenon::thread::detail::concurrent_queue< T >::size_.

◆ enqueue()

template<typename T >
void kcenon::thread::detail::concurrent_queue< T >::enqueue ( T value)
inline

Enqueues a value into the queue.

Parameters
valueThe value to enqueue (moved)
Note
Thread-safe, uses tail lock only

Definition at line 131 of file concurrent_queue.h.

131 {
132 if (shutdown_.load(std::memory_order_acquire)) {
133 return;
134 }
135
136 auto* new_node = new node{std::move(value)};
137
138 {
139 std::lock_guard<std::mutex> lock(tail_mutex_);
140 tail_->next = new_node;
141 tail_ = new_node;
142 }
143
144 size_.fetch_add(1, std::memory_order_release);
145 notify_one();
146 }
void notify_one()
Wakes one waiting consumer.

References kcenon::thread::detail::concurrent_queue< T >::node::next, kcenon::thread::detail::concurrent_queue< T >::notify_one(), kcenon::thread::detail::concurrent_queue< T >::shutdown_, kcenon::thread::detail::concurrent_queue< T >::size_, kcenon::thread::detail::concurrent_queue< T >::tail_, and kcenon::thread::detail::concurrent_queue< T >::tail_mutex_.

Here is the call graph for this function:

◆ is_shutdown()

template<typename T >
auto kcenon::thread::detail::concurrent_queue< T >::is_shutdown ( ) const -> bool
inlinenodiscardnoexcept

Checks if shutdown has been signaled.

Definition at line 269 of file concurrent_queue.h.

269 {
270 return shutdown_.load(std::memory_order_acquire);
271 }

References kcenon::thread::detail::concurrent_queue< T >::shutdown_.

◆ notify_all()

template<typename T >
void kcenon::thread::detail::concurrent_queue< T >::notify_all ( )
inline

Wakes all waiting consumers.

Definition at line 253 of file concurrent_queue.h.

253 {
254 std::lock_guard<std::mutex> lock(cv_mutex_);
255 cv_.notify_all();
256 }

References kcenon::thread::detail::concurrent_queue< T >::cv_, and kcenon::thread::detail::concurrent_queue< T >::cv_mutex_.

Referenced by kcenon::thread::detail::concurrent_queue< T >::shutdown().

Here is the caller graph for this function:

◆ notify_one()

template<typename T >
void kcenon::thread::detail::concurrent_queue< T >::notify_one ( )
inline

Wakes one waiting consumer.

Definition at line 245 of file concurrent_queue.h.

245 {
246 std::lock_guard<std::mutex> lock(cv_mutex_);
247 cv_.notify_one();
248 }

References kcenon::thread::detail::concurrent_queue< T >::cv_, and kcenon::thread::detail::concurrent_queue< T >::cv_mutex_.

Referenced by kcenon::thread::detail::concurrent_queue< T >::enqueue().

Here is the caller graph for this function:

◆ operator=() [1/2]

template<typename T >
concurrent_queue & kcenon::thread::detail::concurrent_queue< T >::operator= ( concurrent_queue< T > && )
delete

◆ operator=() [2/2]

template<typename T >
concurrent_queue & kcenon::thread::detail::concurrent_queue< T >::operator= ( const concurrent_queue< T > & )
delete

◆ shutdown()

template<typename T >
void kcenon::thread::detail::concurrent_queue< T >::shutdown ( )
inline

Signals shutdown and wakes all waiters.

Definition at line 261 of file concurrent_queue.h.

261 {
262 shutdown_.store(true, std::memory_order_release);
263 notify_all();
264 }
void notify_all()
Wakes all waiting consumers.

References kcenon::thread::detail::concurrent_queue< T >::notify_all(), and kcenon::thread::detail::concurrent_queue< T >::shutdown_.

Referenced by kcenon::thread::detail::concurrent_queue< T >::~concurrent_queue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ size()

template<typename T >
auto kcenon::thread::detail::concurrent_queue< T >::size ( ) const -> std::size_t
inlinenodiscardnoexcept

Gets approximate queue size.

Definition at line 238 of file concurrent_queue.h.

238 {
239 return size_.load(std::memory_order_acquire);
240 }

References kcenon::thread::detail::concurrent_queue< T >::size_.

◆ try_dequeue()

template<typename T >
auto kcenon::thread::detail::concurrent_queue< T >::try_dequeue ( ) -> std::optional<T>
inlinenodiscard

Tries to dequeue a value (non-blocking)

Returns
The dequeued value, or nullopt if queue is empty
Note
Thread-safe, uses head lock only

Definition at line 155 of file concurrent_queue.h.

155 {
156 std::lock_guard<std::mutex> lock(head_mutex_);
157
158 node* old_head = head_;
159 node* next = old_head->next;
160
161 if (next == nullptr) {
162 return std::nullopt; // Queue is empty
163 }
164
165 // Extract value from next node
166 std::optional<T> value;
167 if (next->data.has_value()) {
168 value = std::move(*next->data);
169 next->data.reset();
170 }
171
172 // Advance head (old_head becomes the new dummy)
173 head_ = next;
174 delete old_head;
175
176 size_.fetch_sub(1, std::memory_order_release);
177 return value;
178 }

References kcenon::thread::detail::concurrent_queue< T >::node::data, kcenon::thread::detail::concurrent_queue< T >::head_, kcenon::thread::detail::concurrent_queue< T >::head_mutex_, kcenon::thread::detail::concurrent_queue< T >::node::next, and kcenon::thread::detail::concurrent_queue< T >::size_.

Referenced by kcenon::thread::detail::concurrent_queue< T >::wait_dequeue(), and kcenon::thread::detail::concurrent_queue< T >::~concurrent_queue().

Here is the caller graph for this function:

◆ wait_dequeue() [1/2]

template<typename T >
auto kcenon::thread::detail::concurrent_queue< T >::wait_dequeue ( ) -> std::optional<T>
inlinenodiscard

Dequeues a value with indefinite blocking wait.

Returns
The dequeued value, or nullopt if shutdown

Definition at line 224 of file concurrent_queue.h.

224 {
225 return wait_dequeue(std::chrono::hours{24 * 365});
226 }
auto wait_dequeue() -> std::optional< T >
Dequeues a value with indefinite blocking wait.

References kcenon::thread::detail::concurrent_queue< T >::wait_dequeue().

Referenced by kcenon::thread::detail::concurrent_queue< T >::wait_dequeue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ wait_dequeue() [2/2]

template<typename T >
template<typename Rep , typename Period >
auto kcenon::thread::detail::concurrent_queue< T >::wait_dequeue ( const std::chrono::duration< Rep, Period > & timeout) -> std::optional<T>
inlinenodiscard

Dequeues a value with blocking wait.

Parameters
timeoutMaximum time to wait
Returns
The dequeued value, or nullopt if timeout or shutdown

Definition at line 187 of file concurrent_queue.h.

188 {
189 // First try without waiting
190 if (auto value = try_dequeue()) {
191 return value;
192 }
193
194 // Wait for notification
195 std::unique_lock<std::mutex> lock(cv_mutex_);
196 auto deadline = std::chrono::steady_clock::now() + timeout;
197
198 while (!shutdown_.load(std::memory_order_acquire)) {
199 // Try to dequeue
200 lock.unlock();
201 if (auto value = try_dequeue()) {
202 return value;
203 }
204 lock.lock();
205
206 // Check if we should stop waiting
207 if (std::chrono::steady_clock::now() >= deadline) {
208 return std::nullopt;
209 }
210
211 // Wait for notification or timeout
212 cv_.wait_until(lock, deadline);
213 }
214
215 // Final attempt after shutdown
216 return try_dequeue();
217 }

References kcenon::thread::detail::concurrent_queue< T >::cv_, kcenon::thread::detail::concurrent_queue< T >::cv_mutex_, kcenon::thread::detail::concurrent_queue< T >::shutdown_, and kcenon::thread::detail::concurrent_queue< T >::try_dequeue().

Here is the call graph for this function:

Member Data Documentation

◆ cv_

◆ cv_mutex_

◆ head_

◆ head_mutex_

template<typename T >
std::mutex kcenon::thread::detail::concurrent_queue< T >::head_mutex_
mutableprivate

◆ shutdown_

◆ size_

◆ tail_

◆ tail_mutex_

template<typename T >
std::mutex kcenon::thread::detail::concurrent_queue< T >::tail_mutex_
mutableprivate

The documentation for this class was generated from the following files: