Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
metric_exporters.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
22#include "../core/error_codes.h"
26#include "udp_transport.h"
27#include "http_transport.h"
28#include "grpc_transport.h"
29#include <vector>
30#include <string>
31#include <memory>
32#include <chrono>
33#include <optional>
34#include <functional>
35#include <atomic>
36#include <algorithm>
37#include <unordered_map>
38#include <sstream>
39#include <regex>
40#include <mutex>
41#include <cctype>
42#include <cstdint>
43
44namespace kcenon { namespace monitoring {
45
59
64enum class metric_type {
65 counter,
66 gauge,
67 histogram,
68 summary,
69 timer
70};
71
77 std::string endpoint;
78 std::uint16_t port = 0;
80 std::chrono::milliseconds push_interval{15000};
81 std::chrono::milliseconds timeout{5000};
82 std::size_t max_batch_size = 1000;
83 std::size_t max_queue_size = 10000;
84 bool enable_compression = false;
85 std::unordered_map<std::string, std::string> headers;
86 std::unordered_map<std::string, std::string> labels;
87 std::string job_name = "monitoring_system";
88 std::string instance_id;
89
93 common::VoidResult validate() const {
94 if (endpoint.empty() && port == 0) {
95 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
96 "Either endpoint or port must be specified", "monitoring_system").to_common_error());
97 }
98
99 if (push_interval.count() <= 0) {
100 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
101 "Push interval must be positive", "monitoring_system").to_common_error());
102 }
103
104 if (max_batch_size == 0) {
105 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
106 "Batch size must be greater than 0", "monitoring_system").to_common_error());
107 }
108
110 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
111 "Queue size must be at least batch size", "monitoring_system").to_common_error());
112 }
113
114 return common::ok();
115 }
116};
117
123 std::string name;
125 double value;
126 std::chrono::system_clock::time_point timestamp;
127 std::unordered_map<std::string, std::string> labels;
128 std::string help_text;
129
133 std::string to_prometheus_text() const {
134 std::ostringstream ss;
135
136 // Add HELP line
137 if (!help_text.empty()) {
138 ss << "# HELP " << name << " " << help_text << "\n";
139 }
140
141 // Add TYPE line
142 std::string type_str;
143 switch (type) {
144 case metric_type::counter: type_str = "counter"; break;
145 case metric_type::gauge: type_str = "gauge"; break;
146 case metric_type::histogram: type_str = "histogram"; break;
147 case metric_type::summary: type_str = "summary"; break;
148 case metric_type::timer: type_str = "gauge"; break; // Timer as gauge in Prometheus
149 }
150 ss << "# TYPE " << name << " " << type_str << "\n";
151
152 // Add metric line
153 ss << name;
154
155 if (!labels.empty()) {
156 ss << "{";
157 bool first = true;
158 for (const auto& [key, label_value] : labels) {
159 if (!first) ss << ",";
160 ss << key << "=\"" << escape_label_value(label_value) << "\"";
161 first = false;
162 }
163 ss << "}";
164 }
165
166 ss << " " << value;
167
168 // Add timestamp if available
169 if (timestamp != std::chrono::system_clock::time_point{}) {
170 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
171 timestamp.time_since_epoch()).count();
172 ss << " " << ms;
173 }
174
175 ss << "\n";
176 return ss.str();
177 }
178
179private:
180 std::string escape_label_value(const std::string& label_value) const {
181 std::string escaped = label_value;
182 // Escape backslashes, quotes, and newlines
183 escaped = std::regex_replace(escaped, std::regex("\\\\"), "\\\\");
184 escaped = std::regex_replace(escaped, std::regex("\""), "\\\"");
185 escaped = std::regex_replace(escaped, std::regex("\n"), "\\n");
186 return escaped;
187 }
188};
189
195 std::string name;
197 double value;
198 double sample_rate = 1.0;
199 std::unordered_map<std::string, std::string> tags;
200
204 std::string to_statsd_format(bool datadog_format = false) const {
205 std::ostringstream ss;
206
207 ss << name << ":" << value << "|";
208
209 // Add type indicator
210 switch (type) {
211 case metric_type::counter: ss << "c"; break;
212 case metric_type::gauge: ss << "g"; break;
213 case metric_type::timer: ss << "ms"; break;
214 case metric_type::histogram: ss << "h"; break;
215 case metric_type::summary: ss << "s"; break;
216 }
217
218 // Add sample rate if not 1.0
219 if (sample_rate != 1.0) {
220 ss << "|@" << sample_rate;
221 }
222
223 // Add tags (DataDog format)
224 if (datadog_format && !tags.empty()) {
225 ss << "|#";
226 bool first = true;
227 for (const auto& [key, tag_value] : tags) {
228 if (!first) ss << ",";
229 ss << key << ":" << tag_value;
230 first = false;
231 }
232 }
233
234 return ss.str();
235 }
236};
237
243public:
244 virtual ~metric_exporter_interface() = default;
245
249 virtual common::VoidResult export_metrics(const std::vector<monitoring_data>& metrics) = 0;
250
254 virtual common::VoidResult export_snapshot(const metrics_snapshot& snapshot) = 0;
255
259 virtual common::VoidResult flush() = 0;
260
264 virtual common::VoidResult shutdown() = 0;
265
269 virtual std::unordered_map<std::string, std::size_t> get_stats() const = 0;
270
274 virtual common::VoidResult start() { return common::ok(); }
275
279 virtual common::VoidResult stop() { return common::ok(); }
280};
281
287private:
289 std::atomic<std::size_t> exported_metrics_{0};
290 std::atomic<std::size_t> failed_exports_{0};
291 std::atomic<std::size_t> scrape_requests_{0};
292 std::vector<prometheus_metric_data> current_metrics_;
293 mutable std::mutex metrics_mutex_;
294
295public:
297 : config_(config) {}
298
302 std::vector<prometheus_metric_data> convert_monitoring_data(const monitoring_data& data) const {
303 std::vector<prometheus_metric_data> prom_metrics;
304
305 for (const auto& [name, value] : data.get_metrics()) {
308 metric.type = infer_metric_type(name, value);
309 metric.value = value;
311 metric.help_text = "Metric from " + data.get_component_name();
312
313 // Add component name as label
314 metric.labels["component"] = data.get_component_name();
315
316 // Add custom labels from config
317 for (const auto& [key, label_value] : config_.labels) {
318 metric.labels[key] = label_value;
319 }
320
321 // Add tags as labels
322 for (const auto& [key, tag_value] : data.get_tags()) {
323 metric.labels[sanitize_label_name(key)] = tag_value;
324 }
325
326 if (!config_.instance_id.empty()) {
327 metric.labels["instance"] = config_.instance_id;
328 }
329
330 prom_metrics.push_back(std::move(metric));
331 }
332
333 return prom_metrics;
334 }
335
339 std::vector<prometheus_metric_data> convert_snapshot(const metrics_snapshot& snapshot) const {
340 std::vector<prometheus_metric_data> prom_metrics;
341
342 for (const auto& metric_val : snapshot.metrics) {
344 metric.name = sanitize_metric_name(metric_val.name);
345 metric.type = infer_metric_type(metric_val.name, metric_val.value);
346 metric.value = metric_val.value;
347 metric.timestamp = metric_val.timestamp;
348 metric.help_text = "System metric";
349
350 // Add source as label
351 if (!snapshot.source_id.empty()) {
352 metric.labels["source"] = snapshot.source_id;
353 }
354
355 // Add custom labels from config
356 for (const auto& [key, label_value] : config_.labels) {
357 metric.labels[key] = label_value;
358 }
359
360 // Add tags as labels
361 for (const auto& [key, tag_value] : metric_val.tags) {
362 metric.labels[sanitize_label_name(key)] = tag_value;
363 }
364
365 if (!config_.instance_id.empty()) {
366 metric.labels["instance"] = config_.instance_id;
367 }
368
369 prom_metrics.push_back(std::move(metric));
370 }
371
372 return prom_metrics;
373 }
374
375 common::VoidResult export_metrics(const std::vector<monitoring_data>& metrics) override {
376 try {
377 std::lock_guard<std::mutex> lock(metrics_mutex_);
378 current_metrics_.clear();
379
380 for (const auto& data : metrics) {
381 auto prom_metrics = convert_monitoring_data(data);
383 prom_metrics.begin(), prom_metrics.end());
384 }
385
386 exported_metrics_ += metrics.size();
387 return common::ok();
388
389 } catch (const std::exception& e) {
391 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
392 "Prometheus export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
393 }
394 }
395
396 common::VoidResult export_snapshot(const metrics_snapshot& snapshot) override {
397 try {
398 std::lock_guard<std::mutex> lock(metrics_mutex_);
399 auto prom_metrics = convert_snapshot(snapshot);
401 prom_metrics.begin(), prom_metrics.end());
402
404 return common::ok();
405
406 } catch (const std::exception& e) {
408 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
409 "Prometheus snapshot export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
410 }
411 }
412
416 std::string get_metrics_text() const {
417 std::lock_guard<std::mutex> lock(metrics_mutex_);
418 std::ostringstream ss;
419
420 for (const auto& metric : current_metrics_) {
421 ss << metric.to_prometheus_text();
422 }
423
424 // Increment scrape counter
425 const_cast<std::atomic<std::size_t>&>(scrape_requests_)++;
426
427 return ss.str();
428 }
429
430 common::VoidResult flush() override {
431 // Prometheus is pull-based, so flush is a no-op
432 return common::ok();
433 }
434
435 common::VoidResult shutdown() override {
436 return flush();
437 }
438
439 std::unordered_map<std::string, std::size_t> get_stats() const override {
440 return {
441 {"exported_metrics", exported_metrics_.load()},
442 {"failed_exports", failed_exports_.load()},
443 {"scrape_requests", scrape_requests_.load()},
444 {"current_metrics_count", current_metrics_.size()}
445 };
446 }
447
448private:
449 std::string sanitize_metric_name(const std::string& name) const {
450 std::string sanitized = name;
451 // Replace invalid characters with underscores
452 std::regex invalid_chars("[^a-zA-Z0-9_:]");
453 sanitized = std::regex_replace(sanitized, invalid_chars, "_");
454
455 // Ensure it starts with a letter or underscore
456 if (!sanitized.empty() && !std::isalpha(sanitized[0]) && sanitized[0] != '_') {
457 sanitized = "_" + sanitized;
458 }
459
460 return sanitized;
461 }
462
463 std::string sanitize_label_name(const std::string& name) const {
464 std::string sanitized = name;
465 // Replace invalid characters with underscores
466 std::regex invalid_chars("[^a-zA-Z0-9_]");
467 sanitized = std::regex_replace(sanitized, invalid_chars, "_");
468
469 // Ensure it starts with a letter or underscore
470 if (!sanitized.empty() && !std::isalpha(sanitized[0]) && sanitized[0] != '_') {
471 sanitized = "_" + sanitized;
472 }
473
474 return sanitized;
475 }
476
477 metric_type infer_metric_type(const std::string& name, double /*value*/) const {
478 // Simple heuristics for metric type inference
479 std::string lower_name = name;
480 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
481
482 if (lower_name.find("count") != std::string::npos ||
483 lower_name.find("total") != std::string::npos ||
484 lower_name.find("requests") != std::string::npos) {
486 } else if (lower_name.find("histogram") != std::string::npos ||
487 lower_name.find("bucket") != std::string::npos) {
489 } else if (lower_name.find("summary") != std::string::npos ||
490 lower_name.find("quantile") != std::string::npos) {
492 } else {
493 return metric_type::gauge; // Default to gauge
494 }
495 }
496};
497
506private:
508 std::unique_ptr<udp_transport> transport_;
509 std::atomic<std::size_t> exported_metrics_{0};
510 std::atomic<std::size_t> failed_exports_{0};
511 std::atomic<std::size_t> sent_packets_{0};
512 bool started_{false};
513
514public:
519 explicit statsd_exporter(const metric_export_config& config)
520 : config_(config)
522
529 std::unique_ptr<udp_transport> transport)
530 : config_(config)
531 , transport_(std::move(transport)) {}
532
536 std::vector<statsd_metric_data> convert_monitoring_data(const monitoring_data& data) const {
537 std::vector<statsd_metric_data> statsd_metrics;
538
539 for (const auto& [name, value] : data.get_metrics()) {
542 metric.type = infer_metric_type(name, value);
543 metric.value = value;
544 metric.sample_rate = 1.0;
545
546 // Add component as tag
547 metric.tags["component"] = data.get_component_name();
548
549 // Add custom tags from config
550 for (const auto& [key, tag_value] : config_.labels) {
551 metric.tags[key] = tag_value;
552 }
553
554 // Add tags from data
555 for (const auto& [key, tag_value] : data.get_tags()) {
556 metric.tags[key] = tag_value;
557 }
558
559 if (!config_.instance_id.empty()) {
560 metric.tags["instance"] = config_.instance_id;
561 }
562
563 statsd_metrics.push_back(std::move(metric));
564 }
565
566 return statsd_metrics;
567 }
568
572 std::vector<statsd_metric_data> convert_snapshot(const metrics_snapshot& snapshot) const {
573 std::vector<statsd_metric_data> statsd_metrics;
574
575 for (const auto& metric_val : snapshot.metrics) {
577 metric.name = sanitize_metric_name(metric_val.name);
578 metric.type = infer_metric_type(metric_val.name, metric_val.value);
579 metric.value = metric_val.value;
580 metric.sample_rate = 1.0;
581
582 // Add source as tag
583 if (!snapshot.source_id.empty()) {
584 metric.tags["source"] = snapshot.source_id;
585 }
586
587 // Add custom tags from config
588 for (const auto& [key, tag_value] : config_.labels) {
589 metric.tags[key] = tag_value;
590 }
591
592 // Add tags from metric
593 for (const auto& [key, tag_value] : metric_val.tags) {
594 metric.tags[key] = tag_value;
595 }
596
597 if (!config_.instance_id.empty()) {
598 metric.tags["instance"] = config_.instance_id;
599 }
600
601 statsd_metrics.push_back(std::move(metric));
602 }
603
604 return statsd_metrics;
605 }
606
607 common::VoidResult export_metrics(const std::vector<monitoring_data>& metrics) override {
608 try {
609 std::vector<std::string> statsd_lines;
610
611 for (const auto& data : metrics) {
612 auto statsd_metrics = convert_monitoring_data(data);
613 for (const auto& metric : statsd_metrics) {
614 bool datadog_format = (config_.format == metric_export_format::statsd_datadog);
615 statsd_lines.push_back(metric.to_statsd_format(datadog_format));
616 }
617 }
618
619 // Send via UDP (simulated)
620 auto send_result = send_udp_batch(statsd_lines);
621 if (send_result.is_ok()) {
622 exported_metrics_ += metrics.size();
624 } else {
626 return send_result;
627 }
628
629 return common::ok();
630
631 } catch (const std::exception& e) {
633 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
634 "StatsD export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
635 }
636 }
637
638 common::VoidResult export_snapshot(const metrics_snapshot& snapshot) override {
639 try {
640 auto statsd_metrics = convert_snapshot(snapshot);
641 std::vector<std::string> statsd_lines;
642
643 for (const auto& metric : statsd_metrics) {
644 bool datadog_format = (config_.format == metric_export_format::statsd_datadog);
645 statsd_lines.push_back(metric.to_statsd_format(datadog_format));
646 }
647
648 // Send via UDP (simulated)
649 auto send_result = send_udp_batch(statsd_lines);
650 if (send_result.is_ok()) {
653 } else {
655 return send_result;
656 }
657
658 return common::ok();
659
660 } catch (const std::exception& e) {
662 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
663 "StatsD snapshot export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
664 }
665 }
666
667 common::VoidResult start() override {
668 if (started_) {
669 return common::ok();
670 }
671
672 if (!transport_) {
673 return common::VoidResult::err(error_info(
675 "UDP transport not available",
676 "statsd_exporter"
677 ).to_common_error());
678 }
679
680 // Connect to StatsD endpoint
681 auto connect_result = transport_->connect(config_.endpoint, config_.port);
682 if (connect_result.is_err()) {
683 return connect_result;
684 }
685
686 started_ = true;
687 return common::ok();
688 }
689
690 common::VoidResult stop() override {
691 if (!started_) {
692 return common::ok();
693 }
694
695 if (transport_) {
696 transport_->disconnect();
697 }
698
699 started_ = false;
700 return common::ok();
701 }
702
703 common::VoidResult flush() override {
704 // StatsD is push-based and sends immediately, so flush is a no-op
705 return common::ok();
706 }
707
708 common::VoidResult shutdown() override {
709 return stop();
710 }
711
712 std::unordered_map<std::string, std::size_t> get_stats() const override {
713 std::unordered_map<std::string, std::size_t> stats = {
714 {"exported_metrics", exported_metrics_.load()},
715 {"failed_exports", failed_exports_.load()},
716 {"sent_packets", sent_packets_.load()}
717 };
718
719 // Add transport statistics if available
720 if (transport_) {
721 auto transport_stats = transport_->get_statistics();
722 stats["transport_packets_sent"] = transport_stats.packets_sent;
723 stats["transport_bytes_sent"] = transport_stats.bytes_sent;
724 stats["transport_send_failures"] = transport_stats.send_failures;
725 }
726
727 return stats;
728 }
729
730private:
731 common::VoidResult send_udp_batch(const std::vector<std::string>& lines) {
732 if (!transport_) {
733 return common::VoidResult::err(error_info(
735 "UDP transport not available",
736 "statsd_exporter"
737 ).to_common_error());
738 }
739
740 // Auto-connect if not already connected
741 if (!transport_->is_connected()) {
742 auto connect_result = transport_->connect(config_.endpoint, config_.port);
743 if (connect_result.is_err()) {
744 return connect_result;
745 }
746 }
747
748 // Combine lines into a single packet (newline-separated)
749 // StatsD typically accepts multiple metrics per packet
750 std::string batch;
751 for (const auto& line : lines) {
752 if (!batch.empty()) {
753 batch += '\n';
754 }
755 batch += line;
756 }
757
758 // Send the batch
759 return transport_->send(batch);
760 }
761
762 std::string sanitize_metric_name(const std::string& name) const {
763 std::string sanitized = name;
764 // Replace dots and spaces with underscores for StatsD
765 std::regex invalid_chars("[.\\s]+");
766 sanitized = std::regex_replace(sanitized, invalid_chars, "_");
767 return sanitized;
768 }
769
770 metric_type infer_metric_type(const std::string& name, double /*value*/) const {
771 // Simple heuristics for StatsD metric type inference
772 std::string lower_name = name;
773 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
774
775 if (lower_name.find("count") != std::string::npos ||
776 lower_name.find("total") != std::string::npos) {
778 } else if (lower_name.find("time") != std::string::npos ||
779 lower_name.find("duration") != std::string::npos ||
780 lower_name.find("latency") != std::string::npos) {
781 return metric_type::timer;
782 } else {
783 return metric_type::gauge; // Default to gauge
784 }
785 }
786};
787
796private:
798 std::unique_ptr<opentelemetry_metrics_adapter> otel_adapter_;
799 std::unique_ptr<http_transport> http_transport_;
800 std::unique_ptr<grpc_transport> grpc_transport_;
801 std::atomic<std::size_t> exported_metrics_{0};
802 std::atomic<std::size_t> failed_exports_{0};
803 bool started_{false};
804
805public:
816
825 const otel_resource& resource,
826 std::unique_ptr<http_transport> http_transport,
827 std::unique_ptr<grpc_transport> grpc_transport)
828 : config_(config)
829 , otel_adapter_(std::make_unique<opentelemetry_metrics_adapter>(resource))
830 , http_transport_(std::move(http_transport))
831 , grpc_transport_(std::move(grpc_transport)) {}
832
833 common::VoidResult export_metrics(const std::vector<monitoring_data>& metrics) override {
834 try {
835 for (const auto& data : metrics) {
836 // Convert to OpenTelemetry format
837 auto otel_result = otel_adapter_->convert_monitoring_data(data);
838 if (otel_result.is_err()) {
840 return common::VoidResult::err(error_info(monitoring_error_code::processing_failed,
841 "Failed to convert metrics to OTEL format: " + otel_result.error().message, "monitoring_system").to_common_error());
842 }
843
844 const auto& otel_metrics = otel_result.value();
845
846 // Send via appropriate OTLP protocol
847 auto send_result = send_otlp_batch(otel_metrics);
848 if (send_result.is_err()) {
850 return send_result;
851 }
852 }
853
854 exported_metrics_ += metrics.size();
855 return common::ok();
856
857 } catch (const std::exception& e) {
859 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
860 "OTLP metrics export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
861 }
862 }
863
864 common::VoidResult export_snapshot(const metrics_snapshot& snapshot) override {
865 try {
866 // Convert to OpenTelemetry format
867 auto otel_result = otel_adapter_->convert_metrics(snapshot);
868 if (otel_result.is_err()) {
870 return common::VoidResult::err(error_info(monitoring_error_code::processing_failed,
871 "Failed to convert snapshot to OTEL format: " + otel_result.error().message, "monitoring_system").to_common_error());
872 }
873
874 const auto& otel_metrics = otel_result.value();
875
876 // Send via appropriate OTLP protocol
877 auto send_result = send_otlp_batch(otel_metrics);
878 if (send_result.is_err()) {
880 return send_result;
881 }
882
884 return common::ok();
885
886 } catch (const std::exception& e) {
888 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
889 "OTLP snapshot export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
890 }
891 }
892
893 common::VoidResult start() override {
894 if (started_) {
895 return common::ok();
896 }
897
898 // gRPC transport connection is managed per-request
899 // HTTP transport is stateless
900 started_ = true;
901 return common::ok();
902 }
903
904 common::VoidResult stop() override {
905 if (!started_) {
906 return common::ok();
907 }
908
909 if (grpc_transport_) {
910 grpc_transport_->disconnect();
911 }
912
913 started_ = false;
914 return common::ok();
915 }
916
917 common::VoidResult flush() override {
918 // OTLP exporter typically sends immediately, so flush is a no-op
919 return common::ok();
920 }
921
922 common::VoidResult shutdown() override {
923 return stop();
924 }
925
926 std::unordered_map<std::string, std::size_t> get_stats() const override {
927 std::unordered_map<std::string, std::size_t> stats = {
928 {"exported_metrics", exported_metrics_.load()},
929 {"failed_exports", failed_exports_.load()}
930 };
931
932 // Add transport statistics based on protocol
934 auto transport_stats = grpc_transport_->get_statistics();
935 stats["transport_requests_sent"] = transport_stats.requests_sent;
936 stats["transport_bytes_sent"] = transport_stats.bytes_sent;
937 stats["transport_send_failures"] = transport_stats.send_failures;
938 }
939
940 return stats;
941 }
942
943private:
947
952
953 std::string get_content_type() const {
954 switch (config_.format) {
956 return "application/json";
958 return "application/x-protobuf";
959 default:
960 return "application/json";
961 }
962 }
963
964 common::VoidResult send_otlp_batch(const std::vector<otel_metric_data>& metrics) {
965 if (is_grpc_protocol()) {
966 return send_via_grpc(metrics);
967 } else {
968 return send_via_http(metrics);
969 }
970 }
971
972 common::VoidResult send_via_http(const std::vector<otel_metric_data>& metrics) {
973 if (!http_transport_) {
974 return common::VoidResult::err(error_info(
976 "HTTP transport not available",
977 "otlp_metrics_exporter"
978 ).to_common_error());
979 }
980
981 // Build OTLP HTTP endpoint
982 std::string endpoint = config_.endpoint;
983 if (config_.port != 0) {
984 endpoint += ":" + std::to_string(config_.port);
985 }
986 endpoint += "/v1/metrics";
987
988 // Serialize metrics to JSON or Protobuf
989 std::vector<uint8_t> body = serialize_metrics(metrics);
990
991 http_request request;
992 request.url = endpoint;
993 request.method = "POST";
994 request.headers["Content-Type"] = get_content_type();
995 request.body = std::move(body);
996 request.timeout = config_.timeout;
997
998 // Add custom headers
999 for (const auto& [key, value] : config_.headers) {
1000 request.headers[key] = value;
1001 }
1002
1003 auto result = http_transport_->send(request);
1004 if (result.is_err()) {
1005 return common::VoidResult::err(error_info(
1007 "HTTP send failed: " + result.error().message,
1008 "otlp_metrics_exporter"
1009 ).to_common_error());
1010 }
1011
1012 const auto& response = result.value();
1013 if (response.status_code < 200 || response.status_code >= 300) {
1014 return common::VoidResult::err(error_info(
1016 "OTLP HTTP request failed with status " + std::to_string(response.status_code),
1017 "otlp_metrics_exporter"
1018 ).to_common_error());
1019 }
1020
1021 return common::ok();
1022 }
1023
1024 common::VoidResult send_via_grpc(const std::vector<otel_metric_data>& metrics) {
1025 if (!grpc_transport_) {
1026 return common::VoidResult::err(error_info(
1028 "gRPC transport not available",
1029 "otlp_metrics_exporter"
1030 ).to_common_error());
1031 }
1032
1033 // Connect if not already connected
1034 if (!grpc_transport_->is_connected()) {
1035 auto connect_result = grpc_transport_->connect(config_.endpoint, config_.port);
1036 if (connect_result.is_err()) {
1037 return connect_result;
1038 }
1039 }
1040
1041 // Serialize metrics to protobuf
1042 std::vector<uint8_t> body = serialize_metrics(metrics);
1043
1044 grpc_request request;
1045 request.service = "opentelemetry.proto.collector.metrics.v1.MetricsService";
1046 request.method = "Export";
1047 request.body = std::move(body);
1048 request.timeout = config_.timeout;
1049
1050 auto result = grpc_transport_->send(request);
1051 if (result.is_err()) {
1052 return common::VoidResult::err(error_info(
1054 "gRPC send failed: " + result.error().message,
1055 "otlp_metrics_exporter"
1056 ).to_common_error());
1057 }
1058
1059 const auto& response = result.value();
1060 if (response.status_code != 0) { // gRPC OK is 0
1061 return common::VoidResult::err(error_info(
1063 "OTLP gRPC request failed: " + response.status_message,
1064 "otlp_metrics_exporter"
1065 ).to_common_error());
1066 }
1067
1068 return common::ok();
1069 }
1070
1071 std::vector<uint8_t> serialize_metrics(const std::vector<otel_metric_data>& metrics) const {
1072 // Serialize metrics based on format
1073 // For JSON format, convert to JSON string
1074 // For protobuf format, serialize to protobuf bytes
1075 // This is a simplified implementation
1076 std::string json = "{\"resourceMetrics\":[";
1077
1078 bool first = true;
1079 for (const auto& metric : metrics) {
1080 if (!first) json += ",";
1081 first = false;
1082
1083 json += "{\"resource\":{},\"scopeMetrics\":[{\"metrics\":[{";
1084 json += "\"name\":\"" + metric.name + "\",";
1085 json += "\"gauge\":{\"dataPoints\":[{\"asDouble\":" +
1086 std::to_string(metric.value) + "}]}";
1087 json += "}]}]}";
1088 }
1089
1090 json += "]}";
1091
1092 return std::vector<uint8_t>(json.begin(), json.end());
1093 }
1094};
1095
1101public:
1105 static std::unique_ptr<metric_exporter_interface> create_exporter(
1106 const metric_export_config& config,
1107 const otel_resource& resource = create_service_resource("monitoring_system", "2.0.0")) {
1108
1109 switch (config.format) {
1112 return std::make_unique<prometheus_exporter>(config);
1113
1116 return std::make_unique<statsd_exporter>(config);
1117
1121 return std::make_unique<otlp_metrics_exporter>(config, resource);
1122
1123 default:
1124 return nullptr;
1125 }
1126 }
1127
1131 static std::vector<metric_export_format> get_supported_formats(const std::string& backend) {
1132 if (backend == "prometheus") {
1134 } else if (backend == "statsd") {
1136 } else if (backend == "otlp") {
1139 }
1140 return {};
1141 }
1142};
1143
1147inline std::unique_ptr<prometheus_exporter> create_prometheus_exporter(
1148 std::uint16_t port = 9090,
1149 const std::string& job_name = "monitoring_system") {
1150
1151 metric_export_config config;
1152 config.port = port;
1154 config.job_name = job_name;
1155 return std::make_unique<prometheus_exporter>(config);
1156}
1157
1161inline std::unique_ptr<statsd_exporter> create_statsd_exporter(
1162 const std::string& host = "localhost",
1163 std::uint16_t port = 8125,
1164 bool datadog_format = false) {
1165
1166 metric_export_config config;
1167 config.endpoint = host;
1168 config.port = port;
1170 return std::make_unique<statsd_exporter>(config);
1171}
1172
1176inline std::unique_ptr<otlp_metrics_exporter> create_otlp_metrics_exporter(
1177 const std::string& endpoint,
1178 const otel_resource& resource,
1180
1181 metric_export_config config;
1182 config.endpoint = endpoint;
1183 config.format = format;
1184 return std::make_unique<otlp_metrics_exporter>(config, resource);
1185}
1186
1187} } // namespace kcenon::monitoring
Abstract gRPC transport interface.
Abstract HTTP transport interface.
Factory for creating metric exporters.
static std::vector< metric_export_format > get_supported_formats(const std::string &backend)
Get supported formats for a specific backend.
static std::unique_ptr< metric_exporter_interface > create_exporter(const metric_export_config &config, const otel_resource &resource=create_service_resource("monitoring_system", "2.0.0"))
Create a metric exporter based on format.
Abstract interface for metric exporters.
virtual common::VoidResult stop()
Stop the exporter.
virtual common::VoidResult export_snapshot(const metrics_snapshot &snapshot)=0
Export a single metrics snapshot.
virtual common::VoidResult shutdown()=0
Shutdown the exporter.
virtual common::VoidResult start()
Start the exporter (for pull-based systems)
virtual common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics)=0
Export a batch of metrics.
virtual common::VoidResult flush()=0
Flush any pending metrics.
virtual std::unordered_map< std::string, std::size_t > get_stats() const =0
Get exporter statistics.
Adapter for converting monitoring system metrics to OpenTelemetry format.
OpenTelemetry Protocol (OTLP) metrics exporter implementation.
std::unique_ptr< grpc_transport > grpc_transport_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
otlp_metrics_exporter(const metric_export_config &config, const otel_resource &resource, std::unique_ptr< http_transport > http_transport, std::unique_ptr< grpc_transport > grpc_transport)
Construct OTLP exporter with custom transports.
std::unique_ptr< opentelemetry_metrics_adapter > otel_adapter_
std::unique_ptr< http_transport > http_transport_
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< uint8_t > serialize_metrics(const std::vector< otel_metric_data > &metrics) const
std::atomic< std::size_t > exported_metrics_
common::VoidResult start() override
Start the exporter (for pull-based systems)
common::VoidResult send_via_http(const std::vector< otel_metric_data > &metrics)
common::VoidResult send_via_grpc(const std::vector< otel_metric_data > &metrics)
common::VoidResult send_otlp_batch(const std::vector< otel_metric_data > &metrics)
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
otlp_metrics_exporter(const metric_export_config &config, const otel_resource &resource)
Construct OTLP exporter with default transports.
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult stop() override
Stop the exporter.
common::VoidResult flush() override
Flush any pending metrics.
Prometheus metric exporter implementation.
common::VoidResult flush() override
Flush any pending metrics.
std::atomic< std::size_t > exported_metrics_
std::atomic< std::size_t > failed_exports_
prometheus_exporter(const metric_export_config &config)
std::string get_metrics_text() const
Get current metrics in Prometheus format (for HTTP endpoint)
std::atomic< std::size_t > scrape_requests_
metric_type infer_metric_type(const std::string &name, double) const
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
std::vector< prometheus_metric_data > convert_snapshot(const metrics_snapshot &snapshot) const
Convert metrics_snapshot to Prometheus format.
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
std::vector< prometheus_metric_data > convert_monitoring_data(const monitoring_data &data) const
Convert monitoring_data to Prometheus format.
std::string sanitize_metric_name(const std::string &name) const
std::string sanitize_label_name(const std::string &name) const
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< prometheus_metric_data > current_metrics_
StatsD metric exporter implementation.
std::atomic< std::size_t > failed_exports_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
statsd_exporter(const metric_export_config &config, std::unique_ptr< udp_transport > transport)
Construct StatsD exporter with custom UDP transport.
std::string sanitize_metric_name(const std::string &name) const
std::atomic< std::size_t > sent_packets_
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > exported_metrics_
std::unique_ptr< udp_transport > transport_
common::VoidResult stop() override
Stop the exporter.
std::vector< statsd_metric_data > convert_snapshot(const metrics_snapshot &snapshot) const
Convert metrics_snapshot to StatsD format.
common::VoidResult start() override
Start the exporter (for pull-based systems)
metric_type infer_metric_type(const std::string &name, double) const
common::VoidResult export_snapshot(const metrics_snapshot &snapshot) override
Export a single metrics snapshot.
common::VoidResult flush() override
Flush any pending metrics.
common::VoidResult send_udp_batch(const std::vector< std::string > &lines)
common::VoidResult export_metrics(const std::vector< monitoring_data > &metrics) override
Export a batch of metrics.
std::vector< statsd_metric_data > convert_monitoring_data(const monitoring_data &data) const
Convert monitoring_data to StatsD format.
statsd_exporter(const metric_export_config &config)
Construct StatsD exporter with default UDP transport.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
HTTP transport layer for trace exporters.
Interface for components that expose monitoring metrics.
Core monitoring system interface definitions.
metric_type
Metric types supported by exporters.
@ timer
StatsD-specific timer metric.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
@ summary
Pre-calculated quantiles and count/sum.
@ histogram
Distribution of values with buckets.
metric_export_format
Supported metric export formats.
@ prometheus_protobuf
Prometheus protocol buffers format.
@ statsd_plain
StatsD plain UDP format.
@ otlp_grpc
OTLP gRPC metrics protocol.
@ statsd_datadog
StatsD DataDog extension format.
@ prometheus_text
Prometheus text exposition format.
@ otlp_http_json
OTLP HTTP JSON metrics protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers metrics.
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< http_transport > create_default_transport()
Create default HTTP transport.
std::unique_ptr< prometheus_exporter > create_prometheus_exporter(std::uint16_t port=9090, const std::string &job_name="monitoring_system")
Helper function to create a Prometheus exporter.
std::unique_ptr< udp_transport > create_default_udp_transport()
Create default UDP transport.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
@ gauge
Instantaneous value that can go up and down.
@ counter
Monotonically increasing counter.
@ histogram
Distribution of values with buckets.
std::unique_ptr< otlp_metrics_exporter > create_otlp_metrics_exporter(const std::string &endpoint, const otel_resource &resource, metric_export_format format=metric_export_format::otlp_grpc)
Helper function to create an OTLP metrics exporter.
std::unique_ptr< statsd_exporter > create_statsd_exporter(const std::string &host="localhost", std::uint16_t port=8125, bool datadog_format=false)
Helper function to create a StatsD exporter.
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
gRPC request configuration
std::chrono::milliseconds timeout
std::vector< uint8_t > body
HTTP request configuration.
std::chrono::milliseconds timeout
std::vector< uint8_t > body
std::unordered_map< std::string, std::string > headers
Configuration for metric exporters.
std::unordered_map< std::string, std::string > labels
Default labels/tags.
std::size_t max_batch_size
Maximum metrics per batch.
std::chrono::milliseconds push_interval
Push interval for push-based systems.
std::string instance_id
Instance identifier.
std::uint16_t port
Port number (for UDP/TCP)
std::size_t max_queue_size
Maximum queued metrics.
std::string endpoint
Endpoint URL or address.
std::string job_name
Prometheus job name.
std::chrono::milliseconds timeout
Request timeout.
std::unordered_map< std::string, std::string > headers
Custom HTTP headers.
bool enable_compression
Enable data compression.
common::VoidResult validate() const
Validate export configuration.
Basic metric structure for interface compatibility.
std::chrono::system_clock::time_point timestamp
std::variant< double, int64_t, std::string > value
std::unordered_map< std::string, std::string > tags
Complete snapshot of metrics at a point in time.
std::vector< metric_value > metrics
Container for monitoring metrics from a component.
std::chrono::system_clock::time_point get_timestamp() const
Get the timestamp.
const std::string & get_component_name() const
Get the component name.
const tag_map & get_tags() const
Get all tags.
const metric_map & get_metrics() const
Get all metrics.
OpenTelemetry resource representation.
Prometheus-specific metric representation.
std::chrono::system_clock::time_point timestamp
std::unordered_map< std::string, std::string > labels
std::string to_prometheus_text() const
Convert to Prometheus text format.
std::string escape_label_value(const std::string &label_value) const
StatsD-specific metric representation.
std::unordered_map< std::string, std::string > tags
std::string to_statsd_format(bool datadog_format=false) const
Convert to StatsD format.
UDP transport layer for metric exporters.