Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
stream_aggregator.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, šŸ€ā˜€šŸŒ•šŸŒ„ 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
19#include "../core/error_codes.h"
20
21#include <algorithm>
22#include <atomic>
23#include <chrono>
24#include <cmath>
25#include <deque>
26#include <map>
27#include <mutex>
28#include <optional>
29#include <shared_mutex>
30#include <vector>
31
32namespace kcenon::monitoring {
33
39 size_t count = 0;
40 double mean = 0.0;
41 double variance = 0.0;
42 double std_deviation = 0.0;
43 double min_value = 0.0;
44 double max_value = 0.0;
45 double sum = 0.0;
46 size_t outlier_count = 0;
47 std::vector<double> outliers;
48 std::map<double, double> percentiles;
49};
50
59public:
60 online_statistics() = default;
61
66 void add_value(double value) {
67 std::unique_lock<std::shared_mutex> lock(mutex_);
68 count_++;
69 sum_ += value;
70
71 double delta = value - mean_;
72 mean_ += delta / static_cast<double>(count_);
73 double delta2 = value - mean_;
74 m2_ += delta * delta2;
75
76 if (count_ == 1) {
77 min_value_ = value;
78 max_value_ = value;
79 } else {
80 min_value_ = std::min(min_value_, value);
81 max_value_ = std::max(max_value_, value);
82 }
83 }
84
88 size_t count() const {
89 std::shared_lock<std::shared_mutex> lock(mutex_);
90 return count_;
91 }
92
96 double mean() const {
97 std::shared_lock<std::shared_mutex> lock(mutex_);
98 return mean_;
99 }
100
104 double variance() const {
105 std::shared_lock<std::shared_mutex> lock(mutex_);
106 if (count_ < 2) {
107 return 0.0;
108 }
109 return m2_ / static_cast<double>(count_ - 1);
110 }
111
115 double stddev() const {
116 return std::sqrt(variance());
117 }
118
122 double min() const {
123 std::shared_lock<std::shared_mutex> lock(mutex_);
124 return min_value_;
125 }
126
130 double max() const {
131 std::shared_lock<std::shared_mutex> lock(mutex_);
132 return max_value_;
133 }
134
138 double sum() const {
139 std::shared_lock<std::shared_mutex> lock(mutex_);
140 return sum_;
141 }
142
147 std::shared_lock<std::shared_mutex> lock(mutex_);
149 stats.count = count_;
150 stats.mean = mean_;
151 stats.sum = sum_;
152 stats.min_value = min_value_;
153 stats.max_value = max_value_;
154
155 if (count_ >= 2) {
156 stats.variance = m2_ / static_cast<double>(count_ - 1);
157 stats.std_deviation = std::sqrt(stats.variance);
158 }
159
160 return stats;
161 }
162
166 void reset() {
167 std::unique_lock<std::shared_mutex> lock(mutex_);
168 count_ = 0;
169 mean_ = 0.0;
170 m2_ = 0.0;
171 sum_ = 0.0;
172 min_value_ = 0.0;
173 max_value_ = 0.0;
174 }
175
176private:
177 mutable std::shared_mutex mutex_;
178 size_t count_ = 0;
179 double mean_ = 0.0;
180 double m2_ = 0.0;
181 double sum_ = 0.0;
182 double min_value_ = 0.0;
183 double max_value_ = 0.0;
184};
185
194public:
199 explicit quantile_estimator(double p) : p_(p) {
200 init_markers();
201 }
202
203 // Enable move construction and assignment
205 : p_(other.p_)
206 , count_(other.count_) {
207 std::copy(std::begin(other.q_), std::end(other.q_), std::begin(q_));
208 std::copy(std::begin(other.n_), std::end(other.n_), std::begin(n_));
209 std::copy(std::begin(other.n_prime_), std::end(other.n_prime_), std::begin(n_prime_));
210 std::copy(std::begin(other.dn_), std::end(other.dn_), std::begin(dn_));
211 }
212
214 if (this != &other) {
215 p_ = other.p_;
216 count_ = other.count_;
217 std::copy(std::begin(other.q_), std::end(other.q_), std::begin(q_));
218 std::copy(std::begin(other.n_), std::end(other.n_), std::begin(n_));
219 std::copy(std::begin(other.n_prime_), std::end(other.n_prime_), std::begin(n_prime_));
220 std::copy(std::begin(other.dn_), std::end(other.dn_), std::begin(dn_));
221 }
222 return *this;
223 }
224
225 // Disable copy
228
229private:
231 // Initialize marker positions
232 n_[0] = 1;
233 n_[1] = 2;
234 n_[2] = 3;
235 n_[3] = 4;
236 n_[4] = 5;
237
238 // Initialize desired marker positions
239 n_prime_[0] = 1;
240 n_prime_[1] = 1 + 2 * p_;
241 n_prime_[2] = 1 + 4 * p_;
242 n_prime_[3] = 3 + 2 * p_;
243 n_prime_[4] = 5;
244
245 // Increments
246 dn_[0] = 0;
247 dn_[1] = p_ / 2;
248 dn_[2] = p_;
249 dn_[3] = (1 + p_) / 2;
250 dn_[4] = 1;
251 }
252
253public:
254
259 void add_observation(double x) {
260 std::unique_lock<std::shared_mutex> lock(mutex_);
261 count_++;
262
263 if (count_ <= 5) {
264 q_[count_ - 1] = x;
265 if (count_ == 5) {
266 std::sort(q_, q_ + 5);
267 }
268 return;
269 }
270
271 // Find cell k
272 int k;
273 if (x < q_[0]) {
274 q_[0] = x;
275 k = 0;
276 } else if (x >= q_[4]) {
277 q_[4] = x;
278 k = 3;
279 } else {
280 for (k = 0; k < 4; ++k) {
281 if (x < q_[k + 1]) {
282 break;
283 }
284 }
285 }
286
287 // Increment positions
288 for (int i = k + 1; i < 5; ++i) {
289 n_[i]++;
290 }
291
292 // Update desired positions
293 for (int i = 0; i < 5; ++i) {
294 n_prime_[i] += dn_[i];
295 }
296
297 // Adjust marker heights
298 for (int i = 1; i < 4; ++i) {
299 double d = n_prime_[i] - n_[i];
300 if ((d >= 1 && n_[i + 1] - n_[i] > 1) ||
301 (d <= -1 && n_[i - 1] - n_[i] < -1)) {
302 int sign = (d >= 0) ? 1 : -1;
303 double q_new = parabolic(i, sign);
304
305 if (q_[i - 1] < q_new && q_new < q_[i + 1]) {
306 q_[i] = q_new;
307 } else {
308 q_[i] = linear(i, sign);
309 }
310 n_[i] += sign;
311 }
312 }
313 }
314
319 double get_quantile() const {
320 std::shared_lock<std::shared_mutex> lock(mutex_);
321 if (count_ < 5) {
322 if (count_ == 0) {
323 return 0.0;
324 }
325 // For small samples, use simple interpolation
326 std::vector<double> sorted(q_, q_ + count_);
327 std::sort(sorted.begin(), sorted.end());
328 size_t idx = static_cast<size_t>(p_ * (count_ - 1));
329 return sorted[idx];
330 }
331 return q_[2]; // Middle marker for p-quantile
332 }
333
337 size_t count() const {
338 std::shared_lock<std::shared_mutex> lock(mutex_);
339 return count_;
340 }
341
345 void reset() {
346 std::unique_lock<std::shared_mutex> lock(mutex_);
347 count_ = 0;
348 n_[0] = 1; n_[1] = 2; n_[2] = 3; n_[3] = 4; n_[4] = 5;
349 n_prime_[0] = 1;
350 n_prime_[1] = 1 + 2 * p_;
351 n_prime_[2] = 1 + 4 * p_;
352 n_prime_[3] = 3 + 2 * p_;
353 n_prime_[4] = 5;
354 }
355
356private:
357 double parabolic(int i, int sign) const {
358 double qi = q_[i];
359 double qim1 = q_[i - 1];
360 double qip1 = q_[i + 1];
361 int ni = n_[i];
362 int nim1 = n_[i - 1];
363 int nip1 = n_[i + 1];
364
365 double term1 = static_cast<double>(sign) / (nip1 - nim1);
366 double term2 = (ni - nim1 + sign) * (qip1 - qi) / (nip1 - ni);
367 double term3 = (nip1 - ni - sign) * (qi - qim1) / (ni - nim1);
368
369 return qi + term1 * (term2 + term3);
370 }
371
372 double linear(int i, int sign) const {
373 int idx = (sign < 0) ? i - 1 : i + 1;
374 return q_[i] + static_cast<double>(sign) * (q_[idx] - q_[i]) /
375 (n_[idx] - n_[i]);
376 }
377
378 mutable std::shared_mutex mutex_;
379 double p_;
380 size_t count_ = 0;
381 double q_[5] = {0};
382 int n_[5] = {0};
383 double n_prime_[5] = {0};
384 double dn_[5] = {0};
385};
386
395template<typename T>
397public:
398 using time_point = std::chrono::system_clock::time_point;
399 using duration = std::chrono::system_clock::duration;
400
406 moving_window_aggregator(std::chrono::milliseconds window_duration,
407 size_t max_size)
408 : window_duration_(window_duration)
409 , max_size_(max_size) {}
410
416 void add_value(const T& value, time_point timestamp) {
417 std::unique_lock<std::shared_mutex> lock(mutex_);
418
419 // Remove expired entries
420 expire_old_entries(timestamp);
421
422 // Add new entry
423 if (entries_.size() >= max_size_) {
424 entries_.pop_front();
425 }
426 entries_.push_back({value, timestamp});
427 }
428
432 size_t size() const {
433 std::shared_lock<std::shared_mutex> lock(mutex_);
434 return entries_.size();
435 }
436
440 bool empty() const {
441 std::shared_lock<std::shared_mutex> lock(mutex_);
442 return entries_.empty();
443 }
444
448 std::vector<T> get_values() const {
449 std::shared_lock<std::shared_mutex> lock(mutex_);
450 std::vector<T> result;
451 result.reserve(entries_.size());
452 for (const auto& entry : entries_) {
453 result.push_back(entry.value);
454 }
455 return result;
456 }
457
461 void clear() {
462 std::unique_lock<std::shared_mutex> lock(mutex_);
463 entries_.clear();
464 }
465
466private:
471
473 auto cutoff = current - window_duration_;
474 while (!entries_.empty() && entries_.front().timestamp < cutoff) {
475 entries_.pop_front();
476 }
477 }
478
479 mutable std::shared_mutex mutex_;
480 std::chrono::milliseconds window_duration_;
481 size_t max_size_;
482 std::deque<entry> entries_;
483};
484
490 size_t window_size = 10000;
491 std::chrono::milliseconds window_duration{60000};
493 double outlier_threshold = 3.0; // Standard deviations
494 std::vector<double> percentiles_to_track = {0.5, 0.9, 0.95, 0.99};
495
499 common::VoidResult validate() const {
500 if (window_size == 0) {
501 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
502 "Window size must be positive").to_common_error());
503 }
504 if (window_duration.count() <= 0) {
505 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
506 "Window duration must be positive").to_common_error());
507 }
508 if (outlier_threshold <= 0) {
509 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
510 "Outlier threshold must be positive").to_common_error());
511 }
512 return common::ok();
513 }
514};
515
524public:
529
535 : config_(config) {
536 // Initialize percentile estimators
537 for (double p : config_.percentiles_to_track) {
538 percentile_estimators_.emplace(p, quantile_estimator(p));
539 }
540 }
541
547 common::VoidResult add_observation(double value) {
548 std::unique_lock<std::shared_mutex> lock(mutex_);
549
550 // Check for outlier
552 double z_score = std::abs(value - stats_.mean()) /
553 (stats_.stddev() + 1e-10);
554 if (z_score > config_.outlier_threshold) {
556 outliers_.push_back(value);
557 if (outliers_.size() > 100) {
558 outliers_.erase(outliers_.begin());
559 }
560 }
561 }
562
563 stats_.add_value(value);
564
565 // Update percentile estimators
566 for (auto& [p, estimator] : percentile_estimators_) {
567 estimator.add_observation(value);
568 }
569
570 return common::ok();
571 }
572
577 std::shared_lock<std::shared_mutex> lock(mutex_);
578 auto stats = stats_.get_statistics();
580 stats.outliers = outliers_;
581
582 // Add percentiles
583 for (const auto& [p, estimator] : percentile_estimators_) {
584 stats.percentiles[p] = estimator.get_quantile();
585 }
586
587 return stats;
588 }
589
595 std::optional<double> get_percentile(double p) const {
596 std::shared_lock<std::shared_mutex> lock(mutex_);
597 auto it = percentile_estimators_.find(p);
598 if (it != percentile_estimators_.end()) {
599 return it->second.get_quantile();
600 }
601 return std::nullopt;
602 }
603
607 size_t count() const {
608 return stats_.count();
609 }
610
614 double mean() const {
615 return stats_.mean();
616 }
617
621 double variance() const {
622 return stats_.variance();
623 }
624
628 double stddev() const {
629 return stats_.stddev();
630 }
631
635 void reset() {
636 std::unique_lock<std::shared_mutex> lock(mutex_);
637 stats_.reset();
638 outlier_count_ = 0;
639 outliers_.clear();
640 for (auto& [p, estimator] : percentile_estimators_) {
641 estimator.reset();
642 }
643 }
644
645private:
646 mutable std::shared_mutex mutex_;
649 std::map<double, quantile_estimator> percentile_estimators_;
650 size_t outlier_count_ = 0;
651 std::vector<double> outliers_;
652};
653
660inline double pearson_correlation(const std::vector<double>& x,
661 const std::vector<double>& y) {
662 if (x.size() != y.size() || x.size() < 2) {
663 return 0.0;
664 }
665
666 size_t n = x.size();
667 double sum_x = 0, sum_y = 0, sum_xy = 0;
668 double sum_x2 = 0, sum_y2 = 0;
669
670 for (size_t i = 0; i < n; ++i) {
671 sum_x += x[i];
672 sum_y += y[i];
673 sum_xy += x[i] * y[i];
674 sum_x2 += x[i] * x[i];
675 sum_y2 += y[i] * y[i];
676 }
677
678 double num = n * sum_xy - sum_x * sum_y;
679 double den = std::sqrt((n * sum_x2 - sum_x * sum_x) *
680 (n * sum_y2 - sum_y * sum_y));
681
682 if (den < 1e-10) {
683 return 0.0;
684 }
685
686 return num / den;
687}
688
689} // namespace kcenon::monitoring
void add_value(const T &value, time_point timestamp)
Add a value with timestamp.
std::chrono::system_clock::duration duration
std::vector< T > get_values() const
Get all values in the window.
moving_window_aggregator(std::chrono::milliseconds window_duration, size_t max_size)
Constructor.
std::chrono::system_clock::time_point time_point
Welford's algorithm for computing streaming statistics.
double variance() const
Get running variance (sample variance)
void add_value(double value)
Add a value to the statistics.
double stddev() const
Get running standard deviation.
double min() const
Get minimum value.
double sum() const
Get sum of all values.
double max() const
Get maximum value.
double mean() const
Get running mean.
streaming_statistics get_statistics() const
Get full statistics.
size_t count() const
Get sample count.
P² algorithm for streaming quantile estimation.
quantile_estimator & operator=(const quantile_estimator &)=delete
size_t count() const
Get observation count.
void add_observation(double x)
Add an observation.
double linear(int i, int sign) const
quantile_estimator & operator=(quantile_estimator &&other) noexcept
quantile_estimator(const quantile_estimator &)=delete
quantile_estimator(quantile_estimator &&other) noexcept
double get_quantile() const
Get the estimated quantile.
double parabolic(int i, int sign) const
Full-featured streaming aggregation.
stream_aggregator(const stream_aggregator_config &config)
Constructor with configuration.
std::map< double, quantile_estimator > percentile_estimators_
streaming_statistics get_statistics() const
Get full statistics.
common::VoidResult add_observation(double value)
Add an observation.
double variance() const
Get variance.
std::optional< double > get_percentile(double p) const
Get specific percentile.
double stddev() const
Get standard deviation.
size_t count() const
Get observation count.
Monitoring system specific error codes.
double pearson_correlation(const std::vector< double > &x, const std::vector< double > &y)
Calculate Pearson correlation coefficient.
Result pattern type definitions for monitoring system.
Extended error information with context.
Configuration for stream aggregator.
common::VoidResult validate() const
Validate configuration.
Statistical summary from streaming computation.