24#include <shared_mutex>
26#include <unordered_map>
50 "Source metric name cannot be empty").to_common_error());
54 "Target metric prefix cannot be empty").to_common_error());
58 "Aggregation interval must be positive").to_common_error());
108 if (validation.is_err()) {
112 std::unique_lock<std::shared_mutex> lock(
mutex_);
117 "Aggregation rule already exists for metric: " +
130 entry.
aggregator = std::make_unique<stream_aggregator>(config);
145 std::shared_lock<std::shared_mutex> lock(
mutex_);
153 return it->second.aggregator->add_observation(value);
162 std::shared_lock<std::shared_mutex> lock(
mutex_);
166 return common::make_error<streaming_statistics>(
168 "No aggregator found for metric: " + metric_name);
171 return common::ok(it->second.aggregator->get_statistics());
179 std::shared_lock<std::shared_mutex> lock(
mutex_);
181 std::vector<std::string> metrics;
185 metrics.push_back(name);
197 auto start_time = std::chrono::steady_clock::now();
199 std::unique_lock<std::shared_mutex> lock(
mutex_);
203 return common::make_error<stream_aggregation_result>(
205 "No aggregator found for metric: " + metric_name);
208 auto& entry = it->second;
209 auto stats = entry.aggregator->get_statistics();
211 auto end_time = std::chrono::steady_clock::now();
212 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
213 end_time - start_time);
220 agg_result.
timestamp = std::chrono::system_clock::now();
224 const auto& prefix = entry.rule.target_metric_prefix;
230 storage_->store_metric(prefix +
".count",
233 for (
const auto& [p, value] : stats.percentiles) {
234 std::string suffix =
".p" + std::to_string(
static_cast<int>(p * 100));
242 entry.aggregator->reset();
243 entry.last_aggregation = std::chrono::system_clock::now();
245 return common::ok(agg_result);
254 std::unique_lock<std::shared_mutex> lock(
mutex_);
259 "No aggregator found for metric: " + metric_name).to_common_error());
271 bool has_rule(
const std::string& metric_name)
const {
272 std::shared_lock<std::shared_mutex> lock(
mutex_);
280 std::shared_lock<std::shared_mutex> lock(
mutex_);
288 std::unique_lock<std::shared_mutex> lock(
mutex_);
309 std::vector<aggregation_rule> rules;
320 rules.push_back(rule);
332 rules.push_back(rule);
345 rules.push_back(rule);
357 rules.push_back(rule);
369 rules.push_back(rule);
Processes metric streams and generates aggregated statistics.
std::shared_ptr< metric_storage > storage_
common::VoidResult add_aggregation_rule(const aggregation_rule &rule)
Add an aggregation rule.
void clear()
Clear all aggregation rules.
common::VoidResult remove_aggregation_rule(const std::string &metric_name)
Remove an aggregation rule.
aggregation_processor()
Default constructor.
bool has_rule(const std::string &metric_name) const
Check if a metric has an aggregation rule.
common::Result< stream_aggregation_result > force_aggregation(const std::string &metric_name)
Force aggregation for a metric.
common::VoidResult process_observation(const std::string &metric_name, double value)
Process an observation for a metric.
aggregation_processor(std::shared_ptr< metric_storage > storage)
Constructor with storage backend.
common::Result< streaming_statistics > get_current_statistics(const std::string &metric_name) const
Get current statistics for a metric.
std::vector< std::string > get_configured_metrics() const
Get list of configured metrics.
size_t rule_count() const
Get the number of configured rules.
std::unordered_map< std::string, aggregator_entry > aggregators_
Monitoring system specific error codes.
Memory-efficient metric storage with ring buffer backend.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
std::vector< aggregation_rule > create_standard_aggregation_rules()
Create standard aggregation rules for common metrics.
@ storage
Storage device sensor.
Result pattern type definitions for monitoring system.
Streaming statistical aggregation for real-time metrics.
std::chrono::system_clock::time_point last_aggregation
std::unique_ptr< stream_aggregator > aggregator
Configuration for metric aggregation.
std::chrono::milliseconds aggregation_interval
Aggregation interval.
double outlier_threshold
Outlier detection threshold.
bool compute_rate
Compute rate of change.
std::string target_metric_prefix
Prefix for aggregated metrics.
std::string source_metric
Source metric name.
bool detect_outliers
Enable outlier detection.
common::VoidResult validate() const
Validate the aggregation rule.
std::vector< double > percentiles
Percentiles to compute.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Result of an aggregation operation.
streaming_statistics statistics
Computed statistics.
size_t samples_processed
Number of samples processed.
std::chrono::system_clock::time_point timestamp
Aggregation timestamp.
bool stored_successfully
Whether results were stored.
std::chrono::microseconds processing_duration
Processing duration.
std::string source_metric
Source metric name.
Configuration for stream aggregator.
std::vector< double > percentiles_to_track
bool enable_outlier_detection
Statistical summary from streaming computation.