20#include <unordered_map>
59 mutable std::unordered_map<std::string, std::atomic<double>>
counters_;
60 mutable std::unordered_map<std::string, std::atomic<double>>
gauges_;
61 mutable std::unordered_map<std::string, std::vector<double>>
histograms_;
84 "Monitoring is disabled"};
108 static_cast<double>(total_time) / messages,
116 if (min_time != UINT64_MAX) {
118 static_cast<double>(min_time),
137 auto now = std::chrono::system_clock::now();
138 auto uptime = std::chrono::duration_cast<std::chrono::seconds>(
141 static_cast<double>(uptime),
146 std::lock_guard<std::mutex> lock(
mutex_);
148 for (
const auto& [name, value] :
counters_) {
152 for (
const auto& [name, value] :
gauges_) {
158 if (!values.empty()) {
160 double min_val = values[0];
161 double max_val = values[0];
163 for (
double v : values) {
165 min_val = std::min(min_val, v);
166 max_val = std::max(max_val, v);
169 double avg = sum / values.size();
174 static_cast<double>(values.size()),
194 double error_rate =
static_cast<double>(errors) / messages;
195 if (error_rate > 0.1) {
196 result.add_issue(
"High error rate: " +
197 std::to_string(error_rate * 100) +
"%");
198 result.set_status(health_status::unhealthy);
199 }
else if (error_rate > 0.05) {
200 result.add_issue(
"Elevated error rate: " +
201 std::to_string(error_rate * 100) +
"%");
202 result.set_status(health_status::degraded);
210 double drop_rate =
static_cast<double>(dropped) / messages;
211 if (drop_rate > 0.01) {
212 result.add_issue(
"Messages being dropped: " +
213 std::to_string(dropped) +
" total");
214 if (
result.get_status() == health_status::healthy) {
215 result.set_status(health_status::degraded);
223 if (writer_failures > 0) {
224 result.add_issue(
"Writer failures detected: " +
225 std::to_string(writer_failures));
226 if (
result.get_status() == health_status::healthy) {
227 result.set_status(health_status::degraded);
234 if (max_queue > 0 && queue_size > max_queue * 0.9) {
235 result.add_issue(
"Queue near capacity: " +
236 std::to_string(queue_size) +
"/" +
237 std::to_string(max_queue));
238 if (
result.get_status() == health_status::healthy) {
239 result.set_status(health_status::degraded);
245 std::lock_guard<std::mutex> lock(
mutex_);
252 if (
result.is_healthy()) {
253 result.set_message(
"All systems operational");
255 result.set_message(
"Issues detected - check details");
281 std::lock_guard<std::mutex> lock(
mutex_);
327 if (name ==
"messages_logged") {
329 }
else if (name ==
"messages_dropped") {
331 }
else if (name ==
"errors_encountered") {
333 }
else if (name ==
"writers_failed") {
336 std::lock_guard<std::mutex> lock(
mutex_);
349 if (name ==
"buffer_usage_bytes") {
352 while (value > max_val &&
354 static_cast<size_t>(value))) {
357 }
else if (name ==
"queue_size") {
360 while (value > max_val &&
362 static_cast<size_t>(value))) {
366 std::lock_guard<std::mutex> lock(
mutex_);
379 if (name ==
"processing_time_us") {
380 uint64_t us_value =
static_cast<uint64_t
>(value);
385 while (us_value > max_val &&
392 while (us_value < min_val &&
397 std::lock_guard<std::mutex> lock(
mutex_);
414 std::lock_guard<std::mutex> lock(
mutex_);
422 std::lock_guard<std::mutex> lock(
mutex_);
Basic monitoring implementation.
basic_monitor()
Constructor.
std::atomic< uint64_t > messages_logged_
bool is_enabled() const override
Check if monitoring is enabled.
std::atomic< size_t > max_buffer_usage_bytes_
std::unordered_map< std::string, std::vector< double > > histograms_
void record_histogram(const std::string &name, double value) override
Record a histogram value.
void clear_health_issues()
Clear health issues.
std::atomic< size_t > queue_size_
std::unordered_map< std::string, std::atomic< double > > counters_
common::VoidResult reset_metrics() override
Reset all metrics.
result< monitoring_data > collect_metrics() const override
Collect current metrics.
std::atomic< bool > enabled_
~basic_monitor() override=default
Destructor.
std::atomic< size_t > buffer_usage_bytes_
std::unordered_map< std::string, std::atomic< double > > gauges_
std::atomic< uint64_t > min_processing_time_us_
result< health_check_result > check_health() const override
Perform health check.
std::chrono::system_clock::time_point start_time_
void add_health_issue(const std::string &issue)
Add a health issue.
std::atomic< uint64_t > total_processing_time_us_
std::atomic< uint64_t > writers_failed_
std::atomic< uint64_t > messages_dropped_
std::string get_backend_name() const override
Get backend name.
std::vector< std::string > health_issues_
void update_gauge(const std::string &name, double value) override
Update a gauge.
void increment_counter(const std::string &name, double value=1.0) override
Increment a counter.
common::VoidResult set_enabled(bool enable) override
Enable or disable monitoring.
std::atomic< uint64_t > max_processing_time_us_
std::atomic< size_t > max_queue_size_
std::atomic< uint64_t > errors_encountered_
void add_metric(const std::string &name, double value, metric_type type=metric_type::gauge)
Add a metric to the collection.
Abstract monitoring interface.
Abstract interface for monitoring and metrics collection.
@ gauge
A metric that can go up or down (e.g., queue depth).
@ counter
A monotonically increasing metric (e.g., total messages).
@ summary
Statistical summary.
@ size
Rotate based on file size only.