Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_stream_aggregation.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, šŸ€ā˜€šŸŒ•šŸŒ„ 🌊
3// See the LICENSE file in the project root for full license information.
4
5#include <gtest/gtest.h>
9#include <chrono>
10#include <thread>
11#include <vector>
12#include <random>
13#include <cmath>
14
15using namespace kcenon::monitoring;
16
20class StreamAggregationTest : public ::testing::Test {
21protected:
22 void SetUp() override {
23 // Common setup for tests
24 }
25
26 void TearDown() override {
27 // Common cleanup for tests
28 }
29
33 std::vector<double> generate_normal_samples(size_t count, double mean = 0.0, double stddev = 1.0) {
34 std::random_device rd;
35 std::mt19937 gen(rd());
36 std::normal_distribution<> dis(mean, stddev);
37
38 std::vector<double> samples;
39 samples.reserve(count);
40
41 for (size_t i = 0; i < count; ++i) {
42 samples.push_back(dis(gen));
43 }
44
45 return samples;
46 }
47
51 std::vector<double> generate_uniform_samples(size_t count, double min = 0.0, double max = 1.0) {
52 std::random_device rd;
53 std::mt19937 gen(rd());
54 std::uniform_real_distribution<> dis(min, max);
55
56 std::vector<double> samples;
57 samples.reserve(count);
58
59 for (size_t i = 0; i < count; ++i) {
60 samples.push_back(dis(gen));
61 }
62
63 return samples;
64 }
65};
66
67// Online Statistics Tests
68TEST_F(StreamAggregationTest, OnlineStatisticsBasic) {
70
71 EXPECT_EQ(stats.count(), 0);
72 EXPECT_EQ(stats.mean(), 0.0);
73 EXPECT_EQ(stats.variance(), 0.0);
74
75 // Add some values
76 std::vector<double> values = {1.0, 2.0, 3.0, 4.0, 5.0};
77 for (double value : values) {
78 stats.add_value(value);
79 }
80
81 EXPECT_EQ(stats.count(), 5);
82 EXPECT_DOUBLE_EQ(stats.mean(), 3.0); // (1+2+3+4+5)/5 = 3
83
84 auto full_stats = stats.get_statistics();
85 EXPECT_EQ(full_stats.count, 5);
86 EXPECT_DOUBLE_EQ(full_stats.mean, 3.0);
87 EXPECT_DOUBLE_EQ(full_stats.min_value, 1.0);
88 EXPECT_DOUBLE_EQ(full_stats.max_value, 5.0);
89 EXPECT_GT(full_stats.variance, 0.0);
90 EXPECT_GT(full_stats.std_deviation, 0.0);
91}
92
93TEST_F(StreamAggregationTest, OnlineStatisticsLargeDataset) {
95
96 // Generate a large dataset with known properties
97 auto samples = generate_normal_samples(10000, 100.0, 15.0);
98
99 for (double sample : samples) {
100 stats.add_value(sample);
101 }
102
103 auto full_stats = stats.get_statistics();
104
105 // Check that the statistics are close to the expected values
106 EXPECT_NEAR(full_stats.mean, 100.0, 1.0); // Should be close to 100
107 EXPECT_NEAR(full_stats.std_deviation, 15.0, 1.0); // Should be close to 15
108 EXPECT_EQ(full_stats.count, 10000);
109}
110
111// Quantile Estimator Tests
112TEST_F(StreamAggregationTest, QuantileEstimatorMedian) {
113 quantile_estimator median_estimator(0.5);
114
115 // Add values 1 through 100
116 for (int i = 1; i <= 100; ++i) {
117 median_estimator.add_observation(static_cast<double>(i));
118 }
119
120 double estimated_median = median_estimator.get_quantile();
121
122 // For 1-100, median should be around 50.5
123 EXPECT_NEAR(estimated_median, 50.5, 5.0); // Allow some tolerance for P² algorithm
124}
125
126TEST_F(StreamAggregationTest, QuantileEstimatorPercentiles) {
127 quantile_estimator p95_estimator(0.95);
128
129 // Add uniform samples 0-100
130 auto samples = generate_uniform_samples(1000, 0.0, 100.0);
131
132 for (double sample : samples) {
133 p95_estimator.add_observation(sample);
134 }
135
136 double p95 = p95_estimator.get_quantile();
137
138 // 95th percentile should be around 95
139 EXPECT_NEAR(p95, 95.0, 10.0); // P² algorithm has some approximation error
140}
141
142// Moving Window Aggregator Tests
143TEST_F(StreamAggregationTest, MovingWindowBasic) {
144 moving_window_aggregator<double> window(std::chrono::milliseconds(1000), 100);
145
146 auto now = std::chrono::system_clock::now();
147
148 // Add values
149 for (int i = 0; i < 10; ++i) {
150 window.add_value(static_cast<double>(i), now + std::chrono::milliseconds(i * 10));
151 }
152
153 EXPECT_EQ(window.size(), 10);
154
155 auto values = window.get_values();
156 EXPECT_EQ(values.size(), 10);
157
158 // Check values are correct
159 for (size_t i = 0; i < values.size(); ++i) {
160 EXPECT_EQ(values[i], static_cast<double>(i));
161 }
162}
163
164TEST_F(StreamAggregationTest, MovingWindowExpiration) {
165 moving_window_aggregator<double> window(std::chrono::milliseconds(100), 1000);
166
167 auto now = std::chrono::system_clock::now();
168
169 // Add old values (should expire)
170 for (int i = 0; i < 5; ++i) {
171 window.add_value(static_cast<double>(i), now - std::chrono::milliseconds(200));
172 }
173
174 // Add new values (should remain)
175 for (int i = 10; i < 15; ++i) {
176 window.add_value(static_cast<double>(i), now);
177 }
178
179 auto values = window.get_values();
180
181 // Should only have the new values
182 EXPECT_EQ(values.size(), 5);
183 for (size_t i = 0; i < values.size(); ++i) {
184 EXPECT_EQ(values[i], static_cast<double>(i + 10));
185 }
186}
187
188// Stream Aggregator Tests
189TEST_F(StreamAggregationTest, StreamAggregatorBasic) {
191 config.window_size = 1000;
192 config.enable_outlier_detection = false; // Disable for predictable testing
193
194 stream_aggregator aggregator(config);
195
196 // Add observations
197 std::vector<double> values = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
198 for (double value : values) {
199 auto result = aggregator.add_observation(value);
200 EXPECT_TRUE(result.is_ok());
201 }
202
203 auto stats = aggregator.get_statistics();
204
205 EXPECT_EQ(stats.count, 10);
206 EXPECT_DOUBLE_EQ(stats.mean, 5.5);
207 EXPECT_DOUBLE_EQ(stats.min_value, 1.0);
208 EXPECT_DOUBLE_EQ(stats.max_value, 10.0);
209 EXPECT_GT(stats.std_deviation, 0.0);
210
211 // Check percentiles
212 EXPECT_GT(stats.percentiles.size(), 0);
213}
214
215TEST_F(StreamAggregationTest, StreamAggregatorPercentiles) {
216 stream_aggregator aggregator;
217
218 // Add 100 values from 1 to 100
219 for (int i = 1; i <= 100; ++i) {
220 aggregator.add_observation(static_cast<double>(i));
221 }
222
223 // Get specific percentiles
224 auto p50_result = aggregator.get_percentile(0.5);
225 auto p95_result = aggregator.get_percentile(0.95);
226 auto p99_result = aggregator.get_percentile(0.99);
227
228 EXPECT_TRUE(p50_result);
229 EXPECT_TRUE(p95_result);
230 EXPECT_TRUE(p99_result);
231
232 // Check approximate values
233 EXPECT_NEAR(p50_result.value(), 50.0, 10.0);
234 EXPECT_NEAR(p95_result.value(), 95.0, 10.0);
235 EXPECT_NEAR(p99_result.value(), 99.0, 10.0);
236}
237
238TEST_F(StreamAggregationTest, StreamAggregatorOutlierDetection) {
240 config.enable_outlier_detection = true;
241 config.outlier_threshold = 2.0; // 2 standard deviations
242
243 stream_aggregator aggregator(config);
244
245 // Add normal values around 50
246 for (int i = 45; i <= 55; ++i) {
247 aggregator.add_observation(static_cast<double>(i));
248 }
249
250 // Add outliers
251 aggregator.add_observation(100.0); // Should be detected as outlier
252 aggregator.add_observation(0.0); // Should be detected as outlier
253
254 auto stats = aggregator.get_statistics();
255
256 EXPECT_GT(stats.outlier_count, 0);
257 EXPECT_GT(stats.outliers.size(), 0);
258}
259
260TEST_F(StreamAggregationTest, StreamAggregatorReset) {
261 stream_aggregator aggregator;
262
263 // Add some observations
264 for (int i = 1; i <= 10; ++i) {
265 aggregator.add_observation(static_cast<double>(i));
266 }
267
268 EXPECT_EQ(aggregator.count(), 10);
269
270 // Reset
271 aggregator.reset();
272
273 EXPECT_EQ(aggregator.count(), 0);
274 EXPECT_EQ(aggregator.mean(), 0.0);
275 EXPECT_EQ(aggregator.variance(), 0.0);
276}
277
278// Aggregation Processor Tests
279TEST_F(StreamAggregationTest, AggregationProcessorBasic) {
280 auto storage = std::make_shared<metric_storage>();
282
283 // Create aggregation rule
284 aggregation_rule rule;
285 rule.source_metric = "test_metric";
286 rule.target_metric_prefix = "test_metric_stats";
287 rule.aggregation_interval = std::chrono::milliseconds(1000);
288 rule.percentiles = {0.5, 0.95};
289 rule.compute_rate = true;
290 rule.detect_outliers = false;
291
292 auto add_result = processor.add_aggregation_rule(rule);
293 EXPECT_TRUE(add_result.is_ok());
294
295 // Add observations
296 for (int i = 1; i <= 100; ++i) {
297 auto result = processor.process_observation("test_metric", static_cast<double>(i));
298 EXPECT_TRUE(result.is_ok());
299 }
300
301 // Get current statistics
302 auto stats_result = processor.get_current_statistics("test_metric");
303 EXPECT_TRUE(stats_result.is_ok());
304
305 auto stats = stats_result.value();
306 EXPECT_EQ(stats.count, 100);
307 EXPECT_GT(stats.mean, 0.0);
308}
309
310TEST_F(StreamAggregationTest, AggregationProcessorMultipleMetrics) {
311 aggregation_processor processor;
312
313 // Add rules for multiple metrics
314 std::vector<std::string> metric_names = {"cpu_usage", "memory_usage", "network_io"};
315
316 for (const auto& metric_name : metric_names) {
317 aggregation_rule rule;
318 rule.source_metric = metric_name;
319 rule.target_metric_prefix = metric_name + "_stats";
320 rule.aggregation_interval = std::chrono::milliseconds(500);
321
322 auto result = processor.add_aggregation_rule(rule);
323 EXPECT_TRUE(result.is_ok());
324 }
325
326 // Add observations to each metric
327 for (const auto& metric_name : metric_names) {
328 for (int i = 1; i <= 50; ++i) {
329 auto result = processor.process_observation(metric_name, static_cast<double>(i));
330 EXPECT_TRUE(result.is_ok());
331 }
332 }
333
334 // Check configured metrics
335 auto configured = processor.get_configured_metrics();
336 EXPECT_EQ(configured.size(), 3);
337
338 for (const auto& metric_name : metric_names) {
339 EXPECT_NE(std::find(configured.begin(), configured.end(), metric_name), configured.end());
340 }
341}
342
343TEST_F(StreamAggregationTest, AggregationProcessorForceAggregation) {
344 auto storage = std::make_shared<metric_storage>();
346
347 aggregation_rule rule;
348 rule.source_metric = "response_time";
349 rule.target_metric_prefix = "response_time_agg";
350 rule.aggregation_interval = std::chrono::hours(1); // Long interval
351 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
352
353 processor.add_aggregation_rule(rule);
354
355 // Add observations
356 auto samples = generate_normal_samples(1000, 100.0, 20.0);
357 for (double sample : samples) {
358 processor.process_observation("response_time", sample);
359 }
360
361 // Force aggregation before interval
362 auto result = processor.force_aggregation("response_time");
363 EXPECT_TRUE(result.is_ok());
364
365 auto agg_result = result.value();
366 EXPECT_EQ(agg_result.source_metric, "response_time");
367 EXPECT_EQ(agg_result.samples_processed, 1000);
368 EXPECT_GE(agg_result.processing_duration.count(), 0); // May be 0 on fast systems
369
370 // Check that aggregated metrics were stored
371 storage->flush();
372 auto latest = storage->get_latest_value("response_time_agg.mean");
373 EXPECT_TRUE(latest.is_ok());
374 EXPECT_NEAR(latest.value(), 100.0, 10.0);
375}
376
377TEST_F(StreamAggregationTest, AggregationProcessorInvalidRule) {
378 aggregation_processor processor;
379
380 // Test invalid rule (empty source metric)
381 aggregation_rule invalid_rule;
382 invalid_rule.source_metric = ""; // Invalid
383 invalid_rule.target_metric_prefix = "test";
384
385 auto result = processor.add_aggregation_rule(invalid_rule);
386 EXPECT_FALSE(result.is_ok());
387
388 // Test duplicate rule
389 aggregation_rule valid_rule;
390 valid_rule.source_metric = "test_metric";
391 valid_rule.target_metric_prefix = "test_stats";
392
393 auto result1 = processor.add_aggregation_rule(valid_rule);
394 EXPECT_TRUE(result1.is_ok());
395
396 auto result2 = processor.add_aggregation_rule(valid_rule); // Duplicate
397 EXPECT_FALSE(result2.is_ok());
398}
399
400// Utility Function Tests
401TEST_F(StreamAggregationTest, PearsonCorrelation) {
402 // Perfect positive correlation
403 std::vector<double> x1 = {1, 2, 3, 4, 5};
404 std::vector<double> y1 = {2, 4, 6, 8, 10};
405
406 double corr1 = pearson_correlation(x1, y1);
407 EXPECT_NEAR(corr1, 1.0, 0.001);
408
409 // Perfect negative correlation
410 std::vector<double> x2 = {1, 2, 3, 4, 5};
411 std::vector<double> y2 = {5, 4, 3, 2, 1};
412
413 double corr2 = pearson_correlation(x2, y2);
414 EXPECT_NEAR(corr2, -1.0, 0.001);
415
416 // No correlation
417 std::vector<double> x3 = {1, 2, 3, 4, 5};
418 std::vector<double> y3 = {3, 3, 3, 3, 3}; // Constant
419
420 double corr3 = pearson_correlation(x3, y3);
421 EXPECT_NEAR(corr3, 0.0, 0.001);
422
423 // Different sizes (should return 0)
424 std::vector<double> x4 = {1, 2, 3};
425 std::vector<double> y4 = {1, 2, 3, 4, 5};
426
427 double corr4 = pearson_correlation(x4, y4);
428 EXPECT_EQ(corr4, 0.0);
429}
430
431TEST_F(StreamAggregationTest, StandardAggregationRules) {
433
434 EXPECT_GT(rules.size(), 0);
435
436 // Validate all rules
437 for (const auto& rule : rules) {
438 auto validation = rule.validate();
439 EXPECT_TRUE(validation.is_ok()) << "Rule validation failed for: " << rule.source_metric;
440 }
441
442 // Check that standard metrics are included
443 std::vector<std::string> expected_metrics = {"response_time", "request_count", "error_count"};
444
445 for (const auto& expected : expected_metrics) {
446 bool found = false;
447 for (const auto& rule : rules) {
448 if (rule.source_metric == expected) {
449 found = true;
450 break;
451 }
452 }
453 EXPECT_TRUE(found) << "Expected metric not found: " << expected;
454 }
455}
456
457// Configuration Validation Tests
458TEST_F(StreamAggregationTest, ConfigurationValidation) {
459 // Test invalid stream aggregator config
460 stream_aggregator_config invalid_config;
461 invalid_config.window_size = 0; // Invalid
462
463 auto validation = invalid_config.validate();
464 EXPECT_FALSE(validation.is_ok());
465
466 // Test valid config
467 stream_aggregator_config valid_config;
468 valid_config.window_size = 1000;
469 valid_config.window_duration = std::chrono::milliseconds(60000);
470
471 validation = valid_config.validate();
472 EXPECT_TRUE(validation.is_ok());
473
474 // Test invalid aggregation rule
475 aggregation_rule invalid_rule;
476 invalid_rule.source_metric = "test";
477 invalid_rule.target_metric_prefix = ""; // Invalid
478
479 validation = invalid_rule.validate();
480 EXPECT_FALSE(validation.is_ok());
481}
482
483// Thread Safety Tests
484TEST_F(StreamAggregationTest, StreamAggregatorThreadSafety) {
485 stream_aggregator aggregator;
486
487 const int num_threads = 4;
488 const int observations_per_thread = 1000;
489 std::vector<std::thread> threads;
490
491 // Launch multiple threads adding observations
492 for (int t = 0; t < num_threads; ++t) {
493 threads.emplace_back([&aggregator, t, observations_per_thread]() {
494 std::random_device rd;
495 std::mt19937 gen(rd());
496 std::uniform_real_distribution<> dis(0.0, 100.0);
497
498 for (int i = 0; i < observations_per_thread; ++i) {
499 double value = dis(gen);
500 aggregator.add_observation(value);
501
502 // Small delay to increase chance of contention
503 std::this_thread::sleep_for(std::chrono::microseconds(1));
504 }
505 });
506 }
507
508 // Wait for all threads to complete
509 for (auto& thread : threads) {
510 thread.join();
511 }
512
513 // Verify we processed all observations
514 EXPECT_EQ(aggregator.count(), num_threads * observations_per_thread);
515
516 auto stats = aggregator.get_statistics();
517 EXPECT_GT(stats.mean, 0.0);
518 EXPECT_GT(stats.std_deviation, 0.0);
519}
Metric aggregation processing pipeline.
Test suite for Phase 3 P2: Statistical aggregation functions.
std::vector< double > generate_normal_samples(size_t count, double mean=0.0, double stddev=1.0)
Generate normal distribution samples.
std::vector< double > generate_uniform_samples(size_t count, double min=0.0, double max=1.0)
Generate uniform distribution samples.
Processes metric streams and generates aggregated statistics.
common::VoidResult add_aggregation_rule(const aggregation_rule &rule)
Add an aggregation rule.
common::Result< stream_aggregation_result > force_aggregation(const std::string &metric_name)
Force aggregation for a metric.
common::VoidResult process_observation(const std::string &metric_name, double value)
Process an observation for a metric.
common::Result< streaming_statistics > get_current_statistics(const std::string &metric_name) const
Get current statistics for a metric.
std::vector< std::string > get_configured_metrics() const
Get list of configured metrics.
void add_value(const T &value, time_point timestamp)
Add a value with timestamp.
std::vector< T > get_values() const
Get all values in the window.
Welford's algorithm for computing streaming statistics.
double variance() const
Get running variance (sample variance)
void add_value(double value)
Add a value to the statistics.
double mean() const
Get running mean.
streaming_statistics get_statistics() const
Get full statistics.
size_t count() const
Get sample count.
P² algorithm for streaming quantile estimation.
void add_observation(double x)
Add an observation.
double get_quantile() const
Get the estimated quantile.
Full-featured streaming aggregation.
streaming_statistics get_statistics() const
Get full statistics.
common::VoidResult add_observation(double value)
Add an observation.
double variance() const
Get variance.
std::optional< double > get_percentile(double p) const
Get specific percentile.
size_t count() const
Get observation count.
Memory-efficient metric storage with ring buffer backend.
double pearson_correlation(const std::vector< double > &x, const std::vector< double > &y)
Calculate Pearson correlation coefficient.
std::vector< aggregation_rule > create_standard_aggregation_rules()
Create standard aggregation rules for common metrics.
@ storage
Storage device sensor.
Streaming statistical aggregation for real-time metrics.
Configuration for metric aggregation.
std::chrono::milliseconds aggregation_interval
Aggregation interval.
bool compute_rate
Compute rate of change.
std::string target_metric_prefix
Prefix for aggregated metrics.
std::string source_metric
Source metric name.
bool detect_outliers
Enable outlier detection.
common::VoidResult validate() const
Validate the aggregation rule.
std::vector< double > percentiles
Percentiles to compute.
Configuration for stream aggregator.
common::VoidResult validate() const
Validate configuration.
TEST_F(StreamAggregationTest, OnlineStatisticsBasic)