Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_stream_aggregation.cpp File Reference
#include <gtest/gtest.h>
#include <kcenon/monitoring/utils/stream_aggregator.h>
#include <kcenon/monitoring/utils/aggregation_processor.h>
#include <kcenon/monitoring/utils/metric_storage.h>
#include <chrono>
#include <thread>
#include <vector>
#include <random>
#include <cmath>
Include dependency graph for test_stream_aggregation.cpp:

Go to the source code of this file.

Classes

class  StreamAggregationTest
 Test suite for Phase 3 P2: Statistical aggregation functions. More...
 

Functions

 TEST_F (StreamAggregationTest, OnlineStatisticsBasic)
 
 TEST_F (StreamAggregationTest, OnlineStatisticsLargeDataset)
 
 TEST_F (StreamAggregationTest, QuantileEstimatorMedian)
 
 TEST_F (StreamAggregationTest, QuantileEstimatorPercentiles)
 
 TEST_F (StreamAggregationTest, MovingWindowBasic)
 
 TEST_F (StreamAggregationTest, MovingWindowExpiration)
 
 TEST_F (StreamAggregationTest, StreamAggregatorBasic)
 
 TEST_F (StreamAggregationTest, StreamAggregatorPercentiles)
 
 TEST_F (StreamAggregationTest, StreamAggregatorOutlierDetection)
 
 TEST_F (StreamAggregationTest, StreamAggregatorReset)
 
 TEST_F (StreamAggregationTest, AggregationProcessorBasic)
 
 TEST_F (StreamAggregationTest, AggregationProcessorMultipleMetrics)
 
 TEST_F (StreamAggregationTest, AggregationProcessorForceAggregation)
 
 TEST_F (StreamAggregationTest, AggregationProcessorInvalidRule)
 
 TEST_F (StreamAggregationTest, PearsonCorrelation)
 
 TEST_F (StreamAggregationTest, StandardAggregationRules)
 
 TEST_F (StreamAggregationTest, ConfigurationValidation)
 
 TEST_F (StreamAggregationTest, StreamAggregatorThreadSafety)
 

Function Documentation

◆ TEST_F() [1/18]

TEST_F ( StreamAggregationTest ,
AggregationProcessorBasic  )

Definition at line 279 of file test_stream_aggregation.cpp.

279 {
280 auto storage = std::make_shared<metric_storage>();
282
283 // Create aggregation rule
284 aggregation_rule rule;
285 rule.source_metric = "test_metric";
286 rule.target_metric_prefix = "test_metric_stats";
287 rule.aggregation_interval = std::chrono::milliseconds(1000);
288 rule.percentiles = {0.5, 0.95};
289 rule.compute_rate = true;
290 rule.detect_outliers = false;
291
292 auto add_result = processor.add_aggregation_rule(rule);
293 EXPECT_TRUE(add_result.is_ok());
294
295 // Add observations
296 for (int i = 1; i <= 100; ++i) {
297 auto result = processor.process_observation("test_metric", static_cast<double>(i));
298 EXPECT_TRUE(result.is_ok());
299 }
300
301 // Get current statistics
302 auto stats_result = processor.get_current_statistics("test_metric");
303 EXPECT_TRUE(stats_result.is_ok());
304
305 auto stats = stats_result.value();
306 EXPECT_EQ(stats.count, 100);
307 EXPECT_GT(stats.mean, 0.0);
308}
Processes metric streams and generates aggregated statistics.
@ storage
Storage device sensor.
Configuration for metric aggregation.
std::chrono::milliseconds aggregation_interval
Aggregation interval.
bool compute_rate
Compute rate of change.
std::string target_metric_prefix
Prefix for aggregated metrics.
std::string source_metric
Source metric name.
bool detect_outliers
Enable outlier detection.
std::vector< double > percentiles
Percentiles to compute.

References kcenon::monitoring::aggregation_processor::add_aggregation_rule(), kcenon::monitoring::aggregation_rule::aggregation_interval, kcenon::monitoring::aggregation_rule::compute_rate, kcenon::monitoring::aggregation_rule::detect_outliers, kcenon::monitoring::aggregation_processor::get_current_statistics(), kcenon::monitoring::aggregation_rule::percentiles, kcenon::monitoring::aggregation_processor::process_observation(), kcenon::monitoring::aggregation_rule::source_metric, kcenon::monitoring::storage, and kcenon::monitoring::aggregation_rule::target_metric_prefix.

Here is the call graph for this function:

◆ TEST_F() [2/18]

TEST_F ( StreamAggregationTest ,
AggregationProcessorForceAggregation  )

Definition at line 343 of file test_stream_aggregation.cpp.

343 {
344 auto storage = std::make_shared<metric_storage>();
346
347 aggregation_rule rule;
348 rule.source_metric = "response_time";
349 rule.target_metric_prefix = "response_time_agg";
350 rule.aggregation_interval = std::chrono::hours(1); // Long interval
351 rule.percentiles = {0.5, 0.9, 0.95, 0.99};
352
353 processor.add_aggregation_rule(rule);
354
355 // Add observations
356 auto samples = generate_normal_samples(1000, 100.0, 20.0);
357 for (double sample : samples) {
358 processor.process_observation("response_time", sample);
359 }
360
361 // Force aggregation before interval
362 auto result = processor.force_aggregation("response_time");
363 EXPECT_TRUE(result.is_ok());
364
365 auto agg_result = result.value();
366 EXPECT_EQ(agg_result.source_metric, "response_time");
367 EXPECT_EQ(agg_result.samples_processed, 1000);
368 EXPECT_GE(agg_result.processing_duration.count(), 0); // May be 0 on fast systems
369
370 // Check that aggregated metrics were stored
371 storage->flush();
372 auto latest = storage->get_latest_value("response_time_agg.mean");
373 EXPECT_TRUE(latest.is_ok());
374 EXPECT_NEAR(latest.value(), 100.0, 10.0);
375}

References kcenon::monitoring::aggregation_processor::add_aggregation_rule(), kcenon::monitoring::aggregation_rule::aggregation_interval, kcenon::monitoring::aggregation_processor::force_aggregation(), kcenon::monitoring::aggregation_rule::percentiles, kcenon::monitoring::aggregation_processor::process_observation(), kcenon::monitoring::aggregation_rule::source_metric, kcenon::monitoring::storage, and kcenon::monitoring::aggregation_rule::target_metric_prefix.

Here is the call graph for this function:

◆ TEST_F() [3/18]

TEST_F ( StreamAggregationTest ,
AggregationProcessorInvalidRule  )

Definition at line 377 of file test_stream_aggregation.cpp.

377 {
378 aggregation_processor processor;
379
380 // Test invalid rule (empty source metric)
381 aggregation_rule invalid_rule;
382 invalid_rule.source_metric = ""; // Invalid
383 invalid_rule.target_metric_prefix = "test";
384
385 auto result = processor.add_aggregation_rule(invalid_rule);
386 EXPECT_FALSE(result.is_ok());
387
388 // Test duplicate rule
389 aggregation_rule valid_rule;
390 valid_rule.source_metric = "test_metric";
391 valid_rule.target_metric_prefix = "test_stats";
392
393 auto result1 = processor.add_aggregation_rule(valid_rule);
394 EXPECT_TRUE(result1.is_ok());
395
396 auto result2 = processor.add_aggregation_rule(valid_rule); // Duplicate
397 EXPECT_FALSE(result2.is_ok());
398}
common::VoidResult add_aggregation_rule(const aggregation_rule &rule)
Add an aggregation rule.

References kcenon::monitoring::aggregation_processor::add_aggregation_rule(), kcenon::monitoring::aggregation_rule::source_metric, and kcenon::monitoring::aggregation_rule::target_metric_prefix.

Here is the call graph for this function:

◆ TEST_F() [4/18]

TEST_F ( StreamAggregationTest ,
AggregationProcessorMultipleMetrics  )

Definition at line 310 of file test_stream_aggregation.cpp.

310 {
311 aggregation_processor processor;
312
313 // Add rules for multiple metrics
314 std::vector<std::string> metric_names = {"cpu_usage", "memory_usage", "network_io"};
315
316 for (const auto& metric_name : metric_names) {
317 aggregation_rule rule;
318 rule.source_metric = metric_name;
319 rule.target_metric_prefix = metric_name + "_stats";
320 rule.aggregation_interval = std::chrono::milliseconds(500);
321
322 auto result = processor.add_aggregation_rule(rule);
323 EXPECT_TRUE(result.is_ok());
324 }
325
326 // Add observations to each metric
327 for (const auto& metric_name : metric_names) {
328 for (int i = 1; i <= 50; ++i) {
329 auto result = processor.process_observation(metric_name, static_cast<double>(i));
330 EXPECT_TRUE(result.is_ok());
331 }
332 }
333
334 // Check configured metrics
335 auto configured = processor.get_configured_metrics();
336 EXPECT_EQ(configured.size(), 3);
337
338 for (const auto& metric_name : metric_names) {
339 EXPECT_NE(std::find(configured.begin(), configured.end(), metric_name), configured.end());
340 }
341}
common::VoidResult process_observation(const std::string &metric_name, double value)
Process an observation for a metric.
std::vector< std::string > get_configured_metrics() const
Get list of configured metrics.

References kcenon::monitoring::aggregation_processor::add_aggregation_rule(), kcenon::monitoring::aggregation_rule::aggregation_interval, kcenon::monitoring::aggregation_processor::get_configured_metrics(), kcenon::monitoring::aggregation_processor::process_observation(), kcenon::monitoring::aggregation_rule::source_metric, and kcenon::monitoring::aggregation_rule::target_metric_prefix.

Here is the call graph for this function:

◆ TEST_F() [5/18]

TEST_F ( StreamAggregationTest ,
ConfigurationValidation  )

Definition at line 458 of file test_stream_aggregation.cpp.

458 {
459 // Test invalid stream aggregator config
460 stream_aggregator_config invalid_config;
461 invalid_config.window_size = 0; // Invalid
462
463 auto validation = invalid_config.validate();
464 EXPECT_FALSE(validation.is_ok());
465
466 // Test valid config
467 stream_aggregator_config valid_config;
468 valid_config.window_size = 1000;
469 valid_config.window_duration = std::chrono::milliseconds(60000);
470
471 validation = valid_config.validate();
472 EXPECT_TRUE(validation.is_ok());
473
474 // Test invalid aggregation rule
475 aggregation_rule invalid_rule;
476 invalid_rule.source_metric = "test";
477 invalid_rule.target_metric_prefix = ""; // Invalid
478
479 validation = invalid_rule.validate();
480 EXPECT_FALSE(validation.is_ok());
481}
common::VoidResult validate() const
Validate the aggregation rule.
Configuration for stream aggregator.
common::VoidResult validate() const
Validate configuration.

References kcenon::monitoring::aggregation_rule::source_metric, kcenon::monitoring::aggregation_rule::target_metric_prefix, kcenon::monitoring::aggregation_rule::validate(), kcenon::monitoring::stream_aggregator_config::validate(), kcenon::monitoring::stream_aggregator_config::window_duration, and kcenon::monitoring::stream_aggregator_config::window_size.

Here is the call graph for this function:

◆ TEST_F() [6/18]

TEST_F ( StreamAggregationTest ,
MovingWindowBasic  )

Definition at line 143 of file test_stream_aggregation.cpp.

143 {
144 moving_window_aggregator<double> window(std::chrono::milliseconds(1000), 100);
145
146 auto now = std::chrono::system_clock::now();
147
148 // Add values
149 for (int i = 0; i < 10; ++i) {
150 window.add_value(static_cast<double>(i), now + std::chrono::milliseconds(i * 10));
151 }
152
153 EXPECT_EQ(window.size(), 10);
154
155 auto values = window.get_values();
156 EXPECT_EQ(values.size(), 10);
157
158 // Check values are correct
159 for (size_t i = 0; i < values.size(); ++i) {
160 EXPECT_EQ(values[i], static_cast<double>(i));
161 }
162}

References kcenon::monitoring::moving_window_aggregator< T >::add_value(), kcenon::monitoring::moving_window_aggregator< T >::get_values(), and kcenon::monitoring::moving_window_aggregator< T >::size().

Here is the call graph for this function:

◆ TEST_F() [7/18]

TEST_F ( StreamAggregationTest ,
MovingWindowExpiration  )

Definition at line 164 of file test_stream_aggregation.cpp.

164 {
165 moving_window_aggregator<double> window(std::chrono::milliseconds(100), 1000);
166
167 auto now = std::chrono::system_clock::now();
168
169 // Add old values (should expire)
170 for (int i = 0; i < 5; ++i) {
171 window.add_value(static_cast<double>(i), now - std::chrono::milliseconds(200));
172 }
173
174 // Add new values (should remain)
175 for (int i = 10; i < 15; ++i) {
176 window.add_value(static_cast<double>(i), now);
177 }
178
179 auto values = window.get_values();
180
181 // Should only have the new values
182 EXPECT_EQ(values.size(), 5);
183 for (size_t i = 0; i < values.size(); ++i) {
184 EXPECT_EQ(values[i], static_cast<double>(i + 10));
185 }
186}

References kcenon::monitoring::moving_window_aggregator< T >::add_value(), and kcenon::monitoring::moving_window_aggregator< T >::get_values().

Here is the call graph for this function:

◆ TEST_F() [8/18]

TEST_F ( StreamAggregationTest ,
OnlineStatisticsBasic  )

Definition at line 68 of file test_stream_aggregation.cpp.

68 {
70
71 EXPECT_EQ(stats.count(), 0);
72 EXPECT_EQ(stats.mean(), 0.0);
73 EXPECT_EQ(stats.variance(), 0.0);
74
75 // Add some values
76 std::vector<double> values = {1.0, 2.0, 3.0, 4.0, 5.0};
77 for (double value : values) {
78 stats.add_value(value);
79 }
80
81 EXPECT_EQ(stats.count(), 5);
82 EXPECT_DOUBLE_EQ(stats.mean(), 3.0); // (1+2+3+4+5)/5 = 3
83
84 auto full_stats = stats.get_statistics();
85 EXPECT_EQ(full_stats.count, 5);
86 EXPECT_DOUBLE_EQ(full_stats.mean, 3.0);
87 EXPECT_DOUBLE_EQ(full_stats.min_value, 1.0);
88 EXPECT_DOUBLE_EQ(full_stats.max_value, 5.0);
89 EXPECT_GT(full_stats.variance, 0.0);
90 EXPECT_GT(full_stats.std_deviation, 0.0);
91}
Welford's algorithm for computing streaming statistics.
double variance() const
Get running variance (sample variance)
void add_value(double value)
Add a value to the statistics.
double mean() const
Get running mean.
streaming_statistics get_statistics() const
Get full statistics.
size_t count() const
Get sample count.

References kcenon::monitoring::online_statistics::add_value(), kcenon::monitoring::online_statistics::count(), kcenon::monitoring::online_statistics::get_statistics(), kcenon::monitoring::online_statistics::mean(), and kcenon::monitoring::online_statistics::variance().

Here is the call graph for this function:

◆ TEST_F() [9/18]

TEST_F ( StreamAggregationTest ,
OnlineStatisticsLargeDataset  )

Definition at line 93 of file test_stream_aggregation.cpp.

93 {
95
96 // Generate a large dataset with known properties
97 auto samples = generate_normal_samples(10000, 100.0, 15.0);
98
99 for (double sample : samples) {
100 stats.add_value(sample);
101 }
102
103 auto full_stats = stats.get_statistics();
104
105 // Check that the statistics are close to the expected values
106 EXPECT_NEAR(full_stats.mean, 100.0, 1.0); // Should be close to 100
107 EXPECT_NEAR(full_stats.std_deviation, 15.0, 1.0); // Should be close to 15
108 EXPECT_EQ(full_stats.count, 10000);
109}

References kcenon::monitoring::online_statistics::add_value(), and kcenon::monitoring::online_statistics::get_statistics().

Here is the call graph for this function:

◆ TEST_F() [10/18]

TEST_F ( StreamAggregationTest ,
PearsonCorrelation  )

Definition at line 401 of file test_stream_aggregation.cpp.

401 {
402 // Perfect positive correlation
403 std::vector<double> x1 = {1, 2, 3, 4, 5};
404 std::vector<double> y1 = {2, 4, 6, 8, 10};
405
406 double corr1 = pearson_correlation(x1, y1);
407 EXPECT_NEAR(corr1, 1.0, 0.001);
408
409 // Perfect negative correlation
410 std::vector<double> x2 = {1, 2, 3, 4, 5};
411 std::vector<double> y2 = {5, 4, 3, 2, 1};
412
413 double corr2 = pearson_correlation(x2, y2);
414 EXPECT_NEAR(corr2, -1.0, 0.001);
415
416 // No correlation
417 std::vector<double> x3 = {1, 2, 3, 4, 5};
418 std::vector<double> y3 = {3, 3, 3, 3, 3}; // Constant
419
420 double corr3 = pearson_correlation(x3, y3);
421 EXPECT_NEAR(corr3, 0.0, 0.001);
422
423 // Different sizes (should return 0)
424 std::vector<double> x4 = {1, 2, 3};
425 std::vector<double> y4 = {1, 2, 3, 4, 5};
426
427 double corr4 = pearson_correlation(x4, y4);
428 EXPECT_EQ(corr4, 0.0);
429}
double pearson_correlation(const std::vector< double > &x, const std::vector< double > &y)
Calculate Pearson correlation coefficient.

References kcenon::monitoring::pearson_correlation().

Here is the call graph for this function:

◆ TEST_F() [11/18]

TEST_F ( StreamAggregationTest ,
QuantileEstimatorMedian  )

Definition at line 112 of file test_stream_aggregation.cpp.

112 {
113 quantile_estimator median_estimator(0.5);
114
115 // Add values 1 through 100
116 for (int i = 1; i <= 100; ++i) {
117 median_estimator.add_observation(static_cast<double>(i));
118 }
119
120 double estimated_median = median_estimator.get_quantile();
121
122 // For 1-100, median should be around 50.5
123 EXPECT_NEAR(estimated_median, 50.5, 5.0); // Allow some tolerance for P² algorithm
124}
P² algorithm for streaming quantile estimation.

References kcenon::monitoring::quantile_estimator::add_observation(), and kcenon::monitoring::quantile_estimator::get_quantile().

Here is the call graph for this function:

◆ TEST_F() [12/18]

TEST_F ( StreamAggregationTest ,
QuantileEstimatorPercentiles  )

Definition at line 126 of file test_stream_aggregation.cpp.

126 {
127 quantile_estimator p95_estimator(0.95);
128
129 // Add uniform samples 0-100
130 auto samples = generate_uniform_samples(1000, 0.0, 100.0);
131
132 for (double sample : samples) {
133 p95_estimator.add_observation(sample);
134 }
135
136 double p95 = p95_estimator.get_quantile();
137
138 // 95th percentile should be around 95
139 EXPECT_NEAR(p95, 95.0, 10.0); // P² algorithm has some approximation error
140}

References kcenon::monitoring::quantile_estimator::add_observation(), and kcenon::monitoring::quantile_estimator::get_quantile().

Here is the call graph for this function:

◆ TEST_F() [13/18]

TEST_F ( StreamAggregationTest ,
StandardAggregationRules  )

Definition at line 431 of file test_stream_aggregation.cpp.

431 {
433
434 EXPECT_GT(rules.size(), 0);
435
436 // Validate all rules
437 for (const auto& rule : rules) {
438 auto validation = rule.validate();
439 EXPECT_TRUE(validation.is_ok()) << "Rule validation failed for: " << rule.source_metric;
440 }
441
442 // Check that standard metrics are included
443 std::vector<std::string> expected_metrics = {"response_time", "request_count", "error_count"};
444
445 for (const auto& expected : expected_metrics) {
446 bool found = false;
447 for (const auto& rule : rules) {
448 if (rule.source_metric == expected) {
449 found = true;
450 break;
451 }
452 }
453 EXPECT_TRUE(found) << "Expected metric not found: " << expected;
454 }
455}
std::vector< aggregation_rule > create_standard_aggregation_rules()
Create standard aggregation rules for common metrics.

References kcenon::monitoring::create_standard_aggregation_rules().

Here is the call graph for this function:

◆ TEST_F() [14/18]

TEST_F ( StreamAggregationTest ,
StreamAggregatorBasic  )

Definition at line 189 of file test_stream_aggregation.cpp.

189 {
191 config.window_size = 1000;
192 config.enable_outlier_detection = false; // Disable for predictable testing
193
194 stream_aggregator aggregator(config);
195
196 // Add observations
197 std::vector<double> values = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
198 for (double value : values) {
199 auto result = aggregator.add_observation(value);
200 EXPECT_TRUE(result.is_ok());
201 }
202
203 auto stats = aggregator.get_statistics();
204
205 EXPECT_EQ(stats.count, 10);
206 EXPECT_DOUBLE_EQ(stats.mean, 5.5);
207 EXPECT_DOUBLE_EQ(stats.min_value, 1.0);
208 EXPECT_DOUBLE_EQ(stats.max_value, 10.0);
209 EXPECT_GT(stats.std_deviation, 0.0);
210
211 // Check percentiles
212 EXPECT_GT(stats.percentiles.size(), 0);
213}
Full-featured streaming aggregation.

References kcenon::monitoring::stream_aggregator::add_observation(), kcenon::monitoring::stream_aggregator_config::enable_outlier_detection, kcenon::monitoring::stream_aggregator::get_statistics(), and kcenon::monitoring::stream_aggregator_config::window_size.

Here is the call graph for this function:

◆ TEST_F() [15/18]

TEST_F ( StreamAggregationTest ,
StreamAggregatorOutlierDetection  )

Definition at line 238 of file test_stream_aggregation.cpp.

238 {
240 config.enable_outlier_detection = true;
241 config.outlier_threshold = 2.0; // 2 standard deviations
242
243 stream_aggregator aggregator(config);
244
245 // Add normal values around 50
246 for (int i = 45; i <= 55; ++i) {
247 aggregator.add_observation(static_cast<double>(i));
248 }
249
250 // Add outliers
251 aggregator.add_observation(100.0); // Should be detected as outlier
252 aggregator.add_observation(0.0); // Should be detected as outlier
253
254 auto stats = aggregator.get_statistics();
255
256 EXPECT_GT(stats.outlier_count, 0);
257 EXPECT_GT(stats.outliers.size(), 0);
258}

References kcenon::monitoring::stream_aggregator::add_observation(), kcenon::monitoring::stream_aggregator_config::enable_outlier_detection, kcenon::monitoring::stream_aggregator::get_statistics(), and kcenon::monitoring::stream_aggregator_config::outlier_threshold.

Here is the call graph for this function:

◆ TEST_F() [16/18]

TEST_F ( StreamAggregationTest ,
StreamAggregatorPercentiles  )

Definition at line 215 of file test_stream_aggregation.cpp.

215 {
216 stream_aggregator aggregator;
217
218 // Add 100 values from 1 to 100
219 for (int i = 1; i <= 100; ++i) {
220 aggregator.add_observation(static_cast<double>(i));
221 }
222
223 // Get specific percentiles
224 auto p50_result = aggregator.get_percentile(0.5);
225 auto p95_result = aggregator.get_percentile(0.95);
226 auto p99_result = aggregator.get_percentile(0.99);
227
228 EXPECT_TRUE(p50_result);
229 EXPECT_TRUE(p95_result);
230 EXPECT_TRUE(p99_result);
231
232 // Check approximate values
233 EXPECT_NEAR(p50_result.value(), 50.0, 10.0);
234 EXPECT_NEAR(p95_result.value(), 95.0, 10.0);
235 EXPECT_NEAR(p99_result.value(), 99.0, 10.0);
236}
common::VoidResult add_observation(double value)
Add an observation.
std::optional< double > get_percentile(double p) const
Get specific percentile.

References kcenon::monitoring::stream_aggregator::add_observation(), and kcenon::monitoring::stream_aggregator::get_percentile().

Here is the call graph for this function:

◆ TEST_F() [17/18]

TEST_F ( StreamAggregationTest ,
StreamAggregatorReset  )

Definition at line 260 of file test_stream_aggregation.cpp.

260 {
261 stream_aggregator aggregator;
262
263 // Add some observations
264 for (int i = 1; i <= 10; ++i) {
265 aggregator.add_observation(static_cast<double>(i));
266 }
267
268 EXPECT_EQ(aggregator.count(), 10);
269
270 // Reset
271 aggregator.reset();
272
273 EXPECT_EQ(aggregator.count(), 0);
274 EXPECT_EQ(aggregator.mean(), 0.0);
275 EXPECT_EQ(aggregator.variance(), 0.0);
276}
double variance() const
Get variance.
size_t count() const
Get observation count.

References kcenon::monitoring::stream_aggregator::add_observation(), kcenon::monitoring::stream_aggregator::count(), kcenon::monitoring::stream_aggregator::mean(), kcenon::monitoring::stream_aggregator::reset(), and kcenon::monitoring::stream_aggregator::variance().

Here is the call graph for this function:

◆ TEST_F() [18/18]

TEST_F ( StreamAggregationTest ,
StreamAggregatorThreadSafety  )

Definition at line 484 of file test_stream_aggregation.cpp.

484 {
485 stream_aggregator aggregator;
486
487 const int num_threads = 4;
488 const int observations_per_thread = 1000;
489 std::vector<std::thread> threads;
490
491 // Launch multiple threads adding observations
492 for (int t = 0; t < num_threads; ++t) {
493 threads.emplace_back([&aggregator, t, observations_per_thread]() {
494 std::random_device rd;
495 std::mt19937 gen(rd());
496 std::uniform_real_distribution<> dis(0.0, 100.0);
497
498 for (int i = 0; i < observations_per_thread; ++i) {
499 double value = dis(gen);
500 aggregator.add_observation(value);
501
502 // Small delay to increase chance of contention
503 std::this_thread::sleep_for(std::chrono::microseconds(1));
504 }
505 });
506 }
507
508 // Wait for all threads to complete
509 for (auto& thread : threads) {
510 thread.join();
511 }
512
513 // Verify we processed all observations
514 EXPECT_EQ(aggregator.count(), num_threads * observations_per_thread);
515
516 auto stats = aggregator.get_statistics();
517 EXPECT_GT(stats.mean, 0.0);
518 EXPECT_GT(stats.std_deviation, 0.0);
519}
streaming_statistics get_statistics() const
Get full statistics.

References kcenon::monitoring::stream_aggregator::add_observation(), kcenon::monitoring::stream_aggregator::count(), and kcenon::monitoring::stream_aggregator::get_statistics().

Here is the call graph for this function: