Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
trace_exporters.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
22#include "../core/error_codes.h"
25#include "http_transport.h"
26#include "grpc_transport.h"
29#include <vector>
30#include <string>
31#include <memory>
32#include <chrono>
33#include <optional>
34#include <functional>
35#include <atomic>
36#include <algorithm>
37#include <unordered_map>
38#include <sstream>
39#include <iomanip>
40#include <cmath>
41#include <thread>
42
43namespace kcenon { namespace monitoring {
44
58
64 std::string endpoint;
66 std::chrono::milliseconds timeout{30000};
67 std::chrono::milliseconds batch_timeout{5000};
68 std::size_t max_batch_size = 512;
69 std::size_t max_queue_size = 2048;
70 bool enable_compression = true;
71 std::unordered_map<std::string, std::string> headers;
72 std::optional<std::string> service_name;
73
77 common::VoidResult validate() const {
78 if (endpoint.empty()) {
79 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
80 "Export endpoint cannot be empty", "monitoring_system").to_common_error());
81 }
82
83 if (timeout.count() <= 0) {
84 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
85 "Timeout must be positive", "monitoring_system").to_common_error());
86 }
87
88 if (max_batch_size == 0) {
89 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
90 "Batch size must be greater than 0", "monitoring_system").to_common_error());
91 }
92
94 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
95 "Queue size must be at least batch size", "monitoring_system").to_common_error());
96 }
97
98 return common::ok();
99 }
100};
101
107 std::string trace_id;
108 std::string span_id;
109 std::string parent_span_id;
110 std::string operation_name;
111 std::string service_name;
112 std::chrono::microseconds start_time;
113 std::chrono::microseconds duration;
114 std::vector<std::pair<std::string, std::string>> tags;
115 std::vector<std::pair<std::string, std::string>> process_tags;
116
120 std::string to_thrift_json() const {
121 std::ostringstream json;
122 json << "{";
123 json << "\"traceIdHigh\":0,";
124 json << "\"traceIdLow\":" << std::hash<std::string>{}(trace_id) << ",";
125 json << "\"spanId\":" << std::hash<std::string>{}(span_id) << ",";
126 json << "\"parentSpanId\":" << (parent_span_id.empty() ? "0" : std::to_string(std::hash<std::string>{}(parent_span_id))) << ",";
127 json << "\"operationName\":\"" << operation_name << "\",";
128 json << "\"startTime\":" << start_time.count() << ",";
129 json << "\"duration\":" << duration.count() << ",";
130
131 // Tags
132 json << "\"tags\":[";
133 bool first = true;
134 for (const auto& [key, value] : tags) {
135 if (!first) json << ",";
136 json << "{\"key\":\"" << key << "\",\"vType\":\"STRING\",\"vStr\":\"" << value << "\"}";
137 first = false;
138 }
139 json << "],";
140
141 // Process
142 json << "\"process\":{\"serviceName\":\"" << service_name << "\",\"tags\":[";
143 first = true;
144 for (const auto& [key, value] : process_tags) {
145 if (!first) json << ",";
146 json << "{\"key\":\"" << key << "\",\"vType\":\"STRING\",\"vStr\":\"" << value << "\"}";
147 first = false;
148 }
149 json << "]}";
150
151 json << "}";
152 return json.str();
153 }
154
162 std::vector<uint8_t> to_protobuf() const {
169
170 if (!parent_span_id.empty()) {
172 ref.trace_id = out.trace_id;
175 ref.ref_type = 0; // CHILD_OF
176 out.references.push_back(std::move(ref));
177 }
178
179 // Timestamp / duration split into (seconds, nanos).
180 auto st_ns = std::chrono::nanoseconds(start_time);
181 out.start_time_seconds = static_cast<std::int64_t>(
182 std::chrono::duration_cast<std::chrono::seconds>(st_ns).count());
183 out.start_time_nanos = static_cast<std::int32_t>(
184 (st_ns - std::chrono::seconds(out.start_time_seconds)).count());
185
186 auto du_ns = std::chrono::nanoseconds(duration);
187 out.duration_seconds = static_cast<std::int64_t>(
188 std::chrono::duration_cast<std::chrono::seconds>(du_ns).count());
189 out.duration_nanos = static_cast<std::int32_t>(
190 (du_ns - std::chrono::seconds(out.duration_seconds)).count());
191
192 for (const auto& [key, value] : tags) {
194 kv.key = key;
196 kv.v_str = value;
197 out.tags.push_back(std::move(kv));
198 }
199
201 for (const auto& [key, value] : process_tags) {
203 kv.key = key;
205 kv.v_str = value;
206 out.proc.tags.push_back(std::move(kv));
207 }
208
209 return jaeger_proto::encode_span(out);
210 }
211};
212
217inline std::vector<uint8_t> encode_jaeger_batch(
218 const std::vector<jaeger_span_data>& spans,
219 const std::string& service_name) {
221 b.spans.reserve(spans.size());
222 for (const auto& s : spans) {
225 protobuf_wire::hex_to_bytes(s.trace_id), 16);
227 protobuf_wire::hex_to_bytes(s.span_id), 8);
228 ps.operation_name = s.operation_name;
229 if (!s.parent_span_id.empty()) {
231 ref.trace_id = ps.trace_id;
233 protobuf_wire::hex_to_bytes(s.parent_span_id), 8);
234 ref.ref_type = 0;
235 ps.references.push_back(std::move(ref));
236 }
237 auto st_ns = std::chrono::nanoseconds(s.start_time);
238 ps.start_time_seconds = static_cast<std::int64_t>(
239 std::chrono::duration_cast<std::chrono::seconds>(st_ns).count());
240 ps.start_time_nanos = static_cast<std::int32_t>(
241 (st_ns - std::chrono::seconds(ps.start_time_seconds)).count());
242 auto du_ns = std::chrono::nanoseconds(s.duration);
243 ps.duration_seconds = static_cast<std::int64_t>(
244 std::chrono::duration_cast<std::chrono::seconds>(du_ns).count());
245 ps.duration_nanos = static_cast<std::int32_t>(
246 (du_ns - std::chrono::seconds(ps.duration_seconds)).count());
247 for (const auto& [key, value] : s.tags) {
249 kv.key = key;
251 kv.v_str = value;
252 ps.tags.push_back(std::move(kv));
253 }
254 b.spans.push_back(std::move(ps));
255 }
256 b.proc.service_name = service_name;
257 b.has_process = !service_name.empty();
259}
260
266 std::string trace_id;
267 std::string span_id;
268 std::string parent_id;
269 std::string name;
270 std::string kind;
271 std::chrono::microseconds timestamp;
272 std::chrono::microseconds duration;
275 std::unordered_map<std::string, std::string> tags;
276 bool shared = false;
277
281 std::string to_json_v2() const {
282 std::ostringstream json;
283 json << "{";
284 json << "\"traceId\":\"" << trace_id << "\",";
285 json << "\"id\":\"" << span_id << "\",";
286 if (!parent_id.empty()) {
287 json << "\"parentId\":\"" << parent_id << "\",";
288 }
289 json << "\"name\":\"" << name << "\",";
290 json << "\"kind\":\"" << kind << "\",";
291 json << "\"timestamp\":" << timestamp.count() << ",";
292 json << "\"duration\":" << duration.count() << ",";
293
294 // Local endpoint
295 json << "\"localEndpoint\":{\"serviceName\":\"" << local_endpoint_service_name << "\"},";
296
297 // Remote endpoint (if set)
298 if (!remote_endpoint_service_name.empty()) {
299 json << "\"remoteEndpoint\":{\"serviceName\":\"" << remote_endpoint_service_name << "\"},";
300 }
301
302 // Tags
303 json << "\"tags\":{";
304 bool first = true;
305 for (const auto& [key, value] : tags) {
306 if (!first) json << ",";
307 json << "\"" << key << "\":\"" << value << "\"";
308 first = false;
309 }
310 json << "}";
311
312 if (shared) {
313 json << ",\"shared\":true";
314 }
315
316 json << "}";
317 return json.str();
318 }
319
326 std::vector<uint8_t> to_protobuf() const {
328 auto trace_bytes = protobuf_wire::hex_to_bytes(trace_id);
329 // Zipkin accepts 8 or 16 byte trace IDs; normalize the common
330 // shorter/empty/non-hex inputs to 8 bytes so the wire always carries
331 // a valid ID (Zipkin rejects spans without a trace ID).
332 if (trace_bytes.size() == 16) {
333 out.trace_id = std::move(trace_bytes);
334 } else {
335 out.trace_id = protobuf_wire::left_pad(trace_bytes, 8);
336 }
339 if (!parent_id.empty()) {
342 }
344 out.name = name;
345 out.timestamp = static_cast<std::uint64_t>(timestamp.count());
346 out.duration = static_cast<std::uint64_t>(duration.count());
349 out.shared = shared;
350 for (const auto& [key, value] : tags) {
351 out.tags.emplace_back(key, value);
352 }
353 return zipkin_proto::encode_span(out);
354 }
355};
356
362inline std::vector<uint8_t> encode_zipkin_list_of_spans(
363 const std::vector<zipkin_span_data>& spans) {
365 list.spans.reserve(spans.size());
366 for (const auto& s : spans) {
368 auto trace_bytes = protobuf_wire::hex_to_bytes(s.trace_id);
369 if (trace_bytes.size() == 16) {
370 ps.trace_id = std::move(trace_bytes);
371 } else {
372 ps.trace_id = protobuf_wire::left_pad(trace_bytes, 8);
373 }
375 protobuf_wire::hex_to_bytes(s.span_id), 8);
376 if (!s.parent_id.empty()) {
378 protobuf_wire::hex_to_bytes(s.parent_id), 8);
379 }
380 ps.kind = zipkin_proto::parse_kind(s.kind);
381 ps.name = s.name;
382 ps.timestamp = static_cast<std::uint64_t>(s.timestamp.count());
383 ps.duration = static_cast<std::uint64_t>(s.duration.count());
384 ps.local_endpoint.service_name = s.local_endpoint_service_name;
385 ps.remote_endpoint.service_name = s.remote_endpoint_service_name;
386 ps.shared = s.shared;
387 for (const auto& [key, value] : s.tags) {
388 ps.tags.emplace_back(key, value);
389 }
390 list.spans.push_back(std::move(ps));
391 }
393}
394
400public:
401 virtual ~trace_exporter_interface() = default;
402
406 virtual common::VoidResult export_spans(const std::vector<trace_span>& spans) = 0;
407
411 virtual common::VoidResult flush() = 0;
412
416 virtual common::VoidResult shutdown() = 0;
417
421 virtual std::unordered_map<std::string, std::size_t> get_stats() const = 0;
422};
423
432private:
434 std::unique_ptr<http_transport> transport_;
435 std::atomic<std::size_t> exported_spans_{0};
436 std::atomic<std::size_t> failed_exports_{0};
437 std::atomic<std::size_t> dropped_spans_{0};
438 std::size_t max_retries_{3};
439 std::chrono::milliseconds base_retry_delay_{100};
440
441public:
444
445 jaeger_exporter(const trace_export_config& config, std::unique_ptr<http_transport> transport)
446 : config_(config), transport_(std::move(transport)) {}
447
452 jaeger_span_data jaeger_span;
453 jaeger_span.trace_id = span.trace_id;
454 jaeger_span.span_id = span.span_id;
455 jaeger_span.parent_span_id = span.parent_span_id;
456 jaeger_span.operation_name = span.operation_name;
457 jaeger_span.service_name = config_.service_name.value_or(span.service_name);
458
459 // Convert timestamps
460 auto start_epoch = span.start_time.time_since_epoch();
461 jaeger_span.start_time = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
462
463 auto end_epoch = span.end_time.time_since_epoch();
464 jaeger_span.duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
465
466 // Convert tags
467 for (const auto& [key, value] : span.tags) {
468 jaeger_span.tags.emplace_back(key, value);
469 }
470
471 // Add process tags
472 jaeger_span.process_tags.emplace_back("service.name", jaeger_span.service_name);
473
474 return jaeger_span;
475 }
476
477 common::VoidResult export_spans(const std::vector<trace_span>& spans) override {
478 try {
479 std::vector<jaeger_span_data> jaeger_spans;
480 jaeger_spans.reserve(spans.size());
481
482 for (const auto& span : spans) {
483 jaeger_spans.push_back(convert_span(span));
484 }
485
486 // Convert to appropriate format and send
487 common::VoidResult send_result = common::ok();
489 send_result = send_thrift_batch(jaeger_spans);
491 send_result = send_grpc_batch(jaeger_spans);
492 } else {
493 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
494 "Invalid Jaeger export format", "monitoring_system").to_common_error());
495 }
496
497 if (send_result.is_ok()) {
498 exported_spans_ += spans.size();
499 } else {
501 return send_result;
502 }
503
504 return common::ok();
505
506 } catch (const std::exception& e) {
508 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
509 "Jaeger export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
510 }
511 }
512
513 common::VoidResult flush() override {
514 // Jaeger exporter typically sends immediately, so flush is a no-op
515 return common::ok();
516 }
517
518 common::VoidResult shutdown() override {
519 return flush();
520 }
521
522 std::unordered_map<std::string, std::size_t> get_stats() const override {
523 return {
524 {"exported_spans", exported_spans_.load()},
525 {"failed_exports", failed_exports_.load()},
526 {"dropped_spans", dropped_spans_.load()}
527 };
528 }
529
530private:
531 common::VoidResult send_thrift_batch(const std::vector<jaeger_span_data>& spans) {
532 // Build JSON payload for Thrift over HTTP
533 std::ostringstream payload;
534 payload << "{\"data\":[{\"spans\":[";
535 bool first = true;
536 for (const auto& span : spans) {
537 if (!first) payload << ",";
538 payload << span.to_thrift_json();
539 first = false;
540 }
541 payload << "]}]}";
542
543 std::string body = payload.str();
544
545 // Build HTTP request
546 http_request request;
547 request.url = config_.endpoint + "/api/traces";
548 request.method = "POST";
549 request.headers["Content-Type"] = "application/x-thrift";
550 request.headers["Accept"] = "application/json";
551 for (const auto& [key, value] : config_.headers) {
552 request.headers[key] = value;
553 }
554 request.body = std::vector<uint8_t>(body.begin(), body.end());
555 request.timeout = config_.timeout;
556
557 // Send with retry
558 return send_with_retry(request);
559 }
560
561 common::VoidResult send_grpc_batch(const std::vector<jaeger_span_data>& spans) {
562 // Serialize as a Jaeger api_v2 `Batch` protobuf message. Real gRPC
563 // transport would additionally wrap this in a 5-byte gRPC length
564 // prefix and multiplex over HTTP/2; when a gRPC transport is
565 // available the raw `Batch` bytes below can be used as the message
566 // payload directly.
567 std::string resolved_service;
568 if (config_.service_name) {
569 resolved_service = *config_.service_name;
570 } else if (!spans.empty()) {
571 resolved_service = spans.front().service_name;
572 }
573 auto payload = encode_jaeger_batch(spans, resolved_service);
574
575 http_request request;
576 request.url = config_.endpoint;
577 request.method = "POST";
578 request.headers["Content-Type"] = "application/x-protobuf";
579 for (const auto& [key, value] : config_.headers) {
580 request.headers[key] = value;
581 }
582 request.body = payload;
583 request.timeout = config_.timeout;
584
585 return send_with_retry(request);
586 }
587
588 common::VoidResult send_with_retry(const http_request& request) {
589 std::size_t attempt = 0;
590 std::chrono::milliseconds delay = base_retry_delay_;
591
592 while (attempt < max_retries_) {
593 auto result = transport_->send(request);
594 if (result.is_ok()) {
595 const auto& response = result.value();
596 if (response.status_code >= 200 && response.status_code < 300) {
597 return common::ok();
598 }
599 // Retry on 5xx errors
600 if (response.status_code >= 500) {
601 attempt++;
602 if (attempt < max_retries_) {
603 std::this_thread::sleep_for(delay);
604 delay *= 2; // Exponential backoff
605 }
606 continue;
607 }
608 // Non-retryable error
609 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
610 "Jaeger export failed with status: " + std::to_string(response.status_code),
611 "monitoring_system").to_common_error());
612 }
613 attempt++;
614 if (attempt < max_retries_) {
615 std::this_thread::sleep_for(delay);
616 delay *= 2;
617 }
618 }
619 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
620 "Jaeger export failed after " + std::to_string(max_retries_) + " retries",
621 "monitoring_system").to_common_error());
622 }
623};
624
633private:
635 std::unique_ptr<http_transport> transport_;
636 std::atomic<std::size_t> exported_spans_{0};
637 std::atomic<std::size_t> failed_exports_{0};
638 std::atomic<std::size_t> dropped_spans_{0};
639 std::size_t max_retries_{3};
640 std::chrono::milliseconds base_retry_delay_{100};
641
642public:
645
646 zipkin_exporter(const trace_export_config& config, std::unique_ptr<http_transport> transport)
647 : config_(config), transport_(std::move(transport)) {}
648
653 zipkin_span_data zipkin_span;
654 zipkin_span.trace_id = span.trace_id;
655 zipkin_span.span_id = span.span_id;
656 zipkin_span.parent_id = span.parent_span_id;
657 zipkin_span.name = span.operation_name;
659
660 // Convert timestamps (Zipkin uses microseconds since epoch)
661 auto start_epoch = span.start_time.time_since_epoch();
662 zipkin_span.timestamp = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
663
664 auto end_epoch = span.end_time.time_since_epoch();
665 zipkin_span.duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
666
667 // Determine span kind from tags
668 auto kind_it = span.tags.find("span.kind");
669 if (kind_it != span.tags.end()) {
670 zipkin_span.kind = kind_it->second;
671 } else {
672 zipkin_span.kind = "INTERNAL";
673 }
674
675 // Convert tags (exclude special fields)
676 for (const auto& [key, value] : span.tags) {
677 if (key != "span.kind") {
678 zipkin_span.tags[key] = value;
679 }
680 }
681
682 return zipkin_span;
683 }
684
685 common::VoidResult export_spans(const std::vector<trace_span>& spans) override {
686 try {
687 std::vector<zipkin_span_data> zipkin_spans;
688 zipkin_spans.reserve(spans.size());
689
690 for (const auto& span : spans) {
691 zipkin_spans.push_back(convert_span(span));
692 }
693
694 // Convert to appropriate format and send
695 common::VoidResult send_result = common::ok();
697 send_result = send_json_batch(zipkin_spans);
699 send_result = send_protobuf_batch(zipkin_spans);
700 } else {
701 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
702 "Invalid Zipkin export format", "monitoring_system").to_common_error());
703 }
704
705 if (send_result.is_ok()) {
706 exported_spans_ += spans.size();
707 } else {
709 return send_result;
710 }
711
712 return common::ok();
713
714 } catch (const std::exception& e) {
716 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
717 "Zipkin export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
718 }
719 }
720
721 common::VoidResult flush() override {
722 // Zipkin exporter typically sends immediately, so flush is a no-op
723 return common::ok();
724 }
725
726 common::VoidResult shutdown() override {
727 return flush();
728 }
729
730 std::unordered_map<std::string, std::size_t> get_stats() const override {
731 return {
732 {"exported_spans", exported_spans_.load()},
733 {"failed_exports", failed_exports_.load()},
734 {"dropped_spans", dropped_spans_.load()}
735 };
736 }
737
738private:
739 common::VoidResult send_json_batch(const std::vector<zipkin_span_data>& spans) {
740 // Build JSON array payload for Zipkin v2 API
741 std::ostringstream payload;
742 payload << "[";
743 bool first = true;
744 for (const auto& span : spans) {
745 if (!first) payload << ",";
746 payload << span.to_json_v2();
747 first = false;
748 }
749 payload << "]";
750
751 std::string body = payload.str();
752
753 // Build HTTP request
754 http_request request;
755 request.url = config_.endpoint + "/api/v2/spans";
756 request.method = "POST";
757 request.headers["Content-Type"] = "application/json";
758 request.headers["Accept"] = "application/json";
759 for (const auto& [key, value] : config_.headers) {
760 request.headers[key] = value;
761 }
762 request.body = std::vector<uint8_t>(body.begin(), body.end());
763 request.timeout = config_.timeout;
764
765 return send_with_retry(request);
766 }
767
768 common::VoidResult send_protobuf_batch(const std::vector<zipkin_span_data>& spans) {
769 // Serialize as a Zipkin `ListOfSpans` protobuf message. POST target
770 // accepts application/x-protobuf per Zipkin v2 API.
771 auto payload = encode_zipkin_list_of_spans(spans);
772
773 http_request request;
774 request.url = config_.endpoint + "/api/v2/spans";
775 request.method = "POST";
776 request.headers["Content-Type"] = "application/x-protobuf";
777 for (const auto& [key, value] : config_.headers) {
778 request.headers[key] = value;
779 }
780 request.body = payload;
781 request.timeout = config_.timeout;
782
783 return send_with_retry(request);
784 }
785
786 common::VoidResult send_with_retry(const http_request& request) {
787 std::size_t attempt = 0;
788 std::chrono::milliseconds delay = base_retry_delay_;
789
790 while (attempt < max_retries_) {
791 auto result = transport_->send(request);
792 if (result.is_ok()) {
793 const auto& response = result.value();
794 if (response.status_code >= 200 && response.status_code < 300) {
795 return common::ok();
796 }
797 // Retry on 5xx errors
798 if (response.status_code >= 500) {
799 attempt++;
800 if (attempt < max_retries_) {
801 std::this_thread::sleep_for(delay);
802 delay *= 2; // Exponential backoff
803 }
804 continue;
805 }
806 // Non-retryable error
807 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
808 "Zipkin export failed with status: " + std::to_string(response.status_code),
809 "monitoring_system").to_common_error());
810 }
811 attempt++;
812 if (attempt < max_retries_) {
813 std::this_thread::sleep_for(delay);
814 delay *= 2;
815 }
816 }
817 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
818 "Zipkin export failed after " + std::to_string(max_retries_) + " retries",
819 "monitoring_system").to_common_error());
820 }
821};
822
828private:
830 std::unique_ptr<opentelemetry_tracer_adapter> otel_adapter_;
831 std::atomic<std::size_t> exported_spans_{0};
832 std::atomic<std::size_t> failed_exports_{0};
833 std::atomic<std::size_t> dropped_spans_{0};
834
835public:
836 explicit otlp_exporter(const trace_export_config& config, const otel_resource& resource)
837 : config_(config), otel_adapter_(std::make_unique<opentelemetry_tracer_adapter>(resource)) {}
838
839 common::VoidResult export_spans(const std::vector<trace_span>& spans) override {
840 try {
841 // Convert to OpenTelemetry format first
842 auto otel_result = otel_adapter_->convert_spans(spans);
843 if (otel_result.is_err()) {
845 return common::VoidResult::err(error_info(monitoring_error_code::processing_failed,
846 "Failed to convert spans to OTEL format: " + otel_result.error().message, "monitoring_system").to_common_error());
847 }
848
849 const auto& otel_spans = otel_result.value();
850
851 // Send via appropriate OTLP protocol
852 common::VoidResult send_result = common::ok();
854 send_result = send_grpc_batch(otel_spans);
856 send_result = send_http_json_batch(otel_spans);
858 send_result = send_http_protobuf_batch(otel_spans);
859 } else {
860 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
861 "Invalid OTLP export format", "monitoring_system").to_common_error());
862 }
863
864 if (send_result.is_ok()) {
865 exported_spans_ += spans.size();
866 } else {
868 return send_result;
869 }
870
871 return common::ok();
872
873 } catch (const std::exception& e) {
875 return common::VoidResult::err(error_info(monitoring_error_code::operation_failed,
876 "OTLP export failed: " + std::string(e.what()), "monitoring_system").to_common_error());
877 }
878 }
879
880 common::VoidResult flush() override {
881 // OTLP exporter typically sends immediately, so flush is a no-op
882 return common::ok();
883 }
884
885 common::VoidResult shutdown() override {
886 return flush();
887 }
888
889 std::unordered_map<std::string, std::size_t> get_stats() const override {
890 return {
891 {"exported_spans", exported_spans_.load()},
892 {"failed_exports", failed_exports_.load()},
893 {"dropped_spans", dropped_spans_.load()}
894 };
895 }
896
897private:
898 common::VoidResult send_grpc_batch(const std::vector<otel_span_data>& spans) {
899 // Simulate OTLP gRPC sending
900 // In real implementation, this would use OTLP gRPC client
901 (void)spans; // Suppress unused parameter warning
902 return common::ok();
903 }
904
905 common::VoidResult send_http_json_batch(const std::vector<otel_span_data>& spans) {
906 // Simulate OTLP HTTP JSON sending
907 // In real implementation, this would serialize OTEL spans to JSON and POST
908 (void)spans; // Suppress unused parameter warning
909 return common::ok();
910 }
911
912 common::VoidResult send_http_protobuf_batch(const std::vector<otel_span_data>& spans) {
913 // Simulate OTLP HTTP protobuf sending
914 // In real implementation, this would serialize OTEL spans to protobuf and POST
915 (void)spans; // Suppress unused parameter warning
916 return common::ok();
917 }
918};
919
925public:
929 static std::unique_ptr<trace_exporter_interface> create_exporter(
930 const trace_export_config& config,
931 const otel_resource& resource = create_service_resource("monitoring_system", "2.0.0")) {
932
933 switch (config.format) {
936 return std::make_unique<jaeger_exporter>(config);
937
940 return std::make_unique<zipkin_exporter>(config);
941
945 return std::make_unique<otlp_exporter>(config, resource);
946
947 default:
948 return nullptr;
949 }
950 }
951
955 static std::vector<trace_export_format> get_supported_formats(const std::string& backend) {
956 if (backend == "jaeger") {
958 } else if (backend == "zipkin") {
960 } else if (backend == "otlp") {
963 }
964 return {};
965 }
966};
967
971inline std::unique_ptr<jaeger_exporter> create_jaeger_exporter(
972 const std::string& endpoint,
974
975 trace_export_config config;
976 config.endpoint = endpoint;
977 config.format = format;
978 return std::make_unique<jaeger_exporter>(config);
979}
980
984inline std::unique_ptr<zipkin_exporter> create_zipkin_exporter(
985 const std::string& endpoint,
987
988 trace_export_config config;
989 config.endpoint = endpoint;
990 config.format = format;
991 return std::make_unique<zipkin_exporter>(config);
992}
993
997inline std::unique_ptr<otlp_exporter> create_otlp_exporter(
998 const std::string& endpoint,
999 const otel_resource& resource,
1001
1002 trace_export_config config;
1003 config.endpoint = endpoint;
1004 config.format = format;
1005 return std::make_unique<otlp_exporter>(config, resource);
1006}
1007
1008} } // namespace kcenon::monitoring
Jaeger trace exporter implementation.
std::chrono::milliseconds base_retry_delay_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult flush() override
Flush any pending spans.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_grpc_batch(const std::vector< jaeger_span_data > &spans)
std::unique_ptr< http_transport > transport_
common::VoidResult send_thrift_batch(const std::vector< jaeger_span_data > &spans)
std::atomic< std::size_t > failed_exports_
jaeger_span_data convert_span(const trace_span &span) const
Convert internal span to Jaeger format.
jaeger_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
common::VoidResult send_with_retry(const http_request &request)
jaeger_exporter(const trace_export_config &config)
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
Adapter for converting monitoring system traces to OpenTelemetry format.
OpenTelemetry Protocol (OTLP) trace exporter implementation.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_http_json_batch(const std::vector< otel_span_data > &spans)
std::atomic< std::size_t > failed_exports_
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult send_http_protobuf_batch(const std::vector< otel_span_data > &spans)
std::unique_ptr< opentelemetry_tracer_adapter > otel_adapter_
common::VoidResult send_grpc_batch(const std::vector< otel_span_data > &spans)
common::VoidResult flush() override
Flush any pending spans.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
otlp_exporter(const trace_export_config &config, const otel_resource &resource)
std::atomic< std::size_t > dropped_spans_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
Factory for creating trace exporters.
static std::vector< trace_export_format > get_supported_formats(const std::string &backend)
Get supported formats for a specific backend.
static std::unique_ptr< trace_exporter_interface > create_exporter(const trace_export_config &config, const otel_resource &resource=create_service_resource("monitoring_system", "2.0.0"))
Create a trace exporter based on format.
Abstract interface for trace exporters.
virtual common::VoidResult shutdown()=0
Shutdown the exporter.
virtual std::unordered_map< std::string, std::size_t > get_stats() const =0
Get exporter statistics.
virtual common::VoidResult flush()=0
Flush any pending spans.
virtual common::VoidResult export_spans(const std::vector< trace_span > &spans)=0
Export a batch of spans.
Zipkin trace exporter implementation.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
std::unique_ptr< http_transport > transport_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult flush() override
Flush any pending spans.
common::VoidResult send_with_retry(const http_request &request)
zipkin_span_data convert_span(const trace_span &span) const
Convert internal span to Zipkin format.
std::chrono::milliseconds base_retry_delay_
std::atomic< std::size_t > failed_exports_
zipkin_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
std::atomic< std::size_t > exported_spans_
common::VoidResult send_json_batch(const std::vector< zipkin_span_data > &spans)
common::VoidResult send_protobuf_batch(const std::vector< zipkin_span_data > &spans)
zipkin_exporter(const trace_export_config &config)
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
HTTP transport layer for trace exporters.
Serialization/deserialization of Jaeger api_v2 model.proto messages.
std::vector< std::uint8_t > encode_batch(const batch &b)
std::vector< std::uint8_t > encode_span(const span &s)
std::vector< std::uint8_t > left_pad(const std::vector< std::uint8_t > &in, std::size_t width)
Left-pad bytes to a target width. Used to normalize 8-byte trace IDs to Jaeger's 16-byte on-wire widt...
std::vector< std::uint8_t > hex_to_bytes(const std::string &hex)
Decode a hexadecimal string into bytes. Odd-length strings are zero-padded on the left; non-hex chara...
span_kind parse_kind(const std::string &value)
Convert a textual Zipkin kind (e.g. "CLIENT") to its enum value.
std::vector< std::uint8_t > encode_list_of_spans(const list_of_spans &list)
std::vector< std::uint8_t > encode_span(const span &s)
@ otlp_grpc
OTLP gRPC metrics protocol.
@ otlp_http_json
OTLP HTTP JSON metrics protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers metrics.
@ delay
Delay requests until resources are available.
std::unique_ptr< http_transport > create_default_transport()
Create default HTTP transport.
std::vector< uint8_t > encode_zipkin_list_of_spans(const std::vector< zipkin_span_data > &spans)
Encode a batch of Zipkin spans into a ListOfSpans protobuf message suitable for POST /api/v2/spans wi...
std::unique_ptr< zipkin_exporter > create_zipkin_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::zipkin_json)
Helper function to create a Zipkin exporter.
trace_export_format
Supported trace export formats.
@ zipkin_json
Zipkin JSON v2 format.
@ jaeger_thrift
Jaeger Thrift protocol.
@ otlp_http_json
OTLP HTTP JSON protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers.
@ zipkin_protobuf
Zipkin Protocol Buffers format.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
std::unique_ptr< otlp_exporter > create_otlp_exporter(const std::string &endpoint, const otel_resource &resource, trace_export_format format=trace_export_format::otlp_grpc)
Helper function to create an OTLP exporter.
std::vector< uint8_t > encode_jaeger_batch(const std::vector< jaeger_span_data > &spans, const std::string &service_name)
Encode a batch of Jaeger spans (with shared process) into a Jaeger api_v2 Batch protobuf message.
std::unique_ptr< jaeger_exporter > create_jaeger_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::jaeger_grpc)
Helper function to create a Jaeger exporter.
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
HTTP request configuration.
std::chrono::milliseconds timeout
std::vector< uint8_t > body
std::unordered_map< std::string, std::string > headers
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id
std::vector< span_ref > references
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id
Jaeger-specific span representation.
std::vector< std::pair< std::string, std::string > > process_tags
std::string to_thrift_json() const
Convert to Jaeger Thrift format (JSON representation)
std::vector< std::pair< std::string, std::string > > tags
std::vector< uint8_t > to_protobuf() const
Convert this span to a Jaeger api_v2 Span protobuf message.
std::chrono::microseconds duration
std::chrono::microseconds start_time
OpenTelemetry resource representation.
Configuration for trace exporters.
std::chrono::milliseconds timeout
Request timeout.
bool enable_compression
Enable data compression.
std::chrono::milliseconds batch_timeout
Batch export timeout.
std::size_t max_queue_size
Maximum queued spans.
std::optional< std::string > service_name
Override service name.
common::VoidResult validate() const
Validate export configuration.
std::unordered_map< std::string, std::string > headers
Custom HTTP headers.
std::size_t max_batch_size
Maximum spans per batch.
Trace span representing a unit of work in distributed tracing.
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::chrono::system_clock::time_point start_time
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > id
std::vector< std::uint8_t > parent_id
std::vector< std::pair< std::string, std::string > > tags
Zipkin-specific span representation.
std::unordered_map< std::string, std::string > tags
std::chrono::microseconds duration
std::string to_json_v2() const
Convert to Zipkin JSON v2 format.
std::vector< uint8_t > to_protobuf() const
Convert this span to a Zipkin Span protobuf message.
std::chrono::microseconds timestamp
Serialization/deserialization of Zipkin zipkin.proto messages.