Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::metric_storage Class Reference

Thread-safe metric storage with ring buffer buffering. More...

#include <metric_storage.h>

Collaboration diagram for kcenon::monitoring::metric_storage:
Collaboration graph

Public Member Functions

 metric_storage (const metric_storage_config &config={})
 Constructor with configuration.
 
 ~metric_storage ()
 Destructor.
 
 metric_storage (const metric_storage &)=delete
 
metric_storageoperator= (const metric_storage &)=delete
 
common::VoidResult store_metric (const std::string &name, double value, metric_type type=metric_type::gauge)
 Store a single metric value.
 
size_t store_metrics_batch (const metric_batch &batch)
 Store a batch of metrics.
 
void flush ()
 Flush buffered metrics to time series.
 
common::Result< double > get_latest_value (const std::string &name) const
 Get the latest value for a metric.
 
std::vector< std::string > get_metric_names () const
 Get all metric names.
 
common::Result< aggregation_resultquery_metric (const std::string &name, const time_series_query &query) const
 Query metric data.
 
const metric_storage_statsget_stats () const noexcept
 Get storage statistics.
 
const metric_storage_configget_config () const noexcept
 Get configuration.
 
void clear ()
 Clear all stored metrics.
 
size_t series_count () const
 Get number of active metric series.
 
size_t memory_footprint () const
 Get memory footprint estimate.
 

Private Member Functions

void background_processor ()
 Background processing loop.
 
time_seriesget_or_create_series (const std::string &name)
 Get or create time series for a metric.
 

Private Attributes

std::shared_mutex mutex_
 
metric_storage_config config_
 
metric_storage_stats stats_
 
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
 
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
 
std::unordered_map< uint32_t, std::string > hash_to_name_
 
std::atomic< bool > running_ {false}
 
std::thread background_thread_
 

Detailed Description

Thread-safe metric storage with ring buffer buffering.

Provides efficient metric storage using ring buffers for incoming data and time series for historical queries. Supports background processing for automatic flushing.

Definition at line 95 of file metric_storage.h.

Constructor & Destructor Documentation

◆ metric_storage() [1/2]

kcenon::monitoring::metric_storage::metric_storage ( const metric_storage_config & config = {})
inlineexplicit

Constructor with configuration.

Parameters
configMetric storage configuration options
Exceptions
std::invalid_argumentif configuration validation fails

Definition at line 166 of file metric_storage.h.

166 {})
167 : config_(config) {
168
169 auto validation = config_.validate();
170 if (validation.is_err()) {
171 throw std::invalid_argument("Invalid metric storage configuration: " +
172 validation.error().message);
173 }
174
175 // Initialize ring buffer
176 ring_buffer_config rb_config;
177 rb_config.capacity = config_.ring_buffer_capacity;
178 rb_config.overwrite_old = true;
179 rb_config.batch_size = (std::min)(rb_config.capacity / 2, size_t(64));
180 if (rb_config.batch_size == 0) rb_config.batch_size = 1;
181 incoming_buffer_ = std::make_unique<ring_buffer<compact_metric_value>>(rb_config);
182
183 // Start background processing if enabled
185 running_.store(true, std::memory_order_release);
186 background_thread_ = std::thread(&metric_storage::background_processor, this);
187 }
188 }
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
common::VoidResult validate() const
Validate configuration.

Referenced by memory_footprint().

Here is the caller graph for this function:

◆ ~metric_storage()

kcenon::monitoring::metric_storage::~metric_storage ( )
inline

Destructor.

Definition at line 193 of file metric_storage.h.

193 {
194 if (running_.load(std::memory_order_acquire)) {
195 running_.store(false, std::memory_order_release);
196 if (background_thread_.joinable()) {
197 background_thread_.join();
198 }
199 }
200 }

References background_thread_, and running_.

◆ metric_storage() [2/2]

kcenon::monitoring::metric_storage::metric_storage ( const metric_storage & )
delete

Member Function Documentation

◆ background_processor()

void kcenon::monitoring::metric_storage::background_processor ( )
inlineprivate

Background processing loop.

Definition at line 117 of file metric_storage.h.

117 {
118 while (running_.load(std::memory_order_acquire)) {
119 std::this_thread::sleep_for(config_.flush_interval);
120 if (running_.load(std::memory_order_acquire)) {
121 flush();
122 }
123 }
124 }
void flush()
Flush buffered metrics to time series.
std::chrono::milliseconds flush_interval

References config_, flush(), kcenon::monitoring::metric_storage_config::flush_interval, and running_.

Here is the call graph for this function:

◆ clear()

void kcenon::monitoring::metric_storage::clear ( )
inline

Clear all stored metrics.

Definition at line 366 of file metric_storage.h.

366 {
367 std::unique_lock<std::shared_mutex> lock(mutex_);
368 incoming_buffer_->clear();
369 time_series_map_.clear();
370 hash_to_name_.clear();
371 stats_.active_metric_series.store(0, std::memory_order_relaxed);
372 }
std::unordered_map< uint32_t, std::string > hash_to_name_
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_

References kcenon::monitoring::metric_storage_stats::active_metric_series, hash_to_name_, incoming_buffer_, mutex_, stats_, and time_series_map_.

◆ flush()

void kcenon::monitoring::metric_storage::flush ( )
inline

Flush buffered metrics to time series.

Definition at line 260 of file metric_storage.h.

260 {
261 std::vector<compact_metric_value> flushed_metrics;
262 flushed_metrics.reserve(config_.ring_buffer_capacity);
263
264 // Read all available metrics from buffer
265 incoming_buffer_->read_batch(flushed_metrics, config_.ring_buffer_capacity);
266
267 if (flushed_metrics.empty()) {
268 return;
269 }
270
271 std::unique_lock<std::shared_mutex> lock(mutex_);
272
273 for (auto& metric : flushed_metrics) {
274 // Find metric name from hash
275 auto name_it = hash_to_name_.find(metric.metadata.name_hash);
276 if (name_it == hash_to_name_.end()) {
277 continue;
278 }
279
280 const std::string& name = name_it->second;
281
282 // Get or create time series
283 auto* series = get_or_create_series(name);
284 if (series == nullptr) {
285 stats_.failed_flushes.fetch_add(1, std::memory_order_relaxed);
286 continue;
287 }
288
289 // Add data point to time series
290 series->add_point(metric.as_double(), metric.get_timestamp());
291 }
292
293 stats_.flush_count.fetch_add(1, std::memory_order_relaxed);
294 }
time_series * get_or_create_series(const std::string &name)
Get or create time series for a metric.

References config_, kcenon::monitoring::metric_storage_stats::failed_flushes, kcenon::monitoring::metric_storage_stats::flush_count, get_or_create_series(), hash_to_name_, incoming_buffer_, mutex_, kcenon::monitoring::metric_storage_config::ring_buffer_capacity, and stats_.

Referenced by background_processor(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_config()

const metric_storage_config & kcenon::monitoring::metric_storage::get_config ( ) const
inlinenoexcept

Get configuration.

Definition at line 359 of file metric_storage.h.

359 {
360 return config_;
361 }

References config_.

◆ get_latest_value()

common::Result< double > kcenon::monitoring::metric_storage::get_latest_value ( const std::string & name) const
inline

Get the latest value for a metric.

Parameters
nameMetric name
Returns
Optional containing the latest value if available

Definition at line 301 of file metric_storage.h.

301 {
302 std::shared_lock<std::shared_mutex> lock(mutex_);
303
304 auto it = time_series_map_.find(name);
305 if (it == time_series_map_.end()) {
306 return common::Result<double>::err(error_info(monitoring_error_code::collection_failed,
307 "Metric not found: " + name, "monitoring_system").to_common_error());
308 }
309
310 return it->second->get_latest_value();
311 }

References kcenon::monitoring::collection_failed, mutex_, and time_series_map_.

◆ get_metric_names()

std::vector< std::string > kcenon::monitoring::metric_storage::get_metric_names ( ) const
inline

Get all metric names.

Returns
Vector of metric names

Definition at line 317 of file metric_storage.h.

317 {
318 std::shared_lock<std::shared_mutex> lock(mutex_);
319
320 std::vector<std::string> names;
321 names.reserve(time_series_map_.size());
322
323 for (const auto& pair : time_series_map_) {
324 names.push_back(pair.first);
325 }
326
327 return names;
328 }

References mutex_, and time_series_map_.

◆ get_or_create_series()

time_series * kcenon::monitoring::metric_storage::get_or_create_series ( const std::string & name)
inlineprivate

Get or create time series for a metric.

Definition at line 129 of file metric_storage.h.

129 {
130 auto it = time_series_map_.find(name);
131 if (it != time_series_map_.end()) {
132 return it->second.get();
133 }
134
135 // Check if we're at capacity
136 if (time_series_map_.size() >= config_.max_metrics) {
137 return nullptr;
138 }
139
140 // Create new time series
141 time_series_config ts_config;
142 ts_config.max_points = config_.time_series_max_points;
143 ts_config.retention_period = config_.retention_period;
144
145 auto result = time_series::create(name, ts_config);
146 if (result.is_err()) {
147 return nullptr;
148 }
149
150 auto* ptr = result.value().get();
151 time_series_map_[name] = std::move(result.value());
152 stats_.active_metric_series.fetch_add(1, std::memory_order_relaxed);
153
154 // Store hash mapping
155 hash_to_name_[hash_metric_name(name)] = name;
156
157 return ptr;
158 }
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
uint32_t hash_metric_name(const std::string &name) noexcept
Hash function for metric names.

References kcenon::monitoring::metric_storage_stats::active_metric_series, config_, kcenon::monitoring::time_series::create(), kcenon::monitoring::hash_metric_name(), hash_to_name_, kcenon::monitoring::metric_storage_config::max_metrics, kcenon::monitoring::time_series_config::max_points, kcenon::monitoring::metric_storage_config::retention_period, kcenon::monitoring::time_series_config::retention_period, stats_, time_series_map_, and kcenon::monitoring::metric_storage_config::time_series_max_points.

Referenced by flush().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_stats()

const metric_storage_stats & kcenon::monitoring::metric_storage::get_stats ( ) const
inlinenoexcept

Get storage statistics.

Definition at line 352 of file metric_storage.h.

352 {
353 return stats_;
354 }

References stats_.

◆ memory_footprint()

size_t kcenon::monitoring::metric_storage::memory_footprint ( ) const
inline

Get memory footprint estimate.

Definition at line 385 of file metric_storage.h.

385 {
386 std::shared_lock<std::shared_mutex> lock(mutex_);
387
388 size_t total = sizeof(metric_storage);
389 total += config_.ring_buffer_capacity * sizeof(compact_metric_value);
390
391 for (const auto& pair : time_series_map_) {
392 total += pair.first.capacity();
393 total += pair.second->memory_footprint();
394 }
395
396 return total;
397 }
metric_storage(const metric_storage_config &config={})
Constructor with configuration.

References config_, metric_storage(), mutex_, kcenon::monitoring::metric_storage_config::ring_buffer_capacity, and time_series_map_.

Here is the call graph for this function:

◆ operator=()

metric_storage & kcenon::monitoring::metric_storage::operator= ( const metric_storage & )
delete

◆ query_metric()

common::Result< aggregation_result > kcenon::monitoring::metric_storage::query_metric ( const std::string & name,
const time_series_query & query ) const
inline

Query metric data.

Parameters
nameMetric name
queryQuery parameters
Returns
Aggregation result

Definition at line 336 of file metric_storage.h.

337 {
338 std::shared_lock<std::shared_mutex> lock(mutex_);
339
340 auto it = time_series_map_.find(name);
341 if (it == time_series_map_.end()) {
342 return common::Result<aggregation_result>::err(error_info(monitoring_error_code::collection_failed,
343 "Metric not found: " + name, "monitoring_system").to_common_error());
344 }
345
346 return it->second->query(query);
347 }

References kcenon::monitoring::collection_failed, mutex_, and time_series_map_.

◆ series_count()

size_t kcenon::monitoring::metric_storage::series_count ( ) const
inline

Get number of active metric series.

Definition at line 377 of file metric_storage.h.

377 {
378 std::shared_lock<std::shared_mutex> lock(mutex_);
379 return time_series_map_.size();
380 }

References mutex_, and time_series_map_.

◆ store_metric()

common::VoidResult kcenon::monitoring::metric_storage::store_metric ( const std::string & name,
double value,
metric_type type = metric_type::gauge )
inline

Store a single metric value.

Parameters
nameMetric name
valueMetric value
typeMetric type (default: gauge)
Returns
Result indicating success or failure

Definition at line 213 of file metric_storage.h.

214 {
215 auto metadata = create_metric_metadata(name, type);
216 compact_metric_value metric(metadata, value);
217
218 auto result = incoming_buffer_->write(std::move(metric));
219 if (result.is_ok()) {
220 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
221
222 // Store name mapping
223 std::unique_lock<std::shared_mutex> lock(mutex_);
224 if (hash_to_name_.find(metadata.name_hash) == hash_to_name_.end()) {
225 hash_to_name_[metadata.name_hash] = name;
226 }
227 } else {
228 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
229 }
230
231 return result;
232 }
metric_metadata create_metric_metadata(const std::string &name, metric_type type, size_t tag_count=0)
Create metric metadata from name and type.

References kcenon::monitoring::create_metric_metadata(), hash_to_name_, incoming_buffer_, mutex_, stats_, kcenon::monitoring::metric_storage_stats::total_metrics_dropped, and kcenon::monitoring::metric_storage_stats::total_metrics_stored.

Here is the call graph for this function:

◆ store_metrics_batch()

size_t kcenon::monitoring::metric_storage::store_metrics_batch ( const metric_batch & batch)
inline

Store a batch of metrics.

Parameters
batchMetric batch to store
Returns
Number of metrics successfully stored

Definition at line 240 of file metric_storage.h.

240 {
241 size_t stored = 0;
242
243 for (const auto& metric : batch.metrics) {
244 compact_metric_value copy = metric;
245 auto result = incoming_buffer_->write(std::move(copy));
246 if (result.is_ok()) {
247 stored++;
248 stats_.total_metrics_stored.fetch_add(1, std::memory_order_relaxed);
249 } else {
250 stats_.total_metrics_dropped.fetch_add(1, std::memory_order_relaxed);
251 }
252 }
253
254 return stored;
255 }

References incoming_buffer_, kcenon::monitoring::metric_batch::metrics, stats_, kcenon::monitoring::metric_storage_stats::total_metrics_dropped, and kcenon::monitoring::metric_storage_stats::total_metrics_stored.

Member Data Documentation

◆ background_thread_

std::thread kcenon::monitoring::metric_storage::background_thread_
private

Definition at line 112 of file metric_storage.h.

Referenced by ~metric_storage().

◆ config_

metric_storage_config kcenon::monitoring::metric_storage::config_
private

◆ hash_to_name_

std::unordered_map<uint32_t, std::string> kcenon::monitoring::metric_storage::hash_to_name_
private

Definition at line 108 of file metric_storage.h.

Referenced by clear(), flush(), get_or_create_series(), and store_metric().

◆ incoming_buffer_

std::unique_ptr<ring_buffer<compact_metric_value> > kcenon::monitoring::metric_storage::incoming_buffer_
private

Definition at line 102 of file metric_storage.h.

Referenced by clear(), flush(), store_metric(), and store_metrics_batch().

◆ mutex_

std::shared_mutex kcenon::monitoring::metric_storage::mutex_
mutableprivate

◆ running_

std::atomic<bool> kcenon::monitoring::metric_storage::running_ {false}
private

Definition at line 111 of file metric_storage.h.

111{false};

Referenced by background_processor(), and ~metric_storage().

◆ stats_

metric_storage_stats kcenon::monitoring::metric_storage::stats_
mutableprivate

◆ time_series_map_

std::unordered_map<std::string, std::unique_ptr<time_series> > kcenon::monitoring::metric_storage::time_series_map_
private

The documentation for this class was generated from the following file: