Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
time_series_buffer.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
24#ifdef _WIN32
25#ifndef NOMINMAX
26#define NOMINMAX
27#endif
28#ifdef min
29#undef min
30#endif
31#ifdef max
32#undef max
33#endif
34#endif
35
37#include "../core/error_codes.h"
38#include <algorithm>
39#include <chrono>
40#include <cmath>
41#include <limits>
42#include <mutex>
43#include <stdexcept>
44#include <vector>
45
46namespace kcenon { namespace monitoring {
47
53 size_t max_samples = 1000;
54
55 common::VoidResult validate() const {
56 if (max_samples == 0) {
58 "Max samples must be positive");
59 return common::VoidResult::err(err.to_common_error());
60 }
61 return common::ok();
62 }
63};
64
69template <typename T>
71 std::chrono::system_clock::time_point timestamp;
73
74 time_series_sample() noexcept : value{} {}
75
76 time_series_sample(std::chrono::system_clock::time_point ts, T val) noexcept
77 : timestamp(ts), value(val) {}
78};
79
85 double min_value = (std::numeric_limits<double>::max)();
86 double max_value = (std::numeric_limits<double>::lowest)();
87 double avg = 0.0;
88 double stddev = 0.0;
89 double p95 = 0.0;
90 double p99 = 0.0;
91 size_t sample_count = 0;
92 std::chrono::system_clock::time_point oldest_timestamp;
93 std::chrono::system_clock::time_point newest_timestamp;
94};
95
101namespace detail {
102
110inline double calculate_percentile(const std::vector<double>& sorted_values,
111 double percentile) {
112 if (sorted_values.empty()) {
113 return 0.0;
114 }
115 if (sorted_values.size() == 1) {
116 return sorted_values[0];
117 }
118
119 double rank = (percentile / 100.0) * (sorted_values.size() - 1);
120 size_t lower_idx = static_cast<size_t>(rank);
121 size_t upper_idx = lower_idx + 1;
122 double fraction = rank - lower_idx;
123
124 if (upper_idx >= sorted_values.size()) {
125 return sorted_values[lower_idx];
126 }
127
128 return sorted_values[lower_idx] +
129 fraction * (sorted_values[upper_idx] - sorted_values[lower_idx]);
130}
131
141 const std::vector<double>& values,
142 std::chrono::system_clock::time_point oldest_timestamp,
143 std::chrono::system_clock::time_point newest_timestamp) {
144
146 stats.sample_count = values.size();
147
148 if (values.empty()) {
149 stats.min_value = 0.0;
150 stats.max_value = 0.0;
151 return stats;
152 }
153
154 stats.oldest_timestamp = oldest_timestamp;
155 stats.newest_timestamp = newest_timestamp;
156
157 double sum = 0.0;
158 for (double val : values) {
159 sum += val;
160 stats.min_value = (std::min)(stats.min_value, val);
161 stats.max_value = (std::max)(stats.max_value, val);
162 }
163 stats.avg = sum / values.size();
164
165 double variance = 0.0;
166 for (double val : values) {
167 double diff = val - stats.avg;
168 variance += diff * diff;
169 }
170 stats.stddev = std::sqrt(variance / values.size());
171
172 std::vector<double> sorted_values = values;
173 std::sort(sorted_values.begin(), sorted_values.end());
174 stats.p95 = calculate_percentile(sorted_values, 95.0);
175 stats.p99 = calculate_percentile(sorted_values, 99.0);
176
177 return stats;
178}
179
193template <typename Sample>
195 private:
196 mutable std::mutex mutex_;
197 std::vector<Sample> buffer_;
198 size_t head_ = 0;
199 size_t count_ = 0;
201
202 size_t get_actual_index(size_t logical_index) const noexcept {
203 if (count_ < max_samples_) {
204 return logical_index;
205 }
206 return (head_ + logical_index) % max_samples_;
207 }
208
209 public:
215 explicit time_series_ring_buffer(size_t max_samples) : max_samples_(max_samples) {
216 if (max_samples_ == 0) {
217 throw std::invalid_argument("Max samples must be positive");
218 }
219 buffer_.resize(max_samples_);
220 }
221
226
227 void add_sample(const Sample& sample) {
228 std::lock_guard<std::mutex> lock(mutex_);
229
230 buffer_[head_] = sample;
231 head_ = (head_ + 1) % max_samples_;
232
233 if (count_ < max_samples_) {
234 ++count_;
235 }
236 }
237
238 template <typename Duration>
239 std::vector<Sample> get_samples(Duration duration) const {
240 auto cutoff = std::chrono::system_clock::now() - duration;
241 return get_samples_since(cutoff);
242 }
243
244 std::vector<Sample> get_samples_since(
245 std::chrono::system_clock::time_point since) const {
246 std::lock_guard<std::mutex> lock(mutex_);
247
248 std::vector<Sample> result;
249 result.reserve(count_);
250
251 for (size_t i = 0; i < count_; ++i) {
252 const auto& sample = buffer_[get_actual_index(i)];
253 if (sample.timestamp >= since) {
254 result.push_back(sample);
255 }
256 }
257
258 std::sort(result.begin(), result.end(),
259 [](const Sample& a, const Sample& b) {
260 return a.timestamp < b.timestamp;
261 });
262
263 return result;
264 }
265
266 std::vector<Sample> get_all_samples() const {
267 std::lock_guard<std::mutex> lock(mutex_);
268
269 std::vector<Sample> result;
270 result.reserve(count_);
271
272 for (size_t i = 0; i < count_; ++i) {
273 result.push_back(buffer_[get_actual_index(i)]);
274 }
275
276 std::sort(result.begin(), result.end(),
277 [](const Sample& a, const Sample& b) {
278 return a.timestamp < b.timestamp;
279 });
280
281 return result;
282 }
283
284 common::Result<Sample> get_latest() const {
285 std::lock_guard<std::mutex> lock(mutex_);
286
287 if (count_ == 0) {
288 return common::Result<Sample>::err(error_info(monitoring_error_code::collection_failed, "No samples available").to_common_error());
289 }
290
291 size_t latest_idx = (head_ == 0) ? max_samples_ - 1 : head_ - 1;
292 return common::ok(buffer_[latest_idx]);
293 }
294
295 size_t size() const noexcept {
296 std::lock_guard<std::mutex> lock(mutex_);
297 return count_;
298 }
299
300 bool empty() const noexcept {
301 std::lock_guard<std::mutex> lock(mutex_);
302 return count_ == 0;
303 }
304
305 size_t capacity() const noexcept { return max_samples_; }
306
307 void clear() noexcept {
308 std::lock_guard<std::mutex> lock(mutex_);
309 head_ = 0;
310 count_ = 0;
311 }
312
313 size_t memory_footprint() const noexcept {
314 return sizeof(time_series_ring_buffer<Sample>) +
315 max_samples_ * sizeof(Sample);
316 }
317};
318
319} // namespace detail
320
330template <typename T>
332 static_assert(std::is_arithmetic_v<T>, "T must be an arithmetic type");
333
334 private:
335 struct validated_tag {};
337
338 // Private constructor for validated creation via create()
340 : buffer_(config.max_samples) {}
341
342 public:
348 static common::Result<std::unique_ptr<time_series_buffer>> create(
349 const time_series_buffer_config& config = {}) {
350 auto validation = config.validate();
351 if (validation.is_err()) {
352 return common::Result<std::unique_ptr<time_series_buffer>>::err(
354 "Invalid time_series_buffer configuration: " +
355 validation.error().message)
356 .to_common_error());
357 }
358 return common::ok(std::unique_ptr<time_series_buffer>(
359 new time_series_buffer(config, validated_tag{})));
360 }
361
369 : buffer_(config.max_samples) {
370 auto validation = config.validate();
371 if (validation.is_err()) {
372 throw std::invalid_argument("Invalid time_series_buffer configuration: " +
373 validation.error().message);
374 }
375 }
376
381
387 void add_sample(T value,
388 std::chrono::system_clock::time_point timestamp =
389 std::chrono::system_clock::now()) {
390 buffer_.add_sample(time_series_sample<T>(timestamp, value));
391 }
392
398 template <typename Duration>
399 std::vector<time_series_sample<T>> get_samples(Duration duration) const {
400 return buffer_.get_samples(duration);
401 }
402
408 std::vector<time_series_sample<T>> get_samples_since(
409 std::chrono::system_clock::time_point since) const {
410 return buffer_.get_samples_since(since);
411 }
412
417 std::vector<time_series_sample<T>> get_all_samples() const {
418 return buffer_.get_all_samples();
419 }
420
426 template <typename Duration>
427 time_series_statistics get_statistics(Duration duration) const {
428 auto samples = get_samples(duration);
429 return calculate_statistics(samples);
430 }
431
437 auto samples = get_all_samples();
438 return calculate_statistics(samples);
439 }
440
445 common::Result<T> get_latest() const {
446 auto sample_result = buffer_.get_latest();
447 if (sample_result.is_err()) {
448 return common::Result<T>::err(sample_result.error());
449 }
450 return common::ok(sample_result.value().value);
451 }
452
457 common::Result<time_series_sample<T>> get_latest_sample() const {
458 return buffer_.get_latest();
459 }
460
464 size_t size() const noexcept {
465 return buffer_.size();
466 }
467
471 bool empty() const noexcept {
472 return buffer_.empty();
473 }
474
478 size_t capacity() const noexcept {
479 return buffer_.capacity();
480 }
481
485 void clear() noexcept {
486 buffer_.clear();
487 }
488
492 size_t memory_footprint() const noexcept {
493 return buffer_.memory_footprint();
494 }
495
496 private:
498 const std::vector<time_series_sample<T>>& samples) {
499 if (samples.empty()) {
501 stats.sample_count = 0;
502 stats.min_value = 0.0;
503 stats.max_value = 0.0;
504 return stats;
505 }
506
507 std::vector<double> values;
508 values.reserve(samples.size());
509 for (const auto& sample : samples) {
510 values.push_back(static_cast<double>(sample.value));
511 }
512
514 values,
515 samples.front().timestamp,
516 samples.back().timestamp);
517 }
518};
519
525 std::chrono::system_clock::time_point timestamp;
526 double load_1m;
527 double load_5m;
528 double load_15m;
529
530 load_average_sample() noexcept : load_1m(0.0), load_5m(0.0), load_15m(0.0) {}
531
532 load_average_sample(std::chrono::system_clock::time_point ts, double l1, double l5,
533 double l15) noexcept
534 : timestamp(ts), load_1m(l1), load_5m(l5), load_15m(l15) {}
535};
536
546
555 private:
557
558 public:
559 explicit load_average_history(size_t max_samples = 1000) : buffer_(max_samples) {}
560
565
573 void add_sample(double load_1m, double load_5m, double load_15m,
574 std::chrono::system_clock::time_point timestamp =
575 std::chrono::system_clock::now()) {
576 buffer_.add_sample(load_average_sample(timestamp, load_1m, load_5m, load_15m));
577 }
578
584 template <typename Duration>
585 std::vector<load_average_sample> get_samples(Duration duration) const {
586 return buffer_.get_samples(duration);
587 }
588
594 std::vector<load_average_sample> get_samples_since(
595 std::chrono::system_clock::time_point since) const {
596 return buffer_.get_samples_since(since);
597 }
598
603 std::vector<load_average_sample> get_all_samples() const {
604 return buffer_.get_all_samples();
605 }
606
612 template <typename Duration>
613 load_average_statistics get_statistics(Duration duration) const {
614 auto samples = get_samples(duration);
615 return calculate_statistics(samples);
616 }
617
623 auto samples = get_all_samples();
624 return calculate_statistics(samples);
625 }
626
631 common::Result<load_average_sample> get_latest() const {
632 return buffer_.get_latest();
633 }
634
638 size_t size() const noexcept {
639 return buffer_.size();
640 }
641
645 bool empty() const noexcept {
646 return buffer_.empty();
647 }
648
652 size_t capacity() const noexcept {
653 return buffer_.capacity();
654 }
655
659 void clear() noexcept {
660 buffer_.clear();
661 }
662
666 size_t memory_footprint() const noexcept {
667 return buffer_.memory_footprint();
668 }
669
670 private:
672 const std::vector<load_average_sample>& samples) {
674
675 if (samples.empty()) {
676 stats.load_1m_stats.sample_count = 0;
677 stats.load_1m_stats.min_value = 0.0;
678 stats.load_1m_stats.max_value = 0.0;
679 stats.load_5m_stats = stats.load_1m_stats;
680 stats.load_15m_stats = stats.load_1m_stats;
681 return stats;
682 }
683
684 std::vector<double> values_1m, values_5m, values_15m;
685 values_1m.reserve(samples.size());
686 values_5m.reserve(samples.size());
687 values_15m.reserve(samples.size());
688
689 for (const auto& sample : samples) {
690 values_1m.push_back(sample.load_1m);
691 values_5m.push_back(sample.load_5m);
692 values_15m.push_back(sample.load_15m);
693 }
694
695 auto oldest = samples.front().timestamp;
696 auto newest = samples.back().timestamp;
697
698 stats.load_1m_stats = detail::calculate_basic_statistics(values_1m, oldest, newest);
699 stats.load_5m_stats = detail::calculate_basic_statistics(values_5m, oldest, newest);
700 stats.load_15m_stats = detail::calculate_basic_statistics(values_15m, oldest, newest);
701
702 return stats;
703 }
704};
705
706}} // namespace kcenon::monitoring
Generic time-series ring buffer base template.
time_series_ring_buffer & operator=(time_series_ring_buffer &&)=delete
time_series_ring_buffer(size_t max_samples)
Constructor with maximum sample count.
time_series_ring_buffer & operator=(const time_series_ring_buffer &)=delete
std::vector< Sample > get_samples(Duration duration) const
time_series_ring_buffer(const time_series_ring_buffer &)=delete
size_t get_actual_index(size_t logical_index) const noexcept
std::vector< Sample > get_samples_since(std::chrono::system_clock::time_point since) const
time_series_ring_buffer(time_series_ring_buffer &&)=delete
Specialized buffer for tracking load average history.
detail::time_series_ring_buffer< load_average_sample > buffer_
std::vector< load_average_sample > get_samples_since(std::chrono::system_clock::time_point since) const
Get samples since a specific timestamp.
void clear() noexcept
Clear all samples.
std::vector< load_average_sample > get_all_samples() const
Get all samples in chronological order.
load_average_history & operator=(const load_average_history &)=delete
std::vector< load_average_sample > get_samples(Duration duration) const
Get samples within a duration from now.
size_t capacity() const noexcept
Get buffer capacity.
load_average_history & operator=(load_average_history &&)=delete
size_t memory_footprint() const noexcept
Get memory footprint in bytes.
load_average_statistics get_statistics(Duration duration) const
Get statistics for samples within a duration.
common::Result< load_average_sample > get_latest() const
Get the latest sample.
size_t size() const noexcept
Get current number of samples.
bool empty() const noexcept
Check if buffer is empty.
load_average_history(const load_average_history &)=delete
static load_average_statistics calculate_statistics(const std::vector< load_average_sample > &samples)
load_average_history(load_average_history &&)=delete
load_average_statistics get_statistics() const
Get statistics for all samples.
void add_sample(double load_1m, double load_5m, double load_15m, std::chrono::system_clock::time_point timestamp=std::chrono::system_clock::now())
Add a load average sample.
Thread-safe ring buffer for time-series data with statistics.
size_t capacity() const noexcept
Get buffer capacity.
std::vector< time_series_sample< T > > get_samples_since(std::chrono::system_clock::time_point since) const
Get samples since a specific timestamp.
time_series_statistics get_statistics() const
Get statistics for all samples.
common::Result< time_series_sample< T > > get_latest_sample() const
Get the latest sample with timestamp.
time_series_statistics get_statistics(Duration duration) const
Get statistics for samples within a duration.
size_t size() const noexcept
Get current number of samples.
bool empty() const noexcept
Check if buffer is empty.
static common::Result< std::unique_ptr< time_series_buffer > > create(const time_series_buffer_config &config={})
Create a time series buffer with validated configuration.
size_t memory_footprint() const noexcept
Get memory footprint in bytes.
time_series_buffer(const time_series_buffer_config &config={})
Constructor with configuration.
void clear() noexcept
Clear all samples.
static time_series_statistics calculate_statistics(const std::vector< time_series_sample< T > > &samples)
std::vector< time_series_sample< T > > get_samples(Duration duration) const
Get samples within a duration from now.
common::Result< T > get_latest() const
Get the latest sample value.
time_series_buffer & operator=(const time_series_buffer &)=delete
time_series_buffer(const time_series_buffer_config &config, validated_tag)
time_series_buffer & operator=(time_series_buffer &&)=delete
std::vector< time_series_sample< T > > get_all_samples() const
Get all samples in chronological order.
detail::time_series_ring_buffer< time_series_sample< T > > buffer_
time_series_buffer(const time_series_buffer &)=delete
void add_sample(T value, std::chrono::system_clock::time_point timestamp=std::chrono::system_clock::now())
Add a sample to the buffer.
time_series_buffer(time_series_buffer &&)=delete
Monitoring system specific error codes.
Internal implementation details - not part of public API.
Definition memory_pool.h:27
time_series_statistics calculate_basic_statistics(const std::vector< double > &values, std::chrono::system_clock::time_point oldest_timestamp, std::chrono::system_clock::time_point newest_timestamp)
Calculate basic statistics from a vector of double values.
double calculate_percentile(const std::vector< double > &sorted_values, double percentile)
Calculate percentile from sorted values.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Sample containing all three load averages.
load_average_sample(std::chrono::system_clock::time_point ts, double l1, double l5, double l15) noexcept
std::chrono::system_clock::time_point timestamp
Statistics for load average history.
Configuration for time series buffer.
Single sample in time series with timestamp.
time_series_sample(std::chrono::system_clock::time_point ts, T val) noexcept
std::chrono::system_clock::time_point timestamp
Statistics calculated from time series data.
std::chrono::system_clock::time_point newest_timestamp
std::chrono::system_clock::time_point oldest_timestamp