15#include <condition_variable>
57 .atomic_empty_check =
true,
60 .supports_batch =
true,
61 .supports_blocking_wait =
true,
87 [[nodiscard]]
auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
89 return common::error_info{-105,
"cannot enqueue null job",
"thread_system"};
92 std::lock_guard<std::mutex> lock(
mutex_);
93 queue_.push_back(std::move(value));
95 if (
notify_.load(std::memory_order_relaxed)) {
106 [[nodiscard]]
auto dequeue() -> common::Result<std::unique_ptr<job>> {
107 std::unique_lock<std::mutex> lock(
mutex_);
110 return !
queue_.empty() ||
stop_.load(std::memory_order_relaxed);
114 return common::error_info{-121,
"queue is stopped or empty",
"thread_system"};
117 auto value = std::move(
queue_.front());
126 [[nodiscard]]
auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
127 std::lock_guard<std::mutex> lock(
mutex_);
130 return common::error_info{-121,
"queue is empty",
"thread_system"};
133 auto value = std::move(
queue_.front());
142 [[nodiscard]]
auto empty() const ->
bool {
143 std::lock_guard<std::mutex> lock(
mutex_);
151 [[nodiscard]]
auto size() const ->
std::
size_t {
152 std::lock_guard<std::mutex> lock(
mutex_);
160 std::lock_guard<std::mutex> lock(
mutex_);
168 stop_.store(
true, std::memory_order_release);
177 return stop_.load(std::memory_order_acquire);
185 notify_.store(notify, std::memory_order_relaxed);
221 .atomic_empty_check =
false,
224 .supports_batch =
false,
225 .supports_blocking_wait =
false,
226 .supports_stop =
false
234 auto dummy =
new node(
nullptr);
235 head_.store(dummy, std::memory_order_relaxed);
236 tail_.store(dummy, std::memory_order_relaxed);
243 shutdown_.store(
true, std::memory_order_release);
246 node* current =
head_.load(std::memory_order_acquire);
247 while (current !=
nullptr) {
248 node* next = current->
next.load(std::memory_order_acquire);
265 [[nodiscard]]
auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
267 return common::error_info{-105,
"cannot enqueue null job",
"thread_system"};
270 if (
shutdown_.load(std::memory_order_acquire)) {
271 return common::error_info{-122,
"queue is shutting down",
"thread_system"};
274 auto new_node =
new node(std::move(value));
277 node* tail =
tail_.load(std::memory_order_acquire);
278 node* next = tail->
next.load(std::memory_order_acquire);
280 if (tail ==
tail_.load(std::memory_order_acquire)) {
281 if (next ==
nullptr) {
282 if (tail->
next.compare_exchange_weak(next, new_node,
283 std::memory_order_release,
284 std::memory_order_relaxed)) {
285 tail_.compare_exchange_strong(tail, new_node,
286 std::memory_order_release,
287 std::memory_order_relaxed);
292 tail_.compare_exchange_weak(tail, next,
293 std::memory_order_release,
294 std::memory_order_relaxed);
304 [[nodiscard]]
auto dequeue() -> common::Result<std::unique_ptr<job>> {
306 node* head =
head_.load(std::memory_order_acquire);
307 node* tail =
tail_.load(std::memory_order_acquire);
308 node* next = head->
next.load(std::memory_order_acquire);
310 if (head ==
head_.load(std::memory_order_acquire)) {
312 if (next ==
nullptr) {
313 return common::error_info{-121,
"queue is empty",
"thread_system"};
315 tail_.compare_exchange_weak(tail, next,
316 std::memory_order_release,
317 std::memory_order_relaxed);
319 if (next !=
nullptr) {
320 auto value = std::move(next->
data);
321 if (
head_.compare_exchange_weak(head, next,
322 std::memory_order_release,
323 std::memory_order_relaxed)) {
338 [[nodiscard]]
auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
346 [[nodiscard]]
auto empty() const ->
bool {
347 node* head =
head_.load(std::memory_order_acquire);
348 node* next = head->
next.load(std::memory_order_acquire);
349 return next ==
nullptr;
356 [[nodiscard]]
auto size() const ->
std::
size_t {
376 shutdown_.store(
true, std::memory_order_release);
384 return shutdown_.load(std::memory_order_acquire);
398 std::atomic<node*>
next{
nullptr};
400 explicit node(std::unique_ptr<job>&& job_data)
468 [[nodiscard]]
auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
479 [[nodiscard]]
auto dequeue() -> common::Result<std::unique_ptr<job>> {
490 [[nodiscard]]
auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
501 [[nodiscard]]
auto empty() const ->
bool {
512 [[nodiscard]]
auto size() const ->
std::
size_t {
Adaptive synchronization policy that can switch modes.
adaptive_sync_policy & operator=(const adaptive_sync_policy &)=delete
auto size() const -> std::size_t
Get queue size.
auto get_capabilities() const -> queue_capabilities
Queue capabilities (dynamic based on mode)
adaptive_sync_policy & operator=(adaptive_sync_policy &&)=delete
adaptive_sync_policy(mode initial_mode=mode::mutex)
Construct adaptive sync policy.
auto set_notify(bool notify) -> void
Set notify flag.
auto switch_mode(mode target_mode) -> void
Switch to a different mode.
adaptive_sync_policy(const adaptive_sync_policy &)=delete
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job.
auto is_stopped() const -> bool
Check if stopped.
auto clear() -> void
Clear queue.
~adaptive_sync_policy()=default
Destructor.
std::unique_ptr< lockfree_sync_policy > lockfree_policy_
std::unique_ptr< mutex_sync_policy > mutex_policy_
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job.
auto current_mode() const -> mode
Get current mode.
@ lock_free
Using lock-free sync.
auto empty() const -> bool
Check if queue is empty.
auto stop() -> void
Stop queue.
std::atomic< mode > current_mode_
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job.
adaptive_sync_policy(adaptive_sync_policy &&)=delete
Lock-free synchronization policy using Michael-Scott algorithm.
auto is_stopped() const -> bool
Check if queue is stopped.
lockfree_sync_policy & operator=(const lockfree_sync_policy &)=delete
auto size() const -> std::size_t
Get approximate queue size.
static constexpr auto get_capabilities() -> queue_capabilities
Queue capabilities for lock-free sync policy.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job (lock-free)
~lockfree_sync_policy()
Destructor.
lockfree_sync_policy(const lockfree_sync_policy &)=delete
auto empty() const -> bool
Check if queue appears empty (approximate)
std::atomic< node * > tail_
auto clear() -> void
Clear queue (best effort for lock-free)
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job (non-blocking, same as dequeue for lock-free)
lockfree_sync_policy & operator=(lockfree_sync_policy &&)=delete
lockfree_sync_policy(lockfree_sync_policy &&)=delete
lockfree_sync_policy()
Construct lock-free sync policy.
auto stop() -> void
Stop the queue (sets shutdown flag)
std::atomic< bool > shutdown_
std::atomic< node * > head_
std::atomic< std::size_t > approximate_size_
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job (wait-free)
auto set_notify(bool) -> void
Set notify flag (no-op for lock-free)
Synchronization policy using mutex and condition variable.
auto size() const -> std::size_t
Get queue size (exact)
std::atomic< bool > notify_
auto stop() -> void
Stop the queue.
mutex_sync_policy()
Construct mutex sync policy.
auto empty() const -> bool
Check if queue is empty.
mutex_sync_policy(mutex_sync_policy &&)=delete
mutex_sync_policy(const mutex_sync_policy &)=delete
auto set_notify(bool notify) -> void
Set notify flag.
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job (non-blocking)
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job (blocking)
auto clear() -> void
Clear all jobs from queue.
std::atomic< bool > stop_
std::condition_variable condition_
mutex_sync_policy & operator=(mutex_sync_policy &&)=delete
mutex_sync_policy & operator=(const mutex_sync_policy &)=delete
static constexpr auto get_capabilities() -> queue_capabilities
Queue capabilities for mutex sync policy.
auto is_stopped() const -> bool
Check if queue is stopped.
std::deque< std::unique_ptr< job > > queue_
~mutex_sync_policy()=default
Destructor.
A template class representing either a value or an error.
Error codes and utilities for the thread system.
Base job class for schedulable work units in the thread system.
Runtime-queryable queue capabilities descriptor.
Thread-local hazard pointer with explicit memory ordering for safe reclamation.
node(std::unique_ptr< job > &&job_data)
std::unique_ptr< job > data
std::atomic< node * > next
Tag type for sync policy identification.
Runtime-queryable queue capabilities descriptor.