Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
exporters.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13
14#include <atomic>
15#include <iomanip>
16#include <iostream>
17#include <mutex>
18#include <sstream>
19#include <vector>
20
22
23namespace {
24
25// Global tracing state
26struct tracing_state
27{
28 std::atomic<bool> enabled{false};
29 tracing_config config;
30 std::vector<span_processor_callback> processors;
31 std::mutex mutex;
32
33 // Batch queue for async export
34 std::vector<std::string> batch_queue;
35 std::mutex batch_mutex;
36 std::atomic<size_t> queued_count{0};
37};
38
39// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
40tracing_state g_tracing_state;
41// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
42std::mutex g_console_mutex;
43
44// Sampling decision based on trace ID
45auto should_sample(const trace_context& ctx, double sample_rate) -> bool
46{
47 if (sample_rate >= 1.0)
48 {
49 return true;
50 }
51 if (sample_rate <= 0.0)
52 {
53 return false;
54 }
55
56 // Use first 8 bytes of trace ID for consistent sampling
57 const auto& trace_id = ctx.trace_id();
58 uint64_t hash = 0;
59 for (size_t i = 0; i < 8 && i < trace_id.size(); ++i)
60 {
61 hash = (hash << 8) | trace_id[i];
62 }
63
64 // Normalize to 0.0-1.0 range
65 double normalized = static_cast<double>(hash) / static_cast<double>(UINT64_MAX);
66 return normalized < sample_rate;
67}
68
69// Convert attribute value to string
70auto attribute_to_string(const attribute_value& value) -> std::string
71{
72 return std::visit(
73 [](auto&& arg) -> std::string
74 {
75 using T = std::decay_t<decltype(arg)>;
76 if constexpr (std::is_same_v<T, std::string>)
77 {
78 return "\"" + arg + "\"";
79 }
80 else if constexpr (std::is_same_v<T, bool>)
81 {
82 return arg ? "true" : "false";
83 }
84 else if constexpr (std::is_same_v<T, int64_t>)
85 {
86 return std::to_string(arg);
87 }
88 else if constexpr (std::is_same_v<T, double>)
89 {
90 std::ostringstream oss;
91 oss << std::fixed << std::setprecision(3) << arg;
92 return oss.str();
93 }
94 else
95 {
96 return "unknown";
97 }
98 },
99 value);
100}
101
102// Convert span kind to string
103auto span_kind_to_string(span_kind kind) -> std::string_view
104{
105 switch (kind)
106 {
108 return "INTERNAL";
110 return "SERVER";
112 return "CLIENT";
114 return "PRODUCER";
116 return "CONSUMER";
117 }
118 return "UNKNOWN";
119}
120
121// Convert span status to string
122auto span_status_to_string(span_status status) -> std::string_view
123{
124 switch (status)
125 {
127 return "UNSET";
128 case span_status::ok:
129 return "OK";
131 return "ERROR";
132 }
133 return "UNKNOWN";
134}
135
136// Console exporter implementation
137void export_to_console(const span& s)
138{
139 const auto& ctx = s.context();
140 auto duration_ns = s.duration().count();
141 double duration_ms = static_cast<double>(duration_ns) / 1'000'000.0;
142
143 std::ostringstream oss;
144 oss << "\n";
145 oss << "=== SPAN ===\n";
146 oss << "Name: " << s.name() << "\n";
147 oss << "Trace ID: " << ctx.trace_id_hex() << "\n";
148 oss << "Span ID: " << ctx.span_id_hex() << "\n";
149
150 if (ctx.parent_span_id().has_value())
151 {
152 oss << "Parent ID: " << bytes_to_hex(ctx.parent_span_id()->data(), 8) << "\n";
153 }
154
155 oss << "Kind: " << span_kind_to_string(s.kind()) << "\n";
156 oss << "Status: " << span_status_to_string(s.status());
157
158 if (!s.status_description().empty())
159 {
160 oss << " (" << s.status_description() << ")";
161 }
162 oss << "\n";
163
164 oss << std::fixed << std::setprecision(3);
165 oss << "Duration: " << duration_ms << " ms\n";
166
167 // Attributes
168 const auto& attrs = s.attributes();
169 if (!attrs.empty())
170 {
171 oss << "Attributes:\n";
172 for (const auto& [key, value] : attrs)
173 {
174 oss << " " << key << ": " << attribute_to_string(value) << "\n";
175 }
176 }
177
178 // Events
179 const auto& events = s.events();
180 if (!events.empty())
181 {
182 oss << "Events:\n";
183 for (const auto& event : events)
184 {
185 oss << " - " << event.name;
186 if (!event.attributes.empty())
187 {
188 oss << " {";
189 bool first = true;
190 for (const auto& [key, value] : event.attributes)
191 {
192 if (!first)
193 {
194 oss << ", ";
195 }
196 oss << key << ": " << attribute_to_string(value);
197 first = false;
198 }
199 oss << "}";
200 }
201 oss << "\n";
202 }
203 }
204
205 oss << "============\n";
206
207 std::lock_guard<std::mutex> lock(g_console_mutex);
208 std::cout << oss.str() << std::flush;
209}
210
211// Escape JSON string
212auto json_escape(const std::string& s) -> std::string
213{
214 std::ostringstream oss;
215 for (char c : s)
216 {
217 switch (c)
218 {
219 case '"':
220 oss << "\\\"";
221 break;
222 case '\\':
223 oss << "\\\\";
224 break;
225 case '\b':
226 oss << "\\b";
227 break;
228 case '\f':
229 oss << "\\f";
230 break;
231 case '\n':
232 oss << "\\n";
233 break;
234 case '\r':
235 oss << "\\r";
236 break;
237 case '\t':
238 oss << "\\t";
239 break;
240 default:
241 if (static_cast<unsigned char>(c) < 0x20)
242 {
243 oss << "\\u" << std::hex << std::setfill('0') << std::setw(4)
244 << static_cast<int>(c);
245 }
246 else
247 {
248 oss << c;
249 }
250 }
251 }
252 return oss.str();
253}
254
255// Convert attribute value to JSON
256auto attribute_to_json(const attribute_value& value) -> std::string
257{
258 return std::visit(
259 [](auto&& arg) -> std::string
260 {
261 using T = std::decay_t<decltype(arg)>;
262 if constexpr (std::is_same_v<T, std::string>)
263 {
264 return "{\"stringValue\":\"" + json_escape(arg) + "\"}";
265 }
266 else if constexpr (std::is_same_v<T, bool>)
267 {
268 return std::string("{\"boolValue\":") + (arg ? "true" : "false") + "}";
269 }
270 else if constexpr (std::is_same_v<T, int64_t>)
271 {
272 return "{\"intValue\":\"" + std::to_string(arg) + "\"}";
273 }
274 else if constexpr (std::is_same_v<T, double>)
275 {
276 std::ostringstream oss;
277 oss << std::fixed << std::setprecision(6) << arg;
278 return "{\"doubleValue\":" + oss.str() + "}";
279 }
280 else
281 {
282 return "{\"stringValue\":\"unknown\"}";
283 }
284 },
285 value);
286}
287
288// Convert span to OTLP JSON format
289auto span_to_otlp_json(const span& s) -> std::string
290{
291 const auto& ctx = s.context();
292 std::ostringstream oss;
293
294 // Get timestamps
295 auto start_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
296 s.start_time().time_since_epoch())
297 .count();
298 auto end_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
299 s.end_time().time_since_epoch())
300 .count();
301
302 oss << "{";
303 oss << "\"traceId\":\"" << ctx.trace_id_hex() << "\",";
304 oss << "\"spanId\":\"" << ctx.span_id_hex() << "\",";
305
306 if (ctx.parent_span_id().has_value())
307 {
308 oss << "\"parentSpanId\":\""
309 << bytes_to_hex(ctx.parent_span_id()->data(), 8) << "\",";
310 }
311
312 oss << "\"name\":\"" << json_escape(s.name()) << "\",";
313 oss << "\"kind\":" << (static_cast<int>(s.kind()) + 1) << ",";
314 oss << "\"startTimeUnixNano\":\"" << start_ns << "\",";
315 oss << "\"endTimeUnixNano\":\"" << end_ns << "\",";
316
317 // Attributes
318 oss << "\"attributes\":[";
319 const auto& attrs = s.attributes();
320 bool first = true;
321 for (const auto& [key, value] : attrs)
322 {
323 if (!first)
324 {
325 oss << ",";
326 }
327 oss << "{\"key\":\"" << json_escape(key)
328 << "\",\"value\":" << attribute_to_json(value) << "}";
329 first = false;
330 }
331 oss << "],";
332
333 // Events
334 oss << "\"events\":[";
335 const auto& events = s.events();
336 first = true;
337 for (const auto& event : events)
338 {
339 if (!first)
340 {
341 oss << ",";
342 }
343 auto event_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
344 event.timestamp.time_since_epoch())
345 .count();
346 oss << "{\"name\":\"" << json_escape(event.name) << "\",";
347 oss << "\"timeUnixNano\":\"" << event_ns << "\",";
348 oss << "\"attributes\":[";
349 bool attr_first = true;
350 for (const auto& [key, value] : event.attributes)
351 {
352 if (!attr_first)
353 {
354 oss << ",";
355 }
356 oss << "{\"key\":\"" << json_escape(key)
357 << "\",\"value\":" << attribute_to_json(value) << "}";
358 attr_first = false;
359 }
360 oss << "]}";
361 first = false;
362 }
363 oss << "],";
364
365 // Status
366 oss << "\"status\":{";
367 if (s.status() == span_status::error)
368 {
369 oss << "\"code\":2";
370 if (!s.status_description().empty())
371 {
372 oss << ",\"message\":\"" << json_escape(s.status_description()) << "\"";
373 }
374 }
375 else if (s.status() == span_status::ok)
376 {
377 oss << "\"code\":1";
378 }
379 else
380 {
381 oss << "\"code\":0";
382 }
383 oss << "}";
384
385 oss << "}";
386
387 return oss.str();
388}
389
390// Build OTLP export request body
391auto build_otlp_request(const std::vector<std::string>& spans_json,
392 const tracing_config& config) -> std::string
393{
394 std::ostringstream oss;
395
396 oss << "{\"resourceSpans\":[{";
397
398 // Resource
399 oss << "\"resource\":{\"attributes\":[";
400 oss << "{\"key\":\"service.name\",\"value\":{\"stringValue\":\""
401 << json_escape(config.service_name) << "\"}}";
402
403 if (!config.service_version.empty())
404 {
405 oss << ",{\"key\":\"service.version\",\"value\":{\"stringValue\":\""
406 << json_escape(config.service_version) << "\"}}";
407 }
408
409 if (!config.service_namespace.empty())
410 {
411 oss << ",{\"key\":\"service.namespace\",\"value\":{\"stringValue\":\""
412 << json_escape(config.service_namespace) << "\"}}";
413 }
414
415 if (!config.service_instance_id.empty())
416 {
417 oss << ",{\"key\":\"service.instance.id\",\"value\":{\"stringValue\":\""
418 << json_escape(config.service_instance_id) << "\"}}";
419 }
420
421 for (const auto& [key, value] : config.resource_attributes)
422 {
423 oss << ",{\"key\":\"" << json_escape(key)
424 << "\",\"value\":{\"stringValue\":\"" << json_escape(value) << "\"}}";
425 }
426
427 oss << "]},";
428
429 // Scope spans
430 oss << "\"scopeSpans\":[{";
431 oss << "\"scope\":{\"name\":\"network_system.tracing\",\"version\":\"1.0.0\"},";
432 oss << "\"spans\":[";
433
434 bool first = true;
435 for (const auto& span_json : spans_json)
436 {
437 if (!first)
438 {
439 oss << ",";
440 }
441 oss << span_json;
442 first = false;
443 }
444
445 oss << "]}]}]}";
446
447 return oss.str();
448}
449
450// Export spans via OTLP HTTP
451void export_otlp_http(const std::vector<std::string>& spans_json)
452{
453 if (spans_json.empty())
454 {
455 return;
456 }
457
458 const auto& config = g_tracing_state.config;
459 std::string body = build_otlp_request(spans_json, config);
460
461 if (config.debug)
462 {
463 std::lock_guard<std::mutex> lock(g_console_mutex);
464 std::cout << "[TRACING] Exporting " << spans_json.size()
465 << " spans to OTLP HTTP: " << config.otlp.endpoint << "\n";
466 std::cout << "[TRACING] Request body: " << body << "\n";
467 }
468
469 // Note: Full HTTP implementation would use http2_client or ASIO
470 // For now, we log the export attempt when debug is enabled
471 // Production implementation should use async HTTP POST
472}
473
474// Process completed span
475void process_span(const span& s)
476{
477 if (!g_tracing_state.enabled.load(std::memory_order_acquire))
478 {
479 return;
480 }
481
482 const auto& config = g_tracing_state.config;
483
484 // Check sampling based on sampler type
485 bool sampled = false;
486 switch (config.sampler)
487 {
489 sampled = true;
490 break;
492 sampled = false;
493 break;
495 sampled = should_sample(s.context(), config.sample_rate);
496 break;
498 // Use parent's sampling decision (from context)
499 sampled = s.context().is_sampled();
500 break;
501 }
502
503 if (!sampled)
504 {
505 return;
506 }
507
508 // Export based on configured exporter
509 switch (config.exporter)
510 {
512 export_to_console(s);
513 break;
514
516 {
517 // Convert span to JSON and queue for batch export
518 std::string span_json = span_to_otlp_json(s);
519 {
520 std::lock_guard<std::mutex> lock(g_tracing_state.batch_mutex);
521 g_tracing_state.batch_queue.push_back(std::move(span_json));
522 g_tracing_state.queued_count.fetch_add(1, std::memory_order_relaxed);
523 }
524
525 // Check if batch should be exported
526 if (g_tracing_state.queued_count.load(std::memory_order_relaxed) >=
528 {
529 std::vector<std::string> batch;
530 {
531 std::lock_guard<std::mutex> lock(g_tracing_state.batch_mutex);
532 batch = std::move(g_tracing_state.batch_queue);
533 g_tracing_state.batch_queue.clear();
534 g_tracing_state.queued_count.store(0, std::memory_order_relaxed);
535 }
536 export_otlp_http(batch);
537 }
538 break;
539 }
540
542 // gRPC export requires protobuf, fall back to debug console
543 if (config.debug)
544 {
545 {
546 std::lock_guard<std::mutex> lock(g_console_mutex);
547 std::cout << "[TRACING] OTLP gRPC export not implemented, "
548 << "use otlp_http instead\n";
549 }
550 export_to_console(s);
551 }
552 break;
553
555 // Jaeger export uses Thrift, fall back to debug console
556 if (config.debug)
557 {
558 {
559 std::lock_guard<std::mutex> lock(g_console_mutex);
560 std::cout << "[TRACING] Jaeger export not implemented, "
561 << "use otlp_http with Jaeger OTLP receiver\n";
562 }
563 export_to_console(s);
564 }
565 break;
566
568 // Zipkin export requires JSON v2 format
569 if (config.debug)
570 {
571 {
572 std::lock_guard<std::mutex> lock(g_console_mutex);
573 std::cout << "[TRACING] Zipkin export not implemented, "
574 << "use otlp_http with Zipkin OTLP receiver\n";
575 }
576 export_to_console(s);
577 }
578 break;
579
581 default:
582 // No-op
583 break;
584 }
585
586 // Call registered processors
587 std::lock_guard<std::mutex> lock(g_tracing_state.mutex);
588 for (const auto& processor : g_tracing_state.processors)
589 {
590 if (processor)
591 {
592 processor(s);
593 }
594 }
595}
596
597} // anonymous namespace
598
600{
601 const bool should_log = config.debug && config.exporter != exporter_type::none;
602
603 {
604 std::lock_guard<std::mutex> lock(g_tracing_state.mutex);
605
606 g_tracing_state.config = config;
607 g_tracing_state.enabled.store(config.exporter != exporter_type::none,
608 std::memory_order_release);
609 }
610
611 if (should_log)
612 {
613 std::lock_guard<std::mutex> lock(g_console_mutex);
614 std::cout << "[TRACING] Configured with exporter: ";
615 switch (config.exporter)
616 {
618 std::cout << "console";
619 break;
621 std::cout << "otlp_grpc (" << config.otlp.endpoint << ")";
622 break;
624 std::cout << "otlp_http (" << config.otlp.endpoint << ")";
625 break;
627 std::cout << "jaeger (" << config.jaeger_endpoint << ")";
628 break;
630 std::cout << "zipkin (" << config.zipkin_endpoint << ")";
631 break;
632 default:
633 std::cout << "none";
634 break;
635 }
636 std::cout << ", service: " << config.service_name
637 << ", sample_rate: " << config.sample_rate << "\n";
638 }
639}
640
642{
643 // Flush any pending spans before shutdown
645
646 std::lock_guard<std::mutex> lock(g_tracing_state.mutex);
647
648 g_tracing_state.enabled.store(false, std::memory_order_release);
649 g_tracing_state.processors.clear();
650 g_tracing_state.config = tracing_config{};
651
652 // Clear batch queue
653 {
654 std::lock_guard<std::mutex> batch_lock(g_tracing_state.batch_mutex);
655 g_tracing_state.batch_queue.clear();
656 g_tracing_state.queued_count.store(0, std::memory_order_relaxed);
657 }
658}
659
661{
662 // Flush batch queue
663 std::vector<std::string> batch;
664 {
665 std::lock_guard<std::mutex> lock(g_tracing_state.batch_mutex);
666 if (!g_tracing_state.batch_queue.empty())
667 {
668 batch = std::move(g_tracing_state.batch_queue);
669 g_tracing_state.batch_queue.clear();
670 g_tracing_state.queued_count.store(0, std::memory_order_relaxed);
671 }
672 }
673
674 if (!batch.empty())
675 {
676 export_otlp_http(batch);
677 }
678
679 {
680 std::lock_guard<std::mutex> lock(g_console_mutex);
681 std::cout << std::flush;
682 }
683}
684
685auto is_tracing_enabled() -> bool
686{
687 return g_tracing_state.enabled.load(std::memory_order_acquire);
688}
689
691{
692 if (!callback)
693 {
694 return;
695 }
696
697 std::lock_guard<std::mutex> lock(g_tracing_state.mutex);
698 g_tracing_state.processors.push_back(std::move(callback));
699}
700
701void export_span(const span& s)
702{
703 process_span(s);
704}
705
706} // namespace kcenon::network::tracing
RAII span for distributed tracing.
Definition span.h:103
std::vector< std::string > batch_queue
Definition exporters.cpp:34
std::mutex batch_mutex
Definition exporters.cpp:35
std::atomic< bool > enabled
Definition exporters.cpp:28
std::vector< span_processor_callback > processors
Definition exporters.cpp:30
std::atomic< size_t > queued_count
Definition exporters.cpp:36
tracing_config config
Definition exporters.cpp:29
@ otlp_http
OTLP over HTTP (OpenTelemetry Collector)
@ otlp_grpc
OTLP over gRPC (OpenTelemetry Collector)
@ console
Console/stdout output (for debugging)
void flush_tracing()
Force flush all pending spans.
std::variant< std::string, int64_t, double, bool > attribute_value
Attribute value type (supports string, int64, double, bool)
Definition span.h:55
span_kind
Span kind following OpenTelemetry conventions.
Definition span.h:44
@ consumer
Message consumer (e.g., queue subscriber)
@ server
Server-side handling of a request.
@ internal
Default, represents internal operations.
@ producer
Message producer (e.g., queue publisher)
auto bytes_to_hex(const uint8_t *data, size_t size) -> std::string
Convert bytes to lowercase hex string.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
span_status
Span status codes following OpenTelemetry conventions.
Definition span.h:34
@ ok
Operation completed successfully.
@ unset
Default status, span completed without explicit status.
void export_span(const span &s)
Export a completed span.
void register_span_processor(span_processor_callback callback)
Register a custom span processor.
@ trace_id
Sample based on trace ID ratio.
@ parent_based
Sample based on parent span's sampling decision.
void configure_tracing(const tracing_config &config)
Initialize the tracing system with configuration.
std::function< void(const span &)> span_processor_callback
Span processor callback type.
void shutdown_tracing()
Shutdown the tracing system.
std::mutex mutex
RAII span implementation for distributed tracing.
size_t max_export_batch_size
Maximum batch size for a single export @default 512.
std::string endpoint
Endpoint URL for OTLP exporter @default "http://localhost:4317" for gRPC, "http://localhost:4318" for...
Main configuration structure for tracing.
std::string zipkin_endpoint
Zipkin exporter endpoint @default "http://localhost:9411/api/v2/spans".
exporter_type exporter
Exporter type to use @default exporter_type::none.
std::string service_instance_id
Service instance ID (unique per instance) @default "" (auto-generated if empty)
sampler_type sampler
Sampler type to use @default sampler_type::always_on.
std::string service_version
Service version @default "".
std::string service_namespace
Service namespace @default "".
std::string jaeger_endpoint
Jaeger exporter endpoint @default "http://localhost:14268/api/traces".
double sample_rate
Sampling rate (0.0 to 1.0)
std::map< std::string, std::string > resource_attributes
Additional resource attributes.
otlp_config otlp
OTLP exporter configuration.
std::string service_name
Service name for trace identification @default "network_system".
batch_config batch
Batch export configuration.
bool debug
Enable debug output @default false.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.