10#include <gtest/gtest.h>
18#include <condition_variable>
22using namespace std::chrono_literals;
32 bus = std::make_shared<event_bus>(config);
41 std::shared_ptr<event_bus>
bus;
46 std::atomic<int> received_count{0};
47 std::string received_message;
48 std::mutex message_mutex;
49 std::condition_variable cv;
55 std::lock_guard<std::mutex> lock(message_mutex);
56 received_message =
event.get_message();
62 ASSERT_TRUE(token.is_ok());
66 performance_alert_event::alert_severity::warning,
67 "test_component",
"CPU usage is high");
69 auto result = bus->publish_event(
alert);
70 ASSERT_TRUE(result.is_ok());
74 std::unique_lock<std::mutex> lock(message_mutex);
75 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] { return received_count.load() >= 1; }));
78 EXPECT_EQ(received_count.load(), 1);
80 std::lock_guard<std::mutex> lock(message_mutex);
81 EXPECT_EQ(received_message,
"CPU usage is high");
85 bus->unsubscribe_event(token.value());
90 std::atomic<int> subscriber1_count{0};
91 std::atomic<int> subscriber2_count{0};
93 std::condition_variable cv;
108 ASSERT_TRUE(token1.is_ok());
109 ASSERT_TRUE(token2.is_ok());
116 bus->publish_event(event);
120 std::unique_lock<std::mutex> lock(mtx);
121 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] {
122 return subscriber1_count.load() >= 1 && subscriber2_count.load() >= 1;
126 EXPECT_EQ(subscriber1_count.load(), 1);
127 EXPECT_EQ(subscriber2_count.load(), 1);
130 bus->unsubscribe_event(token1.value());
131 bus->unsubscribe_event(token2.value());
136 std::vector<int> processing_order;
137 std::mutex order_mutex;
138 std::condition_variable cv;
144 std::lock_guard<std::mutex> lock(order_mutex);
145 if (event.get_config_key() ==
"high_priority") {
146 processing_order.push_back(1);
148 processing_order.push_back(2);
153 event_priority::high);
155 ASSERT_TRUE(token.is_ok());
159 configuration_change_event::change_type::modified);
162 configuration_change_event::change_type::modified);
168 bus->publish_event(normal_priority);
169 bus->publish_event(high_priority);
176 std::unique_lock<std::mutex> lock(order_mutex);
177 cv.wait_for(lock, 5s, [&] {
return processing_order.size() >= 2; });
185 std::lock_guard<std::mutex> lock(order_mutex);
186 EXPECT_GE(processing_order.size(), 0U);
191 bus->unsubscribe_event(token.value());
196 std::atomic<int> received_count{0};
198 std::condition_variable cv;
205 ASSERT_TRUE(token.is_ok());
208 std::vector<health_check_event::health_check_result> results;
210 bus->publish_event(event1);
214 std::unique_lock<std::mutex> lock(mtx);
215 ASSERT_TRUE(cv.wait_for(lock, 5s, [&] { return received_count.load() >= 1; }));
217 EXPECT_EQ(received_count.load(), 1);
220 bus->unsubscribe_event(token.value());
224 bus->publish_event(event2);
229 std::unique_lock<std::mutex> lock(mtx);
230 cv.wait_for(lock, 100ms, [&] {
return received_count.load() >= 2; });
232 EXPECT_EQ(received_count.load(), 1);
240#if MONITORING_THREAD_SYSTEM_AVAILABLE
250 ASSERT_TRUE(result.is_ok());
255#if MONITORING_THREAD_SYSTEM_AVAILABLE
257 EXPECT_FALSE(types.empty());
258 EXPECT_EQ(types.size(), 3u);
260 EXPECT_TRUE(types.empty());
273 ASSERT_TRUE(result.is_ok());
274 EXPECT_TRUE(result.value().empty());
285 auto initial_stats = bus->get_stats();
286 EXPECT_EQ(initial_stats.total_published, 0);
287 EXPECT_EQ(initial_stats.total_processed, 0);
290 for (
int i = 0; i < 10; ++i) {
292 component_lifecycle_event::lifecycle_state::started,
293 component_lifecycle_event::lifecycle_state::running);
294 bus->publish_event(event);
298 auto deadline = std::chrono::steady_clock::now() + 5s;
299 while (std::chrono::steady_clock::now() < deadline) {
300 auto stats = bus->get_stats();
301 if (stats.total_published >= 10) {
304 std::this_thread::yield();
307 auto final_stats = bus->get_stats();
308 EXPECT_EQ(final_stats.total_published, 10);
309 EXPECT_GE(final_stats.total_processed, 0);
314 std::atomic<int> received_count{0};
316 std::condition_variable cv;
317 const int num_threads = 4;
318 const int events_per_thread = 25;
319 const int total_expected = num_threads * events_per_thread;
324 received_count.fetch_add(
static_cast<int>(event.get_metric_count()));
328 ASSERT_TRUE(token.is_ok());
330 std::vector<std::thread> publishers;
331 publishers.reserve(num_threads);
334 for (
int t = 0; t < num_threads; ++t) {
335 publishers.emplace_back([
this, events_per_thread]() {
336 for (
int i = 0; i < events_per_thread; ++i) {
337 std::vector<metric> metrics;
339 metric{
"test_metric", 42.0, {{
"thread",
"publisher"}}, metric_type::gauge});
342 bus->publish_event(event);
345 std::this_thread::yield();
351 for (
auto& thread : publishers) {
357 std::unique_lock<std::mutex> lock(mtx);
359 cv.wait_for(lock, 10s, [&] { return received_count.load() >= total_expected; }));
363 EXPECT_EQ(received_count.load(), total_expected);
366 bus->unsubscribe_event(token.value());
std::shared_ptr< event_bus > bus
Event for component lifecycle changes.
Event fired when configuration changes.
Event for health check results.
Logger system adapter using dependency injection (Phase 2.3.3)
common::Result< std::vector< metric > > collect_metrics()
Collect metrics from logger if available (Phase 2.3.3)
common::VoidResult register_logger(const std::string &)
Register a logger instance by name.
bool is_logger_system_available() const
Check if logger is available.
double get_current_log_rate() const
Get current log rate (if logger supports metrics)
Event containing collected metrics batch.
Event containing system resource metrics.
bool is_thread_system_available() const
common::Result< std::vector< metric > > collect_metrics()
std::vector< std::string > get_metric_types() const
Lightweight event bus implementation for monitoring system.
Common event type definitions for monitoring system.
Consolidated logger system adapters for monitoring_system.
Core alert data structure.
Configuration for event bus.
size_t worker_thread_count
Basic metric structure for interface compatibility.
TEST_F(EventBusTest, PublishSubscribe)
Consolidated thread system adapters for monitoring_system.