Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::detail::lockfree_job_queue Class Reference

Lock-free Multi-Producer Multi-Consumer (MPMC) job queue (Internal implementation) More...

#include <lockfree_job_queue.h>

Inheritance diagram for kcenon::thread::detail::lockfree_job_queue:
Inheritance graph
Collaboration diagram for kcenon::thread::detail::lockfree_job_queue:
Collaboration graph

Classes

struct  node
 Internal queue node structure. More...
 
class  node_pool
 Lock-free node freelist (Treiber stack) for node recycling. More...
 

Public Member Functions

 lockfree_job_queue ()
 Constructs an empty lock-free job queue.
 
 ~lockfree_job_queue ()
 Destructor.
 
 lockfree_job_queue (const lockfree_job_queue &)=delete
 
lockfree_job_queueoperator= (const lockfree_job_queue &)=delete
 
 lockfree_job_queue (lockfree_job_queue &&)=delete
 
lockfree_job_queueoperator= (lockfree_job_queue &&)=delete
 
auto enqueue (std::unique_ptr< job > &&job) -> common::VoidResult
 Enqueues a job into the queue (thread-safe)
 
auto dequeue () -> common::Result< std::unique_ptr< job > >
 Dequeues a job from the queue (thread-safe)
 
auto try_dequeue () -> common::Result< std::unique_ptr< job > >
 Tries to dequeue a job without blocking.
 
auto empty () const -> bool
 Checks if the queue is empty.
 
auto size () const -> std::size_t
 Gets approximate queue size.
 
auto schedule (std::unique_ptr< job > &&work) -> common::VoidResult override
 Schedule a job (delegates to enqueue)
 
auto get_next_job () -> common::Result< std::unique_ptr< job > > override
 Get next job (delegates to dequeue)
 
auto get_capabilities () const -> queue_capabilities override
 Returns capabilities of lockfree_job_queue.
 
- Public Member Functions inherited from kcenon::thread::scheduler_interface
virtual ~scheduler_interface ()=default
 
- Public Member Functions inherited from kcenon::thread::queue_capabilities_interface
virtual ~queue_capabilities_interface ()=default
 
auto has_exact_size () const -> bool
 Check if size() returns exact values.
 
auto has_atomic_empty () const -> bool
 Check if empty() check is atomic.
 
auto is_lock_free () const -> bool
 Check if this is a lock-free implementation.
 
auto is_wait_free () const -> bool
 Check if this is a wait-free implementation.
 
auto supports_batch () const -> bool
 Check if batch operations are supported.
 
auto supports_blocking_wait () const -> bool
 Check if blocking wait is supported.
 
auto supports_stop () const -> bool
 Check if stop signaling is supported.
 

Private Types

using node_hp_domain = typed_safe_hazard_domain<node>
 

Private Member Functions

void retire_node (node *n)
 Retire a node through hazard pointers, recycling via pool on reclamation.
 

Private Attributes

std::atomic< node * > head_
 
std::atomic< node * > tail_
 
std::shared_ptr< node_poolpool_
 
std::atomic< std::size_t > approximate_size_ {0}
 
std::atomic< bool > shutdown_ {false}
 

Detailed Description

Lock-free Multi-Producer Multi-Consumer (MPMC) job queue (Internal implementation)

This class implements a lock-free MPMC queue using the Michael-Scott algorithm with Safe Hazard Pointers for memory reclamation. It uses explicit memory ordering to ensure correctness on weak memory model architectures (ARM, etc.)

Algorithm: Michael-Scott Queue (1996) Memory Reclamation: Safe Hazard Pointers with explicit memory ordering

Key Features:

  • True lock-free operation (no mutexes, no locks)
  • Safe concurrent access from multiple producers and consumers
  • Automatic memory reclamation using Safe Hazard Pointers
  • Correct memory ordering for weak memory model architectures (ARM)
  • No TLS node pool (eliminates destructor ordering issues)
  • ABA problem prevention through HP-based protection

Performance Characteristics:

  • Enqueue: O(1) amortized, wait-free
  • Dequeue: O(1) amortized, lock-free
  • Memory overhead: ~256 bytes per thread (hazard pointers)

Thread Safety:

  • All methods are thread-safe
  • Can be called concurrently from any number of threads
  • Uses atomic operations with acquire/release semantics
Note
This implementation is production-safe and resolves TICKET-001 (TLS bug) and TICKET-002 (weak memory model safety).
See also
lockfree_job_queue_test.cpp for usage examples
Examples
queue_capabilities_sample.cpp.

Definition at line 63 of file lockfree_job_queue.h.

Member Typedef Documentation

◆ node_hp_domain

Constructor & Destructor Documentation

◆ lockfree_job_queue() [1/3]

kcenon::thread::detail::lockfree_job_queue::lockfree_job_queue ( )

Constructs an empty lock-free job queue.

Initializes the queue with a dummy node to simplify the algorithm. The dummy node is never removed, allowing concurrent enqueue/dequeue.

Definition at line 41 of file lockfree_job_queue.cpp.

42 : pool_(std::make_shared<node_pool>()) {
43 // Create dummy node (Michael-Scott algorithm requires one dummy node)
44 // This simplifies the algorithm by ensuring head and tail are never null
45 node* dummy = new node();
46
47 head_.store(dummy, std::memory_order_relaxed);
48 tail_.store(dummy, std::memory_order_relaxed);
49 approximate_size_.store(0, std::memory_order_relaxed);
50}

References approximate_size_, head_, and tail_.

◆ ~lockfree_job_queue()

kcenon::thread::detail::lockfree_job_queue::~lockfree_job_queue ( )

Destructor.

Drains the queue and reclaims all nodes. Thread-safe even if other threads are still accessing the queue (they will get errors).

Definition at line 53 of file lockfree_job_queue.cpp.

53 {
54 // Signal shutdown to prevent new operations
55 shutdown_.store(true, std::memory_order_release);
56
57 // Drain remaining jobs (release ownership)
58 while (true) {
59 auto result = dequeue();
60 if (result.is_err()) {
61 break;
62 }
63 // Jobs are destroyed when unique_ptr goes out of scope
64 }
65
66 // Safe cleanup: acquire semantics ensure we see all writes
67 node* dummy = head_.load(std::memory_order_acquire);
68
69 // Retire dummy node through pool-aware reclamation
70 // This ensures the node is only deleted when no other thread
71 // holds a hazard pointer to it (uses explicit memory ordering)
72 retire_node(dummy);
73}
void retire_node(node *n)
Retire a node through hazard pointers, recycling via pool on reclamation.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue (thread-safe)

References dequeue(), head_, retire_node(), and shutdown_.

Here is the call graph for this function:

◆ lockfree_job_queue() [2/3]

kcenon::thread::detail::lockfree_job_queue::lockfree_job_queue ( const lockfree_job_queue & )
delete

◆ lockfree_job_queue() [3/3]

kcenon::thread::detail::lockfree_job_queue::lockfree_job_queue ( lockfree_job_queue && )
delete

Member Function Documentation

◆ dequeue()

auto kcenon::thread::detail::lockfree_job_queue::dequeue ( ) -> common::Result<std::unique_ptr<job>>
nodiscard

Dequeues a job from the queue (thread-safe)

Returns
common::Result<std::unique_ptr<job>> The dequeued job or error
Note
Lock-free operation (system-wide progress guaranteed)
Returns empty result if queue is empty (not an error)
Uses Hazard Pointers to protect nodes from premature deletion
Retired nodes are eventually reclaimed by the HP domain

Time Complexity: O(1) amortized Memory Ordering: acquire/release semantics

Definition at line 144 of file lockfree_job_queue.cpp.

144 {
145 // Acquire hazard pointer guards for protecting nodes (uses safe memory ordering)
146 safe_hazard_guard hp_head;
147 safe_hazard_guard hp_next;
148
149 // Limit retries to prevent infinite loop during concurrent mode switching
150 // This is important for adaptive_job_queue which may switch modes while
151 // dequeue is in progress
152 constexpr int MAX_OUTER_RETRIES = 100;
153 constexpr int MAX_INNER_RETRIES = 10;
154
155 for (int outer_retry = 0; outer_retry < MAX_OUTER_RETRIES; ++outer_retry) {
156 // Protect head from reclamation
157 node* head = head_.load(std::memory_order_acquire);
158 hp_head.protect(head);
159
160 // Verify head hasn't changed (ABA protection)
161 if (head != head_.load(std::memory_order_acquire)) {
162 backoff(outer_retry);
163 continue; // Head changed, retry
164 }
165
166 node* tail = tail_.load(std::memory_order_acquire);
167
168 // Protect next node using loop until stable (with retry limit)
169 node* next = nullptr;
170 bool next_stable = false;
171 for (int inner_retry = 0; inner_retry < MAX_INNER_RETRIES; ++inner_retry) {
172 next = head->next.load(std::memory_order_acquire);
173 if (next == nullptr) {
174 next_stable = true;
175 break; // No next node
176 }
177
178 hp_next.protect(next);
179
180 // Verify next is still the same after protection
181 if (next == head->next.load(std::memory_order_acquire)) {
182 next_stable = true;
183 break; // Stable, protected
184 }
185 backoff(inner_retry);
186 // Next changed, retry protection
187 }
188
189 // If we couldn't stabilize next pointer, retry outer loop
190 if (!next_stable) {
191 backoff(outer_retry);
192 continue;
193 }
194
195 // Check if head is still consistent
196 if (head == head_.load(std::memory_order_acquire)) {
197 if (head == tail) {
198 if (next == nullptr) {
199 // Queue is empty
200 return common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
201 }
202
203 // Tail is behind, try to advance it
204 tail_.compare_exchange_weak(
205 tail, next,
206 std::memory_order_release,
207 std::memory_order_relaxed);
208 } else {
209 if (next == nullptr) {
210 // Inconsistent state, retry
211 backoff(outer_retry);
212 continue;
213 }
214
215 // Try to swing head to next
216 if (head_.compare_exchange_weak(
217 head, next,
218 std::memory_order_release,
219 std::memory_order_relaxed)) {
220
221 // Successfully dequeued - now safe to read data
222 // We now own the old head node exclusively
223 std::unique_ptr<job> job_data = std::move(next->data);
224
225 // Retire the old head node for later reclamation (safe memory ordering)
226 // Reclaimed nodes are returned to the pool instead of deleted
227 retire_node(head);
228
229 // Update size (relaxed - just for monitoring)
230 approximate_size_.fetch_sub(1, std::memory_order_relaxed);
231
232 // Return the job data
233 return std::move(job_data);
234 }
235 backoff(outer_retry);
236 }
237 } else {
238 backoff(outer_retry);
239 }
240 }
241
242 // If we exhausted retries, report queue as empty
243 // This is safe because the caller will retry if needed
244 return common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
245}

References kcenon::thread::detail::lockfree_job_queue::node::data, kcenon::thread::detail::lockfree_job_queue::node::next, kcenon::thread::safe_hazard_guard::protect(), and kcenon::thread::queue_empty.

Referenced by get_next_job(), try_dequeue(), and ~lockfree_job_queue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ empty()

auto kcenon::thread::detail::lockfree_job_queue::empty ( ) const -> bool
nodiscard

Checks if the queue is empty.

Returns
true if queue appears empty, false otherwise
Note
This is a snapshot view; queue may change immediately after
Use for hints only, not for synchronization

Definition at line 248 of file lockfree_job_queue.cpp.

248 {
249 // Use hazard pointer protection to safely access head node
250 // This prevents UAF if another thread retires the head during our check
251 safe_hazard_guard hp_head;
252
253 // Try to get a stable read of head
254 // If head keeps changing due to concurrent modifications, retry a few times
255 constexpr int MAX_RETRIES = 10;
256 for (int retry = 0; retry < MAX_RETRIES; ++retry) {
257 node* head = head_.load(std::memory_order_acquire);
258 hp_head.protect(head);
259
260 // Verify head hasn't changed after protection
261 if (head != head_.load(std::memory_order_acquire)) {
262 continue; // Head changed, retry
263 }
264
265 node* next = head->next.load(std::memory_order_acquire);
266
267 // Queue is empty if head->next is null
268 return next == nullptr;
269 }
270
271 // If we exhausted retries, do one final check without verification
272 // This handles the edge case where head keeps changing but we need a definitive answer
273 node* head = head_.load(std::memory_order_acquire);
274 hp_head.protect(head);
275 node* next = head->next.load(std::memory_order_acquire);
276 return next == nullptr;
277}
@ retry
Retry failed job (with max retries)

References head_, kcenon::thread::detail::lockfree_job_queue::node::next, kcenon::thread::safe_hazard_guard::protect(), and kcenon::thread::retry.

Here is the call graph for this function:

◆ enqueue()

auto kcenon::thread::detail::lockfree_job_queue::enqueue ( std::unique_ptr< job > && job) -> common::VoidResult
nodiscard

Enqueues a job into the queue (thread-safe)

Parameters
jobUnique pointer to the job to enqueue
Returns
common::VoidResult Success or error
Note
Wait-free operation (bounded number of steps)
Takes ownership of the job pointer
Never blocks, always makes progress

Time Complexity: O(1) amortized Memory Ordering: release semantics for visibility

Definition at line 76 of file lockfree_job_queue.cpp.

76 {
77 if (!job_ptr) {
78 return common::error_info{static_cast<int>(error_code::invalid_argument), "Cannot enqueue null job", "thread_system"};
79 }
80
81 // Acquire node from pool (reuses retired nodes, falls back to new)
82 node* new_node = pool_->acquire(std::move(job_ptr));
83
84 // Acquire hazard pointer guard for tail protection (uses safe memory ordering)
85 safe_hazard_guard hp_tail;
86
87 // Limit retries to prevent infinite loop during concurrent mode switching
88 constexpr int MAX_RETRIES = 1000;
89
90 for (int retry = 0; retry < MAX_RETRIES; ++retry) {
91 // Read current tail
92 node* tail = tail_.load(std::memory_order_acquire);
93
94 // Protect tail to ensure it's not reclaimed while we read next
95 hp_tail.protect(tail);
96
97 // Verify tail hasn't changed (if it changed, our protection might be on the wrong node)
98 if (tail != tail_.load(std::memory_order_acquire)) {
99 backoff(retry);
100 continue;
101 }
102
103 node* next = tail->next.load(std::memory_order_acquire);
104
105 // Check if tail is still consistent
106 if (tail == tail_.load(std::memory_order_acquire)) {
107 if (next == nullptr) {
108 // Tail is pointing to the last node, try to link new node
109 if (tail->next.compare_exchange_weak(
110 next, new_node,
111 std::memory_order_release,
112 std::memory_order_relaxed)) {
113
114 // Successfully linked, try to swing tail (best effort)
115 tail_.compare_exchange_weak(
116 tail, new_node,
117 std::memory_order_release,
118 std::memory_order_relaxed);
119
120 // Update size (relaxed - just for monitoring)
121 approximate_size_.fetch_add(1, std::memory_order_relaxed);
122
123 return common::ok(); // Success
124 }
125 backoff(retry);
126 } else {
127 // Tail is behind, try to advance it
128 tail_.compare_exchange_weak(
129 tail, next,
130 std::memory_order_release,
131 std::memory_order_relaxed);
132 }
133 } else {
134 backoff(retry);
135 }
136 }
137
138 // If we exhausted retries, return node to pool and report error
139 pool_->release(new_node);
140 return common::error_info{static_cast<int>(error_code::queue_busy), "Queue is busy, retry later", "thread_system"};
141}

References kcenon::thread::invalid_argument, kcenon::thread::detail::lockfree_job_queue::node::next, kcenon::thread::safe_hazard_guard::protect(), kcenon::thread::queue_busy, and kcenon::thread::retry.

Referenced by schedule().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_capabilities()

auto kcenon::thread::detail::lockfree_job_queue::get_capabilities ( ) const -> queue_capabilities
inlinenodiscardoverridevirtual

Returns capabilities of lockfree_job_queue.

Returns
queue_capabilities with lock-free characteristics
Warning
size() is APPROXIMATE, empty() is NON-ATOMIC

Capabilities:

  • exact_size: false (approximate only due to concurrent modifications)
  • atomic_empty_check: false (snapshot view, may change immediately)
  • lock_free: true (uses lock-free Michael-Scott algorithm)
  • wait_free: false (enqueue is wait-free, dequeue is lock-free)
  • supports_batch: false (no batch operations available)
  • supports_blocking_wait: false (spin-wait only via try_dequeue)
  • supports_stop: false (no stop() method available)

Reimplemented from kcenon::thread::queue_capabilities_interface.

Definition at line 197 of file lockfree_job_queue.h.

197 {
198 return queue_capabilities{
199 .exact_size = false, // Approximate only
200 .atomic_empty_check = false, // Non-atomic
201 .lock_free = true, // Lock-free implementation
202 .wait_free = false, // Not wait-free
203 .supports_batch = false, // No batch operations
204 .supports_blocking_wait = false, // Spin-wait only
205 .supports_stop = false // No stop() method
206 };
207 }

◆ get_next_job()

auto kcenon::thread::detail::lockfree_job_queue::get_next_job ( ) -> common::Result<std::unique_ptr<job>>
inlineoverridevirtual

Get next job (delegates to dequeue)

Returns
common::Result<std::unique_ptr<job>> The dequeued job or error
Note
Part of scheduler_interface

Implements kcenon::thread::scheduler_interface.

Definition at line 173 of file lockfree_job_queue.h.

173 {
174 return dequeue();
175 }

References dequeue().

Here is the call graph for this function:

◆ operator=() [1/2]

lockfree_job_queue & kcenon::thread::detail::lockfree_job_queue::operator= ( const lockfree_job_queue & )
delete

◆ operator=() [2/2]

lockfree_job_queue & kcenon::thread::detail::lockfree_job_queue::operator= ( lockfree_job_queue && )
delete

◆ retire_node()

void kcenon::thread::detail::lockfree_job_queue::retire_node ( node * n)
private

Retire a node through hazard pointers, recycling via pool on reclamation.

Definition at line 286 of file lockfree_job_queue.cpp.

286 {
287 if (!n) return;
288
289 // Capture pool by shared_ptr so the closure remains valid even if the
290 // queue is destroyed before the hazard pointer domain reclaims this node
291 std::shared_ptr<node_pool> pool = pool_;
292
294 n,
295 [pool](void* ptr) {
296 pool->release(static_cast<node*>(ptr));
297 }
298 );
299}
static safe_hazard_pointer_domain & instance()
Get singleton instance.
void retire(void *p, retire_callback deleter)
Retire a pointer for later reclamation.

References kcenon::thread::safe_hazard_pointer_domain::instance(), pool_, and kcenon::thread::safe_hazard_pointer_domain::retire().

Referenced by ~lockfree_job_queue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ schedule()

auto kcenon::thread::detail::lockfree_job_queue::schedule ( std::unique_ptr< job > && work) -> common::VoidResult
inlineoverridevirtual

Schedule a job (delegates to enqueue)

Parameters
workJob to schedule
Returns
common::VoidResult Success or error
Note
Part of scheduler_interface

Implements kcenon::thread::scheduler_interface.

Definition at line 162 of file lockfree_job_queue.h.

162 {
163 return enqueue(std::move(work));
164 }
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a job into the queue (thread-safe)

References enqueue().

Here is the call graph for this function:

◆ size()

auto kcenon::thread::detail::lockfree_job_queue::size ( ) const -> std::size_t
nodiscard

Gets approximate queue size.

Returns
Approximate number of jobs in queue
Note
This is a best-effort estimate due to concurrent modifications
Use for monitoring/debugging, not for correctness

Definition at line 280 of file lockfree_job_queue.cpp.

280 {
281 // Return cached size (may not be exact due to concurrent modifications)
282 return approximate_size_.load(std::memory_order_relaxed);
283}

References approximate_size_.

◆ try_dequeue()

auto kcenon::thread::detail::lockfree_job_queue::try_dequeue ( ) -> common::Result<std::unique_ptr<job>>
inlinenodiscard

Tries to dequeue a job without blocking.

Returns
common::Result<std::unique_ptr<job>> The dequeued job or empty
Note
Alias for dequeue() (lock-free queues never block)
Provided for API compatibility with mutex-based queue

Definition at line 126 of file lockfree_job_queue.h.

126 {
127 return dequeue();
128 }

References dequeue().

Here is the call graph for this function:

Member Data Documentation

◆ approximate_size_

std::atomic<std::size_t> kcenon::thread::detail::lockfree_job_queue::approximate_size_ {0}
mutableprivate

Definition at line 315 of file lockfree_job_queue.h.

315{0};

Referenced by lockfree_job_queue(), and size().

◆ head_

std::atomic<node*> kcenon::thread::detail::lockfree_job_queue::head_
private

Definition at line 304 of file lockfree_job_queue.h.

Referenced by empty(), lockfree_job_queue(), and ~lockfree_job_queue().

◆ pool_

std::shared_ptr<node_pool> kcenon::thread::detail::lockfree_job_queue::pool_
private

Definition at line 308 of file lockfree_job_queue.h.

Referenced by retire_node().

◆ shutdown_

std::atomic<bool> kcenon::thread::detail::lockfree_job_queue::shutdown_ {false}
private

Definition at line 318 of file lockfree_job_queue.h.

318{false};

Referenced by ~lockfree_job_queue().

◆ tail_

std::atomic<node*> kcenon::thread::detail::lockfree_job_queue::tail_
private

Definition at line 305 of file lockfree_job_queue.h.

Referenced by lockfree_job_queue().


The documentation for this class was generated from the following files: