Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::time_series Class Reference

Thread-safe time series data storage. More...

#include <time_series.h>

Collaboration diagram for kcenon::monitoring::time_series:
Collaboration graph

Public Member Functions

common::VoidResult add_point (double value, std::chrono::system_clock::time_point timestamp=std::chrono::system_clock::now())
 Add a data point.
 
common::VoidResult add_points (const std::vector< time_point_data > &points)
 Add multiple data points.
 
common::Result< aggregation_resultquery (const time_series_query &query) const
 Query data for a time range.
 
size_t size () const
 Get current number of data points.
 
bool empty () const
 Check if series is empty.
 
const std::string & name () const noexcept
 Get series name.
 
const time_series_configget_config () const noexcept
 Get configuration.
 
void clear ()
 Clear all data.
 
common::Result< double > get_latest_value () const
 Get latest value.
 
size_t memory_footprint () const
 Get memory footprint in bytes.
 

Static Public Member Functions

static common::Result< std::unique_ptr< time_series > > create (const std::string &name, const time_series_config &config={})
 Factory method to create time_series with validation.
 

Private Member Functions

void cleanup_old_data ()
 Cleanup old data points.
 
void compress_data ()
 Compress data if enabled.
 
void enforce_size_limit ()
 Ensure data size doesn't exceed maximum.
 
 time_series (const std::string &name, const time_series_config &config)
 Private constructor (use create() factory method)
 

Private Attributes

std::mutex mutex_
 
std::vector< time_point_datadata_
 
time_series_config config_
 
std::string series_name_
 
size_t insertion_count_ = 0
 

Detailed Description

Thread-safe time series data storage.

Definition at line 209 of file time_series.h.

Constructor & Destructor Documentation

◆ time_series()

kcenon::monitoring::time_series::time_series ( const std::string & name,
const time_series_config & config )
inlineprivate

Private constructor (use create() factory method)

Definition at line 282 of file time_series.h.

283 : config_(config), series_name_(name) {
284 data_.reserve(config_.max_points);
285 }
const std::string & name() const noexcept
Get series name.
std::vector< time_point_data > data_

References config_, data_, and kcenon::monitoring::time_series_config::max_points.

Referenced by memory_footprint().

Here is the caller graph for this function:

Member Function Documentation

◆ add_point()

common::VoidResult kcenon::monitoring::time_series::add_point ( double value,
std::chrono::system_clock::time_point timestamp = std::chrono::system_clock::now() )
inline

Add a data point.

Definition at line 308 of file time_series.h.

310 {
311 std::lock_guard<std::mutex> lock(mutex_);
312
313 time_point_data point(timestamp, value);
314
315 // Optimize: if timestamp is newest, append directly (O(1) vs O(n))
316 if (data_.empty() || point.timestamp >= data_.back().timestamp) {
317 data_.push_back(point);
318 } else {
319 // Insert in chronological order only when necessary
320 auto it = std::upper_bound(data_.begin(), data_.end(), point,
321 [](const time_point_data& a, const time_point_data& b) {
322 return a.timestamp < b.timestamp;
323 });
324
325 data_.insert(it, point);
326 }
327
328 // Perform maintenance periodically (every 100 insertions) instead of every time
330 if (insertion_count_ % 100 == 0) {
334 }
335
336 return common::ok();
337 }
void compress_data()
Compress data if enabled.
void enforce_size_limit()
Ensure data size doesn't exceed maximum.
void cleanup_old_data()
Cleanup old data points.

References cleanup_old_data(), compress_data(), data_, enforce_size_limit(), insertion_count_, mutex_, and kcenon::monitoring::time_point_data::timestamp.

Here is the call graph for this function:

◆ add_points()

common::VoidResult kcenon::monitoring::time_series::add_points ( const std::vector< time_point_data > & points)
inline

Add multiple data points.

Definition at line 342 of file time_series.h.

342 {
343 std::lock_guard<std::mutex> lock(mutex_);
344
345 for (const auto& point : points) {
346 // Optimize: if timestamp is newest, append directly
347 if (data_.empty() || point.timestamp >= data_.back().timestamp) {
348 data_.push_back(point);
349 } else {
350 auto it = std::upper_bound(data_.begin(), data_.end(), point,
351 [](const time_point_data& a, const time_point_data& b) {
352 return a.timestamp < b.timestamp;
353 });
354
355 data_.insert(it, point);
356 }
357 }
358
359 // Perform maintenance after batch insert
363
364 return common::ok();
365 }

References cleanup_old_data(), compress_data(), data_, enforce_size_limit(), and mutex_.

Here is the call graph for this function:

◆ cleanup_old_data()

void kcenon::monitoring::time_series::cleanup_old_data ( )
inlineprivate

Cleanup old data points.

Definition at line 220 of file time_series.h.

220 {
221 auto cutoff = std::chrono::system_clock::now() - config_.retention_period;
222
223 auto it = std::remove_if(data_.begin(), data_.end(),
224 [cutoff](const time_point_data& point) {
225 return !point.is_valid(cutoff);
226 });
227
228 data_.erase(it, data_.end());
229 }
std::chrono::seconds retention_period
Definition time_series.h:35

References config_, data_, and kcenon::monitoring::time_series_config::retention_period.

Referenced by add_point(), and add_points().

Here is the caller graph for this function:

◆ clear()

void kcenon::monitoring::time_series::clear ( )
inline

Clear all data.

Definition at line 468 of file time_series.h.

468 {
469 std::lock_guard<std::mutex> lock(mutex_);
470 data_.clear();
471 }

References data_, and mutex_.

◆ compress_data()

void kcenon::monitoring::time_series::compress_data ( )
inlineprivate

Compress data if enabled.

Definition at line 234 of file time_series.h.

234 {
235 if (!config_.enable_compression || data_.size() < 3) {
236 return;
237 }
238
239 // Simple compression: remove points that don't add significant information
240 std::vector<time_point_data> compressed;
241 compressed.reserve(data_.size());
242
243 if (!data_.empty()) {
244 compressed.push_back(data_[0]);
245
246 for (size_t i = 1; i < data_.size() - 1; ++i) {
247 const auto& prev = data_[i - 1];
248 const auto& curr = data_[i];
249 const auto& next = data_[i + 1];
250
251 // Check if current point is significantly different from linear interpolation
252 double expected = prev.value + (next.value - prev.value) *
253 (std::chrono::duration<double>(curr.timestamp - prev.timestamp).count() /
254 std::chrono::duration<double>(next.timestamp - prev.timestamp).count());
255
256 if (std::abs(curr.value - expected) > config_.compression_threshold) {
257 compressed.push_back(curr);
258 }
259 }
260
261 if (data_.size() > 1) {
262 compressed.push_back(data_.back());
263 }
264 }
265
266 data_ = std::move(compressed);
267 }

References kcenon::monitoring::time_series_config::compression_threshold, config_, data_, and kcenon::monitoring::time_series_config::enable_compression.

Referenced by add_point(), and add_points().

Here is the caller graph for this function:

◆ create()

static common::Result< std::unique_ptr< time_series > > kcenon::monitoring::time_series::create ( const std::string & name,
const time_series_config & config = {} )
inlinestatic

Factory method to create time_series with validation.

Definition at line 291 of file time_series.h.

293 {}) {
294
295 auto validation = config.validate();
296 if (validation.is_err()) {
297 return common::Result<std::unique_ptr<time_series>>::err(
299 validation.error().message, "monitoring_system").to_common_error());
300 }
301
302 return common::ok(std::unique_ptr<time_series>(new time_series(name, config)));
303 }
time_series(const std::string &name, const time_series_config &config)
Private constructor (use create() factory method)

Referenced by demonstrate_aggregations(), demonstrate_basic_operations(), demonstrate_batch_operations(), demonstrate_retention_and_downsampling(), kcenon::monitoring::metric_storage::get_or_create_series(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ empty()

bool kcenon::monitoring::time_series::empty ( ) const
inline

Check if series is empty.

Definition at line 446 of file time_series.h.

446 {
447 std::lock_guard<std::mutex> lock(mutex_);
448 return data_.empty();
449 }

References data_, and mutex_.

◆ enforce_size_limit()

void kcenon::monitoring::time_series::enforce_size_limit ( )
inlineprivate

Ensure data size doesn't exceed maximum.

Definition at line 272 of file time_series.h.

272 {
273 if (data_.size() > config_.max_points) {
274 size_t remove_count = data_.size() - config_.max_points;
275 data_.erase(data_.begin(), data_.begin() + remove_count);
276 }
277 }

References config_, data_, and kcenon::monitoring::time_series_config::max_points.

Referenced by add_point(), and add_points().

Here is the caller graph for this function:

◆ get_config()

const time_series_config & kcenon::monitoring::time_series::get_config ( ) const
inlinenoexcept

Get configuration.

Definition at line 461 of file time_series.h.

461 {
462 return config_;
463 }

References config_.

◆ get_latest_value()

common::Result< double > kcenon::monitoring::time_series::get_latest_value ( ) const
inline

Get latest value.

Definition at line 476 of file time_series.h.

476 {
477 std::lock_guard<std::mutex> lock(mutex_);
478
479 if (data_.empty()) {
480 return common::Result<double>::err(
482 "No data available", "monitoring_system").to_common_error());
483 }
484
485 return common::ok(data_.back().value);
486 }

References kcenon::monitoring::collection_failed, data_, and mutex_.

◆ memory_footprint()

size_t kcenon::monitoring::time_series::memory_footprint ( ) const
inline

Get memory footprint in bytes.

Definition at line 491 of file time_series.h.

491 {
492 std::lock_guard<std::mutex> lock(mutex_);
493 return sizeof(time_series) +
494 data_.capacity() * sizeof(time_point_data) +
495 series_name_.capacity();
496 }

References data_, mutex_, series_name_, and time_series().

Here is the call graph for this function:

◆ name()

const std::string & kcenon::monitoring::time_series::name ( ) const
inlinenoexcept

Get series name.

Definition at line 454 of file time_series.h.

454 {
455 return series_name_;
456 }

References series_name_.

◆ query()

common::Result< aggregation_result > kcenon::monitoring::time_series::query ( const time_series_query & query) const
inline

Query data for a time range.

Definition at line 370 of file time_series.h.

370 {
371 auto validation = query.validate();
372 if (validation.is_err()) {
373 return common::Result<aggregation_result>::err(
375 validation.error().message, "monitoring_system").to_common_error());
376 }
377
378 std::lock_guard<std::mutex> lock(mutex_);
379
380 aggregation_result result;
381 result.query_start = query.start_time;
382 result.query_end = query.end_time;
383
384 // Find data points in the time range
385 auto start_it = std::lower_bound(data_.begin(), data_.end(), query.start_time,
386 [](const time_point_data& point, std::chrono::system_clock::time_point time) {
387 return point.timestamp < time;
388 });
389
390 auto end_it = std::upper_bound(data_.begin(), data_.end(), query.end_time,
391 [](std::chrono::system_clock::time_point time, const time_point_data& point) {
392 return time < point.timestamp;
393 });
394
395 if (start_it == end_it) {
396 return common::ok(std::move(result)); // No data in range
397 }
398
399 // Aggregate data by step size
400 auto current_step_start = query.start_time;
401
402 while (current_step_start < query.end_time) {
403 auto current_step_end = current_step_start + query.step;
404 if (current_step_end > query.end_time) {
405 current_step_end = query.end_time;
406 }
407
408 // Find points in this step
409 std::vector<time_point_data> step_points;
410 for (auto it = start_it; it != end_it; ++it) {
411 if (it->timestamp >= current_step_start && it->timestamp < current_step_end) {
412 step_points.push_back(*it);
413 result.total_samples += it->sample_count;
414 }
415 }
416
417 // Aggregate points in this step
418 if (!step_points.empty()) {
419 time_point_data aggregated_point;
420 aggregated_point.timestamp = current_step_start + query.step / 2; // Middle of step
421
422 for (const auto& point : step_points) {
423 aggregated_point.merge(point);
424 }
425
426 result.points.push_back(aggregated_point);
427 }
428
429 current_step_start = current_step_end;
430 }
431
432 return common::ok(std::move(result));
433 }
common::Result< aggregation_result > query(const time_series_query &query) const
Query data for a time range.

References data_, kcenon::monitoring::invalid_argument, kcenon::monitoring::time_point_data::merge(), mutex_, kcenon::monitoring::aggregation_result::points, query(), kcenon::monitoring::aggregation_result::query_end, kcenon::monitoring::aggregation_result::query_start, kcenon::monitoring::time_point_data::timestamp, kcenon::monitoring::error_info::to_common_error(), and kcenon::monitoring::aggregation_result::total_samples.

Referenced by query().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ size()

size_t kcenon::monitoring::time_series::size ( ) const
inline

Get current number of data points.

Definition at line 438 of file time_series.h.

438 {
439 std::lock_guard<std::mutex> lock(mutex_);
440 return data_.size();
441 }

References data_, and mutex_.

Member Data Documentation

◆ config_

time_series_config kcenon::monitoring::time_series::config_
private

◆ data_

std::vector<time_point_data> kcenon::monitoring::time_series::data_
private

◆ insertion_count_

size_t kcenon::monitoring::time_series::insertion_count_ = 0
private

Definition at line 215 of file time_series.h.

Referenced by add_point().

◆ mutex_

std::mutex kcenon::monitoring::time_series::mutex_
mutableprivate

◆ series_name_

std::string kcenon::monitoring::time_series::series_name_
private

Definition at line 214 of file time_series.h.

Referenced by memory_footprint(), and name().


The documentation for this class was generated from the following file: