13#include <gtest/gtest.h>
40 test_dir_ = std::filesystem::temp_directory_path() /
"monitoring_e2e_test";
41 std::filesystem::create_directories(
test_dir_);
61 file_config.
type = storage_backend_type::file_json;
62 file_config.
path = (test_dir_ /
"metrics.json").
string();
66 memory_config.
type = storage_backend_type::memory_buffer;
69 auto file_backend = std::make_unique<file_storage_backend>(file_config);
70 auto memory_backend = std::make_unique<memory_storage_backend>(memory_config);
73 std::vector<metrics_snapshot> snapshots;
74 for (
int i = 0; i < 50; ++i) {
76 snapshot.
add_metric(
"metric_" + std::to_string(i), i * 1.5);
77 snapshots.push_back(snapshot);
81 std::vector<std::thread> threads;
82 std::atomic<int> success_count{0};
85 threads.emplace_back([&file_backend, &snapshots, &success_count]() {
86 for (
const auto& snapshot : snapshots) {
87 auto result = file_backend->store(snapshot);
88 if (result.is_ok()) success_count++;
93 threads.emplace_back([&memory_backend, &snapshots, &success_count]() {
94 for (
const auto& snapshot : snapshots) {
95 auto result = memory_backend->store(snapshot);
96 if (result.is_ok()) success_count++;
101 for (
auto& t : threads) {
105 EXPECT_EQ(success_count,
static_cast<int>(snapshots.size() * 2));
108 EXPECT_EQ(file_backend->size(), 50u);
109 EXPECT_EQ(memory_backend->size(), 50u);
112 auto file_result = file_backend->retrieve(0);
113 auto memory_result = memory_backend->retrieve(0);
115 EXPECT_TRUE(file_result.is_ok());
116 EXPECT_TRUE(memory_result.is_ok());
119 auto flush_file = file_backend->flush();
120 auto flush_memory = memory_backend->flush();
122 EXPECT_TRUE(flush_file.is_ok());
123 EXPECT_TRUE(flush_memory.is_ok());
136 auto init_result = otel_adapter->initialize();
137 ASSERT_TRUE(init_result.is_ok());
140 auto parent_span_result = tracer.
start_span(
"parent_operation",
"e2e_service");
141 ASSERT_TRUE(parent_span_result.is_ok());
142 auto parent_span = parent_span_result.value();
145 auto child_span_result = tracer.
start_child_span(*parent_span,
"child_operation");
146 ASSERT_TRUE(child_span_result.is_ok());
147 auto child_span = child_span_result.value();
150 child_span->tags[
"user_id"] =
"test_user";
151 child_span->tags[
"request_id"] =
"req_123";
154 child_span->status = trace_span::status_code::error;
155 child_span->status_message =
"Simulated error for testing";
162 std::vector<trace_span> spans;
163 spans.push_back(*parent_span);
164 spans.push_back(*child_span);
166 auto export_result = otel_adapter->export_spans(spans);
167 EXPECT_TRUE(export_result.is_ok());
170 auto stats = otel_adapter->get_stats();
171 EXPECT_GT(stats.pending_spans, 0u);
174 auto flush_result = otel_adapter->flush();
175 EXPECT_TRUE(flush_result.is_ok());
194 std::atomic<bool> service_healthy{
true};
200 if (service_healthy) {
207 health_mon.register_check(
"database", db_check);
216 health_mon.register_check(
"cache", cache_check);
219 auto initial_health = health_mon.check_health();
220 EXPECT_TRUE(initial_health.is_healthy());
223 service_healthy =
false;
226 int recovery_attempts = 0;
227 auto recovery_result = retry_exec.
execute(
228 [&service_healthy, &recovery_attempts]() -> kcenon::common::Result<bool> {
230 if (recovery_attempts >= 2) {
231 service_healthy =
true;
232 return kcenon::common::ok(
true);
234 return kcenon::common::make_error<bool>(
static_cast<int>(monitoring_error_code::operation_failed),
239 EXPECT_TRUE(recovery_result.is_ok());
240 EXPECT_GE(recovery_attempts, 2);
243 auto final_health = health_mon.check_health();
244 EXPECT_TRUE(final_health.is_healthy());
253 auto perf_monitor = std::make_shared<performance_monitor>(
"perf_test");
258 config.
strategy = adaptation_strategy::balanced;
266 EXPECT_TRUE(reg_result.is_ok());
269 auto start_result = adapter.
start();
270 EXPECT_TRUE(start_result.is_ok());
273 for (
int i = 0; i < 100; ++i) {
274 auto timer = perf_monitor->time_operation(
"test_op_" + std::to_string(i % 10));
275 std::this_thread::sleep_for(std::chrono::microseconds(100));
280 EXPECT_TRUE(stats_result.is_ok());
283 auto stop_result = adapter.
stop();
284 EXPECT_TRUE(stop_result.is_ok());
297 ft_config.
circuit_config.timeout = std::chrono::milliseconds(100);
304 std::atomic<int> call_count{0};
305 std::atomic<bool> should_fail{
true};
307 auto unreliable_operation = [&call_count, &should_fail]() -> kcenon::common::Result<bool> {
311 if (call_count <= 3 && should_fail) {
312 return kcenon::common::make_error<bool>(
static_cast<int>(monitoring_error_code::operation_failed),
313 "Simulated failure");
316 return kcenon::common::ok(
true);
320 auto ft_result = ft_manager.
execute(unreliable_operation);
321 EXPECT_TRUE(ft_result.is_ok());
328 cb_config.failure_threshold = 3;
329 cb_config.timeout = std::chrono::milliseconds(100);
333 for (
int i = 0; i < 3; ++i) {
335 EXPECT_FALSE(cb_result.is_ok());
339 EXPECT_EQ(breaker.get_state(), circuit_state::OPEN);
343 EXPECT_FALSE(open_result.is_ok());
346 std::this_thread::sleep_for(std::chrono::milliseconds(150));
354 EXPECT_TRUE(recovery_result.is_ok());
357 for (
int i = 0; i < 5; ++i) {
359 EXPECT_TRUE(stable_result.is_ok());
362 EXPECT_EQ(breaker.get_state(), circuit_state::CLOSED);
372 auto init_result = otel_adapter->initialize();
373 ASSERT_TRUE(init_result.is_ok());
376 std::vector<trace_span> test_spans;
377 for (
int i = 0; i < 10; ++i) {
379 span.
trace_id =
"trace_" + std::to_string(i);
380 span.
span_id =
"span_" + std::to_string(i);
382 span.
start_time = std::chrono::system_clock::now();
384 span.
tags[
"index"] = std::to_string(i);
385 test_spans.push_back(span);
389 auto export_result = otel_adapter->export_spans(test_spans);
390 EXPECT_TRUE(export_result.is_ok());
393 auto stats = otel_adapter->get_stats();
394 EXPECT_EQ(stats.pending_spans, test_spans.size());
400 test_data.
add_metric(
"request_count", 1000.0);
403 auto metrics_result = otel_adapter->export_metrics(test_data);
404 EXPECT_TRUE(metrics_result.is_ok());
407 stats = otel_adapter->get_stats();
408 EXPECT_GT(stats.pending_metrics, 0u);
411 auto flush_result = otel_adapter->flush();
412 EXPECT_TRUE(flush_result.is_ok());
415 stats = otel_adapter->get_stats();
416 EXPECT_EQ(stats.pending_spans, 0u);
417 EXPECT_EQ(stats.pending_metrics, 0u);
427 auto perf_monitor = std::make_shared<performance_monitor>(
"load_perf");
431 const int num_operations = 1000;
432 const int num_threads = 10;
435 auto start_time = std::chrono::steady_clock::now();
436 std::vector<std::thread> load_generators;
437 std::atomic<int> total_operations{0};
439 for (
int t = 0; t < num_threads; ++t) {
440 load_generators.emplace_back([&tracer, &total_operations, t]() {
441 std::random_device rd;
442 std::mt19937 gen(rd());
443 std::uniform_real_distribution<> dis(0.0, 100.0);
445 for (
int i = 0; i < 100; ++i) {
447 auto span_result = tracer.
start_span(
"load_test_" + std::to_string(t),
"load_service");
448 if (span_result.is_ok()) {
449 auto span = span_result.value();
450 span->tags[
"thread"] = std::to_string(t);
451 span->tags[
"value"] = std::to_string(dis(gen));
458 std::this_thread::sleep_for(std::chrono::microseconds(100));
465 std::thread monitor_thread([&health_mon, &start_time]() {
466 while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
467 auto health = health_mon.check_health();
469 EXPECT_TRUE(health.is_operational());
470 std::this_thread::sleep_for(std::chrono::milliseconds(100));
475 for (
auto& t : load_generators) {
478 monitor_thread.join();
480 auto end_time = std::chrono::steady_clock::now();
481 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
484 EXPECT_EQ(total_operations, num_operations);
487 double throughput = (total_operations * 1000.0) / duration.count();
488 std::cout <<
"Load test throughput: " << throughput <<
" ops/sec" << std::endl;
491 EXPECT_GT(throughput, 100.0);
501 config.
type = storage_backend_type::memory_buffer;
503 auto storage = std::make_unique<memory_storage_backend>(config);
509 auto perf_monitor = std::make_shared<performance_monitor>(
"integration_perf");
516 auto timer = perf_monitor->time_operation(
"cpu_measurement");
517 std::this_thread::sleep_for(std::chrono::milliseconds(10));
521 auto metrics = perf_monitor->get_profiler().get_all_metrics();
522 for (
const auto&
metric : metrics) {
524 static_cast<double>(
metric.call_count));
532 auto store_result =
storage->store(snapshot);
533 EXPECT_TRUE(store_result.is_ok());
536 auto span_result = tracer.
start_span(
"cross_component_test",
"test_service");
537 ASSERT_TRUE(span_result.is_ok());
538 auto span = span_result.value();
541 span->tags[
"cpu_usage"] =
"45.0";
542 span->tags[
"memory_usage"] =
"60.0";
547 EXPECT_EQ(
storage->size(), 1u);
549 auto retrieved =
storage->retrieve(0);
550 EXPECT_TRUE(retrieved.is_ok());
553 auto cpu_metric = retrieved.value().get_metric(
"cpu_usage");
554 EXPECT_TRUE(cpu_metric.has_value());
555 EXPECT_DOUBLE_EQ(cpu_metric.value(), 45.0);
557 auto mem_metric = retrieved.value().get_metric(
"memory_usage");
558 EXPECT_TRUE(mem_metric.has_value());
559 EXPECT_DOUBLE_EQ(mem_metric.value(), 60.0);
Adaptive monitoring implementation that adjusts behavior based on system load.
Circuit breaker integration for monitoring_system.
std::filesystem::path test_dir_
Adaptive monitoring controller.
common::Result< bool > stop()
Stop adaptive monitoring.
common::Result< bool > start()
Start adaptive monitoring.
common::Result< adaptation_stats > get_collector_stats(const std::string &name) const
Get adaptation statistics for a collector.
common::Result< bool > register_collector(const std::string &name, std::shared_ptr< kcenon::monitoring::metrics_collector > collector, const adaptive_config &config={})
Register a collector for adaptive monitoring.
Distributed tracer for managing spans and traces.
common::Result< std::shared_ptr< trace_span > > start_span(const std::string &operation_name, const std::string &service_name="monitoring_system")
Start a new root span.
common::Result< bool > finish_span(std::shared_ptr< trace_span > span)
Finish a span.
common::Result< std::shared_ptr< trace_span > > start_child_span(const trace_span &parent, const std::string &operation_name)
Start a child span.
Fault tolerance manager template class.
common::Result< T > execute(Func &&func)
Execute a function with fault tolerance.
Fluent builder for creating functional_health_check instances.
health_check_builder & with_check(std::function< health_check_result()> func)
Set the callable that performs the health check.
health_check_builder & with_type(health_check_type type)
Set the health check type.
health_check_builder & with_name(const std::string &name)
Set the health check name.
Retry executor template class.
common::Result< T > execute(Func &&func)
Execute a function with retry logic.
Distributed tracing implementation for monitoring system.
Fault tolerance manager coordinating circuit breakers and retries.
Health monitoring with dependency graphs, auto-recovery, and statistics.
Core monitoring system interface definitions.
@ timer
StatsD-specific timer metric.
std::unique_ptr< opentelemetry_compatibility_layer > create_opentelemetry_compatibility_layer(const otel_resource &resource)
Create OpenTelemetry compatibility layer.
common::Result< T > execute_with_circuit_breaker(circuit_breaker &cb, const std::string &name, Func &&func)
Execute an operation through a circuit breaker.
health_monitor & global_health_monitor()
Get the global health monitor singleton instance.
common::resilience::circuit_breaker circuit_breaker
@ storage
Storage device sensor.
common::resilience::circuit_breaker_config circuit_breaker_config
OpenTelemetry compatibility layer for monitoring system integration.
Result pattern type definitions for monitoring system.
Retry strategies with backoff for monitoring operations.
Storage backend type definitions for metric persistence.
Adaptive configuration parameters.
double memory_warning_threshold
double high_sampling_rate
double idle_sampling_rate
adaptation_strategy strategy
Fault tolerance configuration.
bool enable_circuit_breaker
circuit_breaker_config circuit_config
static health_check_result unhealthy(const std::string &msg)
static health_check_result healthy(const std::string &msg="OK")
Basic metric structure for interface compatibility.
Complete snapshot of metrics at a point in time.
void add_metric(const std::string &name, double value)
Add a metric to the snapshot.
Container for monitoring metrics from a component.
void add_metric(const std::string &key, double value)
Add a numeric metric.
double backoff_multiplier
std::chrono::milliseconds initial_delay
storage_backend_type type
Trace span representing a unit of work in distributed tracing.
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::string operation_name
std::chrono::system_clock::time_point start_time
TEST_F(IntegrationE2ETest, StorageBackendIntegration)
Trace data exporters for various distributed tracing systems.