Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::metric_storage Class Reference

Thread-safe metric storage with ring buffer buffering. More...

#include <metric_storage.h>

Collaboration diagram for kcenon::monitoring::metric_storage:
Collaboration graph

Classes

struct  validated_tag
 

Public Member Functions

 metric_storage (const metric_storage_config &config={})
 Constructor with configuration.
 
 metric_storage (const metric_storage &)=delete
 
metric_storageoperator= (const metric_storage &)=delete
 
 ~metric_storage ()
 Destructor.
 
common::VoidResult store_metric (const std::string &name, double value, metric_type type=metric_type::gauge)
 Store a single metric value.
 
size_t store_metrics_batch (const metric_batch &batch)
 Store a batch of metrics.
 
void flush ()
 Flush buffered metrics to time series.
 
common::Result< double > get_latest_value (const std::string &name) const
 Get the latest value for a metric.
 
std::vector< std::string > get_metric_names () const
 Get all metric names.
 
common::Result< aggregation_resultquery_metric (const std::string &name, const time_series_query &query) const
 Query metric data.
 
const metric_storage_statsget_stats () const noexcept
 Get storage statistics.
 
const metric_storage_configget_config () const noexcept
 Get configuration.
 
void clear ()
 Clear all stored metrics.
 
size_t series_count () const
 Get number of active metric series.
 
size_t memory_footprint () const
 Get memory footprint estimate.
 

Static Public Member Functions

static common::Result< std::unique_ptr< metric_storage > > create (const metric_storage_config &config={})
 Create a metric storage with validated configuration.
 

Private Member Functions

void background_processor ()
 Background processing loop.
 
time_seriesget_or_create_series (const std::string &name)
 Get or create time series for a metric.
 
void init_internals ()
 
 metric_storage (const metric_storage_config &config, validated_tag)
 

Private Attributes

std::shared_mutex mutex_
 
metric_storage_config config_
 
metric_storage_stats stats_
 
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
 
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
 
std::unordered_map< uint32_t, std::string > hash_to_name_
 
std::atomic< bool > running_ {false}
 
std::thread background_thread_
 

Detailed Description

Thread-safe metric storage with ring buffer buffering.

Provides efficient metric storage using ring buffers for incoming data and time series for historical queries. Supports background processing for automatic flushing.

Definition at line 95 of file metric_storage.h.

Constructor & Destructor Documentation

◆ metric_storage() [1/3]

kcenon::monitoring::metric_storage::metric_storage ( const metric_storage_config & config,
validated_tag  )
inlineprivate

Definition at line 177 of file metric_storage.h.

178 : config_(config) {
180 }

References init_internals().

Referenced by memory_footprint().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ metric_storage() [2/3]

kcenon::monitoring::metric_storage::metric_storage ( const metric_storage_config & config = {})
inlineexplicit

Constructor with configuration.

Parameters
configMetric storage configuration options
Exceptions
std::invalid_argumentif configuration validation fails
Deprecated
Use create() for Result-based error handling

Definition at line 208 of file metric_storage.h.

208 {})
209 : config_(config) {
210
211 auto validation = config_.validate();
212 if (validation.is_err()) {
213 throw std::invalid_argument("Invalid metric storage configuration: " +
214 validation.error().message);
215 }
216
218 }
common::VoidResult validate() const
Validate configuration.

◆ metric_storage() [3/3]

kcenon::monitoring::metric_storage::metric_storage ( const metric_storage & )
delete

◆ ~metric_storage()

kcenon::monitoring::metric_storage::~metric_storage ( )
inline

Destructor.

Definition at line 227 of file metric_storage.h.

227 {
228 if (running_.load(std::memory_order_acquire)) {
229 running_.store(false, std::memory_order_release);
230 if (background_thread_.joinable()) {
231 background_thread_.join();
232 }
233 }
234 }

References background_thread_, and running_.

Member Function Documentation

◆ background_processor()

void kcenon::monitoring::metric_storage::background_processor ( )
inlineprivate

Background processing loop.

Definition at line 119 of file metric_storage.h.

119 {
120 while (running_.load(std::memory_order_acquire)) {
121 std::this_thread::sleep_for(config_.flush_interval);
122 if (running_.load(std::memory_order_acquire)) {
123 flush();
124 }
125 }
126 }
void flush()
Flush buffered metrics to time series.
std::chrono::milliseconds flush_interval

References config_, flush(), kcenon::monitoring::metric_storage_config::flush_interval, and running_.

Referenced by init_internals().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear()

void kcenon::monitoring::metric_storage::clear ( )
inline

Clear all stored metrics.

Definition at line 396 of file metric_storage.h.

396 {
397 std::unique_lock<std::shared_mutex> lock(mutex_);
398 incoming_buffer_->clear();
399 time_series_map_.clear();
400 hash_to_name_.clear();
401 stats_.active_metric_series.store(0, std::memory_order_relaxed);
402 }
std::unordered_map< uint32_t, std::string > hash_to_name_
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_

References kcenon::monitoring::metric_storage_stats::active_metric_series, hash_to_name_, incoming_buffer_, mutex_, stats_, and time_series_map_.

◆ create()

static common::Result< std::unique_ptr< metric_storage > > kcenon::monitoring::metric_storage::create ( const metric_storage_config & config = {})
inlinestatic

Create a metric storage with validated configuration.

Parameters
configMetric storage configuration options
Returns
Result containing the metric storage or error

Definition at line 188 of file metric_storage.h.

189 {}) {
190 auto validation = config.validate();
191 if (validation.is_err()) {
192 return common::Result<std::unique_ptr<metric_storage>>::err(
194 "Invalid metric storage configuration: " +
195 validation.error().message)
196 .to_common_error());
197 }
198 return common::ok(std::unique_ptr<metric_storage>(
199 new metric_storage(config, validated_tag{})));
200 }
metric_storage(const metric_storage_config &config, validated_tag)

Referenced by TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ flush()

void kcenon::monitoring::metric_storage::flush ( )
inline

Flush buffered metrics to time series.

Definition at line 290 of file metric_storage.h.

290 {
291 std::vector<compact_metric_value> flushed_metrics;
292 flushed_metrics.reserve(config_.ring_buffer_capacity);
293
294 // Read all available metrics from buffer
295 incoming_buffer_->read_batch(flushed_metrics, config_.ring_buffer_capacity);
296
297 if (flushed_metrics.empty()) {
298 return;
299 }
300
301 std::unique_lock<std::shared_mutex> lock(mutex_);
302
303 for (auto& metric : flushed_metrics) {
304 // Find metric name from hash
305 auto name_it = hash_to_name_.find(metric.metadata.name_hash);
306 if (name_it == hash_to_name_.end()) {
307 continue;
308 }
309
310 const std::string& name = name_it->second;
311
312 // Get or create time series
313 auto* series = get_or_create_series(name);
314 if (series == nullptr) {
315 stats_.failed_flushes.fetch_add(1, std::memory_order_relaxed);
316 continue;
317 }
318
319 // Add data point to time series
320 series->add_point(metric.as_double(), metric.get_timestamp());
321 }
322
323 stats_.flush_count.fetch_add(1, std::memory_order_relaxed);
324 }
time_series * get_or_create_series(const std::string &name)
Get or create time series for a metric.

References config_, kcenon::monitoring::metric_storage_stats::failed_flushes, kcenon::monitoring::metric_storage_stats::flush_count, get_or_create_series(), hash_to_name_, incoming_buffer_, mutex_, kcenon::monitoring::metric_storage_config::ring_buffer_capacity, and stats_.

Referenced by background_processor(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_config()

const metric_storage_config & kcenon::monitoring::metric_storage::get_config ( ) const
inlinenoexcept

Get configuration.

Definition at line 389 of file metric_storage.h.

389 {
390 return config_;
391 }

References config_.

◆ get_latest_value()

common::Result< double > kcenon::monitoring::metric_storage::get_latest_value ( const std::string & name) const
inline

Get the latest value for a metric.

Parameters
nameMetric name
Returns
Optional containing the latest value if available

Definition at line 331 of file metric_storage.h.

331 {
332 std::shared_lock<std::shared_mutex> lock(mutex_);
333
334 auto it = time_series_map_.find(name);
335 if (it == time_series_map_.end()) {
336 return common::Result<double>::err(error_info(monitoring_error_code::collection_failed,
337 "Metric not found: " + name, "monitoring_system").to_common_error());
338 }
339
340 return it->second->get_latest_value();
341 }

References kcenon::monitoring::collection_failed, mutex_, and time_series_map_.

◆ get_metric_names()

std::vector< std::string > kcenon::monitoring::metric_storage::get_metric_names ( ) const
inline

Get all metric names.

Returns
Vector of metric names

Definition at line 347 of file metric_storage.h.

347 {
348 std::shared_lock<std::shared_mutex> lock(mutex_);
349
350 std::vector<std::string> names;
351 names.reserve(time_series_map_.size());
352
353 for (const auto& pair : time_series_map_) {
354 names.push_back(pair.first);
355 }
356
357 return names;
358 }

References mutex_, and time_series_map_.

◆ get_or_create_series()

time_series * kcenon::monitoring::metric_storage::get_or_create_series ( const std::string & name)
inlineprivate

Get or create time series for a metric.

Definition at line 131 of file metric_storage.h.

131 {
132 auto it = time_series_map_.find(name);
133 if (it != time_series_map_.end()) {
134 return it->second.get();
135 }
136
137 // Check if we're at capacity
138 if (time_series_map_.size() >= config_.max_metrics) {
139 return nullptr;
140 }
141
142 // Create new time series
143 time_series_config ts_config;
144 ts_config.max_points = config_.time_series_max_points;
145 ts_config.retention_period = config_.retention_period;
146
147 auto result = time_series::create(name, ts_config);
148 if (result.is_err()) {
149 return nullptr;
150 }
151
152 auto* ptr = result.value().get();
153 time_series_map_[name] = std::move(result.value());
154 stats_.active_metric_series.fetch_add(1, std::memory_order_relaxed);
155
156 // Store hash mapping
157 hash_to_name_[hash_metric_name(name)] = name;
158
159 return ptr;
160 }
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
uint32_t hash_metric_name(const std::string &name) noexcept
Hash function for metric names.

References kcenon::monitoring::metric_storage_stats::active_metric_series, config_, kcenon::monitoring::time_series::create(), kcenon::monitoring::hash_metric_name(), hash_to_name_, kcenon::monitoring::metric_storage_config::max_metrics, kcenon::monitoring::time_series_config::max_points, kcenon::monitoring::metric_storage_config::retention_period, kcenon::monitoring::time_series_config::retention_period, stats_, time_series_map_, and kcenon::monitoring::metric_storage_config::time_series_max_points.

Referenced by flush().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_stats()

const metric_storage_stats & kcenon::monitoring::metric_storage::get_stats ( ) const
inlinenoexcept

Get storage statistics.

Definition at line 382 of file metric_storage.h.

382 {
383 return stats_;
384 }

References stats_.

◆ init_internals()

void kcenon::monitoring::metric_storage::init_internals ( )
inlineprivate

Definition at line 162 of file metric_storage.h.

162 {
163 ring_buffer_config rb_config;
164 rb_config.capacity = config_.ring_buffer_capacity;
165 rb_config.overwrite_old = true;
166 rb_config.batch_size = (std::min)(rb_config.capacity / 2, size_t(64));
167 if (rb_config.batch_size == 0) rb_config.batch_size = 1;
168 incoming_buffer_ = std::make_unique<ring_buffer<compact_metric_value>>(rb_config);
169
171 running_.store(true, std::memory_order_release);
173 }
174 }
void background_processor()
Background processing loop.

References background_processor(), background_thread_, kcenon::monitoring::ring_buffer_config::batch_size, kcenon::monitoring::ring_buffer_config::capacity, config_, kcenon::monitoring::metric_storage_config::enable_background_processing, incoming_buffer_, kcenon::monitoring::ring_buffer_config::overwrite_old, kcenon::monitoring::metric_storage_config::ring_buffer_capacity, and running_.

Referenced by metric_storage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ memory_footprint()

size_t kcenon::monitoring::metric_storage::memory_footprint ( ) const
inline

Get memory footprint estimate.

Definition at line 415 of file metric_storage.h.

415 {
416 std::shared_lock<std::shared_mutex> lock(mutex_);
417
418 size_t total = sizeof(metric_storage);
419 total += config_.ring_buffer_capacity * sizeof(compact_metric_value);
420
421 for (const auto& pair : time_series_map_) {
422 total += pair.first.capacity();
423 total += pair.second->memory_footprint();
424 }
425
426 return total;
427 }

References config_, metric_storage(), mutex_, kcenon::monitoring::metric_storage_config::ring_buffer_capacity, and time_series_map_.

Here is the call graph for this function:

◆ operator=()

metric_storage & kcenon::monitoring::metric_storage::operator= ( const metric_storage & )
delete

◆ query_metric()

common::Result< aggregation_result > kcenon::monitoring::metric_storage::query_metric ( const std::string & name,
const time_series_query & query ) const
inline

Query metric data.

Parameters
nameMetric name
queryQuery parameters
Returns
Aggregation result

Definition at line 366 of file metric_storage.h.

367 {
368 std::shared_lock<std::shared_mutex> lock(mutex_);
369
370 auto it = time_series_map_.find(name);
371 if (it == time_series_map_.end()) {
372 return common::Result<aggregation_result>::err(error_info(monitoring_error_code::collection_failed,
373 "Metric not found: " + name, "monitoring_system").to_common_error());
374 }
375
376 return it->second->query(query);
377 }

References kcenon::monitoring::collection_failed, mutex_, and time_series_map_.

◆ series_count()

size_t kcenon::monitoring::metric_storage::series_count ( ) const
inline

Get number of active metric series.

Definition at line 407 of file metric_storage.h.

407 {
408 std::shared_lock<std::shared_mutex> lock(mutex_);
409 return time_series_map_.size();
410 }

References mutex_, and time_series_map_.

◆ store_metric()

common::VoidResult kcenon::monitoring::metric_storage::store_metric ( const std::string & name,
double value,
metric_type type = metric_type::gauge )
inline

Store a single metric value.

Parameters
nameMetric name
valueMetric value
typeMetric type (default: gauge)
Returns
Result indicating success or failure

Definition at line 243 of file metric_storage.h.

244 {
245 auto metadata = create_metric_metadata(name, type);
246 compact_metric_value metric(metadata, value);
247
248 auto result = incoming_buffer_->write(std::move(metric));
249 if (result.is_ok()) {
250 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
251
252 // Store name mapping
253 std::unique_lock<std::shared_mutex> lock(mutex_);
254 if (hash_to_name_.find(metadata.name_hash) == hash_to_name_.end()) {
255 hash_to_name_[metadata.name_hash] = name;
256 }
257 } else {
258 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
259 }
260
261 return result;
262 }
metric_metadata create_metric_metadata(const std::string &name, metric_type type, size_t tag_count=0)
Create metric metadata from name and type.

References kcenon::monitoring::create_metric_metadata(), hash_to_name_, incoming_buffer_, mutex_, stats_, kcenon::monitoring::metric_storage_stats::total_metrics_dropped, and kcenon::monitoring::metric_storage_stats::total_metrics_stored.

Here is the call graph for this function:

◆ store_metrics_batch()

size_t kcenon::monitoring::metric_storage::store_metrics_batch ( const metric_batch & batch)
inline

Store a batch of metrics.

Parameters
batchMetric batch to store
Returns
Number of metrics successfully stored

Definition at line 270 of file metric_storage.h.

270 {
271 size_t stored = 0;
272
273 for (const auto& metric : batch.metrics) {
274 compact_metric_value copy = metric;
275 auto result = incoming_buffer_->write(std::move(copy));
276 if (result.is_ok()) {
277 stored++;
278 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
279 } else {
280 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
281 }
282 }
283
284 return stored;
285 }

References incoming_buffer_, kcenon::monitoring::metric_batch::metrics, stats_, kcenon::monitoring::metric_storage_stats::total_metrics_dropped, and kcenon::monitoring::metric_storage_stats::total_metrics_stored.

Member Data Documentation

◆ background_thread_

std::thread kcenon::monitoring::metric_storage::background_thread_
private

Definition at line 114 of file metric_storage.h.

Referenced by init_internals(), and ~metric_storage().

◆ config_

metric_storage_config kcenon::monitoring::metric_storage::config_
private

◆ hash_to_name_

std::unordered_map<uint32_t, std::string> kcenon::monitoring::metric_storage::hash_to_name_
private

Definition at line 110 of file metric_storage.h.

Referenced by clear(), flush(), get_or_create_series(), and store_metric().

◆ incoming_buffer_

std::unique_ptr<ring_buffer<compact_metric_value> > kcenon::monitoring::metric_storage::incoming_buffer_
private

Definition at line 104 of file metric_storage.h.

Referenced by clear(), flush(), init_internals(), store_metric(), and store_metrics_batch().

◆ mutex_

std::shared_mutex kcenon::monitoring::metric_storage::mutex_
mutableprivate

◆ running_

std::atomic<bool> kcenon::monitoring::metric_storage::running_ {false}
private

Definition at line 113 of file metric_storage.h.

113{false};

Referenced by background_processor(), init_internals(), and ~metric_storage().

◆ stats_

metric_storage_stats kcenon::monitoring::metric_storage::stats_
mutableprivate

◆ time_series_map_

std::unordered_map<std::string, std::unique_ptr<time_series> > kcenon::monitoring::metric_storage::time_series_map_
private

The documentation for this class was generated from the following file: