15 , current_mode_(
mode::mutex)
17 , lockfree_queue_(
std::make_unique<detail::lockfree_job_queue>())
18 , mode_start_time_(
std::chrono::steady_clock::now()) {
49 return enqueue(std::move(work));
61 if (stopped_.load(std::memory_order_acquire)) {
69 mode current = current_mode_.load(std::memory_order_acquire);
70 common::VoidResult
result = (current == mode::mutex)
71 ? mutex_queue_->enqueue(std::move(j))
72 : lockfree_queue_->enqueue(std::move(j));
75 std::lock_guard<std::mutex> lock(stats_mutex_);
76 ++stats_.enqueue_count;
83 if (stopped_.load(std::memory_order_acquire)) {
87 common::Result<std::unique_ptr<job>>
result = common::error_info{
static_cast<int>(
error_code::queue_empty),
"Queue is empty",
"thread_system"};
88 mode current = current_mode_.load(std::memory_order_acquire);
91 if (current == mode::mutex) {
92 result = mutex_queue_->try_dequeue();
95 result = lockfree_queue_->dequeue();
98 result = lockfree_queue_->dequeue();
101 result = mutex_queue_->try_dequeue();
106 std::lock_guard<std::mutex> lock(stats_mutex_);
107 ++stats_.dequeue_count;
114 if (stopped_.load(std::memory_order_acquire)) {
118 common::Result<std::unique_ptr<job>>
result = common::error_info{
static_cast<int>(
error_code::queue_empty),
"Queue is empty",
"thread_system"};
119 mode current = current_mode_.load(std::memory_order_acquire);
122 if (current == mode::mutex) {
123 result = mutex_queue_->try_dequeue();
126 result = lockfree_queue_->try_dequeue();
129 result = lockfree_queue_->try_dequeue();
132 result = mutex_queue_->try_dequeue();
137 std::lock_guard<std::mutex> lock(stats_mutex_);
138 ++stats_.dequeue_count;
161 std::lock_guard<std::mutex> lock(migration_mutex_);
163 mutex_queue_->clear();
166 auto result = lockfree_queue_->dequeue();
175 stopped_.store(
true, std::memory_order_release);
176 mutex_queue_->stop();
180 return stopped_.load(std::memory_order_acquire);
193 .atomic_empty_check =
true,
196 .supports_batch =
true,
197 .supports_blocking_wait =
true,
198 .supports_stop =
true
203 .atomic_empty_check =
false,
206 .supports_batch =
false,
207 .supports_blocking_wait =
false,
208 .supports_stop =
true
226 if (policy_ != policy::manual) {
228 "Mode switching is only allowed with manual policy",
"thread_system"};
231 if (current_mode_.load(std::memory_order_acquire) == m) {
246 auto now = std::chrono::steady_clock::now();
247 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
251 result.time_in_mutex_ms += duration;
253 result.time_in_lockfree_ms += duration;
269 , previous_mode_(queue.current_mode())
282 if (!active_ || !queue_) {
287 int prev_count = queue_->accuracy_guard_count_.fetch_sub(1, std::memory_order_acq_rel);
290 if (prev_count == 1) {
292 switch (queue_->policy_) {
302 mode target = queue_->determine_mode_for_balanced();
304 queue_->migrate_to_mode(target);
311 queue_->migrate_to_mode(previous_mode_);
319 : queue_(other.queue_)
320 , previous_mode_(other.previous_mode_)
321 , active_(other.active_) {
322 other.active_ =
false;
323 other.queue_ =
nullptr;
334 if (current == target) {
361 for (
auto&
job : jobs) {
379 auto now = std::chrono::steady_clock::now();
380 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
400 if (total_ops > 10000) {
Adaptive queue that auto-switches between mutex and lock-free modes.
RAII guard that temporarily switches to accuracy mode.
adaptive_job_queue * queue_
~accuracy_guard()
Destructor - restores previous mode.
accuracy_guard(adaptive_job_queue &queue)
Construct guard and switch queue to mutex mode.
Adaptive queue that switches between mutex and lock-free modes.
auto stop() -> void
Signals the queue to stop.
std::atomic< int > accuracy_guard_count_
auto require_accuracy() -> accuracy_guard
Request temporary accuracy mode.
auto current_policy() const -> policy
Get current policy.
auto empty() const -> bool
Checks if the queue is empty.
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
Get next job (delegates to current queue)
auto switch_mode(mode m) -> common::VoidResult
Manually switch mode (only if policy is manual)
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the current active queue.
std::shared_ptr< job_queue > mutex_queue_
auto enqueue(std::unique_ptr< job > &&j) -> common::VoidResult
Enqueues a job into the current active queue.
adaptive_job_queue(policy p=policy::balanced)
Create adaptive queue with specified policy.
void migrate_to_mode(mode target)
std::chrono::steady_clock::time_point mode_start_time_
auto determine_mode_for_balanced() const -> mode
~adaptive_job_queue()
Destructor - cleans up both queue implementations.
std::atomic< mode > current_mode_
auto schedule(std::unique_ptr< job > &&work) -> common::VoidResult override
Schedule a job (delegates to current queue)
std::mutex migration_mutex_
std::unique_ptr< detail::lockfree_job_queue > lockfree_queue_
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Tries to dequeue a job without blocking.
auto clear() -> void
Clears all jobs from the queue.
auto get_stats() const -> stats
Get statistics about queue usage.
@ mutex
Using job_queue (accuracy mode)
@ lock_free
Using lockfree_job_queue (performance mode)
auto get_capabilities() const -> queue_capabilities override
Returns capabilities based on current mode.
std::atomic< bool > stopped_
auto size() const -> std::size_t
Returns the current number of jobs in the queue.
auto is_stopped() const -> bool
Checks if the queue is stopped.
auto current_mode() const -> mode
Get current operating mode.
@ manual
User controls mode.
@ performance_first
Always use lock-free mode.
@ balanced
Auto-switch based on usage.
@ accuracy_first
Always use mutex mode.
A thread-safe job queue for managing and dispatching work items.
Represents a unit of work (task) to be executed, typically by a job queue.
A template class representing either a value or an error.
T & value() &
Gets the value.
bool is_ok() const noexcept
Checks if the result is successful.
Core threading foundation of the thread system library.
Statistics about mode switching.
uint64_t time_in_lockfree_ms
Cumulative time in lock-free mode (ms)
uint64_t time_in_mutex_ms
Cumulative time in mutex mode (ms)
uint64_t mode_switches
Total number of mode switches.
uint64_t dequeue_count
Total dequeue operations.
uint64_t enqueue_count
Total enqueue operations.
Runtime-queryable queue capabilities descriptor.