Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
metric_storage.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
17#include "../core/error_codes.h"
18#include "metric_types.h"
19#include "time_series.h"
20#include "ring_buffer.h"
21#include <string>
22#include <memory>
23#include <mutex>
24#include <shared_mutex>
25#include <unordered_map>
26#include <atomic>
27#include <vector>
28#include <thread>
29
30namespace kcenon { namespace monitoring {
31
37 size_t ring_buffer_capacity = 8192; // Ring buffer capacity (power of 2)
38 size_t max_metrics = 10000; // Maximum number of unique metric series
39 bool enable_background_processing = true; // Enable background flushing
40 std::chrono::milliseconds flush_interval{1000}; // Background flush interval
41 size_t time_series_max_points = 3600; // Max points per time series
42 std::chrono::seconds retention_period{3600}; // Data retention period
43
47 common::VoidResult validate() const {
49 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
50 "Ring buffer capacity must be a power of 2").to_common_error());
51 }
52
53 if (max_metrics == 0) {
54 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
55 "Max metrics must be positive").to_common_error());
56 }
57
58 if (time_series_max_points == 0) {
59 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
60 "Time series max points must be positive").to_common_error());
61 }
62
63 if (retention_period.count() <= 0) {
64 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
65 "Retention period must be positive").to_common_error());
66 }
67
68 return common::ok();
69 }
70};
71
77 std::atomic<size_t> total_metrics_stored{0};
78 std::atomic<size_t> total_metrics_dropped{0};
79 std::atomic<size_t> active_metric_series{0};
80 std::atomic<size_t> flush_count{0};
81 std::atomic<size_t> failed_flushes{0};
82 std::chrono::system_clock::time_point creation_time;
83
84 metric_storage_stats() : creation_time(std::chrono::system_clock::now()) {}
85};
86
96private:
97 struct validated_tag {};
98
99 mutable std::shared_mutex mutex_;
102
103 // Ring buffer for incoming metrics
104 std::unique_ptr<ring_buffer<compact_metric_value>> incoming_buffer_;
105
106 // Time series storage for each metric
107 std::unordered_map<std::string, std::unique_ptr<time_series>> time_series_map_;
108
109 // Metric name to hash mapping for fast lookup
110 std::unordered_map<uint32_t, std::string> hash_to_name_;
111
112 // Background processing
113 std::atomic<bool> running_{false};
115
120 while (running_.load(std::memory_order_acquire)) {
121 std::this_thread::sleep_for(config_.flush_interval);
122 if (running_.load(std::memory_order_acquire)) {
123 flush();
124 }
125 }
126 }
127
131 time_series* get_or_create_series(const std::string& name) {
132 auto it = time_series_map_.find(name);
133 if (it != time_series_map_.end()) {
134 return it->second.get();
135 }
136
137 // Check if we're at capacity
138 if (time_series_map_.size() >= config_.max_metrics) {
139 return nullptr;
140 }
141
142 // Create new time series
143 time_series_config ts_config;
146
147 auto result = time_series::create(name, ts_config);
148 if (result.is_err()) {
149 return nullptr;
150 }
151
152 auto* ptr = result.value().get();
153 time_series_map_[name] = std::move(result.value());
154 stats_.active_metric_series.fetch_add(1, std::memory_order_relaxed);
155
156 // Store hash mapping
157 hash_to_name_[hash_metric_name(name)] = name;
158
159 return ptr;
160 }
161
163 ring_buffer_config rb_config;
165 rb_config.overwrite_old = true;
166 rb_config.batch_size = (std::min)(rb_config.capacity / 2, size_t(64));
167 if (rb_config.batch_size == 0) rb_config.batch_size = 1;
168 incoming_buffer_ = std::make_unique<ring_buffer<compact_metric_value>>(rb_config);
169
171 running_.store(true, std::memory_order_release);
173 }
174 }
175
176 // Private constructor for validated creation via create()
178 : config_(config) {
180 }
181
182public:
188 static common::Result<std::unique_ptr<metric_storage>> create(
189 const metric_storage_config& config = {}) {
190 auto validation = config.validate();
191 if (validation.is_err()) {
192 return common::Result<std::unique_ptr<metric_storage>>::err(
194 "Invalid metric storage configuration: " +
195 validation.error().message)
196 .to_common_error());
197 }
198 return common::ok(std::unique_ptr<metric_storage>(
199 new metric_storage(config, validated_tag{})));
200 }
201
208 explicit metric_storage(const metric_storage_config& config = {})
209 : config_(config) {
210
211 auto validation = config_.validate();
212 if (validation.is_err()) {
213 throw std::invalid_argument("Invalid metric storage configuration: " +
214 validation.error().message);
215 }
216
218 }
219
220 // metric_storage is non-copyable, non-moveable (has thread member)
223
228 if (running_.load(std::memory_order_acquire)) {
229 running_.store(false, std::memory_order_release);
230 if (background_thread_.joinable()) {
231 background_thread_.join();
232 }
233 }
234 }
235
243 common::VoidResult store_metric(const std::string& name, double value,
245 auto metadata = create_metric_metadata(name, type);
246 compact_metric_value metric(metadata, value);
247
248 auto result = incoming_buffer_->write(std::move(metric));
249 if (result.is_ok()) {
250 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
251
252 // Store name mapping
253 std::unique_lock<std::shared_mutex> lock(mutex_);
254 if (hash_to_name_.find(metadata.name_hash) == hash_to_name_.end()) {
255 hash_to_name_[metadata.name_hash] = name;
256 }
257 } else {
258 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
259 }
260
261 return result;
262 }
263
264
270 size_t store_metrics_batch(const metric_batch& batch) {
271 size_t stored = 0;
272
273 for (const auto& metric : batch.metrics) {
275 auto result = incoming_buffer_->write(std::move(copy));
276 if (result.is_ok()) {
277 stored++;
278 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
279 } else {
280 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
281 }
282 }
283
284 return stored;
285 }
286
290 void flush() {
291 std::vector<compact_metric_value> flushed_metrics;
292 flushed_metrics.reserve(config_.ring_buffer_capacity);
293
294 // Read all available metrics from buffer
295 incoming_buffer_->read_batch(flushed_metrics, config_.ring_buffer_capacity);
296
297 if (flushed_metrics.empty()) {
298 return;
299 }
300
301 std::unique_lock<std::shared_mutex> lock(mutex_);
302
303 for (auto& metric : flushed_metrics) {
304 // Find metric name from hash
305 auto name_it = hash_to_name_.find(metric.metadata.name_hash);
306 if (name_it == hash_to_name_.end()) {
307 continue;
308 }
309
310 const std::string& name = name_it->second;
311
312 // Get or create time series
313 auto* series = get_or_create_series(name);
314 if (series == nullptr) {
315 stats_.failed_flushes.fetch_add(1, std::memory_order_relaxed);
316 continue;
317 }
318
319 // Add data point to time series
320 series->add_point(metric.as_double(), metric.get_timestamp());
321 }
322
323 stats_.flush_count.fetch_add(1, std::memory_order_relaxed);
324 }
325
331 common::Result<double> get_latest_value(const std::string& name) const {
332 std::shared_lock<std::shared_mutex> lock(mutex_);
333
334 auto it = time_series_map_.find(name);
335 if (it == time_series_map_.end()) {
336 return common::Result<double>::err(error_info(monitoring_error_code::collection_failed,
337 "Metric not found: " + name, "monitoring_system").to_common_error());
338 }
339
340 return it->second->get_latest_value();
341 }
342
347 std::vector<std::string> get_metric_names() const {
348 std::shared_lock<std::shared_mutex> lock(mutex_);
349
350 std::vector<std::string> names;
351 names.reserve(time_series_map_.size());
352
353 for (const auto& pair : time_series_map_) {
354 names.push_back(pair.first);
355 }
356
357 return names;
358 }
359
366 common::Result<aggregation_result> query_metric(const std::string& name,
367 const time_series_query& query) const {
368 std::shared_lock<std::shared_mutex> lock(mutex_);
369
370 auto it = time_series_map_.find(name);
371 if (it == time_series_map_.end()) {
372 return common::Result<aggregation_result>::err(error_info(monitoring_error_code::collection_failed,
373 "Metric not found: " + name, "monitoring_system").to_common_error());
374 }
375
376 return it->second->query(query);
377 }
378
382 const metric_storage_stats& get_stats() const noexcept {
383 return stats_;
384 }
385
389 const metric_storage_config& get_config() const noexcept {
390 return config_;
391 }
392
396 void clear() {
397 std::unique_lock<std::shared_mutex> lock(mutex_);
398 incoming_buffer_->clear();
399 time_series_map_.clear();
400 hash_to_name_.clear();
401 stats_.active_metric_series.store(0, std::memory_order_relaxed);
402 }
403
407 size_t series_count() const {
408 std::shared_lock<std::shared_mutex> lock(mutex_);
409 return time_series_map_.size();
410 }
411
415 size_t memory_footprint() const {
416 std::shared_lock<std::shared_mutex> lock(mutex_);
417
418 size_t total = sizeof(metric_storage);
420
421 for (const auto& pair : time_series_map_) {
422 total += pair.first.capacity();
423 total += pair.second->memory_footprint();
424 }
425
426 return total;
427 }
428};
429
430} } // namespace kcenon::monitoring
Thread-safe metric storage with ring buffer buffering.
void background_processor()
Background processing loop.
size_t series_count() const
Get number of active metric series.
common::VoidResult store_metric(const std::string &name, double value, metric_type type=metric_type::gauge)
Store a single metric value.
std::vector< std::string > get_metric_names() const
Get all metric names.
void flush()
Flush buffered metrics to time series.
size_t store_metrics_batch(const metric_batch &batch)
Store a batch of metrics.
metric_storage(const metric_storage &)=delete
time_series * get_or_create_series(const std::string &name)
Get or create time series for a metric.
size_t memory_footprint() const
Get memory footprint estimate.
metric_storage(const metric_storage_config &config, validated_tag)
metric_storage(const metric_storage_config &config={})
Constructor with configuration.
common::Result< aggregation_result > query_metric(const std::string &name, const time_series_query &query) const
Query metric data.
std::unordered_map< uint32_t, std::string > hash_to_name_
const metric_storage_stats & get_stats() const noexcept
Get storage statistics.
common::Result< double > get_latest_value(const std::string &name) const
Get the latest value for a metric.
void clear()
Clear all stored metrics.
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
const metric_storage_config & get_config() const noexcept
Get configuration.
static common::Result< std::unique_ptr< metric_storage > > create(const metric_storage_config &config={})
Create a metric storage with validated configuration.
metric_storage & operator=(const metric_storage &)=delete
Thread-safe time series data storage.
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
Monitoring system specific error codes.
Common metric type definitions for efficient storage.
metric_type
Metric types supported by exporters.
@ gauge
Instantaneous value that can go up and down.
uint32_t hash_metric_name(const std::string &name) noexcept
Hash function for metric names.
metric_metadata create_metric_metadata(const std::string &name, metric_type type, size_t tag_count=0)
Create metric metadata from name and type.
Result pattern type definitions for monitoring system.
Lock-free ring buffer for efficient metric storage.
Memory-efficient metric value storage.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Batch of metrics for efficient processing.
std::vector< compact_metric_value > metrics
Configuration for metric storage.
std::chrono::milliseconds flush_interval
common::VoidResult validate() const
Validate configuration.
Statistics for metric storage performance.
std::chrono::system_clock::time_point creation_time
Basic metric structure for interface compatibility.
Configuration for ring buffer behavior.
Definition ring_buffer.h:36
Configuration for time series storage.
Definition time_series.h:34
std::chrono::seconds retention_period
Definition time_series.h:35
Query parameters for time series data.
Time-series data storage for efficient metric history.