Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
otlp_writer.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7#include <kcenon/common/patterns/result.h>
8
9#include <algorithm>
10#include <chrono>
11#include <ctime>
12#include <iomanip>
13#include <iostream>
14#include <sstream>
15#include <thread>
16
17#ifdef LOGGER_HAS_OTLP
18#include <opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h>
19#include <opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h>
20#include <opentelemetry/logs/provider.h>
21#include <opentelemetry/sdk/logs/logger_provider.h>
22#include <opentelemetry/sdk/logs/simple_log_record_processor.h>
23#include <opentelemetry/sdk/resource/resource.h>
24#include <opentelemetry/sdk/resource/semantic_conventions.h>
25#endif
26
27namespace kcenon::logger {
28
29#ifdef LOGGER_HAS_OTLP
33class otlp_writer::otel_impl {
34public:
35 std::shared_ptr<opentelemetry::logs::LoggerProvider> provider;
36 std::shared_ptr<opentelemetry::logs::Logger> logger;
37
38 otel_impl(const config& cfg) {
39 namespace logs_sdk = opentelemetry::sdk::logs;
40 namespace otlp_exporter = opentelemetry::exporter::otlp;
41 namespace resource = opentelemetry::sdk::resource;
42
43 // Build resource attributes
44 resource::ResourceAttributes resource_attrs;
45 resource_attrs.SetAttribute(
46 resource::SemanticConventions::kServiceName,
47 cfg.service_name.empty() ? "unknown_service" : cfg.service_name);
48
49 if (!cfg.service_version.empty()) {
50 resource_attrs.SetAttribute(
51 resource::SemanticConventions::kServiceVersion,
52 cfg.service_version);
53 }
54
55 if (!cfg.service_namespace.empty()) {
56 resource_attrs.SetAttribute(
57 resource::SemanticConventions::kServiceNamespace,
58 cfg.service_namespace);
59 }
60
61 if (!cfg.service_instance_id.empty()) {
62 resource_attrs.SetAttribute(
63 resource::SemanticConventions::kServiceInstanceId,
64 cfg.service_instance_id);
65 }
66
67 // Add custom attributes
68 for (const auto& [key, value] : cfg.resource_attributes) {
69 resource_attrs.SetAttribute(key, value);
70 }
71
72 auto resource_ptr = resource::Resource::Create(resource_attrs);
73
74 // Create exporter based on protocol
75 std::unique_ptr<opentelemetry::sdk::logs::LogRecordExporter> exporter;
76
77 if (cfg.protocol == protocol_type::http) {
78 otlp_exporter::OtlpHttpLogRecordExporterOptions http_opts;
79 http_opts.url = cfg.endpoint;
80 http_opts.timeout = cfg.timeout;
81 http_opts.ssl_insecure_skip_verify = !cfg.use_tls;
82
83 for (const auto& [key, value] : cfg.headers) {
84 http_opts.http_headers.insert({key, value});
85 }
86
87 exporter = std::make_unique<otlp_exporter::OtlpHttpLogRecordExporter>(http_opts);
88 } else {
89 otlp_exporter::OtlpGrpcLogRecordExporterOptions grpc_opts;
90 grpc_opts.endpoint = cfg.endpoint;
91 grpc_opts.timeout = cfg.timeout;
92 grpc_opts.use_ssl_credentials = cfg.use_tls;
93
94 for (const auto& [key, value] : cfg.headers) {
95 grpc_opts.metadata.insert({key, value});
96 }
97
98 exporter = std::make_unique<otlp_exporter::OtlpGrpcLogRecordExporter>(grpc_opts);
99 }
100
101 // Create processor and provider
102 auto processor = std::make_unique<logs_sdk::SimpleLogRecordProcessor>(
103 std::move(exporter));
104
105 provider = std::make_shared<logs_sdk::LoggerProvider>(
106 std::move(processor), resource_ptr);
107
108 logger = provider->GetLogger(cfg.service_name, "", "");
109 }
110};
111#endif
112
114 : config_(cfg) {
115
116#ifdef LOGGER_HAS_OTLP
117 otel_impl_ = std::make_unique<otel_impl>(cfg);
118#endif
119
120 // Start background export thread
121 running_.store(true);
122 export_thread_ = std::make_unique<std::thread>([this]() {
124 });
125}
126
128 // Stop background thread
129 running_.store(false);
130
131 // Wake up export thread
132 {
133 std::lock_guard<std::mutex> lock(queue_mutex_);
134 queue_cv_.notify_one();
135 }
136
137 // Wait for thread to finish
138 if (export_thread_ && export_thread_->joinable()) {
139 export_thread_->join();
140 }
141
142 // Export remaining logs
143 std::vector<log_entry> remaining;
144 {
145 std::lock_guard<std::mutex> lock(queue_mutex_);
146 while (!queue_.empty()) {
147 remaining.push_back(std::move(queue_.front()));
148 queue_.pop();
149 }
150 }
151
152 if (!remaining.empty()) {
153 export_batch(remaining);
154 }
155}
156
158 {
159 std::lock_guard<std::mutex> lock(queue_mutex_);
160
161 // Check queue size limit
162 if (queue_.size() >= config_.max_queue_size) {
163 stats_.logs_dropped.fetch_add(1, std::memory_order_relaxed);
164 return common::ok(); // Drop silently to avoid backpressure
165 }
166
167 // Create a copy of the entry since log_entry is move-only
168 if (entry.location) {
169 queue_.emplace(entry.level,
170 entry.message.to_string(),
171 entry.location->file.to_string(),
172 entry.location->line,
173 entry.location->function.to_string(),
174 entry.timestamp);
175 } else {
176 queue_.emplace(entry.level,
177 entry.message.to_string(),
178 entry.timestamp);
179 }
180
181 // Copy OTEL context to the queued entry
182 auto& queued_entry = queue_.back();
183 queued_entry.otel_ctx = entry.otel_ctx.has_value() ? entry.otel_ctx : otlp::otel_context_storage::get();
184
185 // Wake up export thread if batch size reached
186 if (queue_.size() >= config_.max_batch_size) {
187 queue_cv_.notify_one();
188 }
189 }
190
191 return common::ok();
192}
193
198
200 return healthy_.load(std::memory_order_acquire);
201}
202
204 return export_stats{
205 stats_.logs_exported.load(std::memory_order_relaxed),
206 stats_.logs_dropped.load(std::memory_order_relaxed),
207 stats_.export_success.load(std::memory_order_relaxed),
208 stats_.export_failures.load(std::memory_order_relaxed),
209 stats_.retries.load(std::memory_order_relaxed),
212 };
213}
214
216 std::vector<log_entry> batch;
217
218 {
219 std::lock_guard<std::mutex> lock(queue_mutex_);
220 while (!queue_.empty()) {
221 batch.push_back(std::move(queue_.front()));
222 queue_.pop();
223 }
224 }
225
226 if (!batch.empty()) {
227 export_batch(batch);
228 }
229}
230
232 while (running_.load(std::memory_order_acquire)) {
233 std::vector<log_entry> batch;
234
235 {
236 std::unique_lock<std::mutex> lock(queue_mutex_);
237
238 // Wait for batch size or timeout
239 queue_cv_.wait_for(lock, config_.flush_interval, [this] {
240 return !running_.load(std::memory_order_acquire) ||
241 queue_.size() >= config_.max_batch_size;
242 });
243
244 if (!running_.load(std::memory_order_acquire) && queue_.empty()) {
245 break;
246 }
247
248 // Collect batch
249 while (!queue_.empty() && batch.size() < config_.max_batch_size) {
250 batch.push_back(std::move(queue_.front()));
251 queue_.pop();
252 }
253 }
254
255 // Export batch
256 if (!batch.empty()) {
257 export_batch(batch);
258 }
259 }
260}
261
262bool otlp_writer::export_batch(const std::vector<log_entry>& batch) {
263 bool success = false;
264 std::size_t retries = 0;
265 auto delay = config_.retry_delay;
266
267 while (!success && retries <= config_.max_retries) {
268 if (retries > 0) {
269 stats_.retries.fetch_add(1, std::memory_order_relaxed);
270 std::this_thread::sleep_for(delay);
271 delay *= 2; // Exponential backoff
272 }
273
274#ifdef LOGGER_HAS_OTLP
275 success = export_with_otel_sdk(batch);
276#else
277 success = export_with_http(batch);
278#endif
279
280 ++retries;
281 }
282
283 if (success) {
284 stats_.logs_exported.fetch_add(batch.size(), std::memory_order_relaxed);
285 stats_.export_success.fetch_add(1, std::memory_order_relaxed);
286 stats_.last_export = std::chrono::system_clock::now();
287 healthy_.store(true, std::memory_order_release);
288 } else {
289 stats_.logs_dropped.fetch_add(batch.size(), std::memory_order_relaxed);
290 stats_.export_failures.fetch_add(1, std::memory_order_relaxed);
291 stats_.last_error = std::chrono::system_clock::now();
292 healthy_.store(false, std::memory_order_release);
293 }
294
295 return success;
296}
297
298int otlp_writer::to_otlp_severity(common::interfaces::log_level level) {
299 // OTLP Severity Numbers (per OpenTelemetry specification)
300 // https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
301 switch (level) {
302 case common::interfaces::log_level::trace:
303 return 1; // TRACE
304 case common::interfaces::log_level::debug:
305 return 5; // DEBUG
306 case common::interfaces::log_level::info:
307 return 9; // INFO
308 case common::interfaces::log_level::warn:
309 return 13; // WARN
310 case common::interfaces::log_level::error:
311 return 17; // ERROR
312 case common::interfaces::log_level::fatal:
313 return 21; // FATAL
314 default:
315 return 9; // INFO as default
316 }
317}
318
319#ifdef LOGGER_HAS_OTLP
320bool otlp_writer::export_with_otel_sdk(const std::vector<log_entry>& batch) {
321 if (!otel_impl_ || !otel_impl_->logger) {
322 return false;
323 }
324
325 try {
326 for (const auto& entry : batch) {
327 // Convert log_level from logger_system to common::interfaces
328 auto level = static_cast<common::interfaces::log_level>(static_cast<int>(entry.level));
329
330 // Create log record through OpenTelemetry SDK
331 auto severity = static_cast<opentelemetry::logs::Severity>(
332 to_otlp_severity(level));
333
334 // Build attributes
335 std::map<std::string, opentelemetry::common::AttributeValue> attrs;
336
337 if (entry.location) {
338 std::string file = entry.location->file.to_string();
339 std::string function = entry.location->function.to_string();
340
341 if (!file.empty()) {
342 attrs["code.filepath"] = file;
343 }
344 if (entry.location->line > 0) {
345 attrs["code.lineno"] = static_cast<int64_t>(entry.location->line);
346 }
347 if (!function.empty()) {
348 attrs["code.function"] = function;
349 }
350 }
351
352 // Add OTEL context if present
353 if (entry.otel_ctx) {
354 attrs["trace_id"] = entry.otel_ctx->trace_id;
355 attrs["span_id"] = entry.otel_ctx->span_id;
356 if (!entry.otel_ctx->trace_flags.empty()) {
357 attrs["trace_flags"] = entry.otel_ctx->trace_flags;
358 }
359 }
360
361 // Emit log record
362 otel_impl_->logger->EmitLogRecord(
363 severity,
364 entry.message.to_string(),
365 attrs,
366 entry.timestamp
367 );
368 }
369
370 return true;
371 } catch (const std::exception&) {
372 return false;
373 }
374}
375#else
376bool otlp_writer::export_with_http(const std::vector<log_entry>& batch) {
377 // Fallback implementation without OpenTelemetry SDK
378 // This is a minimal HTTP export implementation for demonstration
379 // In production, you would use a proper HTTP client library
380
381 // Build JSON payload following OTLP/HTTP JSON format
382 std::ostringstream json;
383 json << R"({"resourceLogs":[{"resource":{"attributes":[)";
384
385 // Add service name attribute
386 json << R"({"key":"service.name","value":{"stringValue":")"
387 << config_.service_name << R"("}}]},)";
388
389 json << R"("scopeLogs":[{"logRecords":[)";
390
391 bool first = true;
392 for (const auto& entry : batch) {
393 if (!first) json << ",";
394 first = false;
395
396 // Convert log_level from logger_system to common::interfaces
397 auto level = static_cast<common::interfaces::log_level>(static_cast<int>(entry.level));
398
399 // Convert timestamp to nanoseconds
400 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
401 entry.timestamp.time_since_epoch()).count();
402
403 json << R"({"timeUnixNano":")" << ns << R"(",)";
404 json << R"("severityNumber":)" << to_otlp_severity(level) << ",";
405 json << R"("body":{"stringValue":")" << escape_json(entry.message.to_string()) << R"("},)";
406
407 // Add attributes
408 json << R"("attributes":[)";
409 bool first_attr = true;
410
411 if (entry.location) {
412 std::string file = entry.location->file.to_string();
413
414 if (!file.empty()) {
415 if (!first_attr) json << ",";
416 first_attr = false;
417 json << R"({"key":"code.filepath","value":{"stringValue":")"
418 << escape_json(file) << R"("}})";
419 }
420
421 if (entry.location->line > 0) {
422 if (!first_attr) json << ",";
423 first_attr = false;
424 json << R"({"key":"code.lineno","value":{"intValue":")"
425 << entry.location->line << R"("}})";
426 }
427 }
428
429 if (entry.otel_ctx) {
430 if (!first_attr) json << ",";
431 first_attr = false;
432 json << R"({"key":"trace_id","value":{"stringValue":")"
433 << entry.otel_ctx->trace_id << R"("}},)";
434 json << R"({"key":"span_id","value":{"stringValue":")"
435 << entry.otel_ctx->span_id << R"("}})";
436 }
437
438 json << "]}";
439 }
440
441 json << "]}]}]}";
442
443 // OTLP HTTP transport is not available (LOGGER_HAS_OTLP not defined).
444 // The JSON payload was built but cannot be sent without an HTTP client.
445 // Return false so callers know export did not succeed.
446 static bool warned = false;
447 if (!warned)
448 {
449 std::cerr << "[logger_system] WARNING: OTLP export configured but no HTTP transport "
450 << "available. Build with LOGGER_ENABLE_OTLP=ON and OpenTelemetry SDK, "
451 << "or provide an HTTP client library. Log data is NOT being exported.\n";
452 warned = true;
453 }
454 return false;
455}
456
457std::string otlp_writer::escape_json(const std::string& str) {
458 std::ostringstream result;
459 for (char c : str) {
460 switch (c) {
461 case '"': result << "\\\""; break;
462 case '\\': result << "\\\\"; break;
463 case '\b': result << "\\b"; break;
464 case '\f': result << "\\f"; break;
465 case '\n': result << "\\n"; break;
466 case '\r': result << "\\r"; break;
467 case '\t': result << "\\t"; break;
468 default:
469 if (static_cast<unsigned char>(c) < 0x20) {
470 result << "\\u" << std::hex << std::setfill('0')
471 << std::setw(4) << static_cast<int>(c);
472 } else {
473 result << c;
474 }
475 }
476 }
477 return result.str();
478}
479#endif
480
481} // namespace kcenon::logger
static std::optional< otel_context > get()
Get the OpenTelemetry context for the current thread.
export_stats get_stats() const
Get export statistics.
~otlp_writer() override
Destructor - flushes remaining logs.
bool export_with_http(const std::vector< log_entry > &batch)
std::unique_ptr< std::thread > export_thread_
static int to_otlp_severity(common::interfaces::log_level level)
std::atomic< bool > running_
std::atomic< bool > healthy_
void force_export()
Force immediate export of current batch.
static std::string escape_json(const std::string &str)
std::condition_variable queue_cv_
common::VoidResult flush() override
Flush pending logs.
std::queue< log_entry > queue_
bool is_healthy() const override
Check if writer is healthy.
bool export_batch(const std::vector< log_entry > &batch)
common::VoidResult write(const log_entry &entry) override
Write a log entry with OTEL context.
otlp_writer(const config &cfg)
Constructor with configuration.
std::string to_string() const
Convert to std::string.
VoidResult ok()
@ json
JSON structured format.
OpenTelemetry context structure for trace correlation kcenon.
OpenTelemetry Protocol (OTLP) log writer for observability kcenon.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155
std::optional< source_location > location
Optional source code location information.
Definition log_entry.h:183
std::optional< otlp::otel_context > otel_ctx
Optional OpenTelemetry context for trace correlation.
Definition log_entry.h:204
log_level level
Severity level of the log message.
Definition log_entry.h:162
small_string_256 message
The actual log message.
Definition log_entry.h:169
std::chrono::system_clock::time_point timestamp
Timestamp when the log entry was created.
Definition log_entry.h:175
Configuration for OTLP writer.
Definition otlp_writer.h:89
std::string service_name
Service name (required for resource attributes)
std::size_t max_retries
Number of retry attempts on failure.
std::chrono::milliseconds flush_interval
Maximum time to wait before flushing batch.
std::size_t max_queue_size
Maximum queue size (logs dropped if exceeded)
std::size_t max_batch_size
Maximum batch size before forced flush.
std::chrono::milliseconds retry_delay
Initial retry delay (doubled on each retry)
Statistics snapshot for OTLP export (copyable)
std::chrono::system_clock::time_point last_error
std::chrono::system_clock::time_point last_export