Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
time_series.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
17#include "../core/error_codes.h"
18#include "metric_types.h"
19#include "ring_buffer.h"
20#include <chrono>
21#include <vector>
22#include <algorithm>
23#include <numeric>
24#include <map>
25#include <memory>
26#include <mutex>
27
28namespace kcenon { namespace monitoring {
29
35 std::chrono::seconds retention_period{3600}; // How long to keep data
36 std::chrono::milliseconds resolution{1000}; // Time resolution for aggregation
37 size_t max_points = 3600; // Maximum data points to store
38 bool enable_compression = true; // Enable data compression
39 double compression_threshold = 0.01; // Threshold for compression
40
44 common::VoidResult validate() const {
45 if (retention_period.count() <= 0) {
47 "Retention period must be positive");
48 return common::VoidResult::err(err.to_common_error());
49 }
50
51 if (resolution.count() <= 0) {
53 "Resolution must be positive");
54 return common::VoidResult::err(err.to_common_error());
55 }
56
57 if (max_points == 0) {
59 "Max points must be positive");
60 return common::VoidResult::err(err.to_common_error());
61 }
62
63 return common::ok();
64 }
65};
66
72 std::chrono::system_clock::time_point timestamp;
73 double value;
74 uint32_t sample_count; // Number of samples aggregated into this point
75
76 time_point_data() noexcept : value(0.0), sample_count(0) {}
77
78 time_point_data(std::chrono::system_clock::time_point ts, double val, uint32_t count = 1) noexcept
79 : timestamp(ts), value(val), sample_count(count) {}
80
85 if (sample_count == 0) {
86 *this = other;
87 } else if (other.sample_count > 0) {
88 double total_weight = sample_count + other.sample_count;
89 value = (value * sample_count + other.value * other.sample_count) / total_weight;
90 sample_count += other.sample_count;
91
92 // Use the later timestamp
93 if (other.timestamp > timestamp) {
94 timestamp = other.timestamp;
95 }
96 }
97 }
98
102 bool is_valid(std::chrono::system_clock::time_point cutoff) const noexcept {
103 return timestamp >= cutoff;
104 }
105};
106
112 std::chrono::system_clock::time_point start_time;
113 std::chrono::system_clock::time_point end_time;
114 std::chrono::milliseconds step{1000}; // Aggregation step size
115
117 auto now = std::chrono::system_clock::now();
118 end_time = now;
119 start_time = now - std::chrono::hours(1); // Default: last hour
120 }
121
122 time_series_query(std::chrono::system_clock::time_point start,
123 std::chrono::system_clock::time_point end,
124 std::chrono::milliseconds step_size = std::chrono::milliseconds(1000))
125 : start_time(start), end_time(end), step(step_size) {}
126
130 common::VoidResult validate() const {
131 if (start_time >= end_time) {
133 "Start time must be before end time");
134 return common::VoidResult::err(err.to_common_error());
135 }
136
137 if (step.count() <= 0) {
139 "Step size must be positive");
140 return common::VoidResult::err(err.to_common_error());
141 }
142
143 return common::ok();
144 }
145};
146
152 std::vector<time_point_data> points;
153 std::chrono::system_clock::time_point query_start;
154 std::chrono::system_clock::time_point query_end;
156
158
164 for (const auto& point : points) {
165 summary.add_sample(point.value);
166 }
167 return summary;
168 }
169
173 double get_average() const {
174 if (points.empty()) return 0.0;
175
176 double sum = 0.0;
177 uint64_t total_weight = 0;
178
179 for (const auto& point : points) {
180 sum += point.value * point.sample_count;
181 total_weight += point.sample_count;
182 }
183
184 return total_weight > 0 ? sum / total_weight : 0.0;
185 }
186
190 double get_rate() const {
191 if (points.size() < 2) return 0.0;
192
193 const auto& first = points.front();
194 const auto& last = points.back();
195
196 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
197 last.timestamp - first.timestamp);
198
199 if (duration.count() <= 0) return 0.0;
200
201 return (last.value - first.value) / duration.count();
202 }
203};
204
210private:
211 mutable std::mutex mutex_;
212 std::vector<time_point_data> data_;
214 std::string series_name_;
215 size_t insertion_count_ = 0; // Track insertions for periodic maintenance
216
221 auto cutoff = std::chrono::system_clock::now() - config_.retention_period;
222
223 auto it = std::remove_if(data_.begin(), data_.end(),
224 [cutoff](const time_point_data& point) {
225 return !point.is_valid(cutoff);
226 });
227
228 data_.erase(it, data_.end());
229 }
230
235 if (!config_.enable_compression || data_.size() < 3) {
236 return;
237 }
238
239 // Simple compression: remove points that don't add significant information
240 std::vector<time_point_data> compressed;
241 compressed.reserve(data_.size());
242
243 if (!data_.empty()) {
244 compressed.push_back(data_[0]);
245
246 for (size_t i = 1; i < data_.size() - 1; ++i) {
247 const auto& prev = data_[i - 1];
248 const auto& curr = data_[i];
249 const auto& next = data_[i + 1];
250
251 // Check if current point is significantly different from linear interpolation
252 double expected = prev.value + (next.value - prev.value) *
253 (std::chrono::duration<double>(curr.timestamp - prev.timestamp).count() /
254 std::chrono::duration<double>(next.timestamp - prev.timestamp).count());
255
256 if (std::abs(curr.value - expected) > config_.compression_threshold) {
257 compressed.push_back(curr);
258 }
259 }
260
261 if (data_.size() > 1) {
262 compressed.push_back(data_.back());
263 }
264 }
265
266 data_ = std::move(compressed);
267 }
268
273 if (data_.size() > config_.max_points) {
274 size_t remove_count = data_.size() - config_.max_points;
275 data_.erase(data_.begin(), data_.begin() + remove_count);
276 }
277 }
278
282 time_series(const std::string& name, const time_series_config& config)
283 : config_(config), series_name_(name) {
284 data_.reserve(config_.max_points);
285 }
286
287public:
291 static common::Result<std::unique_ptr<time_series>> create(
292 const std::string& name,
293 const time_series_config& config = {}) {
294
295 auto validation = config.validate();
296 if (validation.is_err()) {
297 return common::Result<std::unique_ptr<time_series>>::err(
299 validation.error().message, "monitoring_system").to_common_error());
300 }
301
302 return common::ok(std::unique_ptr<time_series>(new time_series(name, config)));
303 }
304
308 common::VoidResult add_point(double value,
309 std::chrono::system_clock::time_point timestamp =
310 std::chrono::system_clock::now()) {
311 std::lock_guard<std::mutex> lock(mutex_);
312
313 time_point_data point(timestamp, value);
314
315 // Optimize: if timestamp is newest, append directly (O(1) vs O(n))
316 if (data_.empty() || point.timestamp >= data_.back().timestamp) {
317 data_.push_back(point);
318 } else {
319 // Insert in chronological order only when necessary
320 auto it = std::upper_bound(data_.begin(), data_.end(), point,
321 [](const time_point_data& a, const time_point_data& b) {
322 return a.timestamp < b.timestamp;
323 });
324
325 data_.insert(it, point);
326 }
327
328 // Perform maintenance periodically (every 100 insertions) instead of every time
330 if (insertion_count_ % 100 == 0) {
334 }
335
336 return common::ok();
337 }
338
342 common::VoidResult add_points(const std::vector<time_point_data>& points) {
343 std::lock_guard<std::mutex> lock(mutex_);
344
345 for (const auto& point : points) {
346 // Optimize: if timestamp is newest, append directly
347 if (data_.empty() || point.timestamp >= data_.back().timestamp) {
348 data_.push_back(point);
349 } else {
350 auto it = std::upper_bound(data_.begin(), data_.end(), point,
351 [](const time_point_data& a, const time_point_data& b) {
352 return a.timestamp < b.timestamp;
353 });
354
355 data_.insert(it, point);
356 }
357 }
358
359 // Perform maintenance after batch insert
363
364 return common::ok();
365 }
366
370 common::Result<aggregation_result> query(const time_series_query& query) const {
371 auto validation = query.validate();
372 if (validation.is_err()) {
373 return common::Result<aggregation_result>::err(
375 validation.error().message, "monitoring_system").to_common_error());
376 }
377
378 std::lock_guard<std::mutex> lock(mutex_);
379
380 aggregation_result result;
381 result.query_start = query.start_time;
382 result.query_end = query.end_time;
383
384 // Find data points in the time range
385 auto start_it = std::lower_bound(data_.begin(), data_.end(), query.start_time,
386 [](const time_point_data& point, std::chrono::system_clock::time_point time) {
387 return point.timestamp < time;
388 });
389
390 auto end_it = std::upper_bound(data_.begin(), data_.end(), query.end_time,
391 [](std::chrono::system_clock::time_point time, const time_point_data& point) {
392 return time < point.timestamp;
393 });
394
395 if (start_it == end_it) {
396 return common::ok(std::move(result)); // No data in range
397 }
398
399 // Aggregate data by step size
400 auto current_step_start = query.start_time;
401
402 while (current_step_start < query.end_time) {
403 auto current_step_end = current_step_start + query.step;
404 if (current_step_end > query.end_time) {
405 current_step_end = query.end_time;
406 }
407
408 // Find points in this step
409 std::vector<time_point_data> step_points;
410 for (auto it = start_it; it != end_it; ++it) {
411 if (it->timestamp >= current_step_start && it->timestamp < current_step_end) {
412 step_points.push_back(*it);
413 result.total_samples += it->sample_count;
414 }
415 }
416
417 // Aggregate points in this step
418 if (!step_points.empty()) {
419 time_point_data aggregated_point;
420 aggregated_point.timestamp = current_step_start + query.step / 2; // Middle of step
421
422 for (const auto& point : step_points) {
423 aggregated_point.merge(point);
424 }
425
426 result.points.push_back(aggregated_point);
427 }
428
429 current_step_start = current_step_end;
430 }
431
432 return common::ok(std::move(result));
433 }
434
438 size_t size() const {
439 std::lock_guard<std::mutex> lock(mutex_);
440 return data_.size();
441 }
442
446 bool empty() const {
447 std::lock_guard<std::mutex> lock(mutex_);
448 return data_.empty();
449 }
450
454 const std::string& name() const noexcept {
455 return series_name_;
456 }
457
461 const time_series_config& get_config() const noexcept {
462 return config_;
463 }
464
468 void clear() {
469 std::lock_guard<std::mutex> lock(mutex_);
470 data_.clear();
471 }
472
476 common::Result<double> get_latest_value() const {
477 std::lock_guard<std::mutex> lock(mutex_);
478
479 if (data_.empty()) {
480 return common::Result<double>::err(
482 "No data available", "monitoring_system").to_common_error());
483 }
484
485 return common::ok(data_.back().value);
486 }
487
491 size_t memory_footprint() const {
492 std::lock_guard<std::mutex> lock(mutex_);
493 return sizeof(time_series) +
494 data_.capacity() * sizeof(time_point_data) +
495 series_name_.capacity();
496 }
497};
498
499} } // namespace kcenon::monitoring
Thread-safe time series data storage.
common::Result< double > get_latest_value() const
Get latest value.
common::VoidResult add_point(double value, std::chrono::system_clock::time_point timestamp=std::chrono::system_clock::now())
Add a data point.
const time_series_config & get_config() const noexcept
Get configuration.
time_series(const std::string &name, const time_series_config &config)
Private constructor (use create() factory method)
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
void compress_data()
Compress data if enabled.
common::VoidResult add_points(const std::vector< time_point_data > &points)
Add multiple data points.
void clear()
Clear all data.
const std::string & name() const noexcept
Get series name.
bool empty() const
Check if series is empty.
size_t size() const
Get current number of data points.
std::vector< time_point_data > data_
common::Result< aggregation_result > query(const time_series_query &query) const
Query data for a time range.
size_t memory_footprint() const
Get memory footprint in bytes.
void enforce_size_limit()
Ensure data size doesn't exceed maximum.
void cleanup_old_data()
Cleanup old data points.
Monitoring system specific error codes.
Common metric type definitions for efficient storage.
@ summary
Pre-calculated quantiles and count/sum.
Result pattern type definitions for monitoring system.
Lock-free ring buffer for efficient metric storage.
Result of time series aggregation.
std::chrono::system_clock::time_point query_end
summary_data get_summary() const
Get statistics for the aggregated data.
double get_average() const
Get average value over the time period.
std::vector< time_point_data > points
std::chrono::system_clock::time_point query_start
double get_rate() const
Get rate of change (per second)
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Summary statistics for metrics.
Single data point in time series.
Definition time_series.h:71
bool is_valid(std::chrono::system_clock::time_point cutoff) const noexcept
Check if this point is within retention period.
std::chrono::system_clock::time_point timestamp
Definition time_series.h:72
void merge(const time_point_data &other)
Merge another data point (for aggregation)
Definition time_series.h:84
time_point_data(std::chrono::system_clock::time_point ts, double val, uint32_t count=1) noexcept
Definition time_series.h:78
Configuration for time series storage.
Definition time_series.h:34
std::chrono::milliseconds resolution
Definition time_series.h:36
common::VoidResult validate() const
Validate configuration.
Definition time_series.h:44
std::chrono::seconds retention_period
Definition time_series.h:35
Query parameters for time series data.
std::chrono::milliseconds step
std::chrono::system_clock::time_point start_time
time_series_query(std::chrono::system_clock::time_point start, std::chrono::system_clock::time_point end, std::chrono::milliseconds step_size=std::chrono::milliseconds(1000))
common::VoidResult validate() const
Validate query parameters.
std::chrono::system_clock::time_point end_time