Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
lockfree_job_queue.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <chrono>
9#include <thread>
10
11#if defined(__x86_64__) || defined(_M_X64)
12#include <immintrin.h>
13#endif
14
15namespace kcenon::thread::detail {
16
17namespace {
18
19// Exponential backoff for CAS retry loops.
20// Phase 1 (retry < 16): CPU pause (x86) or yield (other architectures)
21// Phase 2 (retry < 64): thread yield to let other threads progress
22// Phase 3 (retry >= 64): short sleep with exponential increase (capped at ~1ms)
23inline void backoff(int retry) {
24 if (retry < 16) {
25#if defined(__x86_64__) || defined(_M_X64)
26 _mm_pause();
27#else
28 std::this_thread::yield();
29#endif
30 } else if (retry < 64) {
31 std::this_thread::yield();
32 } else {
33 std::this_thread::sleep_for(
34 std::chrono::microseconds(1 << std::min(retry - 64, 10)));
35 }
36}
37
38} // namespace
39
40// Constructor: Initialize with a dummy node
42 : pool_(std::make_shared<node_pool>()) {
43 // Create dummy node (Michael-Scott algorithm requires one dummy node)
44 // This simplifies the algorithm by ensuring head and tail are never null
45 node* dummy = new node();
46
47 head_.store(dummy, std::memory_order_relaxed);
48 tail_.store(dummy, std::memory_order_relaxed);
49 approximate_size_.store(0, std::memory_order_relaxed);
50}
51
52// Destructor: Drain queue and delete all nodes safely
54 // Signal shutdown to prevent new operations
55 shutdown_.store(true, std::memory_order_release);
56
57 // Drain remaining jobs (release ownership)
58 while (true) {
59 auto result = dequeue();
60 if (result.is_err()) {
61 break;
62 }
63 // Jobs are destroyed when unique_ptr goes out of scope
64 }
65
66 // Safe cleanup: acquire semantics ensure we see all writes
67 node* dummy = head_.load(std::memory_order_acquire);
68
69 // Retire dummy node through pool-aware reclamation
70 // This ensures the node is only deleted when no other thread
71 // holds a hazard pointer to it (uses explicit memory ordering)
72 retire_node(dummy);
73}
74
75// Enqueue operation (Michael-Scott algorithm)
76auto lockfree_job_queue::enqueue(std::unique_ptr<job>&& job_ptr) -> common::VoidResult {
77 if (!job_ptr) {
78 return common::error_info{static_cast<int>(error_code::invalid_argument), "Cannot enqueue null job", "thread_system"};
79 }
80
81 // Acquire node from pool (reuses retired nodes, falls back to new)
82 node* new_node = pool_->acquire(std::move(job_ptr));
83
84 // Acquire hazard pointer guard for tail protection (uses safe memory ordering)
85 safe_hazard_guard hp_tail;
86
87 // Limit retries to prevent infinite loop during concurrent mode switching
88 constexpr int MAX_RETRIES = 1000;
89
90 for (int retry = 0; retry < MAX_RETRIES; ++retry) {
91 // Read current tail
92 node* tail = tail_.load(std::memory_order_acquire);
93
94 // Protect tail to ensure it's not reclaimed while we read next
95 hp_tail.protect(tail);
96
97 // Verify tail hasn't changed (if it changed, our protection might be on the wrong node)
98 if (tail != tail_.load(std::memory_order_acquire)) {
99 backoff(retry);
100 continue;
101 }
102
103 node* next = tail->next.load(std::memory_order_acquire);
104
105 // Check if tail is still consistent
106 if (tail == tail_.load(std::memory_order_acquire)) {
107 if (next == nullptr) {
108 // Tail is pointing to the last node, try to link new node
109 if (tail->next.compare_exchange_weak(
110 next, new_node,
111 std::memory_order_release,
112 std::memory_order_relaxed)) {
113
114 // Successfully linked, try to swing tail (best effort)
115 tail_.compare_exchange_weak(
116 tail, new_node,
117 std::memory_order_release,
118 std::memory_order_relaxed);
119
120 // Update size (relaxed - just for monitoring)
121 approximate_size_.fetch_add(1, std::memory_order_relaxed);
122
123 return common::ok(); // Success
124 }
125 backoff(retry);
126 } else {
127 // Tail is behind, try to advance it
128 tail_.compare_exchange_weak(
129 tail, next,
130 std::memory_order_release,
131 std::memory_order_relaxed);
132 }
133 } else {
134 backoff(retry);
135 }
136 }
137
138 // If we exhausted retries, return node to pool and report error
139 pool_->release(new_node);
140 return common::error_info{static_cast<int>(error_code::queue_busy), "Queue is busy, retry later", "thread_system"};
141}
142
143// Dequeue operation (Michael-Scott algorithm with Safe Hazard Pointers)
144auto lockfree_job_queue::dequeue() -> common::Result<std::unique_ptr<job>> {
145 // Acquire hazard pointer guards for protecting nodes (uses safe memory ordering)
146 safe_hazard_guard hp_head;
147 safe_hazard_guard hp_next;
148
149 // Limit retries to prevent infinite loop during concurrent mode switching
150 // This is important for adaptive_job_queue which may switch modes while
151 // dequeue is in progress
152 constexpr int MAX_OUTER_RETRIES = 100;
153 constexpr int MAX_INNER_RETRIES = 10;
154
155 for (int outer_retry = 0; outer_retry < MAX_OUTER_RETRIES; ++outer_retry) {
156 // Protect head from reclamation
157 node* head = head_.load(std::memory_order_acquire);
158 hp_head.protect(head);
159
160 // Verify head hasn't changed (ABA protection)
161 if (head != head_.load(std::memory_order_acquire)) {
162 backoff(outer_retry);
163 continue; // Head changed, retry
164 }
165
166 node* tail = tail_.load(std::memory_order_acquire);
167
168 // Protect next node using loop until stable (with retry limit)
169 node* next = nullptr;
170 bool next_stable = false;
171 for (int inner_retry = 0; inner_retry < MAX_INNER_RETRIES; ++inner_retry) {
172 next = head->next.load(std::memory_order_acquire);
173 if (next == nullptr) {
174 next_stable = true;
175 break; // No next node
176 }
177
178 hp_next.protect(next);
179
180 // Verify next is still the same after protection
181 if (next == head->next.load(std::memory_order_acquire)) {
182 next_stable = true;
183 break; // Stable, protected
184 }
185 backoff(inner_retry);
186 // Next changed, retry protection
187 }
188
189 // If we couldn't stabilize next pointer, retry outer loop
190 if (!next_stable) {
191 backoff(outer_retry);
192 continue;
193 }
194
195 // Check if head is still consistent
196 if (head == head_.load(std::memory_order_acquire)) {
197 if (head == tail) {
198 if (next == nullptr) {
199 // Queue is empty
200 return common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
201 }
202
203 // Tail is behind, try to advance it
204 tail_.compare_exchange_weak(
205 tail, next,
206 std::memory_order_release,
207 std::memory_order_relaxed);
208 } else {
209 if (next == nullptr) {
210 // Inconsistent state, retry
211 backoff(outer_retry);
212 continue;
213 }
214
215 // Try to swing head to next
216 if (head_.compare_exchange_weak(
217 head, next,
218 std::memory_order_release,
219 std::memory_order_relaxed)) {
220
221 // Successfully dequeued - now safe to read data
222 // We now own the old head node exclusively
223 std::unique_ptr<job> job_data = std::move(next->data);
224
225 // Retire the old head node for later reclamation (safe memory ordering)
226 // Reclaimed nodes are returned to the pool instead of deleted
227 retire_node(head);
228
229 // Update size (relaxed - just for monitoring)
230 approximate_size_.fetch_sub(1, std::memory_order_relaxed);
231
232 // Return the job data
233 return std::move(job_data);
234 }
235 backoff(outer_retry);
236 }
237 } else {
238 backoff(outer_retry);
239 }
240 }
241
242 // If we exhausted retries, report queue as empty
243 // This is safe because the caller will retry if needed
244 return common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
245}
246
247// Check if queue is empty
248auto lockfree_job_queue::empty() const -> bool {
249 // Use hazard pointer protection to safely access head node
250 // This prevents UAF if another thread retires the head during our check
251 safe_hazard_guard hp_head;
252
253 // Try to get a stable read of head
254 // If head keeps changing due to concurrent modifications, retry a few times
255 constexpr int MAX_RETRIES = 10;
256 for (int retry = 0; retry < MAX_RETRIES; ++retry) {
257 node* head = head_.load(std::memory_order_acquire);
258 hp_head.protect(head);
259
260 // Verify head hasn't changed after protection
261 if (head != head_.load(std::memory_order_acquire)) {
262 continue; // Head changed, retry
263 }
264
265 node* next = head->next.load(std::memory_order_acquire);
266
267 // Queue is empty if head->next is null
268 return next == nullptr;
269 }
270
271 // If we exhausted retries, do one final check without verification
272 // This handles the edge case where head keeps changing but we need a definitive answer
273 node* head = head_.load(std::memory_order_acquire);
274 hp_head.protect(head);
275 node* next = head->next.load(std::memory_order_acquire);
276 return next == nullptr;
277}
278
279// Get approximate size
280auto lockfree_job_queue::size() const -> std::size_t {
281 // Return cached size (may not be exact due to concurrent modifications)
282 return approximate_size_.load(std::memory_order_relaxed);
283}
284
285// Retire a node through hazard pointers, recycling via pool on reclamation
287 if (!n) return;
288
289 // Capture pool by shared_ptr so the closure remains valid even if the
290 // queue is destroyed before the hazard pointer domain reclaims this node
291 std::shared_ptr<node_pool> pool = pool_;
292
294 n,
295 [pool](void* ptr) {
296 pool->release(static_cast<node*>(ptr));
297 }
298 );
299}
300
301} // namespace kcenon::thread::detail
Lock-free node freelist (Treiber stack) for node recycling.
auto size() const -> std::size_t
Gets approximate queue size.
lockfree_job_queue()
Constructs an empty lock-free job queue.
void retire_node(node *n)
Retire a node through hazard pointers, recycling via pool on reclamation.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a job into the queue (thread-safe)
auto empty() const -> bool
Checks if the queue is empty.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue (thread-safe)
A template class representing either a value or an error.
void protect(void *p) noexcept
Protect a pointer.
static safe_hazard_pointer_domain & instance()
Get singleton instance.
void retire(void *p, retire_callback deleter)
Retire a pointer for later reclamation.
Lock-free MPMC job queue using Michael-Scott algorithm with hazard pointers.
@ retry
Retry failed job (with max retries)
STL namespace.
Retire node for pending deletion.