35#include <unordered_map>
41namespace kcenon {
namespace monitoring {
69 std::unordered_map<std::string, std::string>
headers;
78 "Export endpoint cannot be empty",
"monitoring_system").to_common_error());
83 "Timeout must be positive",
"monitoring_system").to_common_error());
88 "Batch size must be greater than 0",
"monitoring_system").to_common_error());
93 "Queue size must be at least batch size",
"monitoring_system").to_common_error());
112 std::vector<std::pair<std::string, std::string>>
tags;
119 std::ostringstream json;
121 json <<
"\"traceIdHigh\":0,";
122 json <<
"\"traceIdLow\":" << std::hash<std::string>{}(
trace_id) <<
",";
123 json <<
"\"spanId\":" << std::hash<std::string>{}(
span_id) <<
",";
126 json <<
"\"startTime\":" <<
start_time.count() <<
",";
127 json <<
"\"duration\":" <<
duration.count() <<
",";
130 json <<
"\"tags\":[";
132 for (
const auto& [key, value] :
tags) {
133 if (!first) json <<
",";
134 json <<
"{\"key\":\"" << key <<
"\",\"vType\":\"STRING\",\"vStr\":\"" << value <<
"\"}";
140 json <<
"\"process\":{\"serviceName\":\"" <<
service_name <<
"\",\"tags\":[";
143 if (!first) json <<
",";
144 json <<
"{\"key\":\"" << key <<
"\",\"vType\":\"STRING\",\"vStr\":\"" << value <<
"\"}";
177 std::unordered_map<std::string, std::string>
tags;
184 std::ostringstream json;
186 json <<
"\"traceId\":\"" <<
trace_id <<
"\",";
187 json <<
"\"id\":\"" <<
span_id <<
"\",";
189 json <<
"\"parentId\":\"" <<
parent_id <<
"\",";
191 json <<
"\"name\":\"" <<
name <<
"\",";
192 json <<
"\"kind\":\"" <<
kind <<
"\",";
193 json <<
"\"timestamp\":" <<
timestamp.count() <<
",";
194 json <<
"\"duration\":" <<
duration.count() <<
",";
205 json <<
"\"tags\":{";
207 for (
const auto& [key, value] :
tags) {
208 if (!first) json <<
",";
209 json <<
"\"" << key <<
"\":\"" << value <<
"\"";
215 json <<
",\"shared\":true";
243 virtual common::VoidResult
export_spans(
const std::vector<trace_span>& spans) = 0;
248 virtual common::VoidResult
flush() = 0;
258 virtual std::unordered_map<std::string, std::size_t>
get_stats()
const = 0;
297 auto start_epoch = span.
start_time.time_since_epoch();
298 jaeger_span.
start_time = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
300 auto end_epoch = span.
end_time.time_since_epoch();
301 jaeger_span.
duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
304 for (
const auto& [key, value] : span.
tags) {
305 jaeger_span.
tags.emplace_back(key, value);
314 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
316 std::vector<jaeger_span_data> jaeger_spans;
317 jaeger_spans.reserve(spans.size());
319 for (
const auto& span : spans) {
324 common::VoidResult send_result = common::ok();
331 "Invalid Jaeger export format",
"monitoring_system").to_common_error());
334 if (send_result.is_ok()) {
343 }
catch (
const std::exception& e) {
346 "Jaeger export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
350 common::VoidResult
flush()
override {
359 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
370 std::ostringstream payload;
371 payload <<
"{\"data\":[{\"spans\":[";
373 for (
const auto& span : spans) {
374 if (!first) payload <<
",";
375 payload << span.to_thrift_json();
380 std::string body = payload.str();
386 request.
headers[
"Content-Type"] =
"application/x-thrift";
387 request.
headers[
"Accept"] =
"application/json";
391 request.
body = std::vector<uint8_t>(body.begin(), body.end());
401 std::vector<uint8_t> payload;
402 for (
const auto& span : spans) {
403 auto proto = span.to_protobuf();
404 payload.insert(payload.end(), proto.begin(), proto.end());
410 request.
headers[
"Content-Type"] =
"application/grpc+proto";
414 request.
body = payload;
421 std::size_t attempt = 0;
426 if (result.is_ok()) {
427 const auto& response = result.value();
428 if (response.status_code >= 200 && response.status_code < 300) {
432 if (response.status_code >= 500) {
435 std::this_thread::sleep_for(
delay);
442 "Jaeger export failed with status: " + std::to_string(response.status_code),
447 std::this_thread::sleep_for(
delay);
452 "Jaeger export failed after " + std::to_string(
max_retries_) +
" retries",
453 "monitoring_system").to_common_error());
493 auto start_epoch = span.
start_time.time_since_epoch();
494 zipkin_span.
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
496 auto end_epoch = span.
end_time.time_since_epoch();
497 zipkin_span.
duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
500 auto kind_it = span.
tags.find(
"span.kind");
501 if (kind_it != span.
tags.end()) {
502 zipkin_span.
kind = kind_it->second;
504 zipkin_span.
kind =
"INTERNAL";
508 for (
const auto& [key, value] : span.
tags) {
509 if (key !=
"span.kind") {
510 zipkin_span.
tags[key] = value;
517 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
519 std::vector<zipkin_span_data> zipkin_spans;
520 zipkin_spans.reserve(spans.size());
522 for (
const auto& span : spans) {
527 common::VoidResult send_result = common::ok();
534 "Invalid Zipkin export format",
"monitoring_system").to_common_error());
537 if (send_result.is_ok()) {
546 }
catch (
const std::exception& e) {
549 "Zipkin export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
553 common::VoidResult
flush()
override {
562 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
573 std::ostringstream payload;
576 for (
const auto& span : spans) {
577 if (!first) payload <<
",";
578 payload << span.to_json_v2();
583 std::string body = payload.str();
589 request.
headers[
"Content-Type"] =
"application/json";
590 request.
headers[
"Accept"] =
"application/json";
594 request.
body = std::vector<uint8_t>(body.begin(), body.end());
602 std::vector<uint8_t> payload;
603 for (
const auto& span : spans) {
604 auto proto = span.to_protobuf();
605 payload.insert(payload.end(), proto.begin(), proto.end());
611 request.
headers[
"Content-Type"] =
"application/x-protobuf";
615 request.
body = payload;
622 std::size_t attempt = 0;
627 if (result.is_ok()) {
628 const auto& response = result.value();
629 if (response.status_code >= 200 && response.status_code < 300) {
633 if (response.status_code >= 500) {
636 std::this_thread::sleep_for(
delay);
643 "Zipkin export failed with status: " + std::to_string(response.status_code),
648 std::this_thread::sleep_for(
delay);
653 "Zipkin export failed after " + std::to_string(
max_retries_) +
" retries",
654 "monitoring_system").to_common_error());
674 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
678 if (otel_result.is_err()) {
681 "Failed to convert spans to OTEL format: " + otel_result.error().message,
"monitoring_system").
to_common_error());
684 const auto& otel_spans = otel_result.value();
687 common::VoidResult send_result = common::ok();
696 "Invalid OTLP export format",
"monitoring_system").to_common_error());
699 if (send_result.is_ok()) {
708 }
catch (
const std::exception& e) {
711 "OTLP export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
715 common::VoidResult
flush()
override {
724 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
771 return std::make_unique<jaeger_exporter>(config);
775 return std::make_unique<zipkin_exporter>(config);
780 return std::make_unique<otlp_exporter>(config, resource);
791 if (backend ==
"jaeger") {
793 }
else if (backend ==
"zipkin") {
795 }
else if (backend ==
"otlp") {
807 const std::string& endpoint,
813 return std::make_unique<jaeger_exporter>(config);
820 const std::string& endpoint,
826 return std::make_unique<zipkin_exporter>(config);
833 const std::string& endpoint,
840 return std::make_unique<otlp_exporter>(config, resource);
Jaeger trace exporter implementation.
std::chrono::milliseconds base_retry_delay_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult flush() override
Flush any pending spans.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_grpc_batch(const std::vector< jaeger_span_data > &spans)
std::unique_ptr< http_transport > transport_
common::VoidResult send_thrift_batch(const std::vector< jaeger_span_data > &spans)
std::atomic< std::size_t > failed_exports_
jaeger_span_data convert_span(const trace_span &span) const
Convert internal span to Jaeger format.
jaeger_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
common::VoidResult send_with_retry(const http_request &request)
trace_export_config config_
jaeger_exporter(const trace_export_config &config)
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
Adapter for converting monitoring system traces to OpenTelemetry format.
OpenTelemetry Protocol (OTLP) trace exporter implementation.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_http_json_batch(const std::vector< otel_span_data > &spans)
std::atomic< std::size_t > failed_exports_
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult send_http_protobuf_batch(const std::vector< otel_span_data > &spans)
std::unique_ptr< opentelemetry_tracer_adapter > otel_adapter_
common::VoidResult send_grpc_batch(const std::vector< otel_span_data > &spans)
common::VoidResult flush() override
Flush any pending spans.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
otlp_exporter(const trace_export_config &config, const otel_resource &resource)
std::atomic< std::size_t > dropped_spans_
trace_export_config config_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
Factory for creating trace exporters.
static std::vector< trace_export_format > get_supported_formats(const std::string &backend)
Get supported formats for a specific backend.
static std::unique_ptr< trace_exporter_interface > create_exporter(const trace_export_config &config, const otel_resource &resource=create_service_resource("monitoring_system", "2.0.0"))
Create a trace exporter based on format.
Abstract interface for trace exporters.
virtual common::VoidResult shutdown()=0
Shutdown the exporter.
virtual std::unordered_map< std::string, std::size_t > get_stats() const =0
Get exporter statistics.
virtual common::VoidResult flush()=0
Flush any pending spans.
virtual common::VoidResult export_spans(const std::vector< trace_span > &spans)=0
Export a batch of spans.
virtual ~trace_exporter_interface()=default
Zipkin trace exporter implementation.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
std::unique_ptr< http_transport > transport_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult flush() override
Flush any pending spans.
trace_export_config config_
common::VoidResult send_with_retry(const http_request &request)
zipkin_span_data convert_span(const trace_span &span) const
Convert internal span to Zipkin format.
std::chrono::milliseconds base_retry_delay_
std::atomic< std::size_t > failed_exports_
zipkin_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
std::atomic< std::size_t > exported_spans_
common::VoidResult send_json_batch(const std::vector< zipkin_span_data > &spans)
common::VoidResult send_protobuf_batch(const std::vector< zipkin_span_data > &spans)
zipkin_exporter(const trace_export_config &config)
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
HTTP transport layer for trace exporters.
@ otlp_grpc
OTLP gRPC metrics protocol.
@ otlp_http_json
OTLP HTTP JSON metrics protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers metrics.
@ delay
Delay requests until resources are available.
std::unique_ptr< http_transport > create_default_transport()
Create default HTTP transport.
std::unique_ptr< zipkin_exporter > create_zipkin_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::zipkin_json)
Helper function to create a Zipkin exporter.
trace_export_format
Supported trace export formats.
@ otlp_grpc
OTLP gRPC protocol.
@ zipkin_json
Zipkin JSON v2 format.
@ jaeger_grpc
Jaeger gRPC protocol.
@ jaeger_thrift
Jaeger Thrift protocol.
@ otlp_http_json
OTLP HTTP JSON protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers.
@ zipkin_protobuf
Zipkin Protocol Buffers format.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
std::unique_ptr< otlp_exporter > create_otlp_exporter(const std::string &endpoint, const otel_resource &resource, trace_export_format format=trace_export_format::otlp_grpc)
Helper function to create an OTLP exporter.
std::unique_ptr< jaeger_exporter > create_jaeger_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::jaeger_grpc)
Helper function to create a Jaeger exporter.
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
HTTP request configuration.
std::chrono::milliseconds timeout
std::vector< uint8_t > body
std::unordered_map< std::string, std::string > headers
Jaeger-specific span representation.
std::string operation_name
std::string parent_span_id
std::vector< std::pair< std::string, std::string > > process_tags
std::string to_thrift_json() const
Convert to Jaeger Thrift format (JSON representation)
std::vector< std::pair< std::string, std::string > > tags
std::vector< uint8_t > to_protobuf() const
Convert to Jaeger protobuf format (stub)
std::chrono::microseconds duration
std::chrono::microseconds start_time
OpenTelemetry resource representation.
Configuration for trace exporters.
std::chrono::milliseconds timeout
Request timeout.
std::string endpoint
Endpoint URL.
bool enable_compression
Enable data compression.
std::chrono::milliseconds batch_timeout
Batch export timeout.
trace_export_format format
std::size_t max_queue_size
Maximum queued spans.
std::optional< std::string > service_name
Override service name.
common::VoidResult validate() const
Validate export configuration.
std::unordered_map< std::string, std::string > headers
Custom HTTP headers.
std::size_t max_batch_size
Maximum spans per batch.
Trace span representing a unit of work in distributed tracing.
std::string parent_span_id
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::string operation_name
std::chrono::system_clock::time_point start_time
Zipkin-specific span representation.
std::string local_endpoint_service_name
std::unordered_map< std::string, std::string > tags
std::chrono::microseconds duration
std::string to_json_v2() const
Convert to Zipkin JSON v2 format.
std::vector< uint8_t > to_protobuf() const
Convert to Zipkin protobuf format (stub)
std::string remote_endpoint_service_name
std::chrono::microseconds timestamp