28namespace kcenon {
namespace monitoring {
47 "Retention period must be positive");
53 "Resolution must be positive");
59 "Max points must be positive");
78 time_point_data(std::chrono::system_clock::time_point ts,
double val, uint32_t count = 1) noexcept
87 }
else if (
other.sample_count > 0) {
102 bool is_valid(std::chrono::system_clock::time_point cutoff)
const noexcept {
114 std::chrono::milliseconds
step{1000};
117 auto now = std::chrono::system_clock::now();
123 std::chrono::system_clock::time_point end,
124 std::chrono::milliseconds step_size = std::chrono::milliseconds(1000))
133 "Start time must be before end time");
137 if (
step.count() <= 0) {
139 "Step size must be positive");
164 for (
const auto& point :
points) {
165 summary.add_sample(point.value);
174 if (
points.empty())
return 0.0;
177 uint64_t total_weight = 0;
179 for (
const auto& point :
points) {
180 sum += point.value * point.sample_count;
181 total_weight += point.sample_count;
184 return total_weight > 0 ? sum / total_weight : 0.0;
191 if (
points.size() < 2)
return 0.0;
193 const auto& first =
points.front();
194 const auto& last =
points.back();
196 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
197 last.timestamp - first.timestamp);
199 if (duration.count() <= 0)
return 0.0;
201 return (last.value - first.value) / duration.count();
223 auto it = std::remove_if(
data_.begin(),
data_.end(),
225 return !point.is_valid(cutoff);
240 std::vector<time_point_data> compressed;
241 compressed.reserve(
data_.size());
243 if (!
data_.empty()) {
244 compressed.push_back(
data_[0]);
246 for (
size_t i = 1; i <
data_.size() - 1; ++i) {
247 const auto& prev =
data_[i - 1];
248 const auto& curr =
data_[i];
249 const auto& next =
data_[i + 1];
252 double expected = prev.value + (next.value - prev.value) *
253 (std::chrono::duration<double>(curr.timestamp - prev.timestamp).count() /
254 std::chrono::duration<double>(next.timestamp - prev.timestamp).count());
257 compressed.push_back(curr);
261 if (
data_.size() > 1) {
262 compressed.push_back(
data_.back());
266 data_ = std::move(compressed);
291 static common::Result<std::unique_ptr<time_series>>
create(
292 const std::string&
name,
295 auto validation = config.validate();
296 if (validation.is_err()) {
297 return common::Result<std::unique_ptr<time_series>>::err(
302 return common::ok(std::unique_ptr<time_series>(
new time_series(
name, config)));
309 std::chrono::system_clock::time_point timestamp =
310 std::chrono::system_clock::now()) {
311 std::lock_guard<std::mutex> lock(
mutex_);
317 data_.push_back(point);
320 auto it = std::upper_bound(
data_.begin(),
data_.end(), point,
322 return a.timestamp < b.timestamp;
325 data_.insert(it, point);
342 common::VoidResult
add_points(
const std::vector<time_point_data>& points) {
343 std::lock_guard<std::mutex> lock(
mutex_);
345 for (
const auto& point : points) {
347 if (
data_.empty() || point.timestamp >=
data_.back().timestamp) {
348 data_.push_back(point);
350 auto it = std::upper_bound(
data_.begin(),
data_.end(), point,
352 return a.timestamp < b.timestamp;
355 data_.insert(it, point);
371 auto validation =
query.validate();
372 if (validation.is_err()) {
373 return common::Result<aggregation_result>::err(
378 std::lock_guard<std::mutex> lock(
mutex_);
385 auto start_it = std::lower_bound(
data_.begin(),
data_.end(),
query.start_time,
386 [](
const time_point_data& point, std::chrono::system_clock::time_point time) {
387 return point.timestamp < time;
390 auto end_it = std::upper_bound(
data_.begin(),
data_.end(),
query.end_time,
391 [](std::chrono::system_clock::time_point time,
const time_point_data& point) {
392 return time < point.timestamp;
395 if (start_it == end_it) {
396 return common::ok(std::move(result));
400 auto current_step_start =
query.start_time;
402 while (current_step_start <
query.end_time) {
403 auto current_step_end = current_step_start +
query.step;
404 if (current_step_end >
query.end_time) {
405 current_step_end =
query.end_time;
409 std::vector<time_point_data> step_points;
410 for (
auto it = start_it; it != end_it; ++it) {
411 if (it->timestamp >= current_step_start && it->timestamp < current_step_end) {
412 step_points.push_back(*it);
418 if (!step_points.empty()) {
422 for (
const auto& point : step_points) {
423 aggregated_point.
merge(point);
426 result.
points.push_back(aggregated_point);
429 current_step_start = current_step_end;
432 return common::ok(std::move(result));
439 std::lock_guard<std::mutex> lock(
mutex_);
447 std::lock_guard<std::mutex> lock(
mutex_);
448 return data_.empty();
454 const std::string&
name() const noexcept {
469 std::lock_guard<std::mutex> lock(
mutex_);
477 std::lock_guard<std::mutex> lock(
mutex_);
480 return common::Result<double>::err(
482 "No data available",
"monitoring_system").to_common_error());
485 return common::ok(
data_.back().value);
492 std::lock_guard<std::mutex> lock(
mutex_);
Thread-safe time series data storage.
common::Result< double > get_latest_value() const
Get latest value.
common::VoidResult add_point(double value, std::chrono::system_clock::time_point timestamp=std::chrono::system_clock::now())
Add a data point.
const time_series_config & get_config() const noexcept
Get configuration.
time_series(const std::string &name, const time_series_config &config)
Private constructor (use create() factory method)
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
void compress_data()
Compress data if enabled.
common::VoidResult add_points(const std::vector< time_point_data > &points)
Add multiple data points.
void clear()
Clear all data.
const std::string & name() const noexcept
Get series name.
bool empty() const
Check if series is empty.
size_t size() const
Get current number of data points.
std::vector< time_point_data > data_
common::Result< aggregation_result > query(const time_series_query &query) const
Query data for a time range.
size_t memory_footprint() const
Get memory footprint in bytes.
void enforce_size_limit()
Ensure data size doesn't exceed maximum.
void cleanup_old_data()
Cleanup old data points.
time_series_config config_
Monitoring system specific error codes.
Common metric type definitions for efficient storage.
@ summary
Pre-calculated quantiles and count/sum.
Result pattern type definitions for monitoring system.
Lock-free ring buffer for efficient metric storage.
Result of time series aggregation.
std::chrono::system_clock::time_point query_end
summary_data get_summary() const
Get statistics for the aggregated data.
double get_average() const
Get average value over the time period.
std::vector< time_point_data > points
std::chrono::system_clock::time_point query_start
double get_rate() const
Get rate of change (per second)
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Summary statistics for metrics.
Single data point in time series.
bool is_valid(std::chrono::system_clock::time_point cutoff) const noexcept
Check if this point is within retention period.
time_point_data() noexcept
std::chrono::system_clock::time_point timestamp
void merge(const time_point_data &other)
Merge another data point (for aggregation)
time_point_data(std::chrono::system_clock::time_point ts, double val, uint32_t count=1) noexcept
Configuration for time series storage.
std::chrono::milliseconds resolution
common::VoidResult validate() const
Validate configuration.
std::chrono::seconds retention_period
double compression_threshold
Query parameters for time series data.
std::chrono::milliseconds step
std::chrono::system_clock::time_point start_time
time_series_query(std::chrono::system_clock::time_point start, std::chrono::system_clock::time_point end, std::chrono::milliseconds step_size=std::chrono::milliseconds(1000))
common::VoidResult validate() const
Validate query parameters.
std::chrono::system_clock::time_point end_time