7#include <kcenon/common/patterns/result.h>
18#include <opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h>
19#include <opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h>
20#include <opentelemetry/logs/provider.h>
21#include <opentelemetry/sdk/logs/logger_provider.h>
22#include <opentelemetry/sdk/logs/simple_log_record_processor.h>
23#include <opentelemetry/sdk/resource/resource.h>
24#include <opentelemetry/sdk/resource/semantic_conventions.h>
33class otlp_writer::otel_impl {
35 std::shared_ptr<opentelemetry::logs::LoggerProvider> provider;
36 std::shared_ptr<opentelemetry::logs::Logger>
logger;
38 otel_impl(
const config& cfg) {
39 namespace logs_sdk = opentelemetry::sdk::logs;
40 namespace otlp_exporter = opentelemetry::exporter::otlp;
41 namespace resource = opentelemetry::sdk::resource;
44 resource::ResourceAttributes resource_attrs;
45 resource_attrs.SetAttribute(
46 resource::SemanticConventions::kServiceName,
47 cfg.service_name.empty() ?
"unknown_service" : cfg.service_name);
49 if (!cfg.service_version.empty()) {
50 resource_attrs.SetAttribute(
51 resource::SemanticConventions::kServiceVersion,
55 if (!cfg.service_namespace.empty()) {
56 resource_attrs.SetAttribute(
57 resource::SemanticConventions::kServiceNamespace,
58 cfg.service_namespace);
61 if (!cfg.service_instance_id.empty()) {
62 resource_attrs.SetAttribute(
63 resource::SemanticConventions::kServiceInstanceId,
64 cfg.service_instance_id);
68 for (
const auto& [key, value] : cfg.resource_attributes) {
69 resource_attrs.SetAttribute(key, value);
72 auto resource_ptr = resource::Resource::Create(resource_attrs);
75 std::unique_ptr<opentelemetry::sdk::logs::LogRecordExporter> exporter;
77 if (cfg.protocol == protocol_type::http) {
78 otlp_exporter::OtlpHttpLogRecordExporterOptions http_opts;
79 http_opts.url = cfg.endpoint;
80 http_opts.timeout = cfg.timeout;
81 http_opts.ssl_insecure_skip_verify = !cfg.use_tls;
83 for (
const auto& [key, value] : cfg.headers) {
84 http_opts.http_headers.insert({key, value});
87 exporter = std::make_unique<otlp_exporter::OtlpHttpLogRecordExporter>(http_opts);
89 otlp_exporter::OtlpGrpcLogRecordExporterOptions grpc_opts;
90 grpc_opts.endpoint = cfg.endpoint;
91 grpc_opts.timeout = cfg.timeout;
92 grpc_opts.use_ssl_credentials = cfg.use_tls;
94 for (
const auto& [key, value] : cfg.headers) {
95 grpc_opts.metadata.insert({key, value});
98 exporter = std::make_unique<otlp_exporter::OtlpGrpcLogRecordExporter>(grpc_opts);
102 auto processor = std::make_unique<logs_sdk::SimpleLogRecordProcessor>(
103 std::move(exporter));
105 provider = std::make_shared<logs_sdk::LoggerProvider>(
106 std::move(processor), resource_ptr);
108 logger = provider->GetLogger(cfg.service_name,
"",
"");
116#ifdef LOGGER_HAS_OTLP
117 otel_impl_ = std::make_unique<otel_impl>(cfg);
143 std::vector<log_entry> remaining;
147 remaining.push_back(std::move(
queue_.front()));
152 if (!remaining.empty()) {
173 entry.
location->function.to_string(),
182 auto& queued_entry =
queue_.back();
200 return healthy_.load(std::memory_order_acquire);
216 std::vector<log_entry> batch;
221 batch.push_back(std::move(
queue_.front()));
226 if (!batch.empty()) {
232 while (
running_.load(std::memory_order_acquire)) {
233 std::vector<log_entry> batch;
240 return !running_.load(std::memory_order_acquire) ||
241 queue_.size() >= config_.max_batch_size;
244 if (!
running_.load(std::memory_order_acquire) &&
queue_.empty()) {
250 batch.push_back(std::move(
queue_.front()));
256 if (!batch.empty()) {
264 std::size_t retries = 0;
270 std::this_thread::sleep_for(delay);
274#ifdef LOGGER_HAS_OTLP
275 success = export_with_otel_sdk(batch);
287 healthy_.store(
true, std::memory_order_release);
292 healthy_.store(
false, std::memory_order_release);
302 case common::interfaces::log_level::trace:
304 case common::interfaces::log_level::debug:
306 case common::interfaces::log_level::info:
308 case common::interfaces::log_level::warn:
310 case common::interfaces::log_level::error:
312 case common::interfaces::log_level::fatal:
319#ifdef LOGGER_HAS_OTLP
320bool otlp_writer::export_with_otel_sdk(
const std::vector<log_entry>& batch) {
321 if (!otel_impl_ || !otel_impl_->logger) {
326 for (
const auto& entry : batch) {
328 auto level =
static_cast<common::interfaces::log_level
>(
static_cast<int>(entry.level));
331 auto severity =
static_cast<opentelemetry::logs::Severity
>(
335 std::map<std::string, opentelemetry::common::AttributeValue> attrs;
337 if (entry.location) {
338 std::string file = entry.location->file.to_string();
339 std::string function = entry.location->function.to_string();
342 attrs[
"code.filepath"] = file;
344 if (entry.location->line > 0) {
345 attrs[
"code.lineno"] =
static_cast<int64_t
>(entry.location->line);
347 if (!function.empty()) {
348 attrs[
"code.function"] = function;
353 if (entry.otel_ctx) {
354 attrs[
"trace_id"] = entry.otel_ctx->trace_id;
355 attrs[
"span_id"] = entry.otel_ctx->span_id;
356 if (!entry.otel_ctx->trace_flags.empty()) {
357 attrs[
"trace_flags"] = entry.otel_ctx->trace_flags;
362 otel_impl_->logger->EmitLogRecord(
364 entry.message.to_string(),
371 }
catch (
const std::exception&) {
382 std::ostringstream
json;
383 json << R
"({"resourceLogs":[{"resource":{"attributes":[)";
386 json << R
"({"key":"service.name","value":{"stringValue":")"
389 json << R"("scopeLogs":[{"logRecords":[)";
392 for (
const auto& entry : batch) {
393 if (!first)
json <<
",";
397 auto level =
static_cast<common::interfaces::log_level
>(
static_cast<int>(entry.level));
400 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
401 entry.timestamp.time_since_epoch()).count();
403 json << R
"({"timeUnixNano":")" << ns << R"(",)";
405 json << R
"("body":{"stringValue":")" << escape_json(entry.message.to_string()) << R"("},)";
408 json << R
"("attributes":[)";
409 bool first_attr =
true;
411 if (entry.location) {
412 std::string file = entry.location->file.to_string();
415 if (!first_attr)
json <<
",";
417 json << R
"({"key":"code.filepath","value":{"stringValue":")"
421 if (entry.location->line > 0) {
422 if (!first_attr)
json <<
",";
424 json << R
"({"key":"code.lineno","value":{"intValue":")"
425 << entry.location->line << R"("}})";
429 if (entry.otel_ctx) {
430 if (!first_attr)
json <<
",";
432 json << R
"({"key":"trace_id","value":{"stringValue":")"
433 << entry.otel_ctx->trace_id << R"("}},)";
434 json << R"({"key":"span_id","value":{"stringValue":")"
435 << entry.otel_ctx->span_id << R"("}})";
446 static bool warned =
false;
449 std::cerr <<
"[logger_system] WARNING: OTLP export configured but no HTTP transport "
450 <<
"available. Build with LOGGER_ENABLE_OTLP=ON and OpenTelemetry SDK, "
451 <<
"or provide an HTTP client library. Log data is NOT being exported.\n";
458 std::ostringstream
result;
461 case '"':
result <<
"\\\"";
break;
462 case '\\':
result <<
"\\\\";
break;
463 case '\b':
result <<
"\\b";
break;
464 case '\f':
result <<
"\\f";
break;
465 case '\n':
result <<
"\\n";
break;
466 case '\r':
result <<
"\\r";
break;
467 case '\t':
result <<
"\\t";
break;
469 if (
static_cast<unsigned char>(c) < 0x20) {
470 result <<
"\\u" << std::hex << std::setfill(
'0')
471 << std::setw(4) <<
static_cast<int>(c);
static std::optional< otel_context > get()
Get the OpenTelemetry context for the current thread.
export_stats get_stats() const
Get export statistics.
void export_thread_func()
~otlp_writer() override
Destructor - flushes remaining logs.
bool export_with_http(const std::vector< log_entry > &batch)
std::unique_ptr< std::thread > export_thread_
static int to_otlp_severity(common::interfaces::log_level level)
std::atomic< bool > running_
std::atomic< bool > healthy_
void force_export()
Force immediate export of current batch.
static std::string escape_json(const std::string &str)
std::condition_variable queue_cv_
common::VoidResult flush() override
Flush pending logs.
std::queue< log_entry > queue_
bool is_healthy() const override
Check if writer is healthy.
bool export_batch(const std::vector< log_entry > &batch)
common::VoidResult write(const log_entry &entry) override
Write a log entry with OTEL context.
otlp_writer(const config &cfg)
Constructor with configuration.
std::string to_string() const
Convert to std::string.
@ json
JSON structured format.
OpenTelemetry context structure for trace correlation kcenon.
OpenTelemetry Protocol (OTLP) log writer for observability kcenon.
Represents a single log entry with all associated metadata.
std::optional< source_location > location
Optional source code location information.
std::optional< otlp::otel_context > otel_ctx
Optional OpenTelemetry context for trace correlation.
log_level level
Severity level of the log message.
small_string_256 message
The actual log message.
std::chrono::system_clock::time_point timestamp
Timestamp when the log entry was created.
Configuration for OTLP writer.
std::string service_name
Service name (required for resource attributes)
std::size_t max_retries
Number of retry attempts on failure.
std::chrono::milliseconds flush_interval
Maximum time to wait before flushing batch.
std::size_t max_queue_size
Maximum queue size (logs dropped if exceeded)
std::size_t max_batch_size
Maximum batch size before forced flush.
std::chrono::milliseconds retry_delay
Initial retry delay (doubled on each retry)
Statistics snapshot for OTLP export (copyable)
std::atomic< uint64_t > export_failures
std::chrono::system_clock::time_point last_error
std::atomic< uint64_t > logs_dropped
std::chrono::system_clock::time_point last_export
std::atomic< uint64_t > logs_exported
std::atomic< uint64_t > export_success
std::atomic< uint64_t > retries