Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
sync_policies.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <atomic>
15#include <condition_variable>
16#include <deque>
17#include <memory>
18#include <mutex>
19
24
26
31
48public:
50
54 [[nodiscard]] static constexpr auto get_capabilities() -> queue_capabilities {
55 return queue_capabilities{
56 .exact_size = true,
57 .atomic_empty_check = true,
58 .lock_free = false,
59 .wait_free = false,
60 .supports_batch = true,
61 .supports_blocking_wait = true,
62 .supports_stop = true
63 };
64 }
65
69 mutex_sync_policy() : notify_(true), stop_(false) {}
70
74 ~mutex_sync_policy() = default;
75
76 // Non-copyable, non-movable
81
87 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
88 if (!value) {
89 return common::error_info{-105, "cannot enqueue null job", "thread_system"};
90 }
91
92 std::lock_guard<std::mutex> lock(mutex_);
93 queue_.push_back(std::move(value));
94
95 if (notify_.load(std::memory_order_relaxed)) {
96 condition_.notify_one();
97 }
98
99 return common::ok();
100 }
101
106 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>> {
107 std::unique_lock<std::mutex> lock(mutex_);
108
109 condition_.wait(lock, [this] {
110 return !queue_.empty() || stop_.load(std::memory_order_relaxed);
111 });
112
113 if (queue_.empty()) {
114 return common::error_info{-121, "queue is stopped or empty", "thread_system"};
115 }
116
117 auto value = std::move(queue_.front());
118 queue_.pop_front();
119 return value;
120 }
121
126 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
127 std::lock_guard<std::mutex> lock(mutex_);
128
129 if (queue_.empty()) {
130 return common::error_info{-121, "queue is empty", "thread_system"};
131 }
132
133 auto value = std::move(queue_.front());
134 queue_.pop_front();
135 return value;
136 }
137
142 [[nodiscard]] auto empty() const -> bool {
143 std::lock_guard<std::mutex> lock(mutex_);
144 return queue_.empty();
145 }
146
151 [[nodiscard]] auto size() const -> std::size_t {
152 std::lock_guard<std::mutex> lock(mutex_);
153 return queue_.size();
154 }
155
159 auto clear() -> void {
160 std::lock_guard<std::mutex> lock(mutex_);
161 queue_.clear();
162 }
163
167 auto stop() -> void {
168 stop_.store(true, std::memory_order_release);
169 condition_.notify_all();
170 }
171
176 [[nodiscard]] auto is_stopped() const -> bool {
177 return stop_.load(std::memory_order_acquire);
178 }
179
184 auto set_notify(bool notify) -> void {
185 notify_.store(notify, std::memory_order_relaxed);
186 }
187
188private:
189 mutable std::mutex mutex_;
190 std::condition_variable condition_;
191 std::deque<std::unique_ptr<job>> queue_;
192 std::atomic<bool> notify_;
193 std::atomic<bool> stop_;
194};
195
212public:
214
218 [[nodiscard]] static constexpr auto get_capabilities() -> queue_capabilities {
219 return queue_capabilities{
220 .exact_size = false,
221 .atomic_empty_check = false,
222 .lock_free = true,
223 .wait_free = false,
224 .supports_batch = false,
225 .supports_blocking_wait = false,
226 .supports_stop = false
227 };
228 }
229
234 auto dummy = new node(nullptr);
235 head_.store(dummy, std::memory_order_relaxed);
236 tail_.store(dummy, std::memory_order_relaxed);
237 }
238
243 shutdown_.store(true, std::memory_order_release);
244
245 // Drain remaining nodes
246 node* current = head_.load(std::memory_order_acquire);
247 while (current != nullptr) {
248 node* next = current->next.load(std::memory_order_acquire);
249 delete current;
250 current = next;
251 }
252 }
253
254 // Non-copyable, non-movable
259
265 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
266 if (!value) {
267 return common::error_info{-105, "cannot enqueue null job", "thread_system"};
268 }
269
270 if (shutdown_.load(std::memory_order_acquire)) {
271 return common::error_info{-122, "queue is shutting down", "thread_system"};
272 }
273
274 auto new_node = new node(std::move(value));
275
276 while (true) {
277 node* tail = tail_.load(std::memory_order_acquire);
278 node* next = tail->next.load(std::memory_order_acquire);
279
280 if (tail == tail_.load(std::memory_order_acquire)) {
281 if (next == nullptr) {
282 if (tail->next.compare_exchange_weak(next, new_node,
283 std::memory_order_release,
284 std::memory_order_relaxed)) {
285 tail_.compare_exchange_strong(tail, new_node,
286 std::memory_order_release,
287 std::memory_order_relaxed);
288 approximate_size_.fetch_add(1, std::memory_order_relaxed);
289 return common::ok();
290 }
291 } else {
292 tail_.compare_exchange_weak(tail, next,
293 std::memory_order_release,
294 std::memory_order_relaxed);
295 }
296 }
297 }
298 }
299
304 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>> {
305 while (true) {
306 node* head = head_.load(std::memory_order_acquire);
307 node* tail = tail_.load(std::memory_order_acquire);
308 node* next = head->next.load(std::memory_order_acquire);
309
310 if (head == head_.load(std::memory_order_acquire)) {
311 if (head == tail) {
312 if (next == nullptr) {
313 return common::error_info{-121, "queue is empty", "thread_system"};
314 }
315 tail_.compare_exchange_weak(tail, next,
316 std::memory_order_release,
317 std::memory_order_relaxed);
318 } else {
319 if (next != nullptr) {
320 auto value = std::move(next->data);
321 if (head_.compare_exchange_weak(head, next,
322 std::memory_order_release,
323 std::memory_order_relaxed)) {
324 approximate_size_.fetch_sub(1, std::memory_order_relaxed);
325 delete head;
326 return value;
327 }
328 }
329 }
330 }
331 }
332 }
333
338 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
339 return dequeue();
340 }
341
346 [[nodiscard]] auto empty() const -> bool {
347 node* head = head_.load(std::memory_order_acquire);
348 node* next = head->next.load(std::memory_order_acquire);
349 return next == nullptr;
350 }
351
356 [[nodiscard]] auto size() const -> std::size_t {
357 return approximate_size_.load(std::memory_order_relaxed);
358 }
359
363 auto clear() -> void {
364 while (true) {
365 auto result = dequeue();
366 if (result.is_err()) {
367 break;
368 }
369 }
370 }
371
375 auto stop() -> void {
376 shutdown_.store(true, std::memory_order_release);
377 }
378
383 [[nodiscard]] auto is_stopped() const -> bool {
384 return shutdown_.load(std::memory_order_acquire);
385 }
386
391 auto set_notify(bool /*notify*/) -> void {
392 // No-op: lock-free queue doesn't use condition variables
393 }
394
395private:
396 struct node {
397 std::unique_ptr<job> data;
398 std::atomic<node*> next{nullptr};
399
400 explicit node(std::unique_ptr<job>&& job_data)
401 : data(std::move(job_data)) {}
402 };
403
404 std::atomic<node*> head_;
405 std::atomic<node*> tail_;
406 std::atomic<bool> shutdown_;
407 mutable std::atomic<std::size_t> approximate_size_;
408};
409
422public:
424
428 enum class mode {
429 mutex,
430 lock_free
431 };
432
436 [[nodiscard]] auto get_capabilities() const -> queue_capabilities {
437 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
439 }
441 }
442
447 explicit adaptive_sync_policy(mode initial_mode = mode::mutex)
448 : current_mode_(initial_mode)
449 , mutex_policy_(std::make_unique<mutex_sync_policy>())
450 , lockfree_policy_(std::make_unique<lockfree_sync_policy>()) {}
451
456
457 // Non-copyable, non-movable
462
468 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
469 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
470 return mutex_policy_->enqueue(std::move(value));
471 }
472 return lockfree_policy_->enqueue(std::move(value));
473 }
474
479 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>> {
480 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
481 return mutex_policy_->dequeue();
482 }
483 return lockfree_policy_->dequeue();
484 }
485
490 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
491 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
492 return mutex_policy_->try_dequeue();
493 }
494 return lockfree_policy_->try_dequeue();
495 }
496
501 [[nodiscard]] auto empty() const -> bool {
502 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
503 return mutex_policy_->empty();
504 }
505 return lockfree_policy_->empty();
506 }
507
512 [[nodiscard]] auto size() const -> std::size_t {
513 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
514 return mutex_policy_->size();
515 }
516 return lockfree_policy_->size();
517 }
518
522 auto clear() -> void {
523 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
524 mutex_policy_->clear();
525 } else {
526 lockfree_policy_->clear();
527 }
528 }
529
533 auto stop() -> void {
534 mutex_policy_->stop();
535 lockfree_policy_->stop();
536 }
537
542 [[nodiscard]] auto is_stopped() const -> bool {
543 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
544 return mutex_policy_->is_stopped();
545 }
546 return lockfree_policy_->is_stopped();
547 }
548
553 auto set_notify(bool notify) -> void {
554 mutex_policy_->set_notify(notify);
555 }
556
561 [[nodiscard]] auto current_mode() const -> mode {
562 return current_mode_.load(std::memory_order_acquire);
563 }
564
571 auto switch_mode(mode target_mode) -> void {
572 current_mode_.store(target_mode, std::memory_order_release);
573 }
574
575private:
576 std::atomic<mode> current_mode_;
577 std::unique_ptr<mutex_sync_policy> mutex_policy_;
578 std::unique_ptr<lockfree_sync_policy> lockfree_policy_;
579};
580
581} // namespace kcenon::thread::policies
Adaptive synchronization policy that can switch modes.
adaptive_sync_policy & operator=(const adaptive_sync_policy &)=delete
auto size() const -> std::size_t
Get queue size.
auto get_capabilities() const -> queue_capabilities
Queue capabilities (dynamic based on mode)
adaptive_sync_policy & operator=(adaptive_sync_policy &&)=delete
adaptive_sync_policy(mode initial_mode=mode::mutex)
Construct adaptive sync policy.
auto set_notify(bool notify) -> void
Set notify flag.
auto switch_mode(mode target_mode) -> void
Switch to a different mode.
adaptive_sync_policy(const adaptive_sync_policy &)=delete
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job.
auto is_stopped() const -> bool
Check if stopped.
std::unique_ptr< lockfree_sync_policy > lockfree_policy_
std::unique_ptr< mutex_sync_policy > mutex_policy_
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job.
auto current_mode() const -> mode
Get current mode.
auto empty() const -> bool
Check if queue is empty.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job.
adaptive_sync_policy(adaptive_sync_policy &&)=delete
Lock-free synchronization policy using Michael-Scott algorithm.
auto is_stopped() const -> bool
Check if queue is stopped.
lockfree_sync_policy & operator=(const lockfree_sync_policy &)=delete
auto size() const -> std::size_t
Get approximate queue size.
static constexpr auto get_capabilities() -> queue_capabilities
Queue capabilities for lock-free sync policy.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job (lock-free)
lockfree_sync_policy(const lockfree_sync_policy &)=delete
auto empty() const -> bool
Check if queue appears empty (approximate)
auto clear() -> void
Clear queue (best effort for lock-free)
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job (non-blocking, same as dequeue for lock-free)
lockfree_sync_policy & operator=(lockfree_sync_policy &&)=delete
lockfree_sync_policy(lockfree_sync_policy &&)=delete
lockfree_sync_policy()
Construct lock-free sync policy.
auto stop() -> void
Stop the queue (sets shutdown flag)
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job (wait-free)
auto set_notify(bool) -> void
Set notify flag (no-op for lock-free)
Synchronization policy using mutex and condition variable.
auto size() const -> std::size_t
Get queue size (exact)
mutex_sync_policy()
Construct mutex sync policy.
auto empty() const -> bool
Check if queue is empty.
mutex_sync_policy(mutex_sync_policy &&)=delete
mutex_sync_policy(const mutex_sync_policy &)=delete
auto set_notify(bool notify) -> void
Set notify flag.
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job (non-blocking)
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job (blocking)
auto clear() -> void
Clear all jobs from queue.
mutex_sync_policy & operator=(mutex_sync_policy &&)=delete
mutex_sync_policy & operator=(const mutex_sync_policy &)=delete
static constexpr auto get_capabilities() -> queue_capabilities
Queue capabilities for mutex sync policy.
auto is_stopped() const -> bool
Check if queue is stopped.
std::deque< std::unique_ptr< job > > queue_
A template class representing either a value or an error.
Error codes and utilities for the thread system.
Base job class for schedulable work units in the thread system.
STL namespace.
Runtime-queryable queue capabilities descriptor.
Thread-local hazard pointer with explicit memory ordering for safe reclamation.
Tag type for sync policy identification.
Runtime-queryable queue capabilities descriptor.