Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
opentelemetry_adapter.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
18#include "../core/error_codes.h"
22#include <string>
23#include <vector>
24#include <unordered_map>
25#include <chrono>
26#include <memory>
27#include <functional>
28#include <mutex>
29
30namespace kcenon { namespace monitoring {
31
46
51enum class otel_span_kind {
52 unspecified = 0,
53 internal = 1,
54 server = 2,
55 client = 3,
56 producer = 4,
57 consumer = 5
58};
59
64enum class otel_status_code {
65 unset = 0,
66 ok = 1,
67 error = 2
68};
69
75 std::string key;
76 std::string value;
77
78 otel_attribute() = default;
79 otel_attribute(const std::string& k, const std::string& v) : key(k), value(v) {}
80
81 bool operator==(const otel_attribute& other) const {
82 return key == other.key && value == other.value;
83 }
84};
85
92 std::vector<otel_attribute> attributes;
93
94 void add_attribute(const std::string& key, const std::string& value) {
95 attributes.emplace_back(key, value);
96 }
97
98 common::Result<std::string> get_attribute(const std::string& key) const {
99 for (const auto& attr : attributes) {
100 if (attr.key == key) {
101 return common::Result<std::string>::ok(attr.value);
102 }
103 }
104 return common::Result<std::string>::err(error_info(monitoring_error_code::not_found,
105 "Attribute not found: " + key, "monitoring_system").to_common_error());
106 }
107};
108
114 std::string trace_id;
115 std::string span_id;
116 std::string trace_flags;
117 std::string trace_state;
118 bool is_valid{false};
119 bool is_remote{false};
120
121 otel_span_context() = default;
122 otel_span_context(const std::string& tid, const std::string& sid)
123 : trace_id(tid), span_id(sid), is_valid(true) {}
124};
125
133 std::string name;
136 std::string status_message;
137 std::chrono::system_clock::time_point start_time;
138 std::chrono::system_clock::time_point end_time;
139 std::vector<otel_attribute> attributes;
140 std::vector<std::string> events;
142
143 void add_attribute(const std::string& key, const std::string& value) {
144 attributes.emplace_back(key, value);
145 }
146
147 void add_event(const std::string& event) {
148 events.push_back(event);
149 }
150
151 bool is_ended() const {
152 return end_time != std::chrono::system_clock::time_point{};
153 }
154
155 std::chrono::microseconds duration() const {
156 if (!is_ended()) {
157 return std::chrono::microseconds{0};
158 }
159 return std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
160 }
161};
162
168 std::string name;
169 std::string description;
170 std::string unit;
171 double value{0.0};
172 std::vector<otel_attribute> attributes;
173 std::chrono::system_clock::time_point timestamp;
175
176 void add_attribute(const std::string& key, const std::string& attribute_value) {
177 attributes.emplace_back(key, attribute_value);
178 }
179};
180
186public:
188 : resource_(resource) {}
189
193 common::Result<otel_span_data> convert_span(const trace_span& span) {
194 otel_span_data otel_span;
195
196 // Convert basic span information
197 otel_span.name = span.operation_name;
198 otel_span.context = otel_span_context(span.trace_id, span.span_id);
199
200 if (!span.parent_span_id.empty()) {
202 }
203
204 otel_span.start_time = span.start_time;
205 otel_span.end_time = span.end_time;
206 otel_span.resource = resource_;
207
208 // First determine span kind from tags
209 auto kind_it = span.tags.find("span.kind");
210 if (kind_it != span.tags.end()) {
211 otel_span.kind = parse_span_kind(kind_it->second);
212 }
213
214 // Set status from tags
215 auto error_it = span.tags.find("error");
216 if (error_it != span.tags.end() && error_it->second == "true") {
218 auto error_msg_it = span.tags.find("error.message");
219 if (error_msg_it != span.tags.end()) {
220 otel_span.status_message = error_msg_it->second;
221 }
222 } else {
224 }
225
226 // Convert tags to attributes, excluding special OpenTelemetry fields
227 for (const auto& [key, value] : span.tags) {
228 // Skip special fields that are handled above
229 if (key != "span.kind" && key != "error" && key != "error.message") {
230 otel_span.add_attribute(key, value);
231 }
232 }
233
234 return common::Result<otel_span_data>::ok(std::move(otel_span));
235 }
236
240 common::Result<std::vector<otel_span_data>> convert_spans(const std::vector<trace_span>& spans) {
241 std::vector<otel_span_data> otel_spans;
242 otel_spans.reserve(spans.size());
243
244 for (const auto& span : spans) {
245 auto convert_result = convert_span(span);
246 if (convert_result.is_err()) {
247 return common::Result<std::vector<otel_span_data>>::err(error_info(monitoring_error_code::processing_failed,
248 "Failed to convert span: " + convert_result.error().message).to_common_error());
249 }
250 otel_spans.push_back(convert_result.value());
251 }
252
253 return common::Result<std::vector<otel_span_data>>::ok(std::move(otel_spans));
254 }
255
259 common::Result<otel_span_context> create_context(const trace_context& context) {
260 return common::Result<otel_span_context>::ok(otel_span_context(context.trace_id, context.span_id));
261 }
262
263private:
264 otel_span_kind parse_span_kind(const std::string& kind_str) {
265 if (kind_str == "server") return otel_span_kind::server;
266 if (kind_str == "client") return otel_span_kind::client;
267 if (kind_str == "producer") return otel_span_kind::producer;
268 if (kind_str == "consumer") return otel_span_kind::consumer;
269 if (kind_str == "internal") return otel_span_kind::internal;
271 }
272
274};
275
281public:
283 : resource_(resource) {}
284
288 common::Result<std::vector<otel_metric_data>> convert_metrics(const metrics_snapshot& snapshot) {
289 std::vector<otel_metric_data> otel_metrics;
290
291 for (const auto& metric_value : snapshot.metrics) {
296 metric.resource = resource_;
297
298 // Add tags as attributes
299 for (const auto& [tag_name, tag_value] : metric_value.tags) {
300 metric.add_attribute(tag_name, tag_value);
301 }
302
303 // Add common resource attributes
304 metric.add_attribute("service.name", resource_.get_attribute("service.name").value_or("unknown"));
305 metric.add_attribute("service.version", resource_.get_attribute("service.version").value_or("unknown"));
306
307 otel_metrics.push_back(std::move(metric));
308 }
309
310 return common::Result<std::vector<otel_metric_data>>::ok(std::move(otel_metrics));
311 }
312
316 common::Result<std::vector<otel_metric_data>> convert_monitoring_data(const monitoring_data& data) {
317 std::vector<otel_metric_data> otel_metrics;
318
319 for (const auto& [name, value] : data.get_metrics()) {
321 metric.name = name;
322 metric.value = value;
324 metric.resource = resource_;
325
326 // Add tags as attributes
327 for (const auto& [tag_name, tag_value] : data.get_tags()) {
328 metric.add_attribute(tag_name, tag_value);
329 }
330
331 otel_metrics.push_back(std::move(metric));
332 }
333
334 return common::Result<std::vector<otel_metric_data>>::ok(std::move(otel_metrics));
335 }
336
337private:
339};
340
346 std::string endpoint{"http://localhost:4317"};
347 std::string protocol{"grpc"}; // grpc, http/protobuf, http/json
348 std::chrono::milliseconds timeout{30000};
349 std::chrono::milliseconds export_interval{5000};
350 std::size_t max_batch_size{512};
351 std::unordered_map<std::string, std::string> headers;
353 std::string compression_type{"gzip"};
354
355 common::VoidResult validate() const {
356 if (endpoint.empty()) {
357 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
358 "Exporter endpoint cannot be empty", "monitoring_system").to_common_error());
359 }
360 if (protocol != "grpc" && protocol != "http/protobuf" && protocol != "http/json") {
361 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
362 "Invalid protocol: " + protocol, "monitoring_system").to_common_error());
363 }
364 if (timeout <= std::chrono::milliseconds(0)) {
365 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
366 "Timeout must be positive", "monitoring_system").to_common_error());
367 }
368 if (max_batch_size == 0) {
369 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
370 "Batch size must be positive", "monitoring_system").to_common_error());
371 }
372 return common::ok();
373 }
374};
375
381public:
383 : resource_(resource)
384 , tracer_adapter_(resource)
385 , metrics_adapter_(resource) {}
386
390 common::VoidResult initialize() {
391 std::lock_guard<std::mutex> lock(mutex_);
392 if (initialized_) {
393 return common::VoidResult::err(error_info(monitoring_error_code::already_exists,
394 "Compatibility layer already initialized", "monitoring_system").to_common_error());
395 }
396
397 initialized_ = true;
398 return common::ok();
399 }
400
404 common::VoidResult shutdown() {
405 std::lock_guard<std::mutex> lock(mutex_);
406 if (!initialized_) {
407 return common::ok();
408 }
409
410 // Flush any pending data without re-locking
411 // In a real implementation, this would send data to OpenTelemetry collectors
412 pending_spans_.clear();
413 pending_metrics_.clear();
414
415 initialized_ = false;
416 return common::ok();
417 }
418
422 common::VoidResult export_spans(const std::vector<trace_span>& spans) {
423 if (!initialized_) {
424 return common::VoidResult::err(error_info(monitoring_error_code::invalid_state,
425 "Compatibility layer not initialized", "monitoring_system").to_common_error());
426 }
427
428 auto convert_result = tracer_adapter_.convert_spans(spans);
429 if (convert_result.is_err()) {
430 return common::VoidResult::err(error_info(monitoring_error_code::processing_failed,
431 "Failed to convert spans: " + convert_result.error().message).to_common_error());
432 }
433
434 // Store converted spans for batching
435 std::lock_guard<std::mutex> lock(mutex_);
436 const auto& otel_spans = convert_result.value();
437 pending_spans_.insert(pending_spans_.end(), otel_spans.begin(), otel_spans.end());
438
439 return common::ok();
440 }
441
445 common::VoidResult export_metrics(const monitoring_data& data) {
446 if (!initialized_) {
447 return common::VoidResult::err(error_info(monitoring_error_code::invalid_state,
448 "Compatibility layer not initialized", "monitoring_system").to_common_error());
449 }
450
451 auto convert_result = metrics_adapter_.convert_monitoring_data(data);
452 if (convert_result.is_err()) {
453 return common::VoidResult::err(error_info(monitoring_error_code::processing_failed,
454 "Failed to convert metrics: " + convert_result.error().message).to_common_error());
455 }
456
457 // Store converted metrics for batching
458 std::lock_guard<std::mutex> lock(mutex_);
459 const auto& otel_metrics = convert_result.value();
460 pending_metrics_.insert(pending_metrics_.end(), otel_metrics.begin(), otel_metrics.end());
461
462 return common::ok();
463 }
464
468 common::VoidResult flush() {
469 std::lock_guard<std::mutex> lock(mutex_);
470
471 // In a real implementation, this would send data to OpenTelemetry collectors
472 // For now, we'll just clear the pending data
473 pending_spans_.clear();
474 pending_metrics_.clear();
475
476 return common::ok();
477 }
478
483 std::size_t spans_exported{0};
484 std::size_t metrics_exported{0};
485 std::size_t pending_spans{0};
486 std::size_t pending_metrics{0};
487 std::chrono::system_clock::time_point last_export;
488 std::size_t export_errors{0};
489 };
490
492 std::lock_guard<std::mutex> lock(mutex_);
494 stats.pending_spans = pending_spans_.size();
495 stats.pending_metrics = pending_metrics_.size();
500 return stats;
501 }
502
507 return resource_;
508 }
509
510private:
514
515 mutable std::mutex mutex_;
516 bool initialized_{false};
517
518 std::vector<otel_span_data> pending_spans_;
519 std::vector<otel_metric_data> pending_metrics_;
520
521 std::size_t spans_exported_{0};
522 std::size_t metrics_exported_{0};
523 std::chrono::system_clock::time_point last_export_;
524 std::size_t export_errors_{0};
525};
526
527// Factory functions
528
532inline otel_resource create_service_resource(const std::string& service_name,
533 const std::string& service_version = "1.0.0",
534 const std::string& service_namespace = "") {
535 otel_resource resource;
537 resource.add_attribute("service.name", service_name);
538 resource.add_attribute("service.version", service_version);
539 if (!service_namespace.empty()) {
540 resource.add_attribute("service.namespace", service_namespace);
541 }
542 resource.add_attribute("telemetry.sdk.name", "monitoring_system");
543 resource.add_attribute("telemetry.sdk.version", "0.5.0");
544 resource.add_attribute("telemetry.sdk.language", "cpp");
545
546 return resource;
547}
548
552inline std::unique_ptr<opentelemetry_compatibility_layer>
554 return std::make_unique<opentelemetry_compatibility_layer>(resource);
555}
556
560inline std::unique_ptr<opentelemetry_compatibility_layer>
561create_opentelemetry_compatibility_layer(const std::string& service_name,
562 const std::string& service_version = "1.0.0") {
563 auto resource = create_service_resource(service_name, service_version);
565}
566
567} } // namespace kcenon::monitoring
const otel_resource & get_resource() const
Get resource information.
common::VoidResult initialize()
Initialize the compatibility layer.
common::VoidResult export_metrics(const monitoring_data &data)
Export metrics using OpenTelemetry format.
common::VoidResult flush()
Flush pending data to exporters.
common::VoidResult shutdown()
Shutdown the compatibility layer.
common::VoidResult export_spans(const std::vector< trace_span > &spans)
Export spans using OpenTelemetry format.
Adapter for converting monitoring system metrics to OpenTelemetry format.
common::Result< std::vector< otel_metric_data > > convert_metrics(const metrics_snapshot &snapshot)
Convert metrics snapshot to OpenTelemetry metric data.
common::Result< std::vector< otel_metric_data > > convert_monitoring_data(const monitoring_data &data)
Convert monitoring data to OpenTelemetry metric data.
Adapter for converting monitoring system traces to OpenTelemetry format.
opentelemetry_tracer_adapter(const otel_resource &resource)
otel_span_kind parse_span_kind(const std::string &kind_str)
common::Result< otel_span_data > convert_span(const trace_span &span)
Convert internal span to OpenTelemetry span data.
common::Result< otel_span_context > create_context(const trace_context &context)
Create OpenTelemetry context from internal trace context.
common::Result< std::vector< otel_span_data > > convert_spans(const std::vector< trace_span > &spans)
Convert multiple spans to OpenTelemetry format.
Distributed tracing implementation for monitoring system.
Monitoring system specific error codes.
Interface for components that expose monitoring metrics.
Core monitoring system interface definitions.
std::unique_ptr< opentelemetry_compatibility_layer > create_opentelemetry_compatibility_layer(const otel_resource &resource)
Create OpenTelemetry compatibility layer.
otel_resource_type
OpenTelemetry resource types.
otel_resource create_service_resource(const std::string &service_name, const std::string &service_version="1.0.0", const std::string &service_namespace="")
Create OpenTelemetry resource with service information.
otel_status_code
OpenTelemetry status codes.
otel_span_kind
OpenTelemetry span kinds.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Represents a single metric value with metadata.
std::chrono::system_clock::time_point timestamp
std::unordered_map< std::string, std::string > tags
Basic metric structure for interface compatibility.
std::chrono::system_clock::time_point timestamp
std::variant< double, int64_t, std::string > value
Complete snapshot of metrics at a point in time.
std::vector< metric_value > metrics
Container for monitoring metrics from a component.
std::chrono::system_clock::time_point get_timestamp() const
Get the timestamp.
const tag_map & get_tags() const
Get all tags.
const metric_map & get_metrics() const
Get all metrics.
Configuration for OpenTelemetry exporters.
std::unordered_map< std::string, std::string > headers
OpenTelemetry attribute representation.
bool operator==(const otel_attribute &other) const
otel_attribute(const std::string &k, const std::string &v)
OpenTelemetry metric data representation.
void add_attribute(const std::string &key, const std::string &attribute_value)
std::vector< otel_attribute > attributes
std::chrono::system_clock::time_point timestamp
OpenTelemetry resource representation.
common::Result< std::string > get_attribute(const std::string &key) const
std::vector< otel_attribute > attributes
void add_attribute(const std::string &key, const std::string &value)
otel_span_context(const std::string &tid, const std::string &sid)
OpenTelemetry span data representation.
std::chrono::microseconds duration() const
std::chrono::system_clock::time_point start_time
void add_attribute(const std::string &key, const std::string &value)
std::vector< otel_attribute > attributes
std::chrono::system_clock::time_point end_time
void add_event(const std::string &event)
Trace context for propagation across service boundaries.
Trace span representing a unit of work in distributed tracing.
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::chrono::system_clock::time_point start_time