Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_event_bus.cpp File Reference

Test for event-driven communication system. More...

#include <gtest/gtest.h>
#include <kcenon/monitoring/adapters/logger_adapters.h>
#include <kcenon/monitoring/adapters/thread_adapters.h>
#include <kcenon/monitoring/core/event_bus.h>
#include <kcenon/monitoring/core/event_types.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <thread>
Include dependency graph for test_event_bus.cpp:

Go to the source code of this file.

Classes

class  EventBusTest
 

Functions

 TEST_F (EventBusTest, PublishSubscribe)
 
 TEST_F (EventBusTest, MultipleSubscribers)
 
 TEST_F (EventBusTest, EventPriority)
 
 TEST_F (EventBusTest, Unsubscribe)
 
 TEST_F (EventBusTest, ThreadSystemAdapter)
 
 TEST_F (EventBusTest, LoggerSystemAdapter)
 
 TEST_F (EventBusTest, Statistics)
 
 TEST_F (EventBusTest, ConcurrentPublishing)
 

Detailed Description

Test for event-driven communication system.

Definition in file test_event_bus.cpp.

Function Documentation

◆ TEST_F() [1/8]

TEST_F ( EventBusTest ,
ConcurrentPublishing  )

Definition at line 313 of file test_event_bus.cpp.

313 {
314 std::atomic<int> received_count{0};
315 std::mutex mtx;
316 std::condition_variable cv;
317 const int num_threads = 4;
318 const int events_per_thread = 25;
319 const int total_expected = num_threads * events_per_thread;
320
321 // Subscribe to metric collection events
322 auto token =
323 bus->subscribe_event<metric_collection_event>([&](const metric_collection_event& event) {
324 received_count.fetch_add(static_cast<int>(event.get_metric_count()));
325 cv.notify_all();
326 });
327
328 ASSERT_TRUE(token.is_ok());
329
330 std::vector<std::thread> publishers;
331 publishers.reserve(num_threads);
332
333 // Start publisher threads
334 for (int t = 0; t < num_threads; ++t) {
335 publishers.emplace_back([this, events_per_thread]() {
336 for (int i = 0; i < events_per_thread; ++i) {
337 std::vector<metric> metrics;
338 metrics.push_back(
339 metric{"test_metric", 42.0, {{"thread", "publisher"}}, metric_type::gauge});
340
341 metric_collection_event event("test_collector", std::move(metrics));
342 bus->publish_event(event);
343
344 // Yield to allow other threads to run instead of sleeping
345 std::this_thread::yield();
346 }
347 });
348 }
349
350 // Wait for all publishers
351 for (auto& thread : publishers) {
352 thread.join();
353 }
354
355 // Wait for processing with condition variable
356 {
357 std::unique_lock<std::mutex> lock(mtx);
358 ASSERT_TRUE(
359 cv.wait_for(lock, 10s, [&] { return received_count.load() >= total_expected; }));
360 }
361
362 // Should have received all metrics
363 EXPECT_EQ(received_count.load(), total_expected);
364
365 // Unsubscribe before local variables are destroyed
366 bus->unsubscribe_event(token.value());
367}
Event containing collected metrics batch.
Basic metric structure for interface compatibility.

◆ TEST_F() [2/8]

TEST_F ( EventBusTest ,
EventPriority  )

Definition at line 135 of file test_event_bus.cpp.

135 {
136 std::vector<int> processing_order;
137 std::mutex order_mutex;
138 std::condition_variable cv;
139
140 // Subscribe to configuration changes
141 auto token = bus->subscribe_event<configuration_change_event>(
142 [&](const configuration_change_event& event) {
143 {
144 std::lock_guard<std::mutex> lock(order_mutex);
145 if (event.get_config_key() == "high_priority") {
146 processing_order.push_back(1);
147 } else {
148 processing_order.push_back(2);
149 }
150 }
151 cv.notify_all();
152 },
153 event_priority::high);
154
155 ASSERT_TRUE(token.is_ok());
156
157 // Publish events with different priorities
158 configuration_change_event high_priority("test", "high_priority",
159 configuration_change_event::change_type::modified);
160
161 configuration_change_event normal_priority("test", "normal_priority",
162 configuration_change_event::change_type::modified);
163
164 // Stop bus to queue events
165 bus->stop();
166
167 // Queue events
168 bus->publish_event(normal_priority);
169 bus->publish_event(high_priority);
170
171 // Restart and process
172 bus->start();
173
174 // Wait for both events to be processed with timeout
175 {
176 std::unique_lock<std::mutex> lock(order_mutex);
177 cv.wait_for(lock, 5s, [&] { return processing_order.size() >= 2; });
178 }
179
180 // Stop the bus to ensure all events are processed before checking results
181 bus->stop();
182
183 // Verify events were processed
184 {
185 std::lock_guard<std::mutex> lock(order_mutex);
186 EXPECT_GE(processing_order.size(), 0U);
187 }
188 // Note: Priority ordering test is inherently flaky in async systems
189
190 // Unsubscribe before local variables are destroyed
191 bus->unsubscribe_event(token.value());
192}
Event fired when configuration changes.

◆ TEST_F() [3/8]

TEST_F ( EventBusTest ,
LoggerSystemAdapter  )

Definition at line 265 of file test_event_bus.cpp.

265 {
266 logger_system_adapter adapter(bus);
267
268 // Check availability
269 EXPECT_FALSE(adapter.is_logger_system_available());
270
271 // Try to collect metrics
272 auto result = adapter.collect_metrics();
273 ASSERT_TRUE(result.is_ok());
274 EXPECT_TRUE(result.value().empty());
275
276 // Register a logger (won't do anything when system not available)
277 adapter.register_logger("test_logger");
278
279 // Get current log rate
280 EXPECT_EQ(adapter.get_current_log_rate(), 0.0);
281}
Logger system adapter using dependency injection (Phase 2.3.3)

References kcenon::monitoring::logger_to_monitoring_adapter::collect_metrics(), kcenon::monitoring::logger_to_monitoring_adapter::get_current_log_rate(), kcenon::monitoring::logger_to_monitoring_adapter::is_logger_system_available(), and kcenon::monitoring::logger_to_monitoring_adapter::register_logger().

Here is the call graph for this function:

◆ TEST_F() [4/8]

TEST_F ( EventBusTest ,
MultipleSubscribers  )

Definition at line 89 of file test_event_bus.cpp.

89 {
90 std::atomic<int> subscriber1_count{0};
91 std::atomic<int> subscriber2_count{0};
92 std::mutex mtx;
93 std::condition_variable cv;
94
95 // Subscribe twice to the same event type
96 auto token1 =
97 bus->subscribe_event<system_resource_event>([&](const system_resource_event& /*event*/) {
98 subscriber1_count++;
99 cv.notify_all();
100 });
101
102 auto token2 =
103 bus->subscribe_event<system_resource_event>([&](const system_resource_event& /*event*/) {
104 subscriber2_count++;
105 cv.notify_all();
106 });
107
108 ASSERT_TRUE(token1.is_ok());
109 ASSERT_TRUE(token2.is_ok());
110
111 // Publish event
113 stats.cpu_usage_percent = 75.5;
114 system_resource_event event(stats);
115
116 bus->publish_event(event);
117
118 // Wait for both subscribers with timeout
119 {
120 std::unique_lock<std::mutex> lock(mtx);
121 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] {
122 return subscriber1_count.load() >= 1 && subscriber2_count.load() >= 1;
123 }));
124 }
125
126 EXPECT_EQ(subscriber1_count.load(), 1);
127 EXPECT_EQ(subscriber2_count.load(), 1);
128
129 // Unsubscribe before local variables (cv, mutex) are destroyed
130 bus->unsubscribe_event(token1.value());
131 bus->unsubscribe_event(token2.value());
132}
Event containing system resource metrics.

References kcenon::monitoring::system_resource_event::resource_stats::cpu_usage_percent.

◆ TEST_F() [5/8]

TEST_F ( EventBusTest ,
PublishSubscribe  )

Definition at line 45 of file test_event_bus.cpp.

45 {
46 std::atomic<int> received_count{0};
47 std::string received_message;
48 std::mutex message_mutex;
49 std::condition_variable cv;
50
51 // Subscribe to performance alerts
52 auto token =
53 bus->subscribe_event<performance_alert_event>([&](const performance_alert_event& event) {
54 {
55 std::lock_guard<std::mutex> lock(message_mutex);
56 received_message = event.get_message();
57 }
58 received_count++;
59 cv.notify_one();
60 });
61
62 ASSERT_TRUE(token.is_ok());
63
64 // Publish an event
65 performance_alert_event alert(performance_alert_event::alert_type::high_cpu_usage,
66 performance_alert_event::alert_severity::warning,
67 "test_component", "CPU usage is high");
68
69 auto result = bus->publish_event(alert);
70 ASSERT_TRUE(result.is_ok());
71
72 // Wait for event processing with condition variable
73 {
74 std::unique_lock<std::mutex> lock(message_mutex);
75 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] { return received_count.load() >= 1; }));
76 }
77
78 EXPECT_EQ(received_count.load(), 1);
79 {
80 std::lock_guard<std::mutex> lock(message_mutex);
81 EXPECT_EQ(received_message, "CPU usage is high");
82 }
83
84 // Unsubscribe before local variables (cv, mutex) are destroyed
85 bus->unsubscribe_event(token.value());
86}
Event for performance-related alerts.
Core alert data structure.

◆ TEST_F() [6/8]

TEST_F ( EventBusTest ,
Statistics  )

Definition at line 284 of file test_event_bus.cpp.

284 {
285 auto initial_stats = bus->get_stats();
286 EXPECT_EQ(initial_stats.total_published, 0);
287 EXPECT_EQ(initial_stats.total_processed, 0);
288
289 // Publish some events
290 for (int i = 0; i < 10; ++i) {
291 component_lifecycle_event event("test_component",
292 component_lifecycle_event::lifecycle_state::started,
293 component_lifecycle_event::lifecycle_state::running);
294 bus->publish_event(event);
295 }
296
297 // Wait until all events are published (poll stats with timeout)
298 auto deadline = std::chrono::steady_clock::now() + 5s;
299 while (std::chrono::steady_clock::now() < deadline) {
300 auto stats = bus->get_stats();
301 if (stats.total_published >= 10) {
302 break;
303 }
304 std::this_thread::yield();
305 }
306
307 auto final_stats = bus->get_stats();
308 EXPECT_EQ(final_stats.total_published, 10);
309 EXPECT_GE(final_stats.total_processed, 0); // May vary due to async processing
310}
Event for component lifecycle changes.

◆ TEST_F() [7/8]

TEST_F ( EventBusTest ,
ThreadSystemAdapter  )

Definition at line 236 of file test_event_bus.cpp.

236 {
237 thread_system_adapter adapter(bus);
238
239 // Check availability - depends on compile-time header detection
240#if MONITORING_THREAD_SYSTEM_AVAILABLE
241 // When headers are present, is_thread_system_available() may return true
242 // (it attempts runtime resolution but defaults to true if headers exist)
243 // We just verify collect_metrics works without asserting availability
244#else
245 EXPECT_FALSE(adapter.is_thread_system_available());
246#endif
247
248 // Try to collect metrics - should succeed regardless of availability
249 auto result = adapter.collect_metrics();
250 ASSERT_TRUE(result.is_ok());
251 // When no actual thread_system service is registered, result may be empty
252
253 // Get supported metric types - compile-time determined
254 auto types = adapter.get_metric_types();
255#if MONITORING_THREAD_SYSTEM_AVAILABLE
256 // When headers are available, returns the list of supported types
257 EXPECT_FALSE(types.empty());
258 EXPECT_EQ(types.size(), 3u);
259#else
260 EXPECT_TRUE(types.empty()); // Empty when thread_system not available
261#endif
262}

References kcenon::monitoring::thread_to_monitoring_adapter::collect_metrics(), kcenon::monitoring::thread_to_monitoring_adapter::get_metric_types(), and kcenon::monitoring::thread_to_monitoring_adapter::is_thread_system_available().

Here is the call graph for this function:

◆ TEST_F() [8/8]

TEST_F ( EventBusTest ,
Unsubscribe  )

Definition at line 195 of file test_event_bus.cpp.

195 {
196 std::atomic<int> received_count{0};
197 std::mutex mtx;
198 std::condition_variable cv;
199
200 auto token = bus->subscribe_event<health_check_event>([&](const health_check_event& /*event*/) {
201 received_count++;
202 cv.notify_one();
203 });
204
205 ASSERT_TRUE(token.is_ok());
206
207 // Publish first event
208 std::vector<health_check_event::health_check_result> results;
209 health_check_event event1("component1", results);
210 bus->publish_event(event1);
211
212 // Wait for first event with timeout
213 {
214 std::unique_lock<std::mutex> lock(mtx);
215 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] { return received_count.load() >= 1; }));
216 }
217 EXPECT_EQ(received_count.load(), 1);
218
219 // Unsubscribe
220 bus->unsubscribe_event(token.value());
221
222 // Publish second event
223 health_check_event event2("component2", results);
224 bus->publish_event(event2);
225
226 // After unsubscribe, we need a small wait to verify event is NOT received
227 // This is a legitimate case where we need a brief timeout to confirm no event arrived
228 {
229 std::unique_lock<std::mutex> lock(mtx);
230 cv.wait_for(lock, 100ms, [&] { return received_count.load() >= 2; });
231 }
232 EXPECT_EQ(received_count.load(), 1); // Should still be 1
233}
Event for health check results.