Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
thread_system_collector.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <algorithm>
15#include <atomic>
16#include <chrono>
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <optional>
21#include <string>
22#include <thread>
23#include <unordered_map>
24#include <utility>
25#include <vector>
26
28#include "../core/event_bus.h"
29#include "../core/event_types.h"
31
32namespace kcenon { namespace monitoring {
33
37struct thread_pool_stats {
38 // Pool configuration
39 size_t pool_size{0};
40 size_t max_pool_size{0};
41 size_t min_pool_size{0};
42
43 // Thread states
44 size_t active_threads{0};
45 size_t idle_threads{0};
46 size_t waiting_threads{0};
47
48 // Task queue metrics
49 size_t queued_tasks{0};
50 size_t max_queue_size{0};
51 size_t completed_tasks{0};
52 size_t failed_tasks{0};
53 size_t rejected_tasks{0};
54
55 // Performance metrics
56 double average_task_duration_ms{0.0};
57 double max_task_duration_ms{0.0};
58 double min_task_duration_ms{0.0};
59 double task_throughput_per_sec{0.0};
60
61 // Thread utilization
62 double thread_utilization_percent{0.0};
63 double cpu_usage_percent{0.0};
64
65 // Queue wait times
66 double average_queue_wait_ms{0.0};
67 double max_queue_wait_ms{0.0};
68};
69
74class thread_system_collector : public collector_plugin {
75 public:
76 thread_system_collector();
77 ~thread_system_collector() override;
78
79 // collector_plugin implementation
80 auto name() const -> std::string_view override { return "thread_system"; }
81 auto collect() -> std::vector<metric> override;
82 auto interval() const -> std::chrono::milliseconds override { return collection_interval_; }
83 auto is_available() const -> bool override { return true; }
84 auto get_metric_types() const -> std::vector<std::string> override;
85
86 auto get_metadata() const -> plugin_metadata override {
87 return plugin_metadata{
88 .name = name(),
89 .description = "Thread pool and system thread metrics collector",
90 .category = plugin_category::system,
91 .version = "1.0.0",
92 .dependencies = {},
93 .requires_platform_support = false
94 };
95 }
96
97 auto initialize(const config_map& config) -> bool override;
98 void shutdown() override {}
99 auto get_statistics() const -> stats_map override;
100
105 void set_thread_system_adapter(std::shared_ptr<thread_system_adapter> adapter);
106
112 void register_thread_pool(const std::string& pool_name,
113 std::function<thread_pool_stats()> stats_provider);
114
119 void unregister_thread_pool(const std::string& pool_name);
120
126 std::optional<thread_pool_stats> get_pool_stats(const std::string& pool_name) const;
127
132 std::vector<std::string> get_monitored_pools() const;
133
138 void set_detailed_metrics(bool enable);
139
144 void set_collection_interval(std::chrono::milliseconds interval);
145
146 private:
147 // Thread system integration
148 std::shared_ptr<thread_system_adapter> thread_adapter_;
149 std::shared_ptr<event_bus> event_bus_;
150
151 // Thread pool monitoring
152 mutable std::mutex pools_mutex_;
153 std::unordered_map<std::string, std::function<thread_pool_stats()>> pool_providers_;
154 std::unordered_map<std::string, thread_pool_stats> last_pool_stats_;
155
156 // Configuration
157 bool collect_detailed_metrics_{false};
158 bool use_event_bus_{true};
159 std::chrono::milliseconds collection_interval_{1000};
160
161 // Statistics tracking
162 mutable std::mutex stats_mutex_;
163 std::atomic<size_t> collection_count_{0};
164 std::atomic<size_t> collection_errors_{0};
165 std::atomic<bool> is_healthy_{true};
166 std::chrono::steady_clock::time_point init_time_;
167
168 // Performance tracking
169 struct performance_tracker {
170 size_t total_tasks{0};
171 double total_duration_ms{0.0};
172 double total_wait_time_ms{0.0};
173 std::chrono::steady_clock::time_point last_reset;
174 };
175 std::unordered_map<std::string, performance_tracker> performance_trackers_;
176
177 // Helper methods
178 std::vector<metric> collect_from_adapter();
179 std::vector<metric> collect_from_pools();
180 void add_pool_metrics(std::vector<metric>& metrics, const std::string& pool_name,
181 const thread_pool_stats& stats);
182 void update_performance_tracking(const std::string& pool_name, const thread_pool_stats& stats);
183 metric create_metric(const std::string& name, double value, const std::string& pool_name,
184 const std::string& unit = "") const;
185 void subscribe_to_events();
186 void handle_thread_pool_event(const thread_pool_metric_event& event);
187};
188
193class thread_pool_health_monitor {
194 public:
195 enum class health_status {
196 healthy,
197 degraded,
198 unhealthy,
200 };
201
202 struct health_report {
203 std::string pool_name;
204 health_status status;
205 std::vector<std::string> issues;
206 std::unordered_map<std::string, double> metrics;
207 std::chrono::steady_clock::time_point timestamp;
208 };
209
210 struct health_thresholds {
211 // Task queue thresholds
212 double queue_saturation_warn{0.7}; // 70% queue full
213 double queue_saturation_critical{0.9}; // 90% queue full
214
215 // Thread utilization thresholds
216 double thread_utilization_low{0.2}; // 20% utilization (underutilized)
217 double thread_utilization_high{0.9}; // 90% utilization (overloaded)
218
219 // Task performance thresholds
220 double task_failure_rate_warn{0.05}; // 5% failure rate
221 double task_failure_rate_critical{0.1}; // 10% failure rate
222 double task_rejection_rate_warn{0.01}; // 1% rejection rate
223 double task_rejection_rate_critical{0.05}; // 5% rejection rate
224
225 // Queue wait time thresholds (ms)
226 double queue_wait_warn_ms{1000}; // 1 second wait
227 double queue_wait_critical_ms{5000}; // 5 seconds wait
228 };
229
230 explicit thread_pool_health_monitor(const health_thresholds& thresholds = {});
231
238 health_report analyze_health(const thread_pool_stats& stats, const std::string& pool_name);
239
245 health_status get_overall_health(const std::unordered_map<std::string, thread_pool_stats>& pool_stats);
246
251 void update_thresholds(const health_thresholds& thresholds);
252
257 health_thresholds get_thresholds() const;
258
265 std::vector<health_report> get_health_history(const std::optional<std::string>& pool_name = std::nullopt,
266 size_t max_count = 100) const;
267
271 void clear_history();
272
273 private:
274 mutable std::mutex thresholds_mutex_;
275 health_thresholds thresholds_;
276
277 mutable std::mutex history_mutex_;
278 std::vector<health_report> health_history_;
279 const size_t max_history_size_{1000};
280
281 health_status calculate_status(const std::vector<std::string>& issues) const;
282 void check_queue_saturation(health_report& report, const thread_pool_stats& stats);
283 void check_thread_utilization(health_report& report, const thread_pool_stats& stats);
284 void check_task_performance(health_report& report, const thread_pool_stats& stats);
285 void check_queue_wait_times(health_report& report, const thread_pool_stats& stats);
286};
287
292class thread_pool_auto_scaler {
293 public:
294 struct scaling_config {
295 // Scaling boundaries
296 size_t min_threads{1};
297 size_t max_threads{std::thread::hardware_concurrency() * 2};
298
299 // Scaling triggers
300 double scale_up_threshold{0.8}; // 80% utilization
301 double scale_down_threshold{0.3}; // 30% utilization
302
303 // Scaling parameters
304 size_t scale_up_increment{2};
305 size_t scale_down_decrement{1};
306
307 // Timing parameters
308 std::chrono::seconds scale_up_cooldown{30};
309 std::chrono::seconds scale_down_cooldown{60};
310 std::chrono::seconds evaluation_interval{10};
311
312 // Stability parameters
313 size_t min_stable_evaluations{3}; // Number of consistent evaluations before scaling
314 };
315
316 struct scaling_decision {
317 enum class action { none, scale_up, scale_down };
318
319 action recommended_action;
320 size_t current_size;
321 size_t recommended_size;
322 std::string reason;
323 std::chrono::steady_clock::time_point timestamp;
324 };
325
326 explicit thread_pool_auto_scaler(const scaling_config& config = {});
327
333 scaling_decision evaluate(const thread_pool_stats& stats);
334
341 bool apply_scaling(const scaling_decision& decision,
342 std::function<bool(size_t)> resize_function);
343
348 void update_config(const scaling_config& config);
349
354 scaling_config get_config() const;
355
361 std::vector<scaling_decision> get_scaling_history(size_t max_count = 100) const;
362
366 void reset();
367
368 private:
369 mutable std::mutex config_mutex_;
370 scaling_config config_;
371
372 mutable std::mutex state_mutex_;
373 std::chrono::steady_clock::time_point last_scale_up_;
374 std::chrono::steady_clock::time_point last_scale_down_;
375 std::vector<double> utilization_history_;
376 std::vector<scaling_decision> scaling_history_;
377 const size_t max_history_size_{1000};
378
379 bool should_scale_up(const thread_pool_stats& stats) const;
380 bool should_scale_down(const thread_pool_stats& stats) const;
381 bool is_in_cooldown(scaling_decision::action action) const;
382 bool has_stable_utilization(double threshold, bool above) const;
383 size_t calculate_new_size(size_t current_size, scaling_decision::action action) const;
384};
385
386} // namespace monitoring_system
Plugin interface for metric collectors.
Lightweight event bus implementation for monitoring system.
Common event type definitions for monitoring system.
@ none
Not in a cgroup or not Linux.
health_status
System health status levels.
Consolidated thread system adapters for monitoring_system.