Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::aggregation_processor Class Reference

Processes metric streams and generates aggregated statistics. More...

#include <aggregation_processor.h>

Collaboration diagram for kcenon::monitoring::aggregation_processor:
Collaboration graph

Classes

struct  aggregator_entry
 

Public Member Functions

 aggregation_processor ()
 Default constructor.
 
 aggregation_processor (std::shared_ptr< metric_storage > storage)
 Constructor with storage backend.
 
common::VoidResult add_aggregation_rule (const aggregation_rule &rule)
 Add an aggregation rule.
 
common::VoidResult process_observation (const std::string &metric_name, double value)
 Process an observation for a metric.
 
common::Result< streaming_statisticsget_current_statistics (const std::string &metric_name) const
 Get current statistics for a metric.
 
std::vector< std::string > get_configured_metrics () const
 Get list of configured metrics.
 
common::Result< stream_aggregation_resultforce_aggregation (const std::string &metric_name)
 Force aggregation for a metric.
 
common::VoidResult remove_aggregation_rule (const std::string &metric_name)
 Remove an aggregation rule.
 
bool has_rule (const std::string &metric_name) const
 Check if a metric has an aggregation rule.
 
size_t rule_count () const
 Get the number of configured rules.
 
void clear ()
 Clear all aggregation rules.
 

Private Attributes

std::shared_mutex mutex_
 
std::shared_ptr< metric_storagestorage_
 
std::unordered_map< std::string, aggregator_entryaggregators_
 

Detailed Description

Processes metric streams and generates aggregated statistics.

The aggregation processor manages multiple stream aggregators, one for each configured metric, and periodically computes and stores aggregated statistics.

Definition at line 87 of file aggregation_processor.h.

Constructor & Destructor Documentation

◆ aggregation_processor() [1/2]

kcenon::monitoring::aggregation_processor::aggregation_processor ( )
inline

Default constructor.

Definition at line 92 of file aggregation_processor.h.

92: storage_(nullptr) {}
std::shared_ptr< metric_storage > storage_

◆ aggregation_processor() [2/2]

kcenon::monitoring::aggregation_processor::aggregation_processor ( std::shared_ptr< metric_storage > storage)
inlineexplicit

Constructor with storage backend.

Parameters
storageShared pointer to metric storage

Definition at line 98 of file aggregation_processor.h.

99 : storage_(std::move(storage)) {}
@ storage
Storage device sensor.

Member Function Documentation

◆ add_aggregation_rule()

common::VoidResult kcenon::monitoring::aggregation_processor::add_aggregation_rule ( const aggregation_rule & rule)
inline

Add an aggregation rule.

Parameters
ruleThe aggregation rule to add
Returns
Result indicating success or failure

Definition at line 106 of file aggregation_processor.h.

106 {
107 auto validation = rule.validate();
108 if (validation.is_err()) {
109 return validation;
110 }
111
112 std::unique_lock<std::shared_mutex> lock(mutex_);
113
114 // Check for duplicate
115 if (aggregators_.find(rule.source_metric) != aggregators_.end()) {
116 return common::VoidResult::err(error_info(monitoring_error_code::already_exists,
117 "Aggregation rule already exists for metric: " +
118 rule.source_metric).to_common_error());
119 }
120
121 // Create stream aggregator config
122 stream_aggregator_config config;
123 config.enable_outlier_detection = rule.detect_outliers;
124 config.outlier_threshold = rule.outlier_threshold;
125 config.percentiles_to_track = rule.percentiles;
126
127 // Create aggregator
128 aggregator_entry entry;
129 entry.rule = rule;
130 entry.aggregator = std::make_unique<stream_aggregator>(config);
131 entry.last_aggregation = std::chrono::system_clock::now();
132
133 aggregators_.emplace(rule.source_metric, std::move(entry));
134
135 return common::ok();
136 }
std::unordered_map< std::string, aggregator_entry > aggregators_

References kcenon::monitoring::aggregation_processor::aggregator_entry::aggregator, aggregators_, kcenon::monitoring::already_exists, kcenon::monitoring::aggregation_rule::detect_outliers, kcenon::monitoring::stream_aggregator_config::enable_outlier_detection, kcenon::monitoring::aggregation_processor::aggregator_entry::last_aggregation, mutex_, kcenon::monitoring::aggregation_rule::outlier_threshold, kcenon::monitoring::stream_aggregator_config::outlier_threshold, kcenon::monitoring::aggregation_rule::percentiles, kcenon::monitoring::stream_aggregator_config::percentiles_to_track, kcenon::monitoring::aggregation_processor::aggregator_entry::rule, kcenon::monitoring::aggregation_rule::source_metric, kcenon::monitoring::error_info::to_common_error(), and kcenon::monitoring::aggregation_rule::validate().

Referenced by TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear()

void kcenon::monitoring::aggregation_processor::clear ( )
inline

Clear all aggregation rules.

Definition at line 287 of file aggregation_processor.h.

287 {
288 std::unique_lock<std::shared_mutex> lock(mutex_);
289 aggregators_.clear();
290 }

References aggregators_, and mutex_.

◆ force_aggregation()

common::Result< stream_aggregation_result > kcenon::monitoring::aggregation_processor::force_aggregation ( const std::string & metric_name)
inline

Force aggregation for a metric.

Parameters
metric_nameThe metric name
Returns
Result containing the aggregation result

Definition at line 196 of file aggregation_processor.h.

196 {
197 auto start_time = std::chrono::steady_clock::now();
198
199 std::unique_lock<std::shared_mutex> lock(mutex_);
200
201 auto it = aggregators_.find(metric_name);
202 if (it == aggregators_.end()) {
203 return common::make_error<stream_aggregation_result>(
205 "No aggregator found for metric: " + metric_name);
206 }
207
208 auto& entry = it->second;
209 auto stats = entry.aggregator->get_statistics();
210
211 auto end_time = std::chrono::steady_clock::now();
212 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
213 end_time - start_time);
214
215 stream_aggregation_result agg_result;
216 agg_result.source_metric = metric_name;
217 agg_result.samples_processed = stats.count;
218 agg_result.processing_duration = duration;
219 agg_result.statistics = stats;
220 agg_result.timestamp = std::chrono::system_clock::now();
221
222 // Store aggregated metrics if storage is available
223 if (storage_) {
224 const auto& prefix = entry.rule.target_metric_prefix;
225
226 storage_->store_metric(prefix + ".mean", stats.mean, metric_type::gauge);
227 storage_->store_metric(prefix + ".min", stats.min_value, metric_type::gauge);
228 storage_->store_metric(prefix + ".max", stats.max_value, metric_type::gauge);
229 storage_->store_metric(prefix + ".stddev", stats.std_deviation, metric_type::gauge);
230 storage_->store_metric(prefix + ".count",
231 static_cast<double>(stats.count), metric_type::counter);
232
233 for (const auto& [p, value] : stats.percentiles) {
234 std::string suffix = ".p" + std::to_string(static_cast<int>(p * 100));
235 storage_->store_metric(prefix + suffix, value, metric_type::gauge);
236 }
237
238 agg_result.stored_successfully = true;
239 }
240
241 // Reset the aggregator for next interval
242 entry.aggregator->reset();
243 entry.last_aggregation = std::chrono::system_clock::now();
244
245 return common::ok(agg_result);
246 }
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.

References aggregators_, kcenon::monitoring::counter, kcenon::monitoring::gauge, kcenon::monitoring::metric_not_found, mutex_, kcenon::monitoring::stream_aggregation_result::processing_duration, kcenon::monitoring::stream_aggregation_result::samples_processed, kcenon::monitoring::stream_aggregation_result::source_metric, kcenon::monitoring::stream_aggregation_result::statistics, storage_, kcenon::monitoring::stream_aggregation_result::stored_successfully, and kcenon::monitoring::stream_aggregation_result::timestamp.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ get_configured_metrics()

std::vector< std::string > kcenon::monitoring::aggregation_processor::get_configured_metrics ( ) const
inline

Get list of configured metrics.

Returns
Vector of metric names

Definition at line 178 of file aggregation_processor.h.

178 {
179 std::shared_lock<std::shared_mutex> lock(mutex_);
180
181 std::vector<std::string> metrics;
182 metrics.reserve(aggregators_.size());
183
184 for (const auto& [name, entry] : aggregators_) {
185 metrics.push_back(name);
186 }
187
188 return metrics;
189 }

References aggregators_, and mutex_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ get_current_statistics()

common::Result< streaming_statistics > kcenon::monitoring::aggregation_processor::get_current_statistics ( const std::string & metric_name) const
inline

Get current statistics for a metric.

Parameters
metric_nameThe metric name
Returns
Result containing the statistics

Definition at line 161 of file aggregation_processor.h.

161 {
162 std::shared_lock<std::shared_mutex> lock(mutex_);
163
164 auto it = aggregators_.find(metric_name);
165 if (it == aggregators_.end()) {
166 return common::make_error<streaming_statistics>(
168 "No aggregator found for metric: " + metric_name);
169 }
170
171 return common::ok(it->second.aggregator->get_statistics());
172 }

References aggregators_, kcenon::monitoring::metric_not_found, and mutex_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ has_rule()

bool kcenon::monitoring::aggregation_processor::has_rule ( const std::string & metric_name) const
inline

Check if a metric has an aggregation rule.

Parameters
metric_nameThe metric name
Returns
true if rule exists

Definition at line 271 of file aggregation_processor.h.

271 {
272 std::shared_lock<std::shared_mutex> lock(mutex_);
273 return aggregators_.find(metric_name) != aggregators_.end();
274 }

References aggregators_, and mutex_.

◆ process_observation()

common::VoidResult kcenon::monitoring::aggregation_processor::process_observation ( const std::string & metric_name,
double value )
inline

Process an observation for a metric.

Parameters
metric_nameThe metric name
valueThe observed value
Returns
Result indicating success or failure

Definition at line 144 of file aggregation_processor.h.

144 {
145 std::shared_lock<std::shared_mutex> lock(mutex_);
146
147 auto it = aggregators_.find(metric_name);
148 if (it == aggregators_.end()) {
149 // No rule for this metric, silently accept
150 return common::ok();
151 }
152
153 return it->second.aggregator->add_observation(value);
154 }

References aggregators_, and mutex_.

Referenced by TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ remove_aggregation_rule()

common::VoidResult kcenon::monitoring::aggregation_processor::remove_aggregation_rule ( const std::string & metric_name)
inline

Remove an aggregation rule.

Parameters
metric_nameThe metric name
Returns
Result indicating success or failure

Definition at line 253 of file aggregation_processor.h.

253 {
254 std::unique_lock<std::shared_mutex> lock(mutex_);
255
256 auto it = aggregators_.find(metric_name);
257 if (it == aggregators_.end()) {
258 return common::VoidResult::err(error_info(monitoring_error_code::metric_not_found,
259 "No aggregator found for metric: " + metric_name).to_common_error());
260 }
261
262 aggregators_.erase(it);
263 return common::ok();
264 }

References aggregators_, kcenon::monitoring::metric_not_found, and mutex_.

◆ rule_count()

size_t kcenon::monitoring::aggregation_processor::rule_count ( ) const
inline

Get the number of configured rules.

Definition at line 279 of file aggregation_processor.h.

279 {
280 std::shared_lock<std::shared_mutex> lock(mutex_);
281 return aggregators_.size();
282 }

References aggregators_, and mutex_.

Member Data Documentation

◆ aggregators_

std::unordered_map<std::string, aggregator_entry> kcenon::monitoring::aggregation_processor::aggregators_
private

◆ mutex_

std::shared_mutex kcenon::monitoring::aggregation_processor::mutex_
mutableprivate

◆ storage_

std::shared_ptr<metric_storage> kcenon::monitoring::aggregation_processor::storage_
private

Definition at line 300 of file aggregation_processor.h.

Referenced by force_aggregation().


The documentation for this class was generated from the following file: