Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
otlp_grpc_exporter.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
21#include "../core/error_codes.h"
23#include "trace_exporters.h"
24#include "grpc_transport.h"
25#include <vector>
26#include <string>
27#include <memory>
28#include <chrono>
29#include <atomic>
30#include <mutex>
31#include <queue>
32#include <thread>
33#include <condition_variable>
34
35namespace kcenon { namespace monitoring {
36
42 std::string endpoint = "localhost:4317";
43 std::chrono::milliseconds timeout{10000};
44 std::chrono::milliseconds batch_timeout{5000};
45 std::size_t max_batch_size = 512;
46 std::size_t max_queue_size = 2048;
47 std::size_t max_retry_attempts = 3;
48 std::chrono::milliseconds initial_backoff{100};
49 std::chrono::milliseconds max_backoff{10000};
50 bool use_tls = false;
51 std::string certificate_path;
52 std::string service_name = "monitoring_system";
53 std::string service_version = "2.0.0";
54 std::unordered_map<std::string, std::string> headers;
55 std::unordered_map<std::string, std::string> resource_attributes;
56
60 common::VoidResult validate() const {
61 if (endpoint.empty()) {
62 return common::VoidResult::err(error_info(
64 "OTLP endpoint cannot be empty",
65 "otlp_grpc_config"
66 ).to_common_error());
67 }
68 if (timeout.count() <= 0) {
69 return common::VoidResult::err(error_info(
71 "Timeout must be positive",
72 "otlp_grpc_config"
73 ).to_common_error());
74 }
75 if (max_batch_size == 0) {
76 return common::VoidResult::err(error_info(
78 "Batch size must be greater than 0",
79 "otlp_grpc_config"
80 ).to_common_error());
81 }
82 return common::ok();
83 }
84};
85
91 std::size_t spans_exported{0};
92 std::size_t spans_dropped{0};
93 std::size_t export_failures{0};
94 std::size_t retries{0};
95 std::size_t batches_sent{0};
96 std::chrono::microseconds total_export_time{0};
97};
98
107public:
116 static std::vector<uint8_t> convert_to_otlp(
117 const std::vector<trace_span>& spans,
118 const std::string& service_name,
119 const std::string& service_version,
120 const std::unordered_map<std::string, std::string>& resource_attributes) {
121
122 // OTLP ExportTraceServiceRequest structure:
123 // - resource_spans (repeated)
124 // - resource
125 // - attributes
126 // - scope_spans (repeated)
127 // - scope
128 // - spans (repeated)
129
130 std::vector<uint8_t> payload;
131
132 // Build minimal OTLP-compatible protobuf structure
133 // Note: Full implementation would use generated protobuf code
134
135 // For now, we serialize to a simplified format that OpenTelemetry
136 // Collector can process. Production use should integrate actual
137 // OTLP proto definitions.
138
139 // Write field 1 (resource_spans) - length-delimited
140 auto resource_spans_data = build_resource_spans(
141 spans, service_name, service_version, resource_attributes);
142
143 // Protobuf wire format: (field_number << 3) | wire_type
144 // Field 1, wire type 2 (length-delimited) = 0x0A
145 payload.push_back(0x0A);
146 write_varint(payload, resource_spans_data.size());
147 payload.insert(payload.end(),
148 resource_spans_data.begin(), resource_spans_data.end());
149
150 return payload;
151 }
152
153private:
154 static std::vector<uint8_t> build_resource_spans(
155 const std::vector<trace_span>& spans,
156 const std::string& service_name,
157 const std::string& service_version,
158 const std::unordered_map<std::string, std::string>& resource_attributes) {
159
160 std::vector<uint8_t> data;
161
162 // Field 1: resource
163 auto resource_data = build_resource(service_name, service_version, resource_attributes);
164 data.push_back(0x0A); // Field 1, length-delimited
165 write_varint(data, resource_data.size());
166 data.insert(data.end(), resource_data.begin(), resource_data.end());
167
168 // Field 2: scope_spans
169 auto scope_spans_data = build_scope_spans(spans);
170 data.push_back(0x12); // Field 2, length-delimited
171 write_varint(data, scope_spans_data.size());
172 data.insert(data.end(), scope_spans_data.begin(), scope_spans_data.end());
173
174 return data;
175 }
176
177 static std::vector<uint8_t> build_resource(
178 const std::string& service_name,
179 const std::string& service_version,
180 const std::unordered_map<std::string, std::string>& extra_attributes) {
181
182 std::vector<uint8_t> data;
183
184 // Field 1: attributes (repeated)
185 // service.name attribute
186 auto attr1 = build_key_value("service.name", service_name);
187 data.push_back(0x0A);
188 write_varint(data, attr1.size());
189 data.insert(data.end(), attr1.begin(), attr1.end());
190
191 // service.version attribute
192 auto attr2 = build_key_value("service.version", service_version);
193 data.push_back(0x0A);
194 write_varint(data, attr2.size());
195 data.insert(data.end(), attr2.begin(), attr2.end());
196
197 // Extra attributes
198 for (const auto& [key, value] : extra_attributes) {
199 auto attr = build_key_value(key, value);
200 data.push_back(0x0A);
201 write_varint(data, attr.size());
202 data.insert(data.end(), attr.begin(), attr.end());
203 }
204
205 return data;
206 }
207
208 static std::vector<uint8_t> build_scope_spans(
209 const std::vector<trace_span>& spans) {
210
211 std::vector<uint8_t> data;
212
213 // Field 1: scope (InstrumentationScope)
214 auto scope_data = build_scope("monitoring_system", "2.0.0");
215 data.push_back(0x0A);
216 write_varint(data, scope_data.size());
217 data.insert(data.end(), scope_data.begin(), scope_data.end());
218
219 // Field 2: spans (repeated)
220 for (const auto& span : spans) {
221 auto span_data = build_span(span);
222 data.push_back(0x12);
223 write_varint(data, span_data.size());
224 data.insert(data.end(), span_data.begin(), span_data.end());
225 }
226
227 return data;
228 }
229
230 static std::vector<uint8_t> build_scope(
231 const std::string& name,
232 const std::string& version) {
233
234 std::vector<uint8_t> data;
235
236 // Field 1: name
237 data.push_back(0x0A);
238 write_varint(data, name.size());
239 data.insert(data.end(), name.begin(), name.end());
240
241 // Field 2: version
242 data.push_back(0x12);
243 write_varint(data, version.size());
244 data.insert(data.end(), version.begin(), version.end());
245
246 return data;
247 }
248
249 static std::vector<uint8_t> build_span(const trace_span& span) {
250 std::vector<uint8_t> data;
251
252 // Field 1: trace_id (16 bytes)
253 auto trace_id_bytes = hex_to_bytes(span.trace_id, 16);
254 data.push_back(0x0A);
255 write_varint(data, trace_id_bytes.size());
256 data.insert(data.end(), trace_id_bytes.begin(), trace_id_bytes.end());
257
258 // Field 2: span_id (8 bytes)
259 auto span_id_bytes = hex_to_bytes(span.span_id, 8);
260 data.push_back(0x12);
261 write_varint(data, span_id_bytes.size());
262 data.insert(data.end(), span_id_bytes.begin(), span_id_bytes.end());
263
264 // Field 4: parent_span_id (8 bytes, optional)
265 if (!span.parent_span_id.empty()) {
266 auto parent_id_bytes = hex_to_bytes(span.parent_span_id, 8);
267 data.push_back(0x22);
268 write_varint(data, parent_id_bytes.size());
269 data.insert(data.end(), parent_id_bytes.begin(), parent_id_bytes.end());
270 }
271
272 // Field 5: name
273 data.push_back(0x2A);
274 write_varint(data, span.operation_name.size());
275 data.insert(data.end(), span.operation_name.begin(), span.operation_name.end());
276
277 // Field 6: kind (SPAN_KIND_INTERNAL = 1)
278 data.push_back(0x30);
279 data.push_back(0x01);
280
281 // Field 7: start_time_unix_nano (fixed64)
282 auto start_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
283 span.start_time.time_since_epoch()).count();
284 data.push_back(0x39); // Field 7, wire type 1 (64-bit)
285 write_fixed64(data, static_cast<uint64_t>(start_ns));
286
287 // Field 8: end_time_unix_nano (fixed64)
288 auto end_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
289 span.end_time.time_since_epoch()).count();
290 data.push_back(0x41); // Field 8, wire type 1 (64-bit)
291 write_fixed64(data, static_cast<uint64_t>(end_ns));
292
293 // Field 9: attributes (repeated)
294 for (const auto& [key, value] : span.tags) {
295 auto attr = build_key_value(key, value);
296 data.push_back(0x4A);
297 write_varint(data, attr.size());
298 data.insert(data.end(), attr.begin(), attr.end());
299 }
300
301 return data;
302 }
303
304 static std::vector<uint8_t> build_key_value(
305 const std::string& key,
306 const std::string& value) {
307
308 std::vector<uint8_t> data;
309
310 // Field 1: key
311 data.push_back(0x0A);
312 write_varint(data, key.size());
313 data.insert(data.end(), key.begin(), key.end());
314
315 // Field 2: value (AnyValue)
316 auto any_value = build_any_value_string(value);
317 data.push_back(0x12);
318 write_varint(data, any_value.size());
319 data.insert(data.end(), any_value.begin(), any_value.end());
320
321 return data;
322 }
323
324 static std::vector<uint8_t> build_any_value_string(const std::string& value) {
325 std::vector<uint8_t> data;
326 // Field 1: string_value
327 data.push_back(0x0A);
328 write_varint(data, value.size());
329 data.insert(data.end(), value.begin(), value.end());
330 return data;
331 }
332
333 static void write_varint(std::vector<uint8_t>& data, uint64_t value) {
334 while (value >= 0x80) {
335 data.push_back(static_cast<uint8_t>(value | 0x80));
336 value >>= 7;
337 }
338 data.push_back(static_cast<uint8_t>(value));
339 }
340
341 static void write_fixed64(std::vector<uint8_t>& data, uint64_t value) {
342 for (int i = 0; i < 8; ++i) {
343 data.push_back(static_cast<uint8_t>(value & 0xFF));
344 value >>= 8;
345 }
346 }
347
348 static std::vector<uint8_t> hex_to_bytes(const std::string& hex, std::size_t expected_size) {
349 std::vector<uint8_t> bytes;
350 bytes.reserve(expected_size);
351
352 for (std::size_t i = 0; i + 1 < hex.size(); i += 2) {
353 auto byte_str = hex.substr(i, 2);
354 try {
355 bytes.push_back(static_cast<uint8_t>(std::stoul(byte_str, nullptr, 16)));
356 } catch (...) {
357 bytes.push_back(0);
358 }
359 }
360
361 // Pad or truncate to expected size
362 while (bytes.size() < expected_size) {
363 bytes.insert(bytes.begin(), 0);
364 }
365 if (bytes.size() > expected_size) {
366 bytes.resize(expected_size);
367 }
368
369 return bytes;
370 }
371};
372
381private:
383 std::unique_ptr<grpc_transport> transport_;
384 std::atomic<bool> running_{false};
385 std::atomic<std::size_t> exported_spans_{0};
386 std::atomic<std::size_t> dropped_spans_{0};
387 std::atomic<std::size_t> failed_exports_{0};
388 std::atomic<std::size_t> retries_{0};
389 std::mutex stats_mutex_;
390 std::chrono::microseconds total_export_time_{0};
391
392public:
399
407 std::unique_ptr<grpc_transport> transport)
408 : config_(config), transport_(std::move(transport)) {}
409
414 common::VoidResult start() {
415 auto validate_result = config_.validate();
416 if (validate_result.is_err()) {
417 return validate_result;
418 }
419
420 // Parse endpoint
421 auto [host, port] = parse_endpoint(config_.endpoint);
422
423 // Connect to server
424 auto connect_result = transport_->connect(host, port);
425 if (connect_result.is_err()) {
426 return connect_result;
427 }
428
429 running_ = true;
430 return common::ok();
431 }
432
438 common::VoidResult export_spans(const std::vector<trace_span>& spans) override {
439 if (spans.empty()) {
440 return common::ok();
441 }
442
443 if (!transport_->is_connected()) {
444 dropped_spans_ += spans.size();
445 return common::VoidResult::err(error_info(
447 "Not connected to OTLP receiver",
448 "otlp_grpc_exporter"
449 ).to_common_error());
450 }
451
452 // Convert spans to OTLP format
454 spans,
458
459 // Prepare gRPC request
460 grpc_request request;
461 request.service = "opentelemetry.proto.collector.trace.v1.TraceService";
462 request.method = "Export";
463 request.body = std::move(payload);
464 request.timeout = config_.timeout;
465
466 // Add headers/metadata
467 for (const auto& [key, value] : config_.headers) {
468 request.metadata[key] = value;
469 }
470
471 // Send with retry
472 auto start_time = std::chrono::steady_clock::now();
473 auto send_result = send_with_retry(request);
474 auto end_time = std::chrono::steady_clock::now();
475
476 {
477 std::lock_guard<std::mutex> lock(stats_mutex_);
478 total_export_time_ += std::chrono::duration_cast<std::chrono::microseconds>(
479 end_time - start_time);
480 }
481
482 if (send_result.is_ok()) {
483 exported_spans_ += spans.size();
484 return common::ok();
485 } else {
487 dropped_spans_ += spans.size();
488 return common::VoidResult::err(error_info(
490 "Failed to export spans: " + send_result.error().message,
491 "otlp_grpc_exporter"
492 ).to_common_error());
493 }
494 }
495
499 common::VoidResult flush() override {
500 // Synchronous exporter - nothing to flush
501 return common::ok();
502 }
503
507 common::VoidResult shutdown() override {
508 running_ = false;
509 transport_->disconnect();
510 return common::ok();
511 }
512
516 std::unordered_map<std::string, std::size_t> get_stats() const override {
517 return {
518 {"exported_spans", exported_spans_.load()},
519 {"dropped_spans", dropped_spans_.load()},
520 {"failed_exports", failed_exports_.load()},
521 {"retries", retries_.load()}
522 };
523 }
524
530 stats.spans_exported = exported_spans_.load();
531 stats.spans_dropped = dropped_spans_.load();
532 stats.export_failures = failed_exports_.load();
533 stats.retries = retries_.load();
534
535 std::lock_guard<std::mutex> lock(const_cast<std::mutex&>(stats_mutex_));
537 return stats;
538 }
539
543 bool is_running() const {
544 return running_.load();
545 }
546
550 const otlp_grpc_config& config() const {
551 return config_;
552 }
553
554private:
555 common::Result<grpc_response> send_with_retry(const grpc_request& request) {
556 std::size_t attempt = 0;
557 auto backoff = config_.initial_backoff;
558
559 while (attempt < config_.max_retry_attempts) {
560 auto result = transport_->send(request);
561
562 if (result.is_ok()) {
563 const auto& response = result.value();
564 // gRPC status 0 = OK
565 if (response.status_code == 0) {
566 return result;
567 }
568
569 // Check if error is retryable
570 if (is_retryable_error(response.status_code)) {
571 attempt++;
572 retries_++;
573 if (attempt < config_.max_retry_attempts) {
574 std::this_thread::sleep_for(backoff);
575 backoff = std::min(backoff * 2, config_.max_backoff);
576 }
577 continue;
578 }
579
580 // Non-retryable error
581 return common::make_error<grpc_response>(
583 "OTLP export failed with status: " +
584 std::to_string(response.status_code) + " - " + response.status_message);
585 }
586
587 // Transport error
588 attempt++;
589 retries_++;
590 if (attempt < config_.max_retry_attempts) {
591 std::this_thread::sleep_for(backoff);
592 backoff = std::min(backoff * 2, config_.max_backoff);
593 }
594 }
595
596 return common::make_error<grpc_response>(
598 "OTLP export failed after " + std::to_string(config_.max_retry_attempts) + " retries");
599 }
600
601 static bool is_retryable_error(int status_code) {
602 // gRPC status codes that are retryable:
603 // 1 = CANCELLED, 4 = DEADLINE_EXCEEDED, 8 = RESOURCE_EXHAUSTED,
604 // 10 = ABORTED, 14 = UNAVAILABLE
605 return status_code == 1 || status_code == 4 || status_code == 8 ||
606 status_code == 10 || status_code == 14;
607 }
608
609 static std::pair<std::string, uint16_t> parse_endpoint(const std::string& endpoint) {
610 std::string host = "localhost";
611 uint16_t port = 4317;
612
613 auto colon_pos = endpoint.rfind(':');
614 if (colon_pos != std::string::npos) {
615 host = endpoint.substr(0, colon_pos);
616 try {
617 port = static_cast<uint16_t>(std::stoi(endpoint.substr(colon_pos + 1)));
618 } catch (...) {
619 port = 4317;
620 }
621 } else {
622 host = endpoint;
623 }
624
625 return {host, port};
626 }
627};
628
634inline std::unique_ptr<otlp_grpc_exporter> create_otlp_grpc_exporter(
635 const std::string& endpoint = "localhost:4317") {
636
637 otlp_grpc_config config;
638 config.endpoint = endpoint;
639 return std::make_unique<otlp_grpc_exporter>(config);
640}
641
647inline std::unique_ptr<otlp_grpc_exporter> create_otlp_grpc_exporter(
648 const otlp_grpc_config& config) {
649
650 return std::make_unique<otlp_grpc_exporter>(config);
651}
652
653} } // namespace kcenon::monitoring
common::VoidResult flush() override
Flush pending exports.
std::chrono::microseconds total_export_time_
common::VoidResult shutdown() override
Shutdown the exporter.
otlp_exporter_stats get_detailed_stats() const
Get detailed statistics.
static bool is_retryable_error(int status_code)
std::atomic< std::size_t > dropped_spans_
otlp_grpc_exporter(const otlp_grpc_config &config, std::unique_ptr< grpc_transport > transport)
Construct with configuration and custom transport.
common::VoidResult start()
Start the exporter.
bool is_running() const
Check if exporter is running.
std::unique_ptr< grpc_transport > transport_
otlp_grpc_exporter(const otlp_grpc_config &config)
Construct with configuration.
std::atomic< std::size_t > exported_spans_
common::Result< grpc_response > send_with_retry(const grpc_request &request)
const otlp_grpc_config & config() const
Get configuration.
std::unordered_map< std::string, std::size_t > get_stats() const override
Get exporter statistics.
std::atomic< std::size_t > failed_exports_
static std::pair< std::string, uint16_t > parse_endpoint(const std::string &endpoint)
common::VoidResult export_spans(const std::vector< trace_span > &spans) override
Export a batch of spans.
Converts internal spans to OTLP wire format.
static std::vector< uint8_t > build_span(const trace_span &span)
static std::vector< uint8_t > convert_to_otlp(const std::vector< trace_span > &spans, const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &resource_attributes)
Convert spans to OTLP protobuf format.
static std::vector< uint8_t > build_scope(const std::string &name, const std::string &version)
static std::vector< uint8_t > build_resource_spans(const std::vector< trace_span > &spans, const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &resource_attributes)
static void write_fixed64(std::vector< uint8_t > &data, uint64_t value)
static std::vector< uint8_t > build_any_value_string(const std::string &value)
static std::vector< uint8_t > build_key_value(const std::string &key, const std::string &value)
static std::vector< uint8_t > build_scope_spans(const std::vector< trace_span > &spans)
static std::vector< uint8_t > hex_to_bytes(const std::string &hex, std::size_t expected_size)
static std::vector< uint8_t > build_resource(const std::string &service_name, const std::string &service_version, const std::unordered_map< std::string, std::string > &extra_attributes)
static void write_varint(std::vector< uint8_t > &data, uint64_t value)
Abstract interface for trace exporters.
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
gRPC transport layer for OTLP exporters
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< otlp_grpc_exporter > create_otlp_grpc_exporter(const std::string &endpoint="localhost:4317")
Create OTLP gRPC exporter with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
gRPC request configuration
std::chrono::milliseconds timeout
std::unordered_map< std::string, std::string > metadata
std::vector< uint8_t > body
std::chrono::microseconds total_export_time
Total export time (microseconds for precision)
Configuration for OTLP gRPC exporter.
std::string certificate_path
TLS certificate path.
std::string service_version
Service version.
std::unordered_map< std::string, std::string > resource_attributes
Resource attributes.
std::string endpoint
OTLP receiver endpoint.
std::size_t max_retry_attempts
Maximum retry attempts.
std::chrono::milliseconds batch_timeout
Batch export timeout.
std::size_t max_queue_size
Maximum queued spans.
common::VoidResult validate() const
Validate configuration.
std::size_t max_batch_size
Maximum spans per batch.
std::chrono::milliseconds initial_backoff
Initial retry backoff.
std::chrono::milliseconds max_backoff
Maximum retry backoff.
std::chrono::milliseconds timeout
Request timeout.
std::unordered_map< std::string, std::string > headers
Custom headers.
Trace span representing a unit of work in distributed tracing.
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::chrono::system_clock::time_point start_time
Trace data exporters for various distributed tracing systems.