Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
aging_typed_job_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include "aging_typed_job.h"
16#include "typed_job.h"
17#include "job_types.h"
22
23#include <thread>
24#include <atomic>
25#include <vector>
26#include <mutex>
27#include <shared_mutex>
28#include <condition_variable>
29#include <unordered_map>
30#include <optional>
31#include <sstream>
32
33namespace kcenon::thread
34{
40 {
41 std::size_t total_boosts_applied{0};
42 std::size_t jobs_reached_max_boost{0};
43 std::size_t starvation_alerts{0};
44 std::chrono::milliseconds max_wait_time{0};
45 std::chrono::milliseconds avg_wait_time{0};
46 double boost_rate{0.0}; // Boosts per second
47 };
48
87 template <typename job_type = job_types>
89 {
90 public:
94
101
108
109 // Non-copyable
112
113 // Non-movable (due to thread)
116
117 // ============================================
118 // Aging Control
119 // ============================================
120
126 auto start_aging() -> void;
127
131 auto stop_aging() -> void;
132
138 [[nodiscard]] auto is_aging_running() const -> bool;
139
148 auto apply_aging() -> std::size_t;
149
150 // ============================================
151 // Starvation Detection
152 // ============================================
153
159 [[nodiscard]] auto get_starving_jobs() const -> std::vector<job_info>;
160
161 // ============================================
162 // Statistics
163 // ============================================
164
170 [[nodiscard]] auto get_aging_stats() const -> aging_stats;
171
175 auto reset_aging_stats() -> void;
176
177 // ============================================
178 // Configuration
179 // ============================================
180
186 auto set_aging_config(priority_aging_config config) -> void;
187
193 [[nodiscard]] auto get_aging_config() const -> const priority_aging_config&;
194
195 // ============================================
196 // Job Tracking
197 // ============================================
198
205 [[nodiscard]] auto enqueue(std::unique_ptr<aging_typed_job_t<job_type>>&& value)
206 -> common::VoidResult;
207
213 auto register_aging_job(aging_typed_job_t<job_type>* job) -> void;
214
220 auto unregister_aging_job(aging_typed_job_t<job_type>* job) -> void;
221
222 // ============================================
223 // typed_job_queue_t compatible API
224 // ============================================
225
232 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult;
233
240 [[nodiscard]] auto enqueue(std::unique_ptr<typed_job_t<job_type>>&& value)
241 -> common::VoidResult;
242
250 template<typename DerivedJob, typename = std::enable_if_t<std::is_base_of_v<typed_job_t<job_type>, DerivedJob>>>
251 [[nodiscard]] auto enqueue(std::unique_ptr<DerivedJob>&& value) -> common::VoidResult
252 {
253 return enqueue(std::unique_ptr<typed_job_t<job_type>>(std::move(value)));
254 }
255
262 [[nodiscard]] auto enqueue_batch(std::vector<std::unique_ptr<typed_job_t<job_type>>>&& jobs)
263 -> common::VoidResult;
264
270 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>>;
271
278 [[nodiscard]] auto dequeue(const std::vector<job_type>& types)
279 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>;
280
287 [[nodiscard]] auto dequeue(utility_module::span<const job_type> types)
288 -> common::Result<std::unique_ptr<typed_job_t<job_type>>>;
289
293 auto clear() -> void;
294
301 [[nodiscard]] auto empty(const std::vector<job_type>& types) const -> bool;
302
309 [[nodiscard]] auto empty(utility_module::span<const job_type> types) const -> bool;
310
316 [[nodiscard]] auto to_string() const -> std::string;
317
321 auto stop() -> void;
322
328 [[nodiscard]] auto size() const -> std::size_t;
329
330 private:
332 std::unique_ptr<std::thread> aging_thread_;
333 std::atomic<bool> aging_running_{false};
334 std::atomic<bool> stopped_{false};
335 std::condition_variable aging_cv_;
336 mutable std::mutex aging_mutex_;
337
338 // Type-based job storage using policy_queue for each type
339 std::unordered_map<job_type, std::unique_ptr<queue_type>> job_queues_;
340 mutable std::shared_mutex queues_mutex_;
341
342 // Tracked aging jobs
343 std::vector<aging_typed_job_t<job_type>*> aging_jobs_;
344 mutable std::mutex jobs_mutex_;
345
346 // Statistics
348 mutable std::mutex stats_mutex_;
349 std::chrono::steady_clock::time_point stats_start_time_;
350
354 auto aging_loop() -> void;
355
362 [[nodiscard]] auto calculate_boost(std::chrono::milliseconds wait_time) const -> int;
363
367 auto check_starvation() -> void;
368
377 auto update_stats(std::size_t boosts_applied,
378 std::chrono::milliseconds max_wait,
379 std::chrono::milliseconds total_wait,
380 std::size_t job_count) -> void;
381
388 auto get_or_create_queue(const job_type& type) -> queue_type*;
389
396 [[nodiscard]] auto try_dequeue_from_priority(const job_type& priority)
397 -> std::optional<std::unique_ptr<typed_job_t<job_type>>>;
398 };
399
405
406} // namespace kcenon::thread
Typed job with priority aging support to prevent starvation.
Bound policies for queue capacity: unbounded or fixed-size.
A typed job queue with priority aging support, based on policy_queue.
auto get_or_create_queue(const job_type &type) -> queue_type *
Get or create a queue for the specified type.
auto get_aging_stats() const -> aging_stats
Gets aging statistics.
std::chrono::steady_clock::time_point stats_start_time_
aging_typed_job_queue_t(aging_typed_job_queue_t &&)=delete
auto try_dequeue_from_priority(const job_type &priority) -> std::optional< std::unique_ptr< typed_job_t< job_type > > >
Attempts to dequeue a single job from the queue for a given priority.
std::unordered_map< job_type, std::unique_ptr< queue_type > > job_queues_
~aging_typed_job_queue_t()
Destroys the aging typed job queue.
auto enqueue_batch(std::vector< std::unique_ptr< typed_job_t< job_type > > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs.
auto unregister_aging_job(aging_typed_job_t< job_type > *job) -> void
Unregisters an aging job from tracking.
auto aging_loop() -> void
The main aging loop running in the background thread.
auto stop() -> void
Stops accepting new jobs and marks the queue as stopped.
auto calculate_boost(std::chrono::milliseconds wait_time) const -> int
Calculates the priority boost for a given wait time.
auto reset_aging_stats() -> void
Resets the aging statistics.
aging_typed_job_queue_t(priority_aging_config config={})
Constructs an aging typed job queue.
auto size() const -> std::size_t
Gets the total number of jobs in all queues.
auto apply_aging() -> std::size_t
Manually applies aging to all queued jobs.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues the next available job.
auto to_string() const -> std::string
Returns a string representation of the queue.
auto set_aging_config(priority_aging_config config) -> void
Sets the aging configuration.
auto is_aging_running() const -> bool
Checks if aging is currently running.
auto get_starving_jobs() const -> std::vector< job_info >
Gets jobs that are approaching starvation.
auto check_starvation() -> void
Checks for starving jobs and invokes callbacks.
std::vector< aging_typed_job_t< job_type > * > aging_jobs_
auto operator=(aging_typed_job_queue_t &&) -> aging_typed_job_queue_t &=delete
std::unique_ptr< std::thread > aging_thread_
auto empty(const std::vector< job_type > &types) const -> bool
Checks if there are no jobs in any of the specified priority queues.
auto register_aging_job(aging_typed_job_t< job_type > *job) -> void
Registers an aging job for tracking.
auto update_stats(std::size_t boosts_applied, std::chrono::milliseconds max_wait, std::chrono::milliseconds total_wait, std::size_t job_count) -> void
Updates statistics after applying aging.
auto enqueue(std::unique_ptr< aging_typed_job_t< job_type > > &&value) -> common::VoidResult
Enqueues an aging typed job.
aging_typed_job_queue_t(const aging_typed_job_queue_t &)=delete
auto stop_aging() -> void
Stops the background aging thread.
auto start_aging() -> void
Starts the background aging thread.
auto get_aging_config() const -> const priority_aging_config &
Gets the current aging configuration.
auto operator=(const aging_typed_job_queue_t &) -> aging_typed_job_queue_t &=delete
auto clear() -> void
Removes all jobs from all priority queues.
A typed job with priority aging support.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Synchronization policy using mutex and condition variable.
Policy that rejects new items when queue is full.
Policy that allows unlimited queue size.
Policy-based queue template.
Typed job template.
Definition typed_job.h:31
A fallback span implementation for C++17 and earlier compilers.
Definition span.h:46
Job type definitions for the typed thread pool.
Polyfill for std::span on pre-C++20 compilers.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ priority
Priority-based scheduling.
STL namespace.
Policy-based job queue template with customizable sync, bound, and overflow.
Configuration for priority aging and starvation prevention.
Statistics about priority aging behavior.
std::chrono::milliseconds avg_wait_time
std::chrono::milliseconds max_wait_time
Information about a job for starvation callback.
Configuration for priority aging behavior.
Synchronization policies: mutex-based or spinlock-based.
Base typed job carrying a specific priority level.