24#include <shared_mutex>
25#include <unordered_map>
30namespace kcenon {
namespace monitoring {
50 "Ring buffer capacity must be a power of 2").to_common_error());
55 "Max metrics must be positive").to_common_error());
60 "Time series max points must be positive").to_common_error());
65 "Retention period must be positive").to_common_error());
118 while (
running_.load(std::memory_order_acquire)) {
120 if (
running_.load(std::memory_order_acquire)) {
132 return it->second.get();
146 if (result.is_err()) {
150 auto* ptr = result.value().get();
170 if (validation.is_err()) {
171 throw std::invalid_argument(
"Invalid metric storage configuration: " +
172 validation.error().message);
178 rb_config.overwrite_old =
true;
179 rb_config.batch_size = (std::min)(rb_config.capacity / 2,
size_t(64));
180 if (rb_config.batch_size == 0) rb_config.batch_size = 1;
185 running_.store(true, std::memory_order_release);
186 background_thread_ = std::thread(&metric_storage::background_processor, this);
194 if (
running_.load(std::memory_order_acquire)) {
195 running_.store(
false, std::memory_order_release);
219 if (result.is_ok()) {
223 std::unique_lock<std::shared_mutex> lock(
mutex_);
246 if (result.is_ok()) {
261 std::vector<compact_metric_value> flushed_metrics;
267 if (flushed_metrics.empty()) {
271 std::unique_lock<std::shared_mutex> lock(
mutex_);
273 for (
auto&
metric : flushed_metrics) {
280 const std::string& name = name_it->second;
284 if (series ==
nullptr) {
290 series->add_point(
metric.as_double(),
metric.get_timestamp());
302 std::shared_lock<std::shared_mutex> lock(
mutex_);
307 "Metric not found: " + name,
"monitoring_system").to_common_error());
310 return it->second->get_latest_value();
318 std::shared_lock<std::shared_mutex> lock(
mutex_);
320 std::vector<std::string> names;
324 names.push_back(pair.first);
336 common::Result<aggregation_result>
query_metric(
const std::string& name,
338 std::shared_lock<std::shared_mutex> lock(
mutex_);
343 "Metric not found: " + name,
"monitoring_system").to_common_error());
346 return it->second->query(query);
367 std::unique_lock<std::shared_mutex> lock(
mutex_);
378 std::shared_lock<std::shared_mutex> lock(
mutex_);
386 std::shared_lock<std::shared_mutex> lock(
mutex_);
392 total += pair.first.capacity();
393 total += pair.second->memory_footprint();
Thread-safe metric storage with ring buffer buffering.
void background_processor()
Background processing loop.
size_t series_count() const
Get number of active metric series.
common::VoidResult store_metric(const std::string &name, double value, metric_type type=metric_type::gauge)
Store a single metric value.
std::vector< std::string > get_metric_names() const
Get all metric names.
metric_storage_stats stats_
void flush()
Flush buffered metrics to time series.
size_t store_metrics_batch(const metric_batch &batch)
Store a batch of metrics.
metric_storage(const metric_storage &)=delete
metric_storage_config config_
time_series * get_or_create_series(const std::string &name)
Get or create time series for a metric.
size_t memory_footprint() const
Get memory footprint estimate.
metric_storage(const metric_storage_config &config={})
Constructor with configuration.
common::Result< aggregation_result > query_metric(const std::string &name, const time_series_query &query) const
Query metric data.
std::unordered_map< uint32_t, std::string > hash_to_name_
const metric_storage_stats & get_stats() const noexcept
Get storage statistics.
common::Result< double > get_latest_value(const std::string &name) const
Get the latest value for a metric.
void clear()
Clear all stored metrics.
std::unordered_map< std::string, std::unique_ptr< time_series > > time_series_map_
std::atomic< bool > running_
std::unique_ptr< ring_buffer< compact_metric_value > > incoming_buffer_
const metric_storage_config & get_config() const noexcept
Get configuration.
std::thread background_thread_
metric_storage & operator=(const metric_storage &)=delete
~metric_storage()
Destructor.
Lock-free ring buffer with atomic operations.
Thread-safe time series data storage.
static common::Result< std::unique_ptr< time_series > > create(const std::string &name, const time_series_config &config={})
Factory method to create time_series with validation.
Monitoring system specific error codes.
Common metric type definitions for efficient storage.
metric_type
Metric types supported by exporters.
@ gauge
Instantaneous value that can go up and down.
uint32_t hash_metric_name(const std::string &name) noexcept
Hash function for metric names.
metric_metadata create_metric_metadata(const std::string &name, metric_type type, size_t tag_count=0)
Create metric metadata from name and type.
Result pattern type definitions for monitoring system.
Lock-free ring buffer for efficient metric storage.
Memory-efficient metric value storage.
Extended error information with context.
Batch of metrics for efficient processing.
std::vector< compact_metric_value > metrics
Configuration for metric storage.
size_t time_series_max_points
std::chrono::seconds retention_period
std::chrono::milliseconds flush_interval
bool enable_background_processing
size_t ring_buffer_capacity
common::VoidResult validate() const
Validate configuration.
Statistics for metric storage performance.
std::atomic< size_t > total_metrics_stored
std::atomic< size_t > flush_count
std::atomic< size_t > active_metric_series
std::chrono::system_clock::time_point creation_time
std::atomic< size_t > total_metrics_dropped
std::atomic< size_t > failed_flushes
Basic metric structure for interface compatibility.
Configuration for ring buffer behavior.
Configuration for time series storage.
std::chrono::seconds retention_period
Query parameters for time series data.
Time-series data storage for efficient metric history.