Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
aggregation_processor.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
16#include "../core/error_codes.h"
17#include "stream_aggregator.h"
18#include "metric_storage.h"
19
20#include <chrono>
21#include <functional>
22#include <memory>
23#include <mutex>
24#include <shared_mutex>
25#include <string>
26#include <unordered_map>
27#include <vector>
28
29namespace kcenon::monitoring {
30
36 std::string source_metric;
38 std::chrono::milliseconds aggregation_interval{60000};
39 std::vector<double> percentiles = {0.5, 0.9, 0.95, 0.99};
40 bool compute_rate = false;
41 bool detect_outliers = true;
42 double outlier_threshold = 3.0;
43
47 common::VoidResult validate() const {
48 if (source_metric.empty()) {
49 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
50 "Source metric name cannot be empty").to_common_error());
51 }
52 if (target_metric_prefix.empty()) {
53 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
54 "Target metric prefix cannot be empty").to_common_error());
55 }
56 if (aggregation_interval.count() <= 0) {
57 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
58 "Aggregation interval must be positive").to_common_error());
59 }
60 return common::ok();
61 }
62};
63
69 std::string source_metric;
70 size_t samples_processed = 0;
71 std::chrono::microseconds processing_duration{0};
73 std::chrono::system_clock::time_point timestamp;
74 bool stored_successfully = false;
75};
76
77// Alias for backward compatibility
79
88public:
93
98 explicit aggregation_processor(std::shared_ptr<metric_storage> storage)
99 : storage_(std::move(storage)) {}
100
106 common::VoidResult add_aggregation_rule(const aggregation_rule& rule) {
107 auto validation = rule.validate();
108 if (validation.is_err()) {
109 return validation;
110 }
111
112 std::unique_lock<std::shared_mutex> lock(mutex_);
113
114 // Check for duplicate
115 if (aggregators_.find(rule.source_metric) != aggregators_.end()) {
116 return common::VoidResult::err(error_info(monitoring_error_code::already_exists,
117 "Aggregation rule already exists for metric: " +
119 }
120
121 // Create stream aggregator config
125 config.percentiles_to_track = rule.percentiles;
126
127 // Create aggregator
128 aggregator_entry entry;
129 entry.rule = rule;
130 entry.aggregator = std::make_unique<stream_aggregator>(config);
131 entry.last_aggregation = std::chrono::system_clock::now();
132
133 aggregators_.emplace(rule.source_metric, std::move(entry));
134
135 return common::ok();
136 }
137
144 common::VoidResult process_observation(const std::string& metric_name, double value) {
145 std::shared_lock<std::shared_mutex> lock(mutex_);
146
147 auto it = aggregators_.find(metric_name);
148 if (it == aggregators_.end()) {
149 // No rule for this metric, silently accept
150 return common::ok();
151 }
152
153 return it->second.aggregator->add_observation(value);
154 }
155
161 common::Result<streaming_statistics> get_current_statistics(const std::string& metric_name) const {
162 std::shared_lock<std::shared_mutex> lock(mutex_);
163
164 auto it = aggregators_.find(metric_name);
165 if (it == aggregators_.end()) {
166 return common::make_error<streaming_statistics>(
168 "No aggregator found for metric: " + metric_name);
169 }
170
171 return common::ok(it->second.aggregator->get_statistics());
172 }
173
178 std::vector<std::string> get_configured_metrics() const {
179 std::shared_lock<std::shared_mutex> lock(mutex_);
180
181 std::vector<std::string> metrics;
182 metrics.reserve(aggregators_.size());
183
184 for (const auto& [name, entry] : aggregators_) {
185 metrics.push_back(name);
186 }
187
188 return metrics;
189 }
190
196 common::Result<stream_aggregation_result> force_aggregation(const std::string& metric_name) {
197 auto start_time = std::chrono::steady_clock::now();
198
199 std::unique_lock<std::shared_mutex> lock(mutex_);
200
201 auto it = aggregators_.find(metric_name);
202 if (it == aggregators_.end()) {
203 return common::make_error<stream_aggregation_result>(
205 "No aggregator found for metric: " + metric_name);
206 }
207
208 auto& entry = it->second;
209 auto stats = entry.aggregator->get_statistics();
210
211 auto end_time = std::chrono::steady_clock::now();
212 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
213 end_time - start_time);
214
215 stream_aggregation_result agg_result;
216 agg_result.source_metric = metric_name;
217 agg_result.samples_processed = stats.count;
218 agg_result.processing_duration = duration;
219 agg_result.statistics = stats;
220 agg_result.timestamp = std::chrono::system_clock::now();
221
222 // Store aggregated metrics if storage is available
223 if (storage_) {
224 const auto& prefix = entry.rule.target_metric_prefix;
225
226 storage_->store_metric(prefix + ".mean", stats.mean, metric_type::gauge);
227 storage_->store_metric(prefix + ".min", stats.min_value, metric_type::gauge);
228 storage_->store_metric(prefix + ".max", stats.max_value, metric_type::gauge);
229 storage_->store_metric(prefix + ".stddev", stats.std_deviation, metric_type::gauge);
230 storage_->store_metric(prefix + ".count",
231 static_cast<double>(stats.count), metric_type::counter);
232
233 for (const auto& [p, value] : stats.percentiles) {
234 std::string suffix = ".p" + std::to_string(static_cast<int>(p * 100));
235 storage_->store_metric(prefix + suffix, value, metric_type::gauge);
236 }
237
238 agg_result.stored_successfully = true;
239 }
240
241 // Reset the aggregator for next interval
242 entry.aggregator->reset();
243 entry.last_aggregation = std::chrono::system_clock::now();
244
245 return common::ok(agg_result);
246 }
247
253 common::VoidResult remove_aggregation_rule(const std::string& metric_name) {
254 std::unique_lock<std::shared_mutex> lock(mutex_);
255
256 auto it = aggregators_.find(metric_name);
257 if (it == aggregators_.end()) {
258 return common::VoidResult::err(error_info(monitoring_error_code::metric_not_found,
259 "No aggregator found for metric: " + metric_name).to_common_error());
260 }
261
262 aggregators_.erase(it);
263 return common::ok();
264 }
265
271 bool has_rule(const std::string& metric_name) const {
272 std::shared_lock<std::shared_mutex> lock(mutex_);
273 return aggregators_.find(metric_name) != aggregators_.end();
274 }
275
279 size_t rule_count() const {
280 std::shared_lock<std::shared_mutex> lock(mutex_);
281 return aggregators_.size();
282 }
283
287 void clear() {
288 std::unique_lock<std::shared_mutex> lock(mutex_);
289 aggregators_.clear();
290 }
291
292private:
295 std::unique_ptr<stream_aggregator> aggregator;
296 std::chrono::system_clock::time_point last_aggregation;
297 };
298
299 mutable std::shared_mutex mutex_;
300 std::shared_ptr<metric_storage> storage_;
301 std::unordered_map<std::string, aggregator_entry> aggregators_;
302};
303
308inline std::vector<aggregation_rule> create_standard_aggregation_rules() {
309 std::vector<aggregation_rule> rules;
310
311 // Response time metrics
312 {
313 aggregation_rule rule;
314 rule.source_metric = "response_time";
315 rule.target_metric_prefix = "response_time_agg";
316 rule.aggregation_interval = std::chrono::milliseconds(60000);
317 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
318 rule.compute_rate = false;
319 rule.detect_outliers = true;
320 rules.push_back(rule);
321 }
322
323 // Request count metrics
324 {
325 aggregation_rule rule;
326 rule.source_metric = "request_count";
327 rule.target_metric_prefix = "request_count_agg";
328 rule.aggregation_interval = std::chrono::milliseconds(60000);
329 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
330 rule.compute_rate = true;
331 rule.detect_outliers = false;
332 rules.push_back(rule);
333 }
334
335 // Error count metrics
336 {
337 aggregation_rule rule;
338 rule.source_metric = "error_count";
339 rule.target_metric_prefix = "error_count_agg";
340 rule.aggregation_interval = std::chrono::milliseconds(60000);
341 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
342 rule.compute_rate = true;
343 rule.detect_outliers = true;
344 rule.outlier_threshold = 2.0; // More sensitive for errors
345 rules.push_back(rule);
346 }
347
348 // CPU usage metrics
349 {
350 aggregation_rule rule;
351 rule.source_metric = "cpu_usage";
352 rule.target_metric_prefix = "cpu_usage_agg";
353 rule.aggregation_interval = std::chrono::milliseconds(60000);
354 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
355 rule.compute_rate = false;
356 rule.detect_outliers = true;
357 rules.push_back(rule);
358 }
359
360 // Memory usage metrics
361 {
362 aggregation_rule rule;
363 rule.source_metric = "memory_usage";
364 rule.target_metric_prefix = "memory_usage_agg";
365 rule.aggregation_interval = std::chrono::milliseconds(60000);
366 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
367 rule.compute_rate = false;
368 rule.detect_outliers = true;
369 rules.push_back(rule);
370 }
371
372 return rules;
373}
374
375} // namespace kcenon::monitoring
Processes metric streams and generates aggregated statistics.
std::shared_ptr< metric_storage > storage_
common::VoidResult add_aggregation_rule(const aggregation_rule &rule)
Add an aggregation rule.
void clear()
Clear all aggregation rules.
common::VoidResult remove_aggregation_rule(const std::string &metric_name)
Remove an aggregation rule.
bool has_rule(const std::string &metric_name) const
Check if a metric has an aggregation rule.
common::Result< stream_aggregation_result > force_aggregation(const std::string &metric_name)
Force aggregation for a metric.
common::VoidResult process_observation(const std::string &metric_name, double value)
Process an observation for a metric.
aggregation_processor(std::shared_ptr< metric_storage > storage)
Constructor with storage backend.
common::Result< streaming_statistics > get_current_statistics(const std::string &metric_name) const
Get current statistics for a metric.
std::vector< std::string > get_configured_metrics() const
Get list of configured metrics.
size_t rule_count() const
Get the number of configured rules.
std::unordered_map< std::string, aggregator_entry > aggregators_
Monitoring system specific error codes.
Memory-efficient metric storage with ring buffer backend.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
std::vector< aggregation_rule > create_standard_aggregation_rules()
Create standard aggregation rules for common metrics.
@ storage
Storage device sensor.
Result pattern type definitions for monitoring system.
Streaming statistical aggregation for real-time metrics.
std::chrono::system_clock::time_point last_aggregation
std::unique_ptr< stream_aggregator > aggregator
aggregation_rule rule
Configuration for metric aggregation.
std::chrono::milliseconds aggregation_interval
Aggregation interval.
double outlier_threshold
Outlier detection threshold.
bool compute_rate
Compute rate of change.
std::string target_metric_prefix
Prefix for aggregated metrics.
std::string source_metric
Source metric name.
bool detect_outliers
Enable outlier detection.
common::VoidResult validate() const
Validate the aggregation rule.
std::vector< double > percentiles
Percentiles to compute.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
streaming_statistics statistics
Computed statistics.
size_t samples_processed
Number of samples processed.
std::chrono::system_clock::time_point timestamp
Aggregation timestamp.
bool stored_successfully
Whether results were stored.
std::chrono::microseconds processing_duration
Processing duration.
Configuration for stream aggregator.
Statistical summary from streaming computation.