Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
aging_typed_job_queue.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
8#include <cmath>
9#include <algorithm>
10
11namespace kcenon::thread
12{
13 template <typename job_type>
15 : config_(std::move(config))
16 , stats_start_time_(std::chrono::steady_clock::now())
17 {
18 }
19
20 template <typename job_type>
25
26 template <typename job_type>
28 {
29 if (aging_running_.exchange(true))
30 {
31 return; // Already running
32 }
33
34 aging_thread_ = std::make_unique<std::thread>([this]() {
35 aging_loop();
36 });
37 }
38
39 template <typename job_type>
41 {
42 if (!aging_running_.exchange(false))
43 {
44 return; // Not running
45 }
46
47 {
48 std::lock_guard lock(aging_mutex_);
49 aging_cv_.notify_all();
50 }
51
52 if (aging_thread_ && aging_thread_->joinable())
53 {
54 aging_thread_->join();
55 }
56 aging_thread_.reset();
57 }
58
59 template <typename job_type>
61 {
62 return aging_running_.load();
63 }
64
65 template <typename job_type>
67 {
68 while (aging_running_.load())
69 {
70 {
71 std::unique_lock lock(aging_mutex_);
72 aging_cv_.wait_for(lock, config_.aging_interval, [this]() {
73 return !aging_running_.load();
74 });
75 }
76
77 if (!aging_running_.load())
78 {
79 break;
80 }
81
82 if (config_.enabled)
83 {
84 apply_aging();
85 check_starvation();
86 }
87 }
88 }
89
90 template <typename job_type>
92 {
93 std::lock_guard jobs_lock(jobs_mutex_);
94
95 std::size_t boosts_applied = 0;
96 std::chrono::milliseconds max_wait{0};
97 std::chrono::milliseconds total_wait{0};
98
99 for (auto* job : aging_jobs_)
101 if (!job)
102 {
103 continue;
104 }
105
106 auto wait = job->wait_time();
107 total_wait += wait;
108
109 if (wait > max_wait)
110 {
111 max_wait = wait;
112 }
113
114 // Calculate number of intervals waited
115 auto intervals = wait.count() / config_.aging_interval.count();
116 if (intervals > 0)
117 {
118 auto boost = calculate_boost(wait);
119 if (boost > 0 && !job->is_max_boosted())
120 {
121 job->apply_boost(boost);
122 ++boosts_applied;
123
124 if (job->is_max_boosted())
125 {
126 std::lock_guard stats_lock(stats_mutex_);
127 ++stats_.jobs_reached_max_boost;
128 }
129 }
130 }
132
133 if (!aging_jobs_.empty())
134 {
135 update_stats(boosts_applied, max_wait, total_wait, aging_jobs_.size());
136 }
137
138 return boosts_applied;
139 }
140
141 template <typename job_type>
143 std::chrono::milliseconds wait_time) const -> int
144 {
145 auto intervals = static_cast<double>(wait_time.count()) /
146 static_cast<double>(config_.aging_interval.count());
147
148 int boost = 0;
149
150 switch (config_.curve)
151 {
153 boost = static_cast<int>(intervals) * config_.priority_boost_per_interval;
154 break;
155
157 // Exponential: boost increases faster over time
158 boost = static_cast<int>(
159 std::pow(config_.exponential_factor, intervals) - 1.0
160 ) * config_.priority_boost_per_interval;
161 break;
162
164 // Logarithmic: fast initial boost, slower over time
165 if (intervals > 0)
166 {
167 boost = static_cast<int>(
168 std::log(intervals + 1) / std::log(2.0)
169 ) * config_.priority_boost_per_interval;
171 break;
172 }
173
174 // Cap at max boost
175 return std::min(boost, config_.max_priority_boost);
176 }
177
178 template <typename job_type>
180 {
181 if (!config_.starvation_callback)
182 {
183 return;
184 }
185
186 auto threshold = std::chrono::duration_cast<std::chrono::milliseconds>(
187 config_.starvation_threshold
188 );
189
190 std::lock_guard jobs_lock(jobs_mutex_);
191
192 for (auto* job : aging_jobs_)
194 if (!job)
195 {
196 continue;
197 }
198
199 if (job->wait_time() > threshold)
200 {
201 config_.starvation_callback(job->to_job_info());
202
203 {
204 std::lock_guard stats_lock(stats_mutex_);
205 ++stats_.starvation_alerts;
206 }
207 }
208 }
209 }
210
211 template <typename job_type>
214 std::vector<job_info> result;
215
216 auto threshold = std::chrono::duration_cast<std::chrono::milliseconds>(
217 config_.starvation_threshold
218 );
219
220 std::lock_guard lock(jobs_mutex_);
221
222 for (auto* job : aging_jobs_)
223 {
224 if (job && job->wait_time() > threshold)
225 {
226 result.push_back(job->to_job_info());
227 }
228 }
229
230 return result;
231 }
233 template <typename job_type>
235 {
236 std::lock_guard lock(stats_mutex_);
237 return stats_;
238 }
239
240 template <typename job_type>
242 {
243 std::lock_guard lock(stats_mutex_);
244 stats_ = aging_stats{};
245 stats_start_time_ = std::chrono::steady_clock::now();
246 }
247
248 template <typename job_type>
250 {
251 std::lock_guard lock(aging_mutex_);
252 config_ = std::move(config);
253 }
254
255 template <typename job_type>
257 -> const priority_aging_config&
258 {
259 return config_;
260 }
261
262 template <typename job_type>
264 std::unique_ptr<aging_typed_job_t<job_type>>&& value) -> common::VoidResult
265 {
266 if (!value)
267 {
268 return common::error_info{
269 static_cast<int>(error_code::invalid_argument),
270 "Null job",
271 "thread_system"
272 };
273 }
274
275 if (stopped_.load())
276 {
277 return common::error_info{
278 static_cast<int>(error_code::queue_stopped),
279 "Queue is stopped",
280 "thread_system"
281 };
282 }
283
284 // Register for aging tracking
285 register_aging_job(value.get());
286
287 // Set max boost from config
288 value->set_max_boost(config_.max_priority_boost);
289
290 // Get the priority and enqueue to appropriate queue
291 auto priority = value->priority();
292 auto* queue = get_or_create_queue(priority);
293 return queue->enqueue(std::move(value));
294 }
295
296 template <typename job_type>
299 {
300 if (!job)
302 return;
303 }
304
305 std::lock_guard lock(jobs_mutex_);
306 aging_jobs_.push_back(job);
307 }
308
309 template <typename job_type>
312 {
313 if (!job)
314 {
315 return;
317
318 std::lock_guard lock(jobs_mutex_);
319 auto it = std::find(aging_jobs_.begin(), aging_jobs_.end(), job);
320 if (it != aging_jobs_.end())
322 aging_jobs_.erase(it);
323 }
324 }
325
326 template <typename job_type>
328 std::size_t boosts_applied,
329 std::chrono::milliseconds max_wait,
330 std::chrono::milliseconds total_wait,
331 std::size_t job_count) -> void
332 {
333 std::lock_guard lock(stats_mutex_);
334
335 stats_.total_boosts_applied += boosts_applied;
336
337 if (max_wait > stats_.max_wait_time)
338 {
339 stats_.max_wait_time = max_wait;
340 }
341
342 if (job_count > 0)
343 {
344 stats_.avg_wait_time = std::chrono::milliseconds{
345 total_wait.count() / static_cast<long long>(job_count)
346 };
347 }
348
349 // Calculate boost rate (boosts per second)
350 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
351 std::chrono::steady_clock::now() - stats_start_time_
352 );
353 if (elapsed.count() > 0)
355 stats_.boost_rate = static_cast<double>(stats_.total_boosts_applied) /
356 static_cast<double>(elapsed.count());
357 }
358 }
359
360 // ============================================
361 // typed_job_queue_t compatible API implementation
362 // ============================================
363
364 template <typename job_type>
366 {
367 std::unique_lock lock(queues_mutex_);
368 auto it = job_queues_.find(type);
369 if (it == job_queues_.end())
370 {
371 auto [new_it, inserted] = job_queues_.emplace(type, std::make_unique<queue_type>());
372 return new_it->second.get();
373 }
374 return it->second.get();
375 }
376
377 template <typename job_type>
378 auto aging_typed_job_queue_t<job_type>::enqueue(std::unique_ptr<job>&& value) -> common::VoidResult
379 {
380 if (!value)
381 {
382 return common::error_info{
383 static_cast<int>(error_code::invalid_argument),
384 "Null job",
385 "thread_system"
386 };
387 }
389 if (stopped_.load())
390 {
391 return common::error_info{
392 static_cast<int>(error_code::queue_stopped),
393 "Queue is stopped",
394 "thread_system"
395 };
397
398 // For non-typed jobs, use default priority
399 auto* queue = get_or_create_queue(job_type{});
400 return queue->enqueue(std::move(value));
401 }
402
403 template <typename job_type>
405 std::unique_ptr<typed_job_t<job_type>>&& value) -> common::VoidResult
406 {
407 if (!value)
408 {
409 return common::error_info{
410 static_cast<int>(error_code::invalid_argument),
411 "Null job",
412 "thread_system"
413 };
414 }
415
416 if (stopped_.load())
417 {
418 return common::error_info{
419 static_cast<int>(error_code::queue_stopped),
420 "Queue is stopped",
421 "thread_system"
422 };
423 }
424
425 auto priority = value->priority();
426 auto* queue = get_or_create_queue(priority);
427 return queue->enqueue(std::move(value));
428 }
429
430 template <typename job_type>
432 std::vector<std::unique_ptr<typed_job_t<job_type>>>&& jobs) -> common::VoidResult
433 {
434 for (auto& job : jobs)
435 {
436 auto result = enqueue(std::move(job));
437 if (result.is_err())
438 {
439 return result;
440 }
441 }
442 return common::ok();
443 }
444
445 template <typename job_type>
446 auto aging_typed_job_queue_t<job_type>::dequeue() -> common::Result<std::unique_ptr<job>>
447 {
448 std::shared_lock lock(queues_mutex_);
449
450 for (auto& [type, queue] : job_queues_)
451 {
452 if (queue && !queue->empty())
453 {
454 auto result = queue->dequeue();
455 if (result.is_ok())
456 {
457 return result;
458 }
459 }
460 }
461
462 return common::error_info{
463 static_cast<int>(error_code::job_invalid),
464 "No job available",
465 "thread_system"
466 };
467 }
468
469 template <typename job_type>
471 -> std::optional<std::unique_ptr<typed_job_t<job_type>>>
472 {
473 std::shared_lock lock(queues_mutex_);
474
475 auto it = job_queues_.find(priority);
476 if (it == job_queues_.end() || !it->second || it->second->empty())
477 {
478 return std::nullopt;
479 }
480
481 auto result = it->second->dequeue();
482 if (result.is_err())
483 {
484 return std::nullopt;
485 }
486
487 auto job = std::move(result.value());
488 // Cast back to typed_job_t if possible
489 auto* typed_ptr = dynamic_cast<typed_job_t<job_type>*>(job.release());
490 if (typed_ptr)
491 {
492 return std::unique_ptr<typed_job_t<job_type>>(typed_ptr);
493 }
494
495 return std::nullopt;
496 }
497
498 template <typename job_type>
499 auto aging_typed_job_queue_t<job_type>::dequeue(const std::vector<job_type>& types)
500 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>
501 {
502 for (const auto& type : types)
503 {
504 auto result = try_dequeue_from_priority(type);
505 if (result.has_value())
506 {
507 return std::move(result.value());
508 }
509 }
510
511 return common::error_info{
512 static_cast<int>(error_code::job_invalid),
513 "No job available for specified types",
514 "thread_system"
515 };
516 }
517
518 template <typename job_type>
520 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>
521 {
522 for (const auto& type : types)
523 {
524 auto result = try_dequeue_from_priority(type);
525 if (result.has_value())
526 {
527 return std::move(result.value());
528 }
529 }
530
531 return common::error_info{
532 static_cast<int>(error_code::job_invalid),
533 "No job available for specified types",
534 "thread_system"
535 };
536 }
537
538 template <typename job_type>
540 {
541 std::unique_lock lock(queues_mutex_);
542
543 for (auto& [type, queue] : job_queues_)
544 {
545 if (queue)
546 {
547 queue->clear();
548 }
549 }
550
551 {
552 std::lock_guard jobs_lock(jobs_mutex_);
553 aging_jobs_.clear();
554 }
555 }
556
557 template <typename job_type>
558 auto aging_typed_job_queue_t<job_type>::empty(const std::vector<job_type>& types) const -> bool
559 {
560 std::shared_lock lock(queues_mutex_);
561
562 for (const auto& type : types)
563 {
564 auto it = job_queues_.find(type);
565 if (it != job_queues_.end() && it->second && !it->second->empty())
566 {
567 return false;
568 }
569 }
570
571 return true;
572 }
573
574 template <typename job_type>
576 {
577 std::shared_lock lock(queues_mutex_);
578
579 for (const auto& type : types)
580 {
581 auto it = job_queues_.find(type);
582 if (it != job_queues_.end() && it->second && !it->second->empty())
583 {
584 return false;
585 }
586 }
587
588 return true;
589 }
590
591 template <typename job_type>
593 {
594 std::ostringstream oss;
595 oss << "aging_typed_job_queue{";
596 oss << "aging_running: " << (aging_running_.load() ? "true" : "false");
597 oss << ", stopped: " << (stopped_.load() ? "true" : "false");
598 oss << ", total_jobs: " << size();
599 oss << "}";
600 return oss.str();
601 }
602
603 template <typename job_type>
605 {
606 stopped_.store(true);
607
608 std::unique_lock lock(queues_mutex_);
609 for (auto& [type, queue] : job_queues_)
610 {
611 if (queue)
612 {
613 queue->stop();
614 }
615 }
616 }
617
618 template <typename job_type>
620 {
621 std::shared_lock lock(queues_mutex_);
622 std::size_t total = 0;
623
624 for (const auto& [type, queue] : job_queues_)
625 {
626 if (queue)
627 {
628 total += queue->size();
629 }
630 }
631
632 return total;
633 }
634
635 // Explicit template instantiation for job_types
637
638} // namespace kcenon::thread
Priority queue with aging to prevent low-priority job starvation.
A typed job queue with priority aging support, based on policy_queue.
auto get_or_create_queue(const job_type &type) -> queue_type *
Get or create a queue for the specified type.
auto get_aging_stats() const -> aging_stats
Gets aging statistics.
auto try_dequeue_from_priority(const job_type &priority) -> std::optional< std::unique_ptr< typed_job_t< job_type > > >
Attempts to dequeue a single job from the queue for a given priority.
~aging_typed_job_queue_t()
Destroys the aging typed job queue.
auto enqueue_batch(std::vector< std::unique_ptr< typed_job_t< job_type > > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs.
auto unregister_aging_job(aging_typed_job_t< job_type > *job) -> void
Unregisters an aging job from tracking.
auto aging_loop() -> void
The main aging loop running in the background thread.
auto stop() -> void
Stops accepting new jobs and marks the queue as stopped.
auto calculate_boost(std::chrono::milliseconds wait_time) const -> int
Calculates the priority boost for a given wait time.
auto reset_aging_stats() -> void
Resets the aging statistics.
aging_typed_job_queue_t(priority_aging_config config={})
Constructs an aging typed job queue.
auto size() const -> std::size_t
Gets the total number of jobs in all queues.
auto apply_aging() -> std::size_t
Manually applies aging to all queued jobs.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues the next available job.
auto to_string() const -> std::string
Returns a string representation of the queue.
auto set_aging_config(priority_aging_config config) -> void
Sets the aging configuration.
auto is_aging_running() const -> bool
Checks if aging is currently running.
auto get_starving_jobs() const -> std::vector< job_info >
Gets jobs that are approaching starvation.
auto check_starvation() -> void
Checks for starving jobs and invokes callbacks.
auto empty(const std::vector< job_type > &types) const -> bool
Checks if there are no jobs in any of the specified priority queues.
auto register_aging_job(aging_typed_job_t< job_type > *job) -> void
Registers an aging job for tracking.
auto update_stats(std::size_t boosts_applied, std::chrono::milliseconds max_wait, std::chrono::milliseconds total_wait, std::size_t job_count) -> void
Updates statistics after applying aging.
auto enqueue(std::unique_ptr< aging_typed_job_t< job_type > > &&value) -> common::VoidResult
Enqueues an aging typed job.
auto stop_aging() -> void
Stops the background aging thread.
auto start_aging() -> void
Starts the background aging thread.
auto get_aging_config() const -> const priority_aging_config &
Gets the current aging configuration.
auto clear() -> void
Removes all jobs from all priority queues.
A typed job with priority aging support.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Policy-based queue template.
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
bool is_ok() const noexcept
Checks if the result is successful.
Typed job template.
Definition typed_job.h:31
A fallback span implementation for C++17 and earlier compilers.
Definition span.h:46
Error codes and utilities for the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ logarithmic
Decreasing boost (fast initial, slow later)
@ linear
Constant boost per interval.
@ exponential
Increasing boost over time.
@ priority
Priority-based scheduling.
STL namespace.
Statistics about priority aging behavior.
Information about a job for starvation callback.
Configuration for priority aging behavior.