33#include <condition_variable>
35namespace kcenon {
namespace monitoring {
54 std::unordered_map<std::string, std::string>
headers;
64 "OTLP endpoint cannot be empty",
71 "Timeout must be positive",
78 "Batch size must be greater than 0",
117 const std::vector<trace_span>& spans,
118 const std::string& service_name,
119 const std::string& service_version,
120 const std::unordered_map<std::string, std::string>& resource_attributes) {
130 std::vector<uint8_t> payload;
141 spans, service_name, service_version, resource_attributes);
145 payload.push_back(0x0A);
147 payload.insert(payload.end(),
148 resource_spans_data.begin(), resource_spans_data.end());
155 const std::vector<trace_span>& spans,
156 const std::string& service_name,
157 const std::string& service_version,
158 const std::unordered_map<std::string, std::string>& resource_attributes) {
160 std::vector<uint8_t> data;
163 auto resource_data =
build_resource(service_name, service_version, resource_attributes);
164 data.push_back(0x0A);
166 data.insert(data.end(), resource_data.begin(), resource_data.end());
170 data.push_back(0x12);
172 data.insert(data.end(), scope_spans_data.begin(), scope_spans_data.end());
178 const std::string& service_name,
179 const std::string& service_version,
180 const std::unordered_map<std::string, std::string>& extra_attributes) {
182 std::vector<uint8_t> data;
187 data.push_back(0x0A);
189 data.insert(data.end(), attr1.begin(), attr1.end());
193 data.push_back(0x0A);
195 data.insert(data.end(), attr2.begin(), attr2.end());
198 for (
const auto& [key, value] : extra_attributes) {
200 data.push_back(0x0A);
202 data.insert(data.end(), attr.begin(), attr.end());
209 const std::vector<trace_span>& spans) {
211 std::vector<uint8_t> data;
214 auto scope_data =
build_scope(
"monitoring_system",
"2.0.0");
215 data.push_back(0x0A);
217 data.insert(data.end(), scope_data.begin(), scope_data.end());
220 for (
const auto& span : spans) {
222 data.push_back(0x12);
224 data.insert(data.end(), span_data.begin(), span_data.end());
231 const std::string& name,
232 const std::string& version) {
234 std::vector<uint8_t> data;
237 data.push_back(0x0A);
239 data.insert(data.end(), name.begin(), name.end());
242 data.push_back(0x12);
244 data.insert(data.end(), version.begin(), version.end());
250 std::vector<uint8_t> data;
254 data.push_back(0x0A);
256 data.insert(data.end(), trace_id_bytes.begin(), trace_id_bytes.end());
260 data.push_back(0x12);
262 data.insert(data.end(), span_id_bytes.begin(), span_id_bytes.end());
267 data.push_back(0x22);
269 data.insert(data.end(), parent_id_bytes.begin(), parent_id_bytes.end());
273 data.push_back(0x2A);
278 data.push_back(0x30);
279 data.push_back(0x01);
282 auto start_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
284 data.push_back(0x39);
288 auto end_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
289 span.
end_time.time_since_epoch()).count();
290 data.push_back(0x41);
294 for (
const auto& [key, value] : span.
tags) {
296 data.push_back(0x4A);
298 data.insert(data.end(), attr.begin(), attr.end());
305 const std::string& key,
306 const std::string& value) {
308 std::vector<uint8_t> data;
311 data.push_back(0x0A);
313 data.insert(data.end(), key.begin(), key.end());
317 data.push_back(0x12);
319 data.insert(data.end(), any_value.begin(), any_value.end());
325 std::vector<uint8_t> data;
327 data.push_back(0x0A);
329 data.insert(data.end(), value.begin(), value.end());
334 while (value >= 0x80) {
335 data.push_back(
static_cast<uint8_t
>(value | 0x80));
338 data.push_back(
static_cast<uint8_t
>(value));
342 for (
int i = 0; i < 8; ++i) {
343 data.push_back(
static_cast<uint8_t
>(value & 0xFF));
348 static std::vector<uint8_t>
hex_to_bytes(
const std::string& hex, std::size_t expected_size) {
349 std::vector<uint8_t> bytes;
350 bytes.reserve(expected_size);
352 for (std::size_t i = 0; i + 1 < hex.size(); i += 2) {
353 auto byte_str = hex.substr(i, 2);
355 bytes.push_back(
static_cast<uint8_t
>(std::stoul(byte_str,
nullptr, 16)));
362 while (bytes.size() < expected_size) {
363 bytes.insert(bytes.begin(), 0);
365 if (bytes.size() > expected_size) {
366 bytes.resize(expected_size);
407 std::unique_ptr<grpc_transport> transport)
416 if (validate_result.is_err()) {
417 return validate_result;
425 if (connect_result.is_err()) {
426 return connect_result;
438 common::VoidResult
export_spans(
const std::vector<trace_span>& spans)
override {
447 "Not connected to OTLP receiver",
449 ).to_common_error());
461 request.
service =
"opentelemetry.proto.collector.trace.v1.TraceService";
462 request.
method =
"Export";
463 request.
body = std::move(payload);
472 auto start_time = std::chrono::steady_clock::now();
474 auto end_time = std::chrono::steady_clock::now();
479 end_time - start_time);
482 if (send_result.is_ok()) {
490 "Failed to export spans: " + send_result.error().message,
499 common::VoidResult
flush()
override {
516 std::unordered_map<std::string, std::size_t>
get_stats()
const override {
535 std::lock_guard<std::mutex> lock(
const_cast<std::mutex&
>(
stats_mutex_));
556 std::size_t attempt = 0;
562 if (result.is_ok()) {
563 const auto& response = result.value();
565 if (response.status_code == 0) {
574 std::this_thread::sleep_for(backoff);
581 return common::make_error<grpc_response>(
583 "OTLP export failed with status: " +
584 std::to_string(response.status_code) +
" - " + response.status_message);
591 std::this_thread::sleep_for(backoff);
596 return common::make_error<grpc_response>(
605 return status_code == 1 || status_code == 4 || status_code == 8 ||
606 status_code == 10 || status_code == 14;
609 static std::pair<std::string, uint16_t>
parse_endpoint(
const std::string& endpoint) {
610 std::string
host =
"localhost";
611 uint16_t port = 4317;
613 auto colon_pos = endpoint.rfind(
':');
614 if (colon_pos != std::string::npos) {
615 host = endpoint.substr(0, colon_pos);
617 port =
static_cast<uint16_t
>(std::stoi(endpoint.substr(colon_pos + 1)));
635 const std::string& endpoint =
"localhost:4317") {
639 return std::make_unique<otlp_grpc_exporter>(config);
650 return std::make_unique<otlp_grpc_exporter>(config);
OTLP gRPC trace exporter.
common::VoidResult flush() override
Flush pending exports.
std::chrono::microseconds total_export_time_
common::VoidResult shutdown() override
Shutdown the exporter.
std::atomic< std::size_t > retries_
otlp_exporter_stats get_detailed_stats() const
Get detailed statistics.
static bool is_retryable_error(int status_code)
std::atomic< std::size_t > dropped_spans_
otlp_grpc_exporter(const otlp_grpc_config &config, std::unique_ptr< grpc_transport > transport)
Construct with configuration and custom transport.
common::VoidResult start()
Start the exporter.
bool is_running() const
Check if exporter is running.
std::unique_ptr< grpc_transport > transport_
otlp_grpc_exporter(const otlp_grpc_config &config)
Construct with configuration.
std::atomic< std::size_t > exported_spans_
common::Result< grpc_response > send_with_retry(const grpc_request &request)
const otlp_grpc_config & config() const
Get configuration.
std::atomic< bool > running_
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
std::atomic< std::size_t > failed_exports_
static std::pair< std::string, uint16_t > parse_endpoint(const std::string &endpoint)
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
Converts internal spans to OTLP wire format.
static std::vector< uint8_t > build_span(const trace_span &span)
static std::vector< uint8_t > convert_to_otlp(const std::vector< trace_span > &spans, const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &resource_attributes)
Convert spans to OTLP protobuf format.
static std::vector< uint8_t > build_scope(const std::string &name, const std::string &version)
static std::vector< uint8_t > build_resource_spans(const std::vector< trace_span > &spans, const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &resource_attributes)
static void write_fixed64(std::vector< uint8_t > &data, uint64_t value)
static std::vector< uint8_t > build_any_value_string(const std::string &value)
static std::vector< uint8_t > build_key_value(const std::string &key, const std::string &value)
static std::vector< uint8_t > build_scope_spans(const std::vector< trace_span > &spans)
static std::vector< uint8_t > hex_to_bytes(const std::string &hex, std::size_t expected_size)
static std::vector< uint8_t > build_resource(const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &extra_attributes)
static void write_varint(std::vector< uint8_t > &data, uint64_t value)
Abstract interface for trace exporters.
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< otlp_grpc_exporter > create_otlp_grpc_exporter(const std::string &endpoint="localhost:4317")
Create OTLP gRPC exporter with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
gRPC request configuration
std::chrono::milliseconds timeout
std::unordered_map< std::string, std::string > metadata
std::vector< uint8_t > body
Statistics for OTLP exporter.
std::chrono::microseconds total_export_time
Total export time (microseconds for precision)
std::size_t spans_exported
std::size_t export_failures
std::size_t spans_dropped
Configuration for OTLP gRPC exporter.
std::string certificate_path
TLS certificate path.
std::string service_version
Service version.
std::unordered_map< std::string, std::string > resource_attributes
Resource attributes.
std::string endpoint
OTLP receiver endpoint.
std::size_t max_retry_attempts
Maximum retry attempts.
std::chrono::milliseconds batch_timeout
Batch export timeout.
std::string service_name
Service name.
std::size_t max_queue_size
Maximum queued spans.
common::VoidResult validate() const
Validate configuration.
std::size_t max_batch_size
Maximum spans per batch.
std::chrono::milliseconds initial_backoff
Initial retry backoff.
std::chrono::milliseconds max_backoff
Maximum retry backoff.
std::chrono::milliseconds timeout
Request timeout.
std::unordered_map< std::string, std::string > headers
Custom headers.
Trace span representing a unit of work in distributed tracing.
std::string parent_span_id
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::string operation_name
std::chrono::system_clock::time_point start_time
Trace data exporters for various distributed tracing systems.