37#include <unordered_map>
43namespace kcenon {
namespace monitoring {
71 std::unordered_map<std::string, std::string>
headers;
80 "Export endpoint cannot be empty",
"monitoring_system").to_common_error());
85 "Timeout must be positive",
"monitoring_system").to_common_error());
90 "Batch size must be greater than 0",
"monitoring_system").to_common_error());
95 "Queue size must be at least batch size",
"monitoring_system").to_common_error());
114 std::vector<std::pair<std::string, std::string>>
tags;
121 std::ostringstream json;
123 json <<
"\"traceIdHigh\":0,";
124 json <<
"\"traceIdLow\":" << std::hash<std::string>{}(
trace_id) <<
",";
125 json <<
"\"spanId\":" << std::hash<std::string>{}(
span_id) <<
",";
128 json <<
"\"startTime\":" <<
start_time.count() <<
",";
129 json <<
"\"duration\":" <<
duration.count() <<
",";
132 json <<
"\"tags\":[";
134 for (
const auto& [key, value] :
tags) {
135 if (!first) json <<
",";
136 json <<
"{\"key\":\"" << key <<
"\",\"vType\":\"STRING\",\"vStr\":\"" << value <<
"\"}";
142 json <<
"\"process\":{\"serviceName\":\"" <<
service_name <<
"\",\"tags\":[";
145 if (!first) json <<
",";
146 json <<
"{\"key\":\"" << key <<
"\",\"vType\":\"STRING\",\"vStr\":\"" << value <<
"\"}";
180 auto st_ns = std::chrono::nanoseconds(
start_time);
182 std::chrono::duration_cast<std::chrono::seconds>(st_ns).count());
186 auto du_ns = std::chrono::nanoseconds(
duration);
188 std::chrono::duration_cast<std::chrono::seconds>(du_ns).count());
192 for (
const auto& [key, value] :
tags) {
197 out.
tags.push_back(std::move(kv));
206 out.
proc.
tags.push_back(std::move(kv));
218 const std::vector<jaeger_span_data>& spans,
219 const std::string& service_name) {
221 b.
spans.reserve(spans.size());
222 for (
const auto& s : spans) {
229 if (!s.parent_span_id.empty()) {
237 auto st_ns = std::chrono::nanoseconds(s.start_time);
239 std::chrono::duration_cast<std::chrono::seconds>(st_ns).count());
242 auto du_ns = std::chrono::nanoseconds(s.duration);
244 std::chrono::duration_cast<std::chrono::seconds>(du_ns).count());
247 for (
const auto& [key, value] : s.tags) {
252 ps.
tags.push_back(std::move(kv));
254 b.
spans.push_back(std::move(ps));
275 std::unordered_map<std::string, std::string>
tags;
282 std::ostringstream json;
284 json <<
"\"traceId\":\"" <<
trace_id <<
"\",";
285 json <<
"\"id\":\"" <<
span_id <<
"\",";
287 json <<
"\"parentId\":\"" <<
parent_id <<
"\",";
289 json <<
"\"name\":\"" <<
name <<
"\",";
290 json <<
"\"kind\":\"" <<
kind <<
"\",";
291 json <<
"\"timestamp\":" <<
timestamp.count() <<
",";
292 json <<
"\"duration\":" <<
duration.count() <<
",";
303 json <<
"\"tags\":{";
305 for (
const auto& [key, value] :
tags) {
306 if (!first) json <<
",";
307 json <<
"\"" << key <<
"\":\"" << value <<
"\"";
313 json <<
",\"shared\":true";
332 if (trace_bytes.size() == 16) {
333 out.
trace_id = std::move(trace_bytes);
350 for (
const auto& [key, value] :
tags) {
351 out.
tags.emplace_back(key, value);
363 const std::vector<zipkin_span_data>& spans) {
365 list.
spans.reserve(spans.size());
366 for (
const auto& s : spans) {
369 if (trace_bytes.size() == 16) {
370 ps.
trace_id = std::move(trace_bytes);
376 if (!s.parent_id.empty()) {
382 ps.
timestamp =
static_cast<std::uint64_t
>(s.timestamp.count());
383 ps.
duration =
static_cast<std::uint64_t
>(s.duration.count());
387 for (
const auto& [key, value] : s.tags) {
388 ps.
tags.emplace_back(key, value);
390 list.
spans.push_back(std::move(ps));
406 virtual common::VoidResult
export_spans(
const std::vector<trace_span>& spans) = 0;
411 virtual common::VoidResult
flush() = 0;
421 virtual std::unordered_map<std::string, std::size_t>
get_stats()
const = 0;
460 auto start_epoch = span.
start_time.time_since_epoch();
461 jaeger_span.
start_time = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
463 auto end_epoch = span.
end_time.time_since_epoch();
464 jaeger_span.
duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
467 for (
const auto& [key, value] : span.
tags) {
468 jaeger_span.
tags.emplace_back(key, value);
477 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
479 std::vector<jaeger_span_data> jaeger_spans;
480 jaeger_spans.reserve(spans.size());
482 for (
const auto& span : spans) {
487 common::VoidResult send_result = common::ok();
494 "Invalid Jaeger export format",
"monitoring_system").to_common_error());
497 if (send_result.is_ok()) {
506 }
catch (
const std::exception& e) {
509 "Jaeger export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
513 common::VoidResult
flush()
override {
522 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
533 std::ostringstream payload;
534 payload <<
"{\"data\":[{\"spans\":[";
536 for (
const auto& span : spans) {
537 if (!first) payload <<
",";
538 payload << span.to_thrift_json();
543 std::string body = payload.str();
549 request.
headers[
"Content-Type"] =
"application/x-thrift";
550 request.
headers[
"Accept"] =
"application/json";
554 request.
body = std::vector<uint8_t>(body.begin(), body.end());
567 std::string resolved_service;
570 }
else if (!spans.empty()) {
571 resolved_service = spans.front().service_name;
578 request.
headers[
"Content-Type"] =
"application/x-protobuf";
582 request.
body = payload;
589 std::size_t attempt = 0;
594 if (result.is_ok()) {
595 const auto& response = result.value();
596 if (response.status_code >= 200 && response.status_code < 300) {
600 if (response.status_code >= 500) {
603 std::this_thread::sleep_for(
delay);
610 "Jaeger export failed with status: " + std::to_string(response.status_code),
615 std::this_thread::sleep_for(
delay);
620 "Jaeger export failed after " + std::to_string(
max_retries_) +
" retries",
621 "monitoring_system").to_common_error());
661 auto start_epoch = span.
start_time.time_since_epoch();
662 zipkin_span.
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(start_epoch);
664 auto end_epoch = span.
end_time.time_since_epoch();
665 zipkin_span.
duration = std::chrono::duration_cast<std::chrono::microseconds>(end_epoch - start_epoch);
668 auto kind_it = span.
tags.find(
"span.kind");
669 if (kind_it != span.
tags.end()) {
670 zipkin_span.
kind = kind_it->second;
672 zipkin_span.
kind =
"INTERNAL";
676 for (
const auto& [key, value] : span.
tags) {
677 if (key !=
"span.kind") {
678 zipkin_span.
tags[key] = value;
685 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
687 std::vector<zipkin_span_data> zipkin_spans;
688 zipkin_spans.reserve(spans.size());
690 for (
const auto& span : spans) {
695 common::VoidResult send_result = common::ok();
702 "Invalid Zipkin export format",
"monitoring_system").to_common_error());
705 if (send_result.is_ok()) {
714 }
catch (
const std::exception& e) {
717 "Zipkin export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
721 common::VoidResult
flush()
override {
730 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
741 std::ostringstream payload;
744 for (
const auto& span : spans) {
745 if (!first) payload <<
",";
746 payload << span.to_json_v2();
751 std::string body = payload.str();
757 request.
headers[
"Content-Type"] =
"application/json";
758 request.
headers[
"Accept"] =
"application/json";
762 request.
body = std::vector<uint8_t>(body.begin(), body.end());
776 request.
headers[
"Content-Type"] =
"application/x-protobuf";
780 request.
body = payload;
787 std::size_t attempt = 0;
792 if (result.is_ok()) {
793 const auto& response = result.value();
794 if (response.status_code >= 200 && response.status_code < 300) {
798 if (response.status_code >= 500) {
801 std::this_thread::sleep_for(
delay);
808 "Zipkin export failed with status: " + std::to_string(response.status_code),
813 std::this_thread::sleep_for(
delay);
818 "Zipkin export failed after " + std::to_string(
max_retries_) +
" retries",
819 "monitoring_system").to_common_error());
839 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
843 if (otel_result.is_err()) {
846 "Failed to convert spans to OTEL format: " + otel_result.error().message,
"monitoring_system").
to_common_error());
849 const auto& otel_spans = otel_result.value();
852 common::VoidResult send_result = common::ok();
861 "Invalid OTLP export format",
"monitoring_system").to_common_error());
864 if (send_result.is_ok()) {
873 }
catch (
const std::exception& e) {
876 "OTLP export failed: " + std::string(e.what()),
"monitoring_system").
to_common_error());
880 common::VoidResult
flush()
override {
889 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
936 return std::make_unique<jaeger_exporter>(config);
940 return std::make_unique<zipkin_exporter>(config);
945 return std::make_unique<otlp_exporter>(config, resource);
956 if (backend ==
"jaeger") {
958 }
else if (backend ==
"zipkin") {
960 }
else if (backend ==
"otlp") {
972 const std::string& endpoint,
978 return std::make_unique<jaeger_exporter>(config);
985 const std::string& endpoint,
991 return std::make_unique<zipkin_exporter>(config);
998 const std::string& endpoint,
1005 return std::make_unique<otlp_exporter>(config, resource);
Jaeger trace exporter implementation.
std::chrono::milliseconds base_retry_delay_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult flush() override
Flush any pending spans.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_grpc_batch(const std::vector< jaeger_span_data > &spans)
std::unique_ptr< http_transport > transport_
common::VoidResult send_thrift_batch(const std::vector< jaeger_span_data > &spans)
std::atomic< std::size_t > failed_exports_
jaeger_span_data convert_span(const trace_span &span) const
Convert internal span to Jaeger format.
jaeger_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
common::VoidResult send_with_retry(const http_request &request)
trace_export_config config_
jaeger_exporter(const trace_export_config &config)
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
Adapter for converting monitoring system traces to OpenTelemetry format.
OpenTelemetry Protocol (OTLP) trace exporter implementation.
std::atomic< std::size_t > exported_spans_
common::VoidResult send_http_json_batch(const std::vector< otel_span_data > &spans)
std::atomic< std::size_t > failed_exports_
common::VoidResult shutdown() override
Shutdown the exporter.
common::VoidResult send_http_protobuf_batch(const std::vector< otel_span_data > &spans)
std::unique_ptr< opentelemetry_tracer_adapter > otel_adapter_
common::VoidResult send_grpc_batch(const std::vector< otel_span_data > &spans)
common::VoidResult flush() override
Flush any pending spans.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
otlp_exporter(const trace_export_config &config, const otel_resource &resource)
std::atomic< std::size_t > dropped_spans_
trace_export_config config_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
Factory for creating trace exporters.
static std::vector< trace_export_format > get_supported_formats(const std::string &backend)
Get supported formats for a specific backend.
static std::unique_ptr< trace_exporter_interface > create_exporter(const trace_export_config &config, const otel_resource &resource=create_service_resource("monitoring_system", "2.0.0"))
Create a trace exporter based on format.
Abstract interface for trace exporters.
virtual common::VoidResult shutdown()=0
Shutdown the exporter.
virtual std::unordered_map< std::string, std::size_t > get_stats() const =0
Get exporter statistics.
virtual common::VoidResult flush()=0
Flush any pending spans.
virtual common::VoidResult export_spans(const std::vector< trace_span > &spans)=0
Export a batch of spans.
virtual ~trace_exporter_interface()=default
Zipkin trace exporter implementation.
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
std::unique_ptr< http_transport > transport_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > dropped_spans_
common::VoidResult flush() override
Flush any pending spans.
trace_export_config config_
common::VoidResult send_with_retry(const http_request &request)
zipkin_span_data convert_span(const trace_span &span) const
Convert internal span to Zipkin format.
std::chrono::milliseconds base_retry_delay_
std::atomic< std::size_t > failed_exports_
zipkin_exporter(const trace_export_config &config, std::unique_ptr< http_transport > transport)
std::atomic< std::size_t > exported_spans_
common::VoidResult send_json_batch(const std::vector< zipkin_span_data > &spans)
common::VoidResult send_protobuf_batch(const std::vector< zipkin_span_data > &spans)
zipkin_exporter(const trace_export_config &config)
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
HTTP transport layer for trace exporters.
Serialization/deserialization of Jaeger api_v2 model.proto messages.
std::vector< std::uint8_t > encode_batch(const batch &b)
std::vector< std::uint8_t > encode_span(const span &s)
std::vector< std::uint8_t > left_pad(const std::vector< std::uint8_t > &in, std::size_t width)
Left-pad bytes to a target width. Used to normalize 8-byte trace IDs to Jaeger's 16-byte on-wire widt...
std::vector< std::uint8_t > hex_to_bytes(const std::string &hex)
Decode a hexadecimal string into bytes. Odd-length strings are zero-padded on the left; non-hex chara...
span_kind parse_kind(const std::string &value)
Convert a textual Zipkin kind (e.g. "CLIENT") to its enum value.
std::vector< std::uint8_t > encode_list_of_spans(const list_of_spans &list)
std::vector< std::uint8_t > encode_span(const span &s)
@ otlp_grpc
OTLP gRPC metrics protocol.
@ otlp_http_json
OTLP HTTP JSON metrics protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers metrics.
@ delay
Delay requests until resources are available.
std::unique_ptr< http_transport > create_default_transport()
Create default HTTP transport.
std::vector< uint8_t > encode_zipkin_list_of_spans(const std::vector< zipkin_span_data > &spans)
Encode a batch of Zipkin spans into a ListOfSpans protobuf message suitable for POST /api/v2/spans wi...
std::unique_ptr< zipkin_exporter > create_zipkin_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::zipkin_json)
Helper function to create a Zipkin exporter.
trace_export_format
Supported trace export formats.
@ otlp_grpc
OTLP gRPC protocol.
@ zipkin_json
Zipkin JSON v2 format.
@ jaeger_grpc
Jaeger gRPC protocol.
@ jaeger_thrift
Jaeger Thrift protocol.
@ otlp_http_json
OTLP HTTP JSON protocol.
@ otlp_http_protobuf
OTLP HTTP Protocol Buffers.
@ zipkin_protobuf
Zipkin Protocol Buffers format.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
std::unique_ptr< otlp_exporter > create_otlp_exporter(const std::string &endpoint, const otel_resource &resource, trace_export_format format=trace_export_format::otlp_grpc)
Helper function to create an OTLP exporter.
std::vector< uint8_t > encode_jaeger_batch(const std::vector< jaeger_span_data > &spans, const std::string &service_name)
Encode a batch of Jaeger spans (with shared process) into a Jaeger api_v2 Batch protobuf message.
std::unique_ptr< jaeger_exporter > create_jaeger_exporter(const std::string &endpoint, trace_export_format format=trace_export_format::jaeger_grpc)
Helper function to create a Jaeger exporter.
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
HTTP request configuration.
std::chrono::milliseconds timeout
std::vector< uint8_t > body
std::unordered_map< std::string, std::string > headers
std::vector< span > spans
std::vector< key_value > tags
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id
std::vector< key_value > tags
std::vector< span_ref > references
std::int64_t duration_seconds
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > span_id
std::int64_t start_time_seconds
std::string operation_name
std::int32_t duration_nanos
std::int32_t start_time_nanos
Jaeger-specific span representation.
std::string operation_name
std::string parent_span_id
std::vector< std::pair< std::string, std::string > > process_tags
std::string to_thrift_json() const
Convert to Jaeger Thrift format (JSON representation)
std::vector< std::pair< std::string, std::string > > tags
std::vector< uint8_t > to_protobuf() const
Convert this span to a Jaeger api_v2 Span protobuf message.
std::chrono::microseconds duration
std::chrono::microseconds start_time
OpenTelemetry resource representation.
Configuration for trace exporters.
std::chrono::milliseconds timeout
Request timeout.
std::string endpoint
Endpoint URL.
bool enable_compression
Enable data compression.
std::chrono::milliseconds batch_timeout
Batch export timeout.
trace_export_format format
std::size_t max_queue_size
Maximum queued spans.
std::optional< std::string > service_name
Override service name.
common::VoidResult validate() const
Validate export configuration.
std::unordered_map< std::string, std::string > headers
Custom HTTP headers.
std::size_t max_batch_size
Maximum spans per batch.
Trace span representing a unit of work in distributed tracing.
std::string parent_span_id
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::string operation_name
std::chrono::system_clock::time_point start_time
std::vector< span > spans
std::vector< std::uint8_t > trace_id
std::vector< std::uint8_t > id
std::vector< std::uint8_t > parent_id
std::vector< std::pair< std::string, std::string > > tags
Zipkin-specific span representation.
std::string local_endpoint_service_name
std::unordered_map< std::string, std::string > tags
std::chrono::microseconds duration
std::string to_json_v2() const
Convert to Zipkin JSON v2 format.
std::vector< uint8_t > to_protobuf() const
Convert this span to a Zipkin Span protobuf message.
std::string remote_endpoint_service_name
std::chrono::microseconds timestamp
Serialization/deserialization of Zipkin zipkin.proto messages.