37#include <unordered_map>
44namespace kcenon {
namespace monitoring {
85 std::unordered_map<std::string, std::string>
headers;
86 std::unordered_map<std::string, std::string>
labels;
96 "Either endpoint or port must be specified",
"monitoring_system").to_common_error());
101 "Push interval must be positive",
"monitoring_system").to_common_error());
106 "Batch size must be greater than 0",
"monitoring_system").to_common_error());
111 "Queue size must be at least batch size",
"monitoring_system").to_common_error());
127 std::unordered_map<std::string, std::string>
labels;
134 std::ostringstream ss;
142 std::string type_str;
150 ss <<
"# TYPE " <<
name <<
" " << type_str <<
"\n";
158 for (
const auto& [key, label_value] :
labels) {
159 if (!first) ss <<
",";
169 if (
timestamp != std::chrono::system_clock::time_point{}) {
170 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
181 std::string escaped = label_value;
183 escaped = std::regex_replace(escaped, std::regex(
"\\\\"),
"\\\\");
184 escaped = std::regex_replace(escaped, std::regex(
"\""),
"\\\"");
185 escaped = std::regex_replace(escaped, std::regex(
"\n"),
"\\n");
199 std::unordered_map<std::string, std::string>
tags;
205 std::ostringstream ss;
224 if (datadog_format && !
tags.empty()) {
227 for (
const auto& [key, tag_value] :
tags) {
228 if (!first) ss <<
",";
229 ss << key <<
":" << tag_value;
249 virtual common::VoidResult
export_metrics(
const std::vector<monitoring_data>& metrics) = 0;
259 virtual common::VoidResult
flush() = 0;
269 virtual std::unordered_map<std::string, std::size_t>
get_stats()
const = 0;
274 virtual common::VoidResult
start() {
return common::ok(); }
279 virtual common::VoidResult
stop() {
return common::ok(); }
303 std::vector<prometheus_metric_data> prom_metrics;
305 for (
const auto& [name, value] : data.
get_metrics()) {
318 metric.labels[key] = label_value;
322 for (
const auto& [key, tag_value] : data.
get_tags()) {
330 prom_metrics.push_back(std::move(
metric));
340 std::vector<prometheus_metric_data> prom_metrics;
342 for (
const auto& metric_val : snapshot.
metrics) {
348 metric.help_text =
"System metric";
357 metric.labels[key] = label_value;
361 for (
const auto& [key, tag_value] : metric_val.tags) {
369 prom_metrics.push_back(std::move(
metric));
375 common::VoidResult
export_metrics(
const std::vector<monitoring_data>& metrics)
override {
380 for (
const auto& data : metrics) {
383 prom_metrics.begin(), prom_metrics.end());
389 }
catch (
const std::exception& e) {
392 "Prometheus export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
401 prom_metrics.begin(), prom_metrics.end());
406 }
catch (
const std::exception& e) {
409 "Prometheus snapshot export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
418 std::ostringstream ss;
421 ss <<
metric.to_prometheus_text();
430 common::VoidResult
flush()
override {
439 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
450 std::string sanitized = name;
452 std::regex invalid_chars(
"[^a-zA-Z0-9_:]");
453 sanitized = std::regex_replace(sanitized, invalid_chars,
"_");
456 if (!sanitized.empty() && !std::isalpha(sanitized[0]) && sanitized[0] !=
'_') {
457 sanitized =
"_" + sanitized;
464 std::string sanitized = name;
466 std::regex invalid_chars(
"[^a-zA-Z0-9_]");
467 sanitized = std::regex_replace(sanitized, invalid_chars,
"_");
470 if (!sanitized.empty() && !std::isalpha(sanitized[0]) && sanitized[0] !=
'_') {
471 sanitized =
"_" + sanitized;
479 std::string lower_name = name;
480 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
482 if (lower_name.find(
"count") != std::string::npos ||
483 lower_name.find(
"total") != std::string::npos ||
484 lower_name.find(
"requests") != std::string::npos) {
486 }
else if (lower_name.find(
"histogram") != std::string::npos ||
487 lower_name.find(
"bucket") != std::string::npos) {
489 }
else if (lower_name.find(
"summary") != std::string::npos ||
490 lower_name.find(
"quantile") != std::string::npos) {
529 std::unique_ptr<udp_transport> transport)
537 std::vector<statsd_metric_data> statsd_metrics;
539 for (
const auto& [name, value] : data.
get_metrics()) {
555 for (
const auto& [key, tag_value] : data.
get_tags()) {
563 statsd_metrics.push_back(std::move(
metric));
566 return statsd_metrics;
573 std::vector<statsd_metric_data> statsd_metrics;
575 for (
const auto& metric_val : snapshot.
metrics) {
593 for (
const auto& [key, tag_value] : metric_val.tags) {
601 statsd_metrics.push_back(std::move(
metric));
604 return statsd_metrics;
607 common::VoidResult
export_metrics(
const std::vector<monitoring_data>& metrics)
override {
609 std::vector<std::string> statsd_lines;
611 for (
const auto& data : metrics) {
613 for (
const auto&
metric : statsd_metrics) {
615 statsd_lines.push_back(
metric.to_statsd_format(datadog_format));
621 if (send_result.is_ok()) {
631 }
catch (
const std::exception& e) {
634 "StatsD export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
641 std::vector<std::string> statsd_lines;
643 for (
const auto&
metric : statsd_metrics) {
645 statsd_lines.push_back(
metric.to_statsd_format(datadog_format));
650 if (send_result.is_ok()) {
660 }
catch (
const std::exception& e) {
663 "StatsD snapshot export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
667 common::VoidResult
start()
override {
675 "UDP transport not available",
677 ).to_common_error());
682 if (connect_result.is_err()) {
683 return connect_result;
690 common::VoidResult
stop()
override {
703 common::VoidResult
flush()
override {
712 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
713 std::unordered_map<std::string, std::size_t> stats = {
721 auto transport_stats =
transport_->get_statistics();
722 stats[
"transport_packets_sent"] = transport_stats.packets_sent;
723 stats[
"transport_bytes_sent"] = transport_stats.bytes_sent;
724 stats[
"transport_send_failures"] = transport_stats.send_failures;
735 "UDP transport not available",
737 ).to_common_error());
743 if (connect_result.is_err()) {
744 return connect_result;
751 for (
const auto& line : lines) {
752 if (!batch.empty()) {
763 std::string sanitized = name;
765 std::regex invalid_chars(
"[.\\s]+");
766 sanitized = std::regex_replace(sanitized, invalid_chars,
"_");
772 std::string lower_name = name;
773 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
775 if (lower_name.find(
"count") != std::string::npos ||
776 lower_name.find(
"total") != std::string::npos) {
778 }
else if (lower_name.find(
"time") != std::string::npos ||
779 lower_name.find(
"duration") != std::string::npos ||
780 lower_name.find(
"latency") != std::string::npos) {
833 common::VoidResult
export_metrics(
const std::vector<monitoring_data>& metrics)
override {
835 for (
const auto& data : metrics) {
837 auto otel_result =
otel_adapter_->convert_monitoring_data(data);
838 if (otel_result.is_err()) {
841 "Failed to convert metrics to OTEL format: " + otel_result.error().message,
"monitoring_system").
to_common_error());
844 const auto& otel_metrics = otel_result.value();
848 if (send_result.is_err()) {
857 }
catch (
const std::exception& e) {
860 "OTLP metrics export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
868 if (otel_result.is_err()) {
871 "Failed to convert snapshot to OTEL format: " + otel_result.error().message,
"monitoring_system").
to_common_error());
874 const auto& otel_metrics = otel_result.value();
878 if (send_result.is_err()) {
886 }
catch (
const std::exception& e) {
889 "OTLP snapshot export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
893 common::VoidResult
start()
override {
904 common::VoidResult
stop()
override {
917 common::VoidResult
flush()
override {
926 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
927 std::unordered_map<std::string, std::size_t> stats = {
935 stats[
"transport_requests_sent"] = transport_stats.requests_sent;
936 stats[
"transport_bytes_sent"] = transport_stats.bytes_sent;
937 stats[
"transport_send_failures"] = transport_stats.send_failures;
956 return "application/json";
958 return "application/x-protobuf";
960 return "application/json";
972 common::VoidResult
send_via_http(
const std::vector<otel_metric_data>& metrics) {
976 "HTTP transport not available",
977 "otlp_metrics_exporter"
978 ).to_common_error());
986 endpoint +=
"/v1/metrics";
992 request.
url = endpoint;
995 request.
body = std::move(body);
1004 if (result.is_err()) {
1007 "HTTP send failed: " + result.error().message,
1008 "otlp_metrics_exporter"
1012 const auto& response = result.value();
1013 if (response.status_code < 200 || response.status_code >= 300) {
1016 "OTLP HTTP request failed with status " + std::to_string(response.status_code),
1017 "otlp_metrics_exporter"
1021 return common::ok();
1024 common::VoidResult
send_via_grpc(
const std::vector<otel_metric_data>& metrics) {
1028 "gRPC transport not available",
1029 "otlp_metrics_exporter"
1030 ).to_common_error());
1036 if (connect_result.is_err()) {
1037 return connect_result;
1045 request.
service =
"opentelemetry.proto.collector.metrics.v1.MetricsService";
1046 request.
method =
"Export";
1047 request.
body = std::move(body);
1051 if (result.is_err()) {
1054 "gRPC send failed: " + result.error().message,
1055 "otlp_metrics_exporter"
1059 const auto& response = result.value();
1060 if (response.status_code != 0) {
1063 "OTLP gRPC request failed: " + response.status_message,
1064 "otlp_metrics_exporter"
1068 return common::ok();
1076 std::string json =
"{\"resourceMetrics\":[";
1079 for (
const auto&
metric : metrics) {
1080 if (!first) json +=
",";
1083 json +=
"{\"resource\":{},\"scopeMetrics\":[{\"metrics\":[{";
1085 json +=
"\"gauge\":{\"dataPoints\":[{\"asDouble\":" +
1092 return std::vector<uint8_t>(json.begin(), json.end());
1112 return std::make_unique<prometheus_exporter>(config);
1116 return std::make_unique<statsd_exporter>(config);
1121 return std::make_unique<otlp_metrics_exporter>(config, resource);
1132 if (backend ==
"prometheus") {
1134 }
else if (backend ==
"statsd") {
1136 }
else if (backend ==
"otlp") {
1148 std::uint16_t port = 9090,
1149 const std::string& job_name =
"monitoring_system") {
1155 return std::make_unique<prometheus_exporter>(config);
1162 const std::string&
host =
"localhost",
1163 std::uint16_t port = 8125,
1164 bool datadog_format =
false) {
1170 return std::make_unique<statsd_exporter>(config);
1177 const std::string& endpoint,
1184 return std::make_unique<otlp_metrics_exporter>(config, resource);
Abstract gRPC transport interface.
Abstract HTTP transport interface.
Factory for creating metric exporters.
static std::vector< metric_export_format > get_supported_formats(const std::string &backend)
Get supported formats for a specific backend.
static std::unique_ptr< metric_exporter_interface > create_exporter(const metric_export_config &config, const otel_resource &resource=create_service_resource("monitoring_system", "2.0.0"))
Create a metric exporter based on format.
Abstract interface for metric exporters.
virtual common::VoidResult stop()
Stop the exporter.
virtual ~metric_exporter_interface()=default
virtual common::VoidResult export_snapshot(const metrics_snapshot &snapshot)=0
Export a single metrics snapshot.
virtual common::VoidResult shutdown()=0
Shutdown the exporter.
virtual common::VoidResult start()
Start the exporter (for pull-based systems)
virtual common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics)=0
Export a batch of metrics.
virtual common::VoidResult flush()=0
Flush any pending metrics.
virtual std::unordered_map< std::string, std::size_t > get_stats() const =0
Get exporter statistics.
Adapter for converting monitoring system metrics to OpenTelemetry format.
OpenTelemetry Protocol (OTLP) metrics exporter implementation.
std::unique_ptr< grpc_transport > grpc_transport_
std::atomic< std::size_t > failed_exports_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
otlp_metrics_exporter(const metric_export_config &config, const otel_resource &resource, std::unique_ptr< http_transport > http_transport, std::unique_ptr< grpc_transport > grpc_transport)
Construct OTLP exporter with custom transports.
std::unique_ptr< opentelemetry_metrics_adapter > otel_adapter_
std::unique_ptr< http_transport > http_transport_
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< uint8_t > serialize_metrics(const std::vector< otel_metric_data > &metrics) const
std::string get_content_type() const
std::atomic< std::size_t > exported_metrics_
common::VoidResult start() override
Start the exporter (for pull-based systems)
bool is_http_protocol() const
bool is_grpc_protocol() const
common::VoidResult send_via_http(const std::vector< otel_metric_data > &metrics)
common::VoidResult send_via_grpc(const std::vector< otel_metric_data > &metrics)
common::VoidResult send_otlp_batch(const std::vector< otel_metric_data > &metrics)
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
metric_export_config config_
otlp_metrics_exporter(const metric_export_config &config, const otel_resource &resource)
Construct OTLP exporter with default transports.
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult stop() override
Stop the exporter.
common::VoidResult flush() override
Flush any pending metrics.
Prometheus metric exporter implementation.
common::VoidResult flush() override
Flush any pending metrics.
std::atomic< std::size_t > exported_metrics_
std::atomic< std::size_t > failed_exports_
prometheus_exporter(const metric_export_config &config)
std::mutex metrics_mutex_
std::string get_metrics_text() const
Get current metrics in Prometheus format (for HTTP endpoint)
std::atomic< std::size_t > scrape_requests_
metric_export_config config_
metric_type infer_metric_type(const std::string &name, double) const
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
std::vector< prometheus_metric_data > convert_snapshot(const metrics_snapshot &snapshot) const
Convert metrics_snapshot to Prometheus format.
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
std::vector< prometheus_metric_data > convert_monitoring_data(const monitoring_data &data) const
Convert monitoring_data to Prometheus format.
std::string sanitize_metric_name(const std::string &name) const
std::string sanitize_label_name(const std::string &name) const
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< prometheus_metric_data > current_metrics_
StatsD metric exporter implementation.
std::atomic< std::size_t > failed_exports_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
statsd_exporter(const metric_export_config &config, std::unique_ptr< udp_transport > transport)
Construct StatsD exporter with custom UDP transport.
std::string sanitize_metric_name(const std::string &name) const
std::atomic< std::size_t > sent_packets_
metric_export_config config_
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > exported_metrics_
std::unique_ptr< udp_transport > transport_
common::VoidResult stop() override
Stop the exporter.
std::vector< statsd_metric_data > convert_snapshot(const metrics_snapshot &snapshot) const
Convert metrics_snapshot to StatsD format.
common::VoidResult start() override
Start the exporter (for pull-based systems)
metric_type infer_metric_type(const std::string &name, double) const
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
common::VoidResult flush() override
Flush any pending metrics.
common::VoidResult send_udp_batch(const std::vector< std::string > &lines)
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< statsd_metric_data > convert_monitoring_data(const monitoring_data &data) const
Convert monitoring_data to StatsD format.
statsd_exporter(const metric_export_config &config)
Construct StatsD exporter with default UDP transport.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
HTTP transport layer for trace exporters.
Interface for components that expose monitoring metrics.
Core monitoring system interface definitions.
metric_type
Metric types supported by exporters.
@ timer
StatsD-specific timer metric.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
@ summary
Pre-calculated quantiles and count/sum.
@ histogram
Distribution of values with buckets.
metric_export_format
Supported metric export formats.
@ prometheus_protobuf
Prometheus protocol buffers format.
@ statsd_plain
StatsD plain UDP format.
@ otlp_grpc
OTLP gRPC metrics protocol.
@ statsd_datadog
StatsD DataDog extension format.
@ prometheus_text
Prometheus text exposition format.
@ otlp_http_json
OTLP HTTP JSON metrics protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers metrics.
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< http_transport > create_default_transport()
Create default HTTP transport.
std::unique_ptr< prometheus_exporter > create_prometheus_exporter(std::uint16_t port=9090, const std::string &job_name="monitoring_system")
Helper function to create a Prometheus exporter.
std::unique_ptr< udp_transport > create_default_udp_transport()
Create default UDP transport.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
@ histogram
Distribution of values with buckets.
std::unique_ptr< otlp_metrics_exporter > create_otlp_metrics_exporter(const std::string &endpoint, const otel_resource &resource, metric_export_format format=metric_export_format::otlp_grpc)
Helper function to create an OTLP metrics exporter.
std::unique_ptr< statsd_exporter > create_statsd_exporter(const std::string &host="localhost", std::uint16_t port=8125, bool datadog_format=false)
Helper function to create a StatsD exporter.
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
gRPC request configuration
std::chrono::milliseconds timeout
std::vector< uint8_t > body
HTTP request configuration.
std::chrono::milliseconds timeout
std::vector< uint8_t > body
std::unordered_map< std::string, std::string > headers
Configuration for metric exporters.
std::unordered_map< std::string, std::string > labels
Default labels/tags.
std::size_t max_batch_size
Maximum metrics per batch.
std::chrono::milliseconds push_interval
Push interval for push-based systems.
std::string instance_id
Instance identifier.
std::uint16_t port
Port number (for UDP/TCP)
std::size_t max_queue_size
Maximum queued metrics.
metric_export_format format
std::string endpoint
Endpoint URL or address.
std::string job_name
Prometheus job name.
std::chrono::milliseconds timeout
Request timeout.
std::unordered_map< std::string, std::string > headers
Custom HTTP headers.
bool enable_compression
Enable data compression.
common::VoidResult validate() const
Validate export configuration.
Basic metric structure for interface compatibility.
std::chrono::system_clock::time_point timestamp
std::variant< double, int64_t, std::string > value
std::unordered_map< std::string, std::string > tags
Complete snapshot of metrics at a point in time.
std::vector< metric_value > metrics
Container for monitoring metrics from a component.
std::chrono::system_clock::time_point get_timestamp() const
Get the timestamp.
const std::string & get_component_name() const
Get the component name.
const tag_map & get_tags() const
Get all tags.
const metric_map & get_metrics() const
Get all metrics.
OpenTelemetry resource representation.
Prometheus-specific metric representation.
std::chrono::system_clock::time_point timestamp
std::unordered_map< std::string, std::string > labels
std::string to_prometheus_text() const
Convert to Prometheus text format.
std::string escape_label_value(const std::string &label_value) const
StatsD-specific metric representation.
std::unordered_map< std::string, std::string > tags
std::string to_statsd_format(bool datadog_format=false) const
Convert to StatsD format.
UDP transport layer for metric exporters.