Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
lockfree_job_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13#pragma once
14
20
21#include <atomic>
22#include <memory>
23#include <optional>
24
25namespace kcenon::thread {
26
27namespace detail {
28
65public:
73
81
82 // Non-copyable and non-movable (queue has shared global state)
87
101 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& job) -> common::VoidResult;
102
116 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>>;
117
126 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
127 return dequeue();
128 }
129
138 [[nodiscard]] auto empty() const -> bool;
139
148 [[nodiscard]] auto size() const -> std::size_t;
149
150 // ============================================
151 // scheduler_interface implementation
152 // ============================================
153
162 auto schedule(std::unique_ptr<job>&& work) -> common::VoidResult override {
163 return enqueue(std::move(work));
164 }
165
173 auto get_next_job() -> common::Result<std::unique_ptr<job>> override {
174 return dequeue();
175 }
176
177 // ============================================
178 // queue_capabilities_interface implementation
179 // ============================================
180
197 [[nodiscard]] auto get_capabilities() const -> queue_capabilities override {
198 return queue_capabilities{
199 .exact_size = false, // Approximate only
200 .atomic_empty_check = false, // Non-atomic
201 .lock_free = true, // Lock-free implementation
202 .wait_free = false, // Not wait-free
203 .supports_batch = false, // No batch operations
204 .supports_blocking_wait = false, // Spin-wait only
205 .supports_stop = false // No stop() method
206 };
207 }
208
209private:
216 struct node {
217 std::unique_ptr<job> data; // Payload (nullptr for dummy node)
218 std::atomic<node*> next{nullptr}; // Next node in queue
219
220 node() : data(nullptr), next(nullptr) {}
221
222 explicit node(std::unique_ptr<job>&& job_data)
223 : data(std::move(job_data))
224 , next(nullptr) {}
225 };
226
238 class node_pool {
239 public:
240 node_pool() = default;
241
243 // Drain freelist and delete all pooled nodes
244 node* head = free_list_.load(std::memory_order_relaxed);
245 while (head) {
246 node* next = head->next.load(std::memory_order_relaxed);
247 delete head;
248 head = next;
249 }
250 }
251
252 // Non-copyable, non-movable
253 node_pool(const node_pool&) = delete;
254 node_pool& operator=(const node_pool&) = delete;
255
261 node* acquire(std::unique_ptr<job>&& job_data) {
262 node* head = free_list_.load(std::memory_order_acquire);
263 while (head) {
264 if (free_list_.compare_exchange_weak(
265 head, head->next.load(std::memory_order_relaxed),
266 std::memory_order_acq_rel,
267 std::memory_order_acquire)) {
268 // Reuse pooled node — reset fields
269 head->data = std::move(job_data);
270 head->next.store(nullptr, std::memory_order_relaxed);
271 return head;
272 }
273 // head is updated by CAS on failure, retry
274 }
275 // Pool empty — allocate fresh node
276 return new node(std::move(job_data));
277 }
278
283 void release(node* n) {
284 if (!n) return;
285 // Clear payload before pooling
286 n->data.reset();
287 node* old_head = free_list_.load(std::memory_order_relaxed);
288 do {
289 n->next.store(old_head, std::memory_order_relaxed);
290 } while (!free_list_.compare_exchange_weak(
291 old_head, n,
292 std::memory_order_release,
293 std::memory_order_relaxed));
294 }
295
296 private:
297 std::atomic<node*> free_list_{nullptr};
298 };
299
301 void retire_node(node* n);
302
303 // Queue state: head and tail pointers
304 std::atomic<node*> head_; // Dequeue end (with acquire/release)
305 std::atomic<node*> tail_; // Enqueue end (with acquire/release)
306
307 // Node pool for recycling retired nodes (shared_ptr for capture safety)
308 std::shared_ptr<node_pool> pool_;
309
310 // Hazard pointer domain for safe memory reclamation
311 // Uses safe_hazard_pointer with explicit memory ordering (safe on ARM/weak memory models)
313
314 // Statistics (for monitoring, relaxed memory order)
315 mutable std::atomic<std::size_t> approximate_size_{0};
316
317 // Shutdown flag for safe destruction
318 std::atomic<bool> shutdown_{false};
319};
320
321} // namespace detail
322
323} // namespace kcenon::thread
Lock-free node freelist (Treiber stack) for node recycling.
node_pool & operator=(const node_pool &)=delete
void release(node *n)
Return a retired node to the pool for reuse.
node * acquire(std::unique_ptr< job > &&job_data)
Acquire a node from the pool, or allocate a new one.
Lock-free Multi-Producer Multi-Consumer (MPMC) job queue (Internal implementation)
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Tries to dequeue a job without blocking.
auto size() const -> std::size_t
Gets approximate queue size.
lockfree_job_queue & operator=(const lockfree_job_queue &)=delete
lockfree_job_queue()
Constructs an empty lock-free job queue.
lockfree_job_queue & operator=(lockfree_job_queue &&)=delete
auto schedule(std::unique_ptr< job > &&work) -> common::VoidResult override
Schedule a job (delegates to enqueue)
lockfree_job_queue(const lockfree_job_queue &)=delete
lockfree_job_queue(lockfree_job_queue &&)=delete
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
Get next job (delegates to dequeue)
void retire_node(node *n)
Retire a node through hazard pointers, recycling via pool on reclamation.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a job into the queue (thread-safe)
auto get_capabilities() const -> queue_capabilities override
Returns capabilities of lockfree_job_queue.
auto empty() const -> bool
Checks if the queue is empty.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue (thread-safe)
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Mixin interface for queue capability introspection.
Scheduler interface for queuing and retrieving jobs.
Error codes and utilities for the thread system.
Base job class for schedulable work units in the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Mixin interface for queue capability introspection.
Thread-local hazard pointer with explicit memory ordering for safe reclamation.
Abstract scheduler interface for queuing and retrieving jobs.
Runtime-queryable queue capabilities descriptor.