13 template <
typename job_type>
15 : config_(
std::move(config))
16 , stats_start_time_(
std::chrono::steady_clock::now())
20 template <
typename job_type>
26 template <
typename job_type>
29 if (aging_running_.exchange(
true))
34 aging_thread_ = std::make_unique<std::thread>([
this]() {
39 template <
typename job_type>
42 if (!aging_running_.exchange(
false))
48 std::lock_guard lock(aging_mutex_);
49 aging_cv_.notify_all();
52 if (aging_thread_ && aging_thread_->joinable())
54 aging_thread_->join();
56 aging_thread_.reset();
59 template <
typename job_type>
62 return aging_running_.load();
65 template <
typename job_type>
68 while (aging_running_.load())
71 std::unique_lock lock(aging_mutex_);
72 aging_cv_.wait_for(lock, config_.aging_interval, [
this]() {
73 return !aging_running_.load();
77 if (!aging_running_.load())
90 template <
typename job_type>
93 std::lock_guard jobs_lock(jobs_mutex_);
95 std::size_t boosts_applied = 0;
96 std::chrono::milliseconds max_wait{0};
97 std::chrono::milliseconds total_wait{0};
99 for (
auto*
job : aging_jobs_)
106 auto wait =
job->wait_time();
115 auto intervals = wait.count() / config_.aging_interval.count();
118 auto boost = calculate_boost(wait);
119 if (boost > 0 && !
job->is_max_boosted())
121 job->apply_boost(boost);
124 if (
job->is_max_boosted())
126 std::lock_guard stats_lock(stats_mutex_);
127 ++stats_.jobs_reached_max_boost;
133 if (!aging_jobs_.empty())
135 update_stats(boosts_applied, max_wait, total_wait, aging_jobs_.size());
138 return boosts_applied;
141 template <
typename job_type>
143 std::chrono::milliseconds wait_time)
const ->
int
145 auto intervals =
static_cast<double>(wait_time.count()) /
146 static_cast<double>(config_.aging_interval.count());
150 switch (config_.curve)
153 boost =
static_cast<int>(intervals) * config_.priority_boost_per_interval;
158 boost =
static_cast<int>(
159 std::pow(config_.exponential_factor, intervals) - 1.0
160 ) * config_.priority_boost_per_interval;
167 boost =
static_cast<int>(
168 std::log(intervals + 1) / std::log(2.0)
169 ) * config_.priority_boost_per_interval;
175 return std::min(boost, config_.max_priority_boost);
178 template <
typename job_type>
181 if (!config_.starvation_callback)
186 auto threshold = std::chrono::duration_cast<std::chrono::milliseconds>(
187 config_.starvation_threshold
190 std::lock_guard jobs_lock(jobs_mutex_);
192 for (
auto*
job : aging_jobs_)
199 if (
job->wait_time() > threshold)
201 config_.starvation_callback(
job->to_job_info());
204 std::lock_guard stats_lock(stats_mutex_);
205 ++stats_.starvation_alerts;
211 template <
typename job_type>
214 std::vector<job_info>
result;
216 auto threshold = std::chrono::duration_cast<std::chrono::milliseconds>(
217 config_.starvation_threshold
220 std::lock_guard lock(jobs_mutex_);
222 for (
auto*
job : aging_jobs_)
224 if (
job &&
job->wait_time() > threshold)
233 template <
typename job_type>
236 std::lock_guard lock(stats_mutex_);
240 template <
typename job_type>
243 std::lock_guard lock(stats_mutex_);
245 stats_start_time_ = std::chrono::steady_clock::now();
248 template <
typename job_type>
251 std::lock_guard lock(aging_mutex_);
252 config_ = std::move(config);
255 template <
typename job_type>
262 template <
typename job_type>
268 return common::error_info{
277 return common::error_info{
285 register_aging_job(value.get());
288 value->set_max_boost(config_.max_priority_boost);
292 auto* queue = get_or_create_queue(
priority);
293 return queue->enqueue(std::move(value));
296 template <
typename job_type>
305 std::lock_guard lock(jobs_mutex_);
306 aging_jobs_.push_back(
job);
309 template <
typename job_type>
318 std::lock_guard lock(jobs_mutex_);
319 auto it = std::find(aging_jobs_.begin(), aging_jobs_.end(),
job);
320 if (it != aging_jobs_.end())
322 aging_jobs_.erase(it);
326 template <
typename job_type>
328 std::size_t boosts_applied,
329 std::chrono::milliseconds max_wait,
330 std::chrono::milliseconds total_wait,
331 std::size_t job_count) ->
void
333 std::lock_guard lock(stats_mutex_);
335 stats_.total_boosts_applied += boosts_applied;
337 if (max_wait > stats_.max_wait_time)
339 stats_.max_wait_time = max_wait;
344 stats_.avg_wait_time = std::chrono::milliseconds{
345 total_wait.count() /
static_cast<long long>(job_count)
350 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
351 std::chrono::steady_clock::now() - stats_start_time_
353 if (elapsed.count() > 0)
355 stats_.boost_rate =
static_cast<double>(stats_.total_boosts_applied) /
356 static_cast<double>(elapsed.count());
364 template <
typename job_type>
367 std::unique_lock lock(queues_mutex_);
368 auto it = job_queues_.find(type);
369 if (it == job_queues_.end())
371 auto [new_it, inserted] = job_queues_.emplace(type, std::make_unique<queue_type>());
372 return new_it->second.get();
374 return it->second.get();
377 template <
typename job_type>
382 return common::error_info{
391 return common::error_info{
399 auto* queue = get_or_create_queue(job_type{});
400 return queue->enqueue(std::move(value));
403 template <
typename job_type>
409 return common::error_info{
418 return common::error_info{
426 auto* queue = get_or_create_queue(
priority);
427 return queue->enqueue(std::move(value));
430 template <
typename job_type>
434 for (
auto&
job : jobs)
445 template <
typename job_type>
448 std::shared_lock lock(queues_mutex_);
450 for (
auto& [type, queue] : job_queues_)
452 if (queue && !queue->empty())
454 auto result = queue->dequeue();
462 return common::error_info{
469 template <
typename job_type>
471 -> std::optional<std::unique_ptr<typed_job_t<job_type>>>
473 std::shared_lock lock(queues_mutex_);
475 auto it = job_queues_.find(
priority);
476 if (it == job_queues_.end() || !it->second || it->second->empty())
481 auto result = it->second->dequeue();
492 return std::unique_ptr<typed_job_t<job_type>>(typed_ptr);
498 template <
typename job_type>
500 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>
502 for (
const auto& type : types)
504 auto result = try_dequeue_from_priority(type);
511 return common::error_info{
513 "No job available for specified types",
518 template <
typename job_type>
520 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>
522 for (
const auto& type : types)
524 auto result = try_dequeue_from_priority(type);
531 return common::error_info{
533 "No job available for specified types",
538 template <
typename job_type>
541 std::unique_lock lock(queues_mutex_);
543 for (
auto& [type, queue] : job_queues_)
552 std::lock_guard jobs_lock(jobs_mutex_);
557 template <
typename job_type>
560 std::shared_lock lock(queues_mutex_);
562 for (
const auto& type : types)
564 auto it = job_queues_.find(type);
565 if (it != job_queues_.end() && it->second && !it->second->empty())
574 template <
typename job_type>
577 std::shared_lock lock(queues_mutex_);
579 for (
const auto& type : types)
581 auto it = job_queues_.find(type);
582 if (it != job_queues_.end() && it->second && !it->second->empty())
591 template <
typename job_type>
594 std::ostringstream oss;
595 oss <<
"aging_typed_job_queue{";
596 oss <<
"aging_running: " << (aging_running_.load() ?
"true" :
"false");
597 oss <<
", stopped: " << (stopped_.load() ?
"true" :
"false");
598 oss <<
", total_jobs: " << size();
603 template <
typename job_type>
606 stopped_.store(
true);
608 std::unique_lock lock(queues_mutex_);
609 for (
auto& [type, queue] : job_queues_)
618 template <
typename job_type>
621 std::shared_lock lock(queues_mutex_);
622 std::size_t total = 0;
624 for (
const auto& [type, queue] : job_queues_)
628 total += queue->size();
Priority queue with aging to prevent low-priority job starvation.
A typed job queue with priority aging support, based on policy_queue.
auto get_or_create_queue(const job_type &type) -> queue_type *
Get or create a queue for the specified type.
auto get_aging_stats() const -> aging_stats
Gets aging statistics.
auto try_dequeue_from_priority(const job_type &priority) -> std::optional< std::unique_ptr< typed_job_t< job_type > > >
Attempts to dequeue a single job from the queue for a given priority.
~aging_typed_job_queue_t()
Destroys the aging typed job queue.
auto enqueue_batch(std::vector< std::unique_ptr< typed_job_t< job_type > > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs.
auto unregister_aging_job(aging_typed_job_t< job_type > *job) -> void
Unregisters an aging job from tracking.
auto aging_loop() -> void
The main aging loop running in the background thread.
auto stop() -> void
Stops accepting new jobs and marks the queue as stopped.
auto calculate_boost(std::chrono::milliseconds wait_time) const -> int
Calculates the priority boost for a given wait time.
auto reset_aging_stats() -> void
Resets the aging statistics.
aging_typed_job_queue_t(priority_aging_config config={})
Constructs an aging typed job queue.
auto size() const -> std::size_t
Gets the total number of jobs in all queues.
auto apply_aging() -> std::size_t
Manually applies aging to all queued jobs.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues the next available job.
auto to_string() const -> std::string
Returns a string representation of the queue.
auto set_aging_config(priority_aging_config config) -> void
Sets the aging configuration.
auto is_aging_running() const -> bool
Checks if aging is currently running.
auto get_starving_jobs() const -> std::vector< job_info >
Gets jobs that are approaching starvation.
auto check_starvation() -> void
Checks for starving jobs and invokes callbacks.
auto empty(const std::vector< job_type > &types) const -> bool
Checks if there are no jobs in any of the specified priority queues.
auto register_aging_job(aging_typed_job_t< job_type > *job) -> void
Registers an aging job for tracking.
auto update_stats(std::size_t boosts_applied, std::chrono::milliseconds max_wait, std::chrono::milliseconds total_wait, std::size_t job_count) -> void
Updates statistics after applying aging.
auto enqueue(std::unique_ptr< aging_typed_job_t< job_type > > &&value) -> common::VoidResult
Enqueues an aging typed job.
auto stop_aging() -> void
Stops the background aging thread.
auto start_aging() -> void
Starts the background aging thread.
auto get_aging_config() const -> const priority_aging_config &
Gets the current aging configuration.
auto clear() -> void
Removes all jobs from all priority queues.
A typed job with priority aging support.
Represents a unit of work (task) to be executed, typically by a job queue.
Policy-based queue template.
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
bool is_ok() const noexcept
Checks if the result is successful.
A fallback span implementation for C++17 and earlier compilers.
Error codes and utilities for the thread system.
Core threading foundation of the thread system library.
@ logarithmic
Decreasing boost (fast initial, slow later)
@ linear
Constant boost per interval.
@ exponential
Increasing boost over time.
@ priority
Priority-based scheduling.
Statistics about priority aging behavior.
Information about a job for starvation callback.
Configuration for priority aging behavior.