Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_integration_e2e.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5
13#include <gtest/gtest.h>
14#include <thread>
15#include <chrono>
16#include <vector>
17#include <atomic>
18#include <random>
19#include <filesystem>
20
33
34using namespace kcenon::monitoring;
35
36class IntegrationE2ETest : public ::testing::Test {
37protected:
38 void SetUp() override {
39 // Create temp directory for test outputs
40 test_dir_ = std::filesystem::temp_directory_path() / "monitoring_e2e_test";
41 std::filesystem::create_directories(test_dir_);
42 }
43
44 void TearDown() override {
45 // Cleanup test directory
46 if (std::filesystem::exists(test_dir_)) {
47 std::filesystem::remove_all(test_dir_);
48 }
49 }
50
51 std::filesystem::path test_dir_;
52};
53
58TEST_F(IntegrationE2ETest, StorageBackendIntegration) {
59 // 1. Create multiple storage backends
60 storage_config file_config;
61 file_config.type = storage_backend_type::file_json;
62 file_config.path = (test_dir_ / "metrics.json").string();
63 file_config.max_capacity = 100;
64
65 storage_config memory_config;
66 memory_config.type = storage_backend_type::memory_buffer;
67 memory_config.max_capacity = 100;
68
69 auto file_backend = std::make_unique<file_storage_backend>(file_config);
70 auto memory_backend = std::make_unique<memory_storage_backend>(memory_config);
71
72 // 2. Create test data
73 std::vector<metrics_snapshot> snapshots;
74 for (int i = 0; i < 50; ++i) {
75 metrics_snapshot snapshot;
76 snapshot.add_metric("metric_" + std::to_string(i), i * 1.5);
77 snapshots.push_back(snapshot);
78 }
79
80 // 3. Store data in both backends concurrently
81 std::vector<std::thread> threads;
82 std::atomic<int> success_count{0};
83
84 // Thread for file backend
85 threads.emplace_back([&file_backend, &snapshots, &success_count]() {
86 for (const auto& snapshot : snapshots) {
87 auto result = file_backend->store(snapshot);
88 if (result.is_ok()) success_count++;
89 }
90 });
91
92 // Thread for memory backend
93 threads.emplace_back([&memory_backend, &snapshots, &success_count]() {
94 for (const auto& snapshot : snapshots) {
95 auto result = memory_backend->store(snapshot);
96 if (result.is_ok()) success_count++;
97 }
98 });
99
100 // 4. Wait for completion
101 for (auto& t : threads) {
102 t.join();
103 }
104
105 EXPECT_EQ(success_count, static_cast<int>(snapshots.size() * 2));
106
107 // 5. Verify data consistency
108 EXPECT_EQ(file_backend->size(), 50u);
109 EXPECT_EQ(memory_backend->size(), 50u);
110
111 // 6. Test retrieval
112 auto file_result = file_backend->retrieve(0);
113 auto memory_result = memory_backend->retrieve(0);
114
115 EXPECT_TRUE(file_result.is_ok());
116 EXPECT_TRUE(memory_result.is_ok());
117
118 // 7. Test flush
119 auto flush_file = file_backend->flush();
120 auto flush_memory = memory_backend->flush();
121
122 EXPECT_TRUE(flush_file.is_ok());
123 EXPECT_TRUE(flush_memory.is_ok());
124}
125
130TEST_F(IntegrationE2ETest, DistributedTracingE2E) {
131 // 1. Setup tracing components
132 distributed_tracer tracer;
133 auto otel_adapter = create_opentelemetry_compatibility_layer("e2e_service", "1.0.0");
134
135 // 2. Initialize OTEL adapter
136 auto init_result = otel_adapter->initialize();
137 ASSERT_TRUE(init_result.is_ok());
138
139 // 3. Create parent span
140 auto parent_span_result = tracer.start_span("parent_operation", "e2e_service");
141 ASSERT_TRUE(parent_span_result.is_ok());
142 auto parent_span = parent_span_result.value();
143
144 // 4. Create child span with parent context
145 auto child_span_result = tracer.start_child_span(*parent_span, "child_operation");
146 ASSERT_TRUE(child_span_result.is_ok());
147 auto child_span = child_span_result.value();
148
149 // 5. Add tags directly (trace_span uses tags map)
150 child_span->tags["user_id"] = "test_user";
151 child_span->tags["request_id"] = "req_123";
152
153 // 6. Set error status
154 child_span->status = trace_span::status_code::error;
155 child_span->status_message = "Simulated error for testing";
156
157 // 7. Finish spans
158 tracer.finish_span(child_span);
159 tracer.finish_span(parent_span);
160
161 // 8. Export spans through OTEL adapter
162 std::vector<trace_span> spans;
163 spans.push_back(*parent_span);
164 spans.push_back(*child_span);
165
166 auto export_result = otel_adapter->export_spans(spans);
167 EXPECT_TRUE(export_result.is_ok());
168
169 // 9. Verify stats
170 auto stats = otel_adapter->get_stats();
171 EXPECT_GT(stats.pending_spans, 0u);
172
173 // 10. Flush
174 auto flush_result = otel_adapter->flush();
175 EXPECT_TRUE(flush_result.is_ok());
176}
177
182TEST_F(IntegrationE2ETest, HealthMonitoringWithRecovery) {
183 // 1. Setup health monitoring
184 auto& health_mon = global_health_monitor();
185
186 // Use retry_executor with retry_config for retry logic
187 retry_config cfg;
188 cfg.max_attempts = 3;
189 cfg.initial_delay = std::chrono::milliseconds(10);
190 cfg.backoff_multiplier = 2.0;
191 retry_executor<bool> retry_exec("recovery_executor", cfg);
192
193 // 2. Register health checks using health_check_builder
194 std::atomic<bool> service_healthy{true};
195
196 auto db_check = health_check_builder()
197 .with_name("database")
198 .with_type(health_check_type::liveness)
199 .with_check([&service_healthy]() {
200 if (service_healthy) {
201 return health_check_result::healthy("Database connection OK");
202 } else {
203 return health_check_result::unhealthy("Database connection failed");
204 }
205 })
206 .build();
207 health_mon.register_check("database", db_check);
208
209 auto cache_check = health_check_builder()
210 .with_name("cache")
211 .with_type(health_check_type::liveness)
212 .with_check([]() {
213 return health_check_result::healthy("Cache service running");
214 })
215 .build();
216 health_mon.register_check("cache", cache_check);
217
218 // 3. Initial health check - should be healthy
219 auto initial_health = health_mon.check_health();
220 EXPECT_TRUE(initial_health.is_healthy());
221
222 // 4. Simulate failure
223 service_healthy = false;
224
225 // 5. Attempt recovery with retry logic
226 int recovery_attempts = 0;
227 auto recovery_result = retry_exec.execute(
228 [&service_healthy, &recovery_attempts]() -> kcenon::common::Result<bool> {
229 recovery_attempts++;
230 if (recovery_attempts >= 2) {
231 service_healthy = true;
232 return kcenon::common::ok(true);
233 }
234 return kcenon::common::make_error<bool>(static_cast<int>(monitoring_error_code::operation_failed),
235 "Still recovering");
236 }
237 );
238
239 EXPECT_TRUE(recovery_result.is_ok());
240 EXPECT_GE(recovery_attempts, 2);
241
242 // 6. Verify health restored
243 auto final_health = health_mon.check_health();
244 EXPECT_TRUE(final_health.is_healthy());
245}
246
251TEST_F(IntegrationE2ETest, PerformanceAdaptiveMonitoring) {
252 // 1. Setup performance monitoring
253 auto perf_monitor = std::make_shared<performance_monitor>("perf_test");
254 adaptive_monitor adapter;
255
256 // 2. Configure adaptation
257 adaptive_config config;
258 config.strategy = adaptation_strategy::balanced;
259 config.high_threshold = 70.0;
260 config.memory_warning_threshold = 80.0;
261 config.high_sampling_rate = 0.2;
262 config.idle_sampling_rate = 1.0;
263
264 // 3. Register collector with adaptive monitor
265 auto reg_result = adapter.register_collector("perf_test", perf_monitor, config);
266 EXPECT_TRUE(reg_result.is_ok());
267
268 // 4. Start adaptive monitoring
269 auto start_result = adapter.start();
270 EXPECT_TRUE(start_result.is_ok());
271
272 // 5. Record some metrics
273 for (int i = 0; i < 100; ++i) {
274 auto timer = perf_monitor->time_operation("test_op_" + std::to_string(i % 10));
275 std::this_thread::sleep_for(std::chrono::microseconds(100));
276 }
277
278 // 6. Get adaptation stats
279 auto stats_result = adapter.get_collector_stats("perf_test");
280 EXPECT_TRUE(stats_result.is_ok());
281
282 // 7. Stop monitoring
283 auto stop_result = adapter.stop();
284 EXPECT_TRUE(stop_result.is_ok());
285}
286
291TEST_F(IntegrationE2ETest, CircuitBreakerAndRetry) {
292 // 1. Setup resilience components with fault_tolerance_manager
293 fault_tolerance_config ft_config;
294 ft_config.enable_circuit_breaker = true;
295 ft_config.enable_retry = true;
296 ft_config.circuit_config.failure_threshold = 3;
297 ft_config.circuit_config.timeout = std::chrono::milliseconds(100);
298 ft_config.retry_cfg.max_attempts = 5;
299 ft_config.retry_cfg.initial_delay = std::chrono::milliseconds(10);
300
301 fault_tolerance_manager<bool> ft_manager("test_manager", ft_config);
302
303 // 2. Simulate component with intermittent failures
304 std::atomic<int> call_count{0};
305 std::atomic<bool> should_fail{true};
306
307 auto unreliable_operation = [&call_count, &should_fail]() -> kcenon::common::Result<bool> {
308 call_count++;
309
310 // Fail first 3 calls, then succeed
311 if (call_count <= 3 && should_fail) {
312 return kcenon::common::make_error<bool>(static_cast<int>(monitoring_error_code::operation_failed),
313 "Simulated failure");
314 }
315
316 return kcenon::common::ok(true);
317 };
318
319 // 3. Test fault tolerance execution
320 auto ft_result = ft_manager.execute(unreliable_operation);
321 EXPECT_TRUE(ft_result.is_ok());
322
323 // 4. Reset and test standalone circuit breaker
324 call_count = 0;
325 should_fail = true;
326
327 circuit_breaker_config cb_config;
328 cb_config.failure_threshold = 3;
329 cb_config.timeout = std::chrono::milliseconds(100);
330 circuit_breaker breaker(cb_config);
331
332 // Trigger circuit breaker with failures
333 for (int i = 0; i < 3; ++i) {
334 auto cb_result = execute_with_circuit_breaker<bool>(breaker, "test_breaker", unreliable_operation);
335 EXPECT_FALSE(cb_result.is_ok());
336 }
337
338 // Circuit should be open
339 EXPECT_EQ(breaker.get_state(), circuit_state::OPEN);
340
341 // Further calls should fail fast (circuit open)
342 auto open_result = execute_with_circuit_breaker<bool>(breaker, "test_breaker", unreliable_operation);
343 EXPECT_FALSE(open_result.is_ok());
344
345 // 5. Wait for circuit recovery
346 std::this_thread::sleep_for(std::chrono::milliseconds(150));
347
348 // Allow success for recovery
349 should_fail = false;
350 call_count = 0;
351
352 // Circuit should transition to half-open and then closed
353 auto recovery_result = execute_with_circuit_breaker<bool>(breaker, "test_breaker", unreliable_operation);
354 EXPECT_TRUE(recovery_result.is_ok());
355
356 // After several successes, circuit should be closed
357 for (int i = 0; i < 5; ++i) {
358 auto stable_result = execute_with_circuit_breaker<bool>(breaker, "test_breaker", unreliable_operation);
359 EXPECT_TRUE(stable_result.is_ok());
360 }
361
362 EXPECT_EQ(breaker.get_state(), circuit_state::CLOSED);
363}
364
369TEST_F(IntegrationE2ETest, ExportPipelineIntegration) {
370 // 1. Setup OTEL adapter
371 auto otel_adapter = create_opentelemetry_compatibility_layer("export_test", "1.0.0");
372 auto init_result = otel_adapter->initialize();
373 ASSERT_TRUE(init_result.is_ok());
374
375 // 2. Create sample traces
376 std::vector<trace_span> test_spans;
377 for (int i = 0; i < 10; ++i) {
378 trace_span span;
379 span.trace_id = "trace_" + std::to_string(i);
380 span.span_id = "span_" + std::to_string(i);
381 span.operation_name = "operation_" + std::to_string(i);
382 span.start_time = std::chrono::system_clock::now();
383 span.end_time = span.start_time + std::chrono::milliseconds(100);
384 span.tags["index"] = std::to_string(i);
385 test_spans.push_back(span);
386 }
387
388 // 3. Export spans
389 auto export_result = otel_adapter->export_spans(test_spans);
390 EXPECT_TRUE(export_result.is_ok());
391
392 // 4. Verify export stats
393 auto stats = otel_adapter->get_stats();
394 EXPECT_EQ(stats.pending_spans, test_spans.size());
395
396 // 5. Create sample metrics
397 monitoring_data test_data("export_test");
398 test_data.add_metric("cpu_usage", 75.0);
399 test_data.add_metric("memory_usage", 60.0);
400 test_data.add_metric("request_count", 1000.0);
401
402 // 6. Export metrics
403 auto metrics_result = otel_adapter->export_metrics(test_data);
404 EXPECT_TRUE(metrics_result.is_ok());
405
406 // 7. Verify combined stats
407 stats = otel_adapter->get_stats();
408 EXPECT_GT(stats.pending_metrics, 0u);
409
410 // 8. Flush all pending data
411 auto flush_result = otel_adapter->flush();
412 EXPECT_TRUE(flush_result.is_ok());
413
414 // 9. Verify flush completed
415 stats = otel_adapter->get_stats();
416 EXPECT_EQ(stats.pending_spans, 0u);
417 EXPECT_EQ(stats.pending_metrics, 0u);
418}
419
424TEST_F(IntegrationE2ETest, FullSystemLoadTest) {
425 // 1. Setup components
426 distributed_tracer tracer;
427 auto perf_monitor = std::make_shared<performance_monitor>("load_perf");
428 auto& health_mon = global_health_monitor();
429
430 // 2. Configure for high load
431 const int num_operations = 1000;
432 const int num_threads = 10;
433
434 // 3. Generate load
435 auto start_time = std::chrono::steady_clock::now();
436 std::vector<std::thread> load_generators;
437 std::atomic<int> total_operations{0};
438
439 for (int t = 0; t < num_threads; ++t) {
440 load_generators.emplace_back([&tracer, &total_operations, t]() {
441 std::random_device rd;
442 std::mt19937 gen(rd());
443 std::uniform_real_distribution<> dis(0.0, 100.0);
444
445 for (int i = 0; i < 100; ++i) {
446 // Create span
447 auto span_result = tracer.start_span("load_test_" + std::to_string(t), "load_service");
448 if (span_result.is_ok()) {
449 auto span = span_result.value();
450 span->tags["thread"] = std::to_string(t);
451 span->tags["value"] = std::to_string(dis(gen));
452 tracer.finish_span(span);
453 total_operations++;
454 }
455
456 // Small delay to prevent overwhelming
457 if (i % 10 == 0) {
458 std::this_thread::sleep_for(std::chrono::microseconds(100));
459 }
460 }
461 });
462 }
463
464 // 4. Monitor while load is running
465 std::thread monitor_thread([&health_mon, &start_time]() {
466 while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
467 auto health = health_mon.check_health();
468 // System should remain operational under load
469 EXPECT_TRUE(health.is_operational());
470 std::this_thread::sleep_for(std::chrono::milliseconds(100));
471 }
472 });
473
474 // 5. Wait for completion
475 for (auto& t : load_generators) {
476 t.join();
477 }
478 monitor_thread.join();
479
480 auto end_time = std::chrono::steady_clock::now();
481 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
482
483 // 6. Verify performance
484 EXPECT_EQ(total_operations, num_operations);
485
486 // Calculate throughput
487 double throughput = (total_operations * 1000.0) / duration.count();
488 std::cout << "Load test throughput: " << throughput << " ops/sec" << std::endl;
489
490 // Should achieve reasonable throughput
491 EXPECT_GT(throughput, 100.0); // At least 100 ops/sec
492}
493
498TEST_F(IntegrationE2ETest, CrossComponentIntegration) {
499 // 1. Create storage backend
500 storage_config config;
501 config.type = storage_backend_type::memory_buffer;
502 config.max_capacity = 1000;
503 auto storage = std::make_unique<memory_storage_backend>(config);
504
505 // 2. Create tracer
506 distributed_tracer tracer;
507
508 // 3. Create performance monitor
509 auto perf_monitor = std::make_shared<performance_monitor>("integration_perf");
510
511 // 4. Create metrics snapshot and record performance
512 metrics_snapshot snapshot;
513
514 // Add performance metrics using scoped timer
515 {
516 auto timer = perf_monitor->time_operation("cpu_measurement");
517 std::this_thread::sleep_for(std::chrono::milliseconds(10));
518 }
519
520 // Get profiler metrics and add to snapshot
521 auto metrics = perf_monitor->get_profiler().get_all_metrics();
522 for (const auto& metric : metrics) {
523 snapshot.add_metric(metric.operation_name + "_count",
524 static_cast<double>(metric.call_count));
525 }
526
527 // Add some direct metrics
528 snapshot.add_metric("cpu_usage", 45.0);
529 snapshot.add_metric("memory_usage", 60.0);
530
531 // 5. Store snapshot
532 auto store_result = storage->store(snapshot);
533 EXPECT_TRUE(store_result.is_ok());
534
535 // 6. Create trace span
536 auto span_result = tracer.start_span("cross_component_test", "test_service");
537 ASSERT_TRUE(span_result.is_ok());
538 auto span = span_result.value();
539
540 // 7. Add metrics to span as tags
541 span->tags["cpu_usage"] = "45.0";
542 span->tags["memory_usage"] = "60.0";
543
544 tracer.finish_span(span);
545
546 // 8. Verify storage
547 EXPECT_EQ(storage->size(), 1u);
548
549 auto retrieved = storage->retrieve(0);
550 EXPECT_TRUE(retrieved.is_ok());
551
552 // 9. Verify metrics in retrieved snapshot
553 auto cpu_metric = retrieved.value().get_metric("cpu_usage");
554 EXPECT_TRUE(cpu_metric.has_value());
555 EXPECT_DOUBLE_EQ(cpu_metric.value(), 45.0);
556
557 auto mem_metric = retrieved.value().get_metric("memory_usage");
558 EXPECT_TRUE(mem_metric.has_value());
559 EXPECT_DOUBLE_EQ(mem_metric.value(), 60.0);
560}
561
562// Note: main() function is provided by GTest framework or other test files
Adaptive monitoring implementation that adjusts behavior based on system load.
Circuit breaker integration for monitoring_system.
std::filesystem::path test_dir_
Adaptive monitoring controller.
common::Result< bool > stop()
Stop adaptive monitoring.
common::Result< bool > start()
Start adaptive monitoring.
common::Result< adaptation_stats > get_collector_stats(const std::string &name) const
Get adaptation statistics for a collector.
common::Result< bool > register_collector(const std::string &name, std::shared_ptr< kcenon::monitoring::metrics_collector > collector, const adaptive_config &config={})
Register a collector for adaptive monitoring.
Distributed tracer for managing spans and traces.
common::Result< std::shared_ptr< trace_span > > start_span(const std::string &operation_name, const std::string &service_name="monitoring_system")
Start a new root span.
common::Result< bool > finish_span(std::shared_ptr< trace_span > span)
Finish a span.
common::Result< std::shared_ptr< trace_span > > start_child_span(const trace_span &parent, const std::string &operation_name)
Start a child span.
Fault tolerance manager template class.
common::Result< T > execute(Func &&func)
Execute a function with fault tolerance.
Fluent builder for creating functional_health_check instances.
health_check_builder & with_check(std::function< health_check_result()> func)
Set the callable that performs the health check.
health_check_builder & with_type(health_check_type type)
Set the health check type.
health_check_builder & with_name(const std::string &name)
Set the health check name.
Retry executor template class.
common::Result< T > execute(Func &&func)
Execute a function with retry logic.
Distributed tracing implementation for monitoring system.
Fault tolerance manager coordinating circuit breakers and retries.
Health monitoring with dependency graphs, auto-recovery, and statistics.
Core monitoring system interface definitions.
@ timer
StatsD-specific timer metric.
std::unique_ptr< opentelemetry_compatibility_layer > create_opentelemetry_compatibility_layer(const otel_resource &resource)
Create OpenTelemetry compatibility layer.
common::Result< T > execute_with_circuit_breaker(circuit_breaker &cb, const std::string &name, Func &&func)
Execute an operation through a circuit breaker.
health_monitor & global_health_monitor()
Get the global health monitor singleton instance.
common::resilience::circuit_breaker circuit_breaker
@ storage
Storage device sensor.
common::resilience::circuit_breaker_config circuit_breaker_config
OpenTelemetry compatibility layer for monitoring system integration.
Performance monitoring and profiling implementation.
Result pattern type definitions for monitoring system.
Retry strategies with backoff for monitoring operations.
Storage backend type definitions for metric persistence.
Adaptive configuration parameters.
static health_check_result unhealthy(const std::string &msg)
static health_check_result healthy(const std::string &msg="OK")
Basic metric structure for interface compatibility.
Complete snapshot of metrics at a point in time.
void add_metric(const std::string &name, double value)
Add a metric to the snapshot.
Container for monitoring metrics from a component.
void add_metric(const std::string &key, double value)
Add a numeric metric.
std::chrono::milliseconds initial_delay
Trace span representing a unit of work in distributed tracing.
std::unordered_map< std::string, std::string > tags
std::chrono::system_clock::time_point end_time
std::chrono::system_clock::time_point start_time
TEST_F(IntegrationE2ETest, StorageBackendIntegration)
Trace data exporters for various distributed tracing systems.