Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
metric_storage.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
17#include "../core/error_codes.h"
18#include "metric_types.h"
19#include "time_series.h"
20#include "ring_buffer.h"
21#include <string>
22#include <memory>
23#include <mutex>
24#include <shared_mutex>
25#include <unordered_map>
26#include <atomic>
27#include <vector>
28#include <thread>
29
30namespace kcenon { namespace monitoring {
31
37 size_t ring_buffer_capacity = 8192; // Ring buffer capacity (power of 2)
38 size_t max_metrics = 10000; // Maximum number of unique metric series
39 bool enable_background_processing = true; // Enable background flushing
40 std::chrono::milliseconds flush_interval{1000}; // Background flush interval
41 size_t time_series_max_points = 3600; // Max points per time series
42 std::chrono::seconds retention_period{3600}; // Data retention period
43
47 common::VoidResult validate() const {
49 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
50 "Ring buffer capacity must be a power of 2").to_common_error());
51 }
52
53 if (max_metrics == 0) {
54 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
55 "Max metrics must be positive").to_common_error());
56 }
57
58 if (time_series_max_points == 0) {
59 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
60 "Time series max points must be positive").to_common_error());
61 }
62
63 if (retention_period.count() <= 0) {
64 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
65 "Retention period must be positive").to_common_error());
66 }
67
68 return common::ok();
69 }
70};
71
77 std::atomic<size_t> total_metrics_stored{0};
78 std::atomic<size_t> total_metrics_dropped{0};
79 std::atomic<size_t> active_metric_series{0};
80 std::atomic<size_t> flush_count{0};
81 std::atomic<size_t> failed_flushes{0};
82 std::chrono::system_clock::time_point creation_time;
83
84 metric_storage_stats() : creation_time(std::chrono::system_clock::now()) {}
85};
86
96private:
97 mutable std::shared_mutex mutex_;
100
101 // Ring buffer for incoming metrics
102 std::unique_ptr<ring_buffer<compact_metric_value>> incoming_buffer_;
103
104 // Time series storage for each metric
105 std::unordered_map<std::string, std::unique_ptr<time_series>> time_series_map_;
106
107 // Metric name to hash mapping for fast lookup
108 std::unordered_map<uint32_t, std::string> hash_to_name_;
109
110 // Background processing
111 std::atomic<bool> running_{false};
113
118 while (running_.load(std::memory_order_acquire)) {
119 std::this_thread::sleep_for(config_.flush_interval);
120 if (running_.load(std::memory_order_acquire)) {
121 flush();
122 }
123 }
124 }
125
129 time_series* get_or_create_series(const std::string& name) {
130 auto it = time_series_map_.find(name);
131 if (it != time_series_map_.end()) {
132 return it->second.get();
133 }
134
135 // Check if we're at capacity
136 if (time_series_map_.size() >= config_.max_metrics) {
137 return nullptr;
138 }
139
140 // Create new time series
141 time_series_config ts_config;
144
145 auto result = time_series::create(name, ts_config);
146 if (result.is_err()) {
147 return nullptr;
148 }
149
150 auto* ptr = result.value().get();
151 time_series_map_[name] = std::move(result.value());
152 stats_.active_metric_series.fetch_add(1, std::memory_order_relaxed);
153
154 // Store hash mapping
155 hash_to_name_[hash_metric_name(name)] = name;
156
157 return ptr;
158 }
159
160public:
166 explicit metric_storage(const metric_storage_config& config = {})
167 : config_(config) {
168
169 auto validation = config_.validate();
170 if (validation.is_err()) {
171 throw std::invalid_argument("Invalid metric storage configuration: " +
172 validation.error().message);
173 }
174
175 // Initialize ring buffer
176 ring_buffer_config rb_config;
178 rb_config.overwrite_old = true;
179 rb_config.batch_size = (std::min)(rb_config.capacity / 2, size_t(64));
180 if (rb_config.batch_size == 0) rb_config.batch_size = 1;
181 incoming_buffer_ = std::make_unique<ring_buffer<compact_metric_value>>(rb_config);
182
183 // Start background processing if enabled
185 running_.store(true, std::memory_order_release);
186 background_thread_ = std::thread(&metric_storage::background_processor, this);
187 }
188 }
189
194 if (running_.load(std::memory_order_acquire)) {
195 running_.store(false, std::memory_order_release);
196 if (background_thread_.joinable()) {
197 background_thread_.join();
198 }
199 }
200 }
201
202 // Non-copyable
205
213 common::VoidResult store_metric(const std::string& name, double value,
215 auto metadata = create_metric_metadata(name, type);
216 compact_metric_value metric(metadata, value);
217
218 auto result = incoming_buffer_->write(std::move(metric));
219 if (result.is_ok()) {
220 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
221
222 // Store name mapping
223 std::unique_lock<std::shared_mutex> lock(mutex_);
224 if (hash_to_name_.find(metadata.name_hash) == hash_to_name_.end()) {
225 hash_to_name_[metadata.name_hash] = name;
226 }
227 } else {
228 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
229 }
230
231 return result;
232 }
233
234
240 size_t store_metrics_batch(const metric_batch& batch) {
241 size_t stored = 0;
242
243 for (const auto& metric : batch.metrics) {
245 auto result = incoming_buffer_->write(std::move(copy));
246 if (result.is_ok()) {
247 stored++;
248 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
249 } else {
250 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
251 }
252 }
253
254 return stored;
255 }
256
260 void flush() {
261 std::vector<compact_metric_value> flushed_metrics;
262 flushed_metrics.reserve(config_.ring_buffer_capacity);
263
264 // Read all available metrics from buffer
265 incoming_buffer_->read_batch(flushed_metrics, config_.ring_buffer_capacity);
266
267 if (flushed_metrics.empty()) {
268 return;
269 }
270
271 std::unique_lock<std::shared_mutex> lock(mutex_);
272
273 for (auto& metric : flushed_metrics) {
274 // Find metric name from hash
275 auto name_it = hash_to_name_.find(metric.metadata.name_hash);
276 if (name_it == hash_to_name_.end()) {
277 continue;
278 }
279
280 const std::string& name = name_it->second;
281
282 // Get or create time series
283 auto* series = get_or_create_series(name);
284 if (series == nullptr) {
285 stats_.failed_flushes.fetch_add(1, std::memory_order_relaxed);
286 continue;
287 }
288
289 // Add data point to time series
290 series->add_point(metric.as_double(), metric.get_timestamp());
291 }
292
293 stats_.flush_count.fetch_add(1, std::memory_order_relaxed);
294 }
295
301 common::Result<double> get_latest_value(const std::string& name) const {
302 std::shared_lock<std::shared_mutex> lock(mutex_);
303
304 auto it = time_series_map_.find(name);
305 if (it == time_series_map_.end()) {
306 return common::Result<double>::err(error_info(monitoring_error_code::collection_failed,
307 "Metric not found: " + name, "monitoring_system").to_common_error());
308 }
309
310 return it->second->get_latest_value();
311 }
312
317 std::vector<std::string> get_metric_names() const {
318 std::shared_lock<std::shared_mutex> lock(mutex_);
319
320 std::vector<std::string> names;
321 names.reserve(time_series_map_.size());
322
323 for (const auto& pair : time_series_map_) {
324 names.push_back(pair.first);
325 }
326
327 return names;
328 }
329
336 common::Result<aggregation_result> query_metric(const std::string& name,
337 const time_series_query& query) const {
338 std::shared_lock<std::shared_mutex> lock(mutex_);
339
340 auto it = time_series_map_.find(name);
341 if (it == time_series_map_.end()) {
342 return common::Result<aggregation_result>::err(error_info(monitoring_error_code::collection_failed,
343 "Metric not found: " + name, "monitoring_system").to_common_error());
344 }
345
346 return it->second->query(query);
347 }
348
352 const metric_storage_stats& get_stats() const noexcept {
353 return stats_;
354 }
355
359 const metric_storage_config& get_config() const noexcept {
360 return config_;
361 }
362
366 void clear() {
367 std::unique_lock<std::shared_mutex> lock(mutex_);
368 incoming_buffer_->clear();
369 time_series_map_.clear();
370 hash_to_name_.clear();
371 stats_.active_metric_series.store(0, std::memory_order_relaxed);
372 }
373
377 size_t series_count() const {
378 std::shared_lock<std::shared_mutex> lock(mutex_);
379 return time_series_map_.size();
380 }
381
385 size_t memory_footprint() const {
386 std::shared_lock<std::shared_mutex> lock(mutex_);
387
388 size_t total = sizeof(metric_storage);
390
391 for (const auto& pair : time_series_map_) {
392 total += pair.first.capacity();
393 total += pair.second->memory_footprint();
394 }
395
396 return total;
397 }
398};
399
400} } // namespace kcenon::monitoring
Thread-safe metric storage with ring buffer buffering.
void background_processor()
Background processing loop.
size_t series_count() const
Get number of active metric series.
common::VoidResult store_metric(const std::string &name, double value, metric_type type=metric_type::gauge)
Store a single metric value.
std::vector< std::string > get_metric_names() const
Get all metric names.
void flush()
Flush buffered metrics to time series.
size_t store_metrics_batch(const metric_batch &batch)
Store a batch of metrics.
metric_storage(const metric_storage &)=delete
time_series * get_or_create_series(const std::string &name)
Get or create time series for a metric.
size_t memory_footprint() const
Get memory footprint estimate.
metric_storage(const metric_storage_config &config={})
Constructor with configuration.
common::Result< aggregation_result > query_metric(const std::string &name, const time_series_query &query) const
Query metric data.
std::unordered_map< uint32_t, std::string > hash_to_name_
const metric_storage_stats & get_stats() const noexcept
Get storage statistics.
common::Result< double > get_latest_value(const std::string &name) const
Get the latest value for a metric.
void clear()
Clear all stored metrics.
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
const metric_storage_config & get_config() const noexcept
Get configuration.
metric_storage & operator=(const metric_storage &)=delete
Lock-free ring buffer with atomic operations.
Thread-safe time series data storage.
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
Monitoring system specific error codes.
Common metric type definitions for efficient storage.
metric_type
Metric types supported by exporters.
@ gauge
Instantaneous value that can go up and down.
uint32_t hash_metric_name(const std::string &name) noexcept
Hash function for metric names.
metric_metadata create_metric_metadata(const std::string &name, metric_type type, size_t tag_count=0)
Create metric metadata from name and type.
Result pattern type definitions for monitoring system.
Lock-free ring buffer for efficient metric storage.
Memory-efficient metric value storage.
Extended error information with context.
Batch of metrics for efficient processing.
std::vector< compact_metric_value > metrics
Configuration for metric storage.
std::chrono::milliseconds flush_interval
common::VoidResult validate() const
Validate configuration.
Statistics for metric storage performance.
std::chrono::system_clock::time_point creation_time
Basic metric structure for interface compatibility.
Configuration for ring buffer behavior.
Definition ring_buffer.h:36
Configuration for time series storage.
Definition time_series.h:34
std::chrono::seconds retention_period
Definition time_series.h:35
Query parameters for time series data.
Time-series data storage for efficient metric history.