Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::otlp_grpc_exporter Class Reference

OTLP gRPC trace exporter. More...

#include <otlp_grpc_exporter.h>

Inheritance diagram for kcenon::monitoring::otlp_grpc_exporter:
Inheritance graph
Collaboration diagram for kcenon::monitoring::otlp_grpc_exporter:
Collaboration graph

Public Member Functions

 otlp_grpc_exporter (const otlp_grpc_config &config)
 Construct with configuration.
 
 otlp_grpc_exporter (const otlp_grpc_config &config, std::unique_ptr< grpc_transport > transport)
 Construct with configuration and custom transport.
 
common::VoidResult start ()
 Start the exporter.
 
common::VoidResult export_spans (const std::vector< trace_span > &spans) override
 Export a batch of spans.
 
common::VoidResult flush () override
 Flush pending exports.
 
common::VoidResult shutdown () override
 Shutdown the exporter.
 
std::unordered_map< std::string, std::size_t > get_stats () const override
 Get exporter statistics.
 
otlp_exporter_stats get_detailed_stats () const
 Get detailed statistics.
 
bool is_running () const
 Check if exporter is running.
 
const otlp_grpc_configconfig () const
 Get configuration.
 
- Public Member Functions inherited from kcenon::monitoring::trace_exporter_interface
virtual ~trace_exporter_interface ()=default
 

Private Member Functions

common::Result< grpc_responsesend_with_retry (const grpc_request &request)
 

Static Private Member Functions

static bool is_retryable_error (int status_code)
 
static std::pair< std::string, uint16_t > parse_endpoint (const std::string &endpoint)
 

Private Attributes

otlp_grpc_config config_
 
std::unique_ptr< grpc_transporttransport_
 
std::atomic< bool > running_ {false}
 
std::atomic< std::size_t > exported_spans_ {0}
 
std::atomic< std::size_t > dropped_spans_ {0}
 
std::atomic< std::size_t > failed_exports_ {0}
 
std::atomic< std::size_t > retries_ {0}
 
std::mutex stats_mutex_
 
std::chrono::microseconds total_export_time_ {0}
 

Detailed Description

OTLP gRPC trace exporter.

Exports trace spans to an OpenTelemetry-compatible backend via gRPC. Implements batching, retry with exponential backoff, and async export.

Definition at line 380 of file otlp_grpc_exporter.h.

Constructor & Destructor Documentation

◆ otlp_grpc_exporter() [1/2]

kcenon::monitoring::otlp_grpc_exporter::otlp_grpc_exporter ( const otlp_grpc_config & config)
inlineexplicit

Construct with configuration.

Parameters
configExporter configuration

Definition at line 397 of file otlp_grpc_exporter.h.

std::unique_ptr< grpc_transport > transport_
const otlp_grpc_config & config() const
Get configuration.
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.

◆ otlp_grpc_exporter() [2/2]

kcenon::monitoring::otlp_grpc_exporter::otlp_grpc_exporter ( const otlp_grpc_config & config,
std::unique_ptr< grpc_transport > transport )
inline

Construct with configuration and custom transport.

Parameters
configExporter configuration
transportCustom gRPC transport (for testing)

Definition at line 405 of file otlp_grpc_exporter.h.

408 : config_(config), transport_(std::move(transport)) {}

Member Function Documentation

◆ config()

const otlp_grpc_config & kcenon::monitoring::otlp_grpc_exporter::config ( ) const
inline

Get configuration.

Definition at line 550 of file otlp_grpc_exporter.h.

550 {
551 return config_;
552 }

References config_.

◆ export_spans()

common::VoidResult kcenon::monitoring::otlp_grpc_exporter::export_spans ( const std::vector< trace_span > & spans)
inlineoverridevirtual

Export a batch of spans.

Parameters
spansSpans to export
Returns
common::VoidResult indicating success or failure

Implements kcenon::monitoring::trace_exporter_interface.

Definition at line 438 of file otlp_grpc_exporter.h.

438 {
439 if (spans.empty()) {
440 return common::ok();
441 }
442
443 if (!transport_->is_connected()) {
444 dropped_spans_ += spans.size();
445 return common::VoidResult::err(error_info(
447 "Not connected to OTLP receiver",
448 "otlp_grpc_exporter"
449 ).to_common_error());
450 }
451
452 // Convert spans to OTLP format
454 spans,
458
459 // Prepare gRPC request
460 grpc_request request;
461 request.service = "opentelemetry.proto.collector.trace.v1.TraceService";
462 request.method = "Export";
463 request.body = std::move(payload);
464 request.timeout = config_.timeout;
465
466 // Add headers/metadata
467 for (const auto& [key, value] : config_.headers) {
468 request.metadata[key] = value;
469 }
470
471 // Send with retry
472 auto start_time = std::chrono::steady_clock::now();
473 auto send_result = send_with_retry(request);
474 auto end_time = std::chrono::steady_clock::now();
475
476 {
477 std::lock_guard<std::mutex> lock(stats_mutex_);
478 total_export_time_ += std::chrono::duration_cast<std::chrono::microseconds>(
479 end_time - start_time);
480 }
481
482 if (send_result.is_ok()) {
483 exported_spans_ += spans.size();
484 return common::ok();
485 } else {
487 dropped_spans_ += spans.size();
488 return common::VoidResult::err(error_info(
490 "Failed to export spans: " + send_result.error().message,
491 "otlp_grpc_exporter"
492 ).to_common_error());
493 }
494 }
std::chrono::microseconds total_export_time_
std::atomic< std::size_t > dropped_spans_
std::atomic< std::size_t > exported_spans_
common::Result< grpc_response > send_with_retry(const grpc_request &request)
std::atomic< std::size_t > failed_exports_
static std::vector< uint8_t > convert_to_otlp(const std::vector< trace_span > &spans, const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &resource_attributes)
Convert spans to OTLP protobuf format.
std::string service_version
Service version.
std::unordered_map< std::string, std::string > resource_attributes
Resource attributes.
std::chrono::milliseconds timeout
Request timeout.
std::unordered_map< std::string, std::string > headers
Custom headers.

References kcenon::monitoring::grpc_request::body, config_, kcenon::monitoring::otlp_span_converter::convert_to_otlp(), dropped_spans_, exported_spans_, failed_exports_, kcenon::monitoring::otlp_grpc_config::headers, kcenon::monitoring::grpc_request::metadata, kcenon::monitoring::grpc_request::method, kcenon::monitoring::network_error, kcenon::monitoring::operation_failed, kcenon::monitoring::otlp_grpc_config::resource_attributes, send_with_retry(), kcenon::monitoring::grpc_request::service, kcenon::monitoring::otlp_grpc_config::service_name, kcenon::monitoring::otlp_grpc_config::service_version, stats_mutex_, kcenon::monitoring::grpc_request::timeout, kcenon::monitoring::otlp_grpc_config::timeout, kcenon::monitoring::error_info::to_common_error(), total_export_time_, and transport_.

Referenced by TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ flush()

common::VoidResult kcenon::monitoring::otlp_grpc_exporter::flush ( )
inlineoverridevirtual

Flush pending exports.

Implements kcenon::monitoring::trace_exporter_interface.

Definition at line 499 of file otlp_grpc_exporter.h.

499 {
500 // Synchronous exporter - nothing to flush
501 return common::ok();
502 }

◆ get_detailed_stats()

otlp_exporter_stats kcenon::monitoring::otlp_grpc_exporter::get_detailed_stats ( ) const
inline

Get detailed statistics.

Definition at line 528 of file otlp_grpc_exporter.h.

528 {
529 otlp_exporter_stats stats;
530 stats.spans_exported = exported_spans_.load();
531 stats.spans_dropped = dropped_spans_.load();
532 stats.export_failures = failed_exports_.load();
533 stats.retries = retries_.load();
534
535 std::lock_guard<std::mutex> lock(const_cast<std::mutex&>(stats_mutex_));
536 stats.total_export_time = total_export_time_;
537 return stats;
538 }

References dropped_spans_, kcenon::monitoring::otlp_exporter_stats::export_failures, exported_spans_, failed_exports_, kcenon::monitoring::otlp_exporter_stats::retries, retries_, kcenon::monitoring::otlp_exporter_stats::spans_dropped, kcenon::monitoring::otlp_exporter_stats::spans_exported, stats_mutex_, kcenon::monitoring::otlp_exporter_stats::total_export_time, and total_export_time_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ get_stats()

std::unordered_map< std::string, std::size_t > kcenon::monitoring::otlp_grpc_exporter::get_stats ( ) const
inlineoverridevirtual

Get exporter statistics.

Implements kcenon::monitoring::trace_exporter_interface.

Definition at line 516 of file otlp_grpc_exporter.h.

516 {
517 return {
518 {"exported_spans", exported_spans_.load()},
519 {"dropped_spans", dropped_spans_.load()},
520 {"failed_exports", failed_exports_.load()},
521 {"retries", retries_.load()}
522 };
523 }

References dropped_spans_, exported_spans_, failed_exports_, and retries_.

Referenced by TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ is_retryable_error()

static bool kcenon::monitoring::otlp_grpc_exporter::is_retryable_error ( int status_code)
inlinestaticprivate

Definition at line 601 of file otlp_grpc_exporter.h.

601 {
602 // gRPC status codes that are retryable:
603 // 1 = CANCELLED, 4 = DEADLINE_EXCEEDED, 8 = RESOURCE_EXHAUSTED,
604 // 10 = ABORTED, 14 = UNAVAILABLE
605 return status_code == 1 || status_code == 4 || status_code == 8 ||
606 status_code == 10 || status_code == 14;
607 }

Referenced by send_with_retry().

Here is the caller graph for this function:

◆ is_running()

bool kcenon::monitoring::otlp_grpc_exporter::is_running ( ) const
inline

Check if exporter is running.

Definition at line 543 of file otlp_grpc_exporter.h.

543 {
544 return running_.load();
545 }

References running_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ parse_endpoint()

static std::pair< std::string, uint16_t > kcenon::monitoring::otlp_grpc_exporter::parse_endpoint ( const std::string & endpoint)
inlinestaticprivate

Definition at line 609 of file otlp_grpc_exporter.h.

609 {
610 std::string host = "localhost";
611 uint16_t port = 4317;
612
613 auto colon_pos = endpoint.rfind(':');
614 if (colon_pos != std::string::npos) {
615 host = endpoint.substr(0, colon_pos);
616 try {
617 port = static_cast<uint16_t>(std::stoi(endpoint.substr(colon_pos + 1)));
618 } catch (...) {
619 port = 4317;
620 }
621 } else {
622 host = endpoint;
623 }
624
625 return {host, port};
626 }

References kcenon::monitoring::host.

Referenced by start().

Here is the caller graph for this function:

◆ send_with_retry()

common::Result< grpc_response > kcenon::monitoring::otlp_grpc_exporter::send_with_retry ( const grpc_request & request)
inlineprivate

Definition at line 555 of file otlp_grpc_exporter.h.

555 {
556 std::size_t attempt = 0;
557 auto backoff = config_.initial_backoff;
558
559 while (attempt < config_.max_retry_attempts) {
560 auto result = transport_->send(request);
561
562 if (result.is_ok()) {
563 const auto& response = result.value();
564 // gRPC status 0 = OK
565 if (response.status_code == 0) {
566 return result;
567 }
568
569 // Check if error is retryable
570 if (is_retryable_error(response.status_code)) {
571 attempt++;
572 retries_++;
573 if (attempt < config_.max_retry_attempts) {
574 std::this_thread::sleep_for(backoff);
575 backoff = std::min(backoff * 2, config_.max_backoff);
576 }
577 continue;
578 }
579
580 // Non-retryable error
581 return common::make_error<grpc_response>(
583 "OTLP export failed with status: " +
584 std::to_string(response.status_code) + " - " + response.status_message);
585 }
586
587 // Transport error
588 attempt++;
589 retries_++;
590 if (attempt < config_.max_retry_attempts) {
591 std::this_thread::sleep_for(backoff);
592 backoff = std::min(backoff * 2, config_.max_backoff);
593 }
594 }
595
596 return common::make_error<grpc_response>(
598 "OTLP export failed after " + std::to_string(config_.max_retry_attempts) + " retries");
599 }
static bool is_retryable_error(int status_code)
std::size_t max_retry_attempts
Maximum retry attempts.
std::chrono::milliseconds initial_backoff
Initial retry backoff.
std::chrono::milliseconds max_backoff
Maximum retry backoff.

References config_, kcenon::monitoring::otlp_grpc_config::initial_backoff, is_retryable_error(), kcenon::monitoring::otlp_grpc_config::max_backoff, kcenon::monitoring::otlp_grpc_config::max_retry_attempts, kcenon::monitoring::operation_failed, retries_, and transport_.

Referenced by export_spans().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ shutdown()

common::VoidResult kcenon::monitoring::otlp_grpc_exporter::shutdown ( )
inlineoverridevirtual

Shutdown the exporter.

Implements kcenon::monitoring::trace_exporter_interface.

Definition at line 507 of file otlp_grpc_exporter.h.

507 {
508 running_ = false;
509 transport_->disconnect();
510 return common::ok();
511 }

References running_, and transport_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ start()

common::VoidResult kcenon::monitoring::otlp_grpc_exporter::start ( )
inline

Start the exporter.

Returns
common::VoidResult indicating success or failure

Definition at line 414 of file otlp_grpc_exporter.h.

414 {
415 auto validate_result = config_.validate();
416 if (validate_result.is_err()) {
417 return validate_result;
418 }
419
420 // Parse endpoint
421 auto [host, port] = parse_endpoint(config_.endpoint);
422
423 // Connect to server
424 auto connect_result = transport_->connect(host, port);
425 if (connect_result.is_err()) {
426 return connect_result;
427 }
428
429 running_ = true;
430 return common::ok();
431 }
static std::pair< std::string, uint16_t > parse_endpoint(const std::string &endpoint)
std::string endpoint
OTLP receiver endpoint.
common::VoidResult validate() const
Validate configuration.

References config_, kcenon::monitoring::otlp_grpc_config::endpoint, kcenon::monitoring::host, parse_endpoint(), running_, transport_, and kcenon::monitoring::otlp_grpc_config::validate().

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ config_

otlp_grpc_config kcenon::monitoring::otlp_grpc_exporter::config_
private

Definition at line 382 of file otlp_grpc_exporter.h.

Referenced by config(), export_spans(), send_with_retry(), and start().

◆ dropped_spans_

std::atomic<std::size_t> kcenon::monitoring::otlp_grpc_exporter::dropped_spans_ {0}
private

Definition at line 386 of file otlp_grpc_exporter.h.

386{0};

Referenced by export_spans(), get_detailed_stats(), and get_stats().

◆ exported_spans_

std::atomic<std::size_t> kcenon::monitoring::otlp_grpc_exporter::exported_spans_ {0}
private

Definition at line 385 of file otlp_grpc_exporter.h.

385{0};

Referenced by export_spans(), get_detailed_stats(), and get_stats().

◆ failed_exports_

std::atomic<std::size_t> kcenon::monitoring::otlp_grpc_exporter::failed_exports_ {0}
private

Definition at line 387 of file otlp_grpc_exporter.h.

387{0};

Referenced by export_spans(), get_detailed_stats(), and get_stats().

◆ retries_

std::atomic<std::size_t> kcenon::monitoring::otlp_grpc_exporter::retries_ {0}
private

Definition at line 388 of file otlp_grpc_exporter.h.

388{0};

Referenced by get_detailed_stats(), get_stats(), and send_with_retry().

◆ running_

std::atomic<bool> kcenon::monitoring::otlp_grpc_exporter::running_ {false}
private

Definition at line 384 of file otlp_grpc_exporter.h.

384{false};

Referenced by is_running(), shutdown(), and start().

◆ stats_mutex_

std::mutex kcenon::monitoring::otlp_grpc_exporter::stats_mutex_
private

Definition at line 389 of file otlp_grpc_exporter.h.

Referenced by export_spans(), and get_detailed_stats().

◆ total_export_time_

std::chrono::microseconds kcenon::monitoring::otlp_grpc_exporter::total_export_time_ {0}
private

Definition at line 390 of file otlp_grpc_exporter.h.

390{0};

Referenced by export_spans(), and get_detailed_stats().

◆ transport_

std::unique_ptr<grpc_transport> kcenon::monitoring::otlp_grpc_exporter::transport_
private

Definition at line 383 of file otlp_grpc_exporter.h.

Referenced by export_spans(), send_with_retry(), shutdown(), and start().


The documentation for this class was generated from the following file: