23#include <unordered_map>
32namespace kcenon {
namespace monitoring {
37struct thread_pool_stats {
40 size_t max_pool_size{0};
41 size_t min_pool_size{0};
44 size_t active_threads{0};
45 size_t idle_threads{0};
46 size_t waiting_threads{0};
49 size_t queued_tasks{0};
50 size_t max_queue_size{0};
51 size_t completed_tasks{0};
52 size_t failed_tasks{0};
53 size_t rejected_tasks{0};
56 double average_task_duration_ms{0.0};
57 double max_task_duration_ms{0.0};
58 double min_task_duration_ms{0.0};
59 double task_throughput_per_sec{0.0};
62 double thread_utilization_percent{0.0};
63 double cpu_usage_percent{0.0};
66 double average_queue_wait_ms{0.0};
67 double max_queue_wait_ms{0.0};
74class thread_system_collector :
public collector_plugin {
76 thread_system_collector();
77 ~thread_system_collector()
override;
80 auto name() const -> std::string_view
override {
return "thread_system"; }
81 auto collect() -> std::vector<metric>
override;
82 auto interval() const -> std::chrono::milliseconds
override {
return collection_interval_; }
83 auto is_available() const ->
bool override {
return true; }
84 auto get_metric_types() const -> std::vector<std::
string> override;
86 auto get_metadata() const -> plugin_metadata
override {
87 return plugin_metadata{
89 .description =
"Thread pool and system thread metrics collector",
90 .category = plugin_category::system,
93 .requires_platform_support =
false
97 auto initialize(
const config_map& config) ->
bool override;
98 void shutdown()
override {}
99 auto get_statistics() const -> stats_map override;
105 void set_thread_system_adapter(std::shared_ptr<thread_system_adapter> adapter);
112 void register_thread_pool(const std::
string& pool_name,
113 std::function<thread_pool_stats()> stats_provider);
119 void unregister_thread_pool(const std::
string& pool_name);
126 std::optional<thread_pool_stats> get_pool_stats(const std::
string& pool_name) const;
132 std::vector<std::
string> get_monitored_pools() const;
138 void set_detailed_metrics(
bool enable);
144 void set_collection_interval(std::chrono::milliseconds interval);
148 std::shared_ptr<thread_system_adapter> thread_adapter_;
149 std::shared_ptr<event_bus> event_bus_;
152 mutable std::mutex pools_mutex_;
153 std::unordered_map<std::
string, std::function<thread_pool_stats()>> pool_providers_;
154 std::unordered_map<std::
string, thread_pool_stats> last_pool_stats_;
157 bool collect_detailed_metrics_{
false};
158 bool use_event_bus_{
true};
159 std::chrono::milliseconds collection_interval_{1000};
162 mutable std::mutex stats_mutex_;
163 std::atomic<size_t> collection_count_{0};
164 std::atomic<size_t> collection_errors_{0};
165 std::atomic<bool> is_healthy_{
true};
166 std::chrono::steady_clock::time_point init_time_;
169 struct performance_tracker {
170 size_t total_tasks{0};
171 double total_duration_ms{0.0};
172 double total_wait_time_ms{0.0};
173 std::chrono::steady_clock::time_point last_reset;
175 std::unordered_map<std::string, performance_tracker> performance_trackers_;
178 std::vector<metric> collect_from_adapter();
179 std::vector<metric> collect_from_pools();
180 void add_pool_metrics(std::vector<metric>& metrics,
const std::string& pool_name,
181 const thread_pool_stats& stats);
182 void update_performance_tracking(
const std::string& pool_name,
const thread_pool_stats& stats);
183 metric create_metric(
const std::string& name,
double value,
const std::string& pool_name,
184 const std::string& unit =
"")
const;
185 void subscribe_to_events();
186 void handle_thread_pool_event(
const thread_pool_metric_event& event);
193class thread_pool_health_monitor {
202 struct health_report {
203 std::string pool_name;
205 std::vector<std::string> issues;
206 std::unordered_map<std::string, double> metrics;
207 std::chrono::steady_clock::time_point timestamp;
210 struct health_thresholds {
212 double queue_saturation_warn{0.7};
213 double queue_saturation_critical{0.9};
216 double thread_utilization_low{0.2};
217 double thread_utilization_high{0.9};
220 double task_failure_rate_warn{0.05};
221 double task_failure_rate_critical{0.1};
222 double task_rejection_rate_warn{0.01};
223 double task_rejection_rate_critical{0.05};
226 double queue_wait_warn_ms{1000};
227 double queue_wait_critical_ms{5000};
230 explicit thread_pool_health_monitor(
const health_thresholds& thresholds = {});
238 health_report analyze_health(
const thread_pool_stats& stats,
const std::string& pool_name);
245 health_status get_overall_health(
const std::unordered_map<std::string, thread_pool_stats>& pool_stats);
251 void update_thresholds(
const health_thresholds& thresholds);
257 health_thresholds get_thresholds()
const;
265 std::vector<health_report> get_health_history(
const std::optional<std::string>& pool_name = std::nullopt,
266 size_t max_count = 100)
const;
271 void clear_history();
274 mutable std::mutex thresholds_mutex_;
275 health_thresholds thresholds_;
277 mutable std::mutex history_mutex_;
278 std::vector<health_report> health_history_;
279 const size_t max_history_size_{1000};
281 health_status calculate_status(
const std::vector<std::string>& issues)
const;
282 void check_queue_saturation(health_report& report,
const thread_pool_stats& stats);
283 void check_thread_utilization(health_report& report,
const thread_pool_stats& stats);
284 void check_task_performance(health_report& report,
const thread_pool_stats& stats);
285 void check_queue_wait_times(health_report& report,
const thread_pool_stats& stats);
292class thread_pool_auto_scaler {
294 struct scaling_config {
296 size_t min_threads{1};
297 size_t max_threads{std::thread::hardware_concurrency() * 2};
300 double scale_up_threshold{0.8};
301 double scale_down_threshold{0.3};
304 size_t scale_up_increment{2};
305 size_t scale_down_decrement{1};
308 std::chrono::seconds scale_up_cooldown{30};
309 std::chrono::seconds scale_down_cooldown{60};
310 std::chrono::seconds evaluation_interval{10};
313 size_t min_stable_evaluations{3};
316 struct scaling_decision {
317 enum class action {
none, scale_up, scale_down };
319 action recommended_action;
321 size_t recommended_size;
323 std::chrono::steady_clock::time_point timestamp;
326 explicit thread_pool_auto_scaler(
const scaling_config& config = {});
333 scaling_decision evaluate(
const thread_pool_stats& stats);
341 bool apply_scaling(
const scaling_decision& decision,
342 std::function<
bool(
size_t)> resize_function);
348 void update_config(
const scaling_config& config);
354 scaling_config get_config()
const;
361 std::vector<scaling_decision> get_scaling_history(
size_t max_count = 100)
const;
369 mutable std::mutex config_mutex_;
370 scaling_config config_;
372 mutable std::mutex state_mutex_;
373 std::chrono::steady_clock::time_point last_scale_up_;
374 std::chrono::steady_clock::time_point last_scale_down_;
375 std::vector<double> utilization_history_;
376 std::vector<scaling_decision> scaling_history_;
377 const size_t max_history_size_{1000};
379 bool should_scale_up(
const thread_pool_stats& stats)
const;
380 bool should_scale_down(
const thread_pool_stats& stats)
const;
381 bool is_in_cooldown(scaling_decision::action action)
const;
382 bool has_stable_utilization(
double threshold,
bool above)
const;
383 size_t calculate_new_size(
size_t current_size, scaling_decision::action action)
const;
Plugin interface for metric collectors.
Lightweight event bus implementation for monitoring system.
Common event type definitions for monitoring system.
@ none
Not in a cgroup or not Linux.
health_status
System health status levels.
Consolidated thread system adapters for monitoring_system.