Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
adaptive_job_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13#pragma once
14
18
19#include <atomic>
20#include <chrono>
21#include <memory>
22#include <mutex>
23#include <type_traits>
24
25namespace kcenon::thread {
26
62public:
66 enum class mode {
67 mutex,
69 };
70
74 enum class policy {
77 balanced,
78 manual
79 };
80
81 // ============================================
82 // Construction
83 // ============================================
84
90
95
96 // Non-copyable
99
100 // Non-movable (contains shared state)
103
104 // ============================================
105 // scheduler_interface implementation
106 // ============================================
107
113 auto schedule(std::unique_ptr<job>&& work) -> common::VoidResult override;
114
119 auto get_next_job() -> common::Result<std::unique_ptr<job>> override;
120
121 // ============================================
122 // Standard queue operations
123 // ============================================
124
130 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& j) -> common::VoidResult;
131
148 template<typename JobType, typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
149 [[nodiscard]] auto enqueue(std::unique_ptr<JobType>&& value) -> common::VoidResult
150 {
151 return enqueue(std::unique_ptr<job>(std::move(value)));
152 }
153
158 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>>;
159
164 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>>;
165
171 [[nodiscard]] auto empty() const -> bool;
172
178 [[nodiscard]] auto size() const -> std::size_t;
179
183 auto clear() -> void;
184
188 auto stop() -> void;
189
194 [[nodiscard]] auto is_stopped() const -> bool;
195
196 // ============================================
197 // queue_capabilities_interface implementation
198 // ============================================
199
208 [[nodiscard]] auto get_capabilities() const -> queue_capabilities override;
209
210 // ============================================
211 // Adaptive-specific API
212 // ============================================
213
218 [[nodiscard]] auto current_mode() const -> mode;
219
224 [[nodiscard]] auto current_policy() const -> policy;
225
231 auto switch_mode(mode m) -> common::VoidResult;
232
236 struct stats {
237 uint64_t mode_switches{0};
238 uint64_t time_in_mutex_ms{0};
239 uint64_t time_in_lockfree_ms{0};
240 uint64_t enqueue_count{0};
241 uint64_t dequeue_count{0};
242 };
243
248 [[nodiscard]] auto get_stats() const -> stats;
249
250 // ============================================
251 // RAII Guard for temporary accuracy
252 // ============================================
253
254 class accuracy_guard;
255
277 [[nodiscard]] auto require_accuracy() -> accuracy_guard;
278
284 public:
289 explicit accuracy_guard(adaptive_job_queue& queue);
290
295
296 // Move-only
297 accuracy_guard(accuracy_guard&& other) noexcept;
299
300 // Non-copyable
303
304 private:
308 };
309
310private:
311 // Policy and mode
313 std::atomic<mode> current_mode_;
314 std::atomic<bool> stopped_{false};
315
316 // Wrapped queues (existing classes, unchanged)
317 std::shared_ptr<job_queue> mutex_queue_;
318 std::unique_ptr<detail::lockfree_job_queue> lockfree_queue_;
319
320 // Synchronization for mode switching
321 mutable std::mutex migration_mutex_;
322
323 // Accuracy guard reference count
324 std::atomic<int> accuracy_guard_count_{0};
325
326 // Statistics tracking
327 mutable std::mutex stats_mutex_;
329 std::chrono::steady_clock::time_point mode_start_time_;
330
331 // Private methods
332 void migrate_to_mode(mode target);
333 void update_mode_time();
334 auto determine_mode_for_balanced() const -> mode;
335};
336
337} // namespace kcenon::thread
RAII guard that temporarily switches to accuracy mode.
accuracy_guard & operator=(accuracy_guard &&)=delete
accuracy_guard & operator=(const accuracy_guard &)=delete
accuracy_guard(const accuracy_guard &)=delete
Adaptive queue that switches between mutex and lock-free modes.
auto stop() -> void
Signals the queue to stop.
auto require_accuracy() -> accuracy_guard
Request temporary accuracy mode.
auto current_policy() const -> policy
Get current policy.
auto empty() const -> bool
Checks if the queue is empty.
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
Get next job (delegates to current queue)
auto switch_mode(mode m) -> common::VoidResult
Manually switch mode (only if policy is manual)
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the current active queue.
std::shared_ptr< job_queue > mutex_queue_
auto enqueue(std::unique_ptr< job > &&j) -> common::VoidResult
Enqueues a job into the current active queue.
adaptive_job_queue(policy p=policy::balanced)
Create adaptive queue with specified policy.
auto enqueue(std::unique_ptr< JobType > &&value) -> common::VoidResult
Type-safe enqueue for job subclasses.
std::chrono::steady_clock::time_point mode_start_time_
~adaptive_job_queue()
Destructor - cleans up both queue implementations.
auto schedule(std::unique_ptr< job > &&work) -> common::VoidResult override
Schedule a job (delegates to current queue)
adaptive_job_queue(adaptive_job_queue &&)=delete
adaptive_job_queue & operator=(adaptive_job_queue &&)=delete
std::unique_ptr< detail::lockfree_job_queue > lockfree_queue_
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Tries to dequeue a job without blocking.
auto clear() -> void
Clears all jobs from the queue.
auto get_stats() const -> stats
Get statistics about queue usage.
@ mutex
Using job_queue (accuracy mode)
@ lock_free
Using lockfree_job_queue (performance mode)
adaptive_job_queue & operator=(const adaptive_job_queue &)=delete
auto get_capabilities() const -> queue_capabilities override
Returns capabilities based on current mode.
auto size() const -> std::size_t
Returns the current number of jobs in the queue.
adaptive_job_queue(const adaptive_job_queue &)=delete
auto is_stopped() const -> bool
Checks if the queue is stopped.
auto current_mode() const -> mode
Get current operating mode.
@ performance_first
Always use lock-free mode.
@ balanced
Auto-switch based on usage.
Mixin interface for queue capability introspection.
Scheduler interface for queuing and retrieving jobs.
Thread-safe FIFO job queue with optional bounded size.
Lock-free MPMC job queue using Michael-Scott algorithm with hazard pointers.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Mixin interface for queue capability introspection.
Runtime-queryable queue capabilities descriptor.