41 const int num_publishers = 15;
42 const int events_per_publisher = 500;
44 std::atomic<int> events_received{0};
45 std::atomic<int> errors{0};
54 ASSERT_TRUE(token.is_ok());
56 std::vector<std::thread> threads;
57 std::latch sync_point(num_publishers);
59 for (
int i = 0; i < num_publishers; ++i) {
60 threads.emplace_back([&, thread_id = i]() {
61 sync_point.arrive_and_wait();
63 for (
int j = 0; j < events_per_publisher; ++j) {
66 performance_alert_event::alert_type::high_cpu_usage,
67 performance_alert_event::alert_severity::warning,
68 "thread_" + std::to_string(thread_id),
"Test message " + std::to_string(j));
70 auto result = bus->publish_event(
alert);
71 if (result.is_err()) {
79 std::this_thread::yield();
85 for (
auto& t : threads) {
90 auto deadline = std::chrono::steady_clock::now() + 5s;
91 while (events_received.load() < num_publishers * events_per_publisher &&
92 std::chrono::steady_clock::now() < deadline) {
93 std::this_thread::yield();
96 bus->unsubscribe_event(token.value());
98 EXPECT_EQ(errors.load(), 0);
99 EXPECT_LE(events_received.load(), num_publishers * events_per_publisher);
104 const int num_threads = 12;
105 const int events_per_thread = 300;
107 std::atomic<int> perf_alerts{0};
108 std::atomic<int> resource_events{0};
109 std::atomic<int> thread_pool_events{0};
110 std::atomic<int> errors{0};
122 ASSERT_TRUE(perf_token.is_ok());
123 ASSERT_TRUE(resource_token.is_ok());
124 ASSERT_TRUE(pool_token.is_ok());
126 std::vector<std::thread> threads;
128 for (
int i = 0; i < num_threads; ++i) {
129 threads.emplace_back([&, thread_id = i]() {
130 for (
int j = 0; j < events_per_thread; ++j) {
136 performance_alert_event::alert_type::high_memory_usage,
137 performance_alert_event::alert_severity::info,
138 "component_" + std::to_string(thread_id),
"Test");
139 bus->publish_event(
alert);
147 bus->publish_event(resource);
156 bus->publish_event(pool);
167 for (
auto& t : threads) {
172 auto deadline = std::chrono::steady_clock::now() + 5s;
173 while ((perf_alerts.load() + resource_events.load() + thread_pool_events.load() < 1) &&
174 std::chrono::steady_clock::now() < deadline) {
175 std::this_thread::yield();
178 bus->unsubscribe_event(perf_token.value());
179 bus->unsubscribe_event(resource_token.value());
180 bus->unsubscribe_event(pool_token.value());
182 EXPECT_EQ(errors.load(), 0);
183 EXPECT_GT(perf_alerts.load() + resource_events.load() + thread_pool_events.load(), 0);
188 const int num_subscribers = 20;
189 const int num_publishers = 5;
190 const int events_per_publisher = 300;
192 std::vector<std::atomic<int>> subscriber_counts(num_subscribers);
193 std::vector<subscription_token> tokens;
194 std::atomic<int> errors{0};
197 for (
int i = 0; i < num_subscribers; ++i) {
201 ASSERT_TRUE(token.is_ok());
202 tokens.push_back(token.value());
205 std::vector<std::thread> threads;
208 for (
int i = 0; i < num_publishers; ++i) {
209 threads.emplace_back([&, pub_id = i]() {
210 for (
int j = 0; j < events_per_publisher; ++j) {
216 auto result = bus->publish_event(event);
217 if (result.is_err()) {
225 std::this_thread::yield();
231 for (
auto& t : threads) {
236 auto deadline = std::chrono::steady_clock::now() + 5s;
237 while (std::chrono::steady_clock::now() < deadline) {
238 bool all_received =
true;
239 for (
int i = 0; i < num_subscribers; ++i) {
240 if (subscriber_counts[i].load() == 0) {
241 all_received =
false;
248 std::this_thread::yield();
252 for (
auto& token : tokens) {
253 bus->unsubscribe_event(token);
256 EXPECT_EQ(errors.load(), 0);
261 const int num_publishers = 5;
262 const int num_dynamic_subscribers = 10;
263 const int events_per_publisher = 400;
265 std::atomic<bool> running{
true};
266 std::atomic<int> errors{0};
267 std::vector<std::thread> threads;
270 for (
int i = 0; i < num_publishers; ++i) {
271 threads.emplace_back([&]() {
272 for (
int j = 0; j < events_per_publisher && running.load(); ++j) {
275 performance_alert_event::alert_type::threshold_exceeded,
276 performance_alert_event::alert_severity::critical,
"dynamic_test",
277 "Message " + std::to_string(j));
278 bus->publish_event(
alert);
282 std::this_thread::sleep_for(2ms);
288 for (
int i = 0; i < num_dynamic_subscribers; ++i) {
289 threads.emplace_back([&]() {
290 while (running.load()) {
298 std::this_thread::sleep_for(20ms);
299 bus->unsubscribe_event(token.value());
302 std::this_thread::sleep_for(10ms);
310 std::this_thread::sleep_for(500ms);
311 running.store(
false);
313 for (
auto& t : threads) {
317 EXPECT_EQ(errors.load(), 0);
322 const int num_threads = 10;
323 const int events_per_thread = 200;
325 std::atomic<int> high_priority{0};
326 std::atomic<int> normal_priority{0};
327 std::atomic<int> low_priority{0};
328 std::atomic<int> errors{0};
340 ASSERT_TRUE(high_token.is_ok());
341 ASSERT_TRUE(normal_token.is_ok());
342 ASSERT_TRUE(low_token.is_ok());
344 std::vector<std::thread> threads;
346 for (
int i = 0; i < num_threads; ++i) {
347 threads.emplace_back([&, thread_id = i]() {
348 for (
int j = 0; j < events_per_thread; ++j) {
351 performance_alert_event::alert_type::high_error_rate,
352 performance_alert_event::alert_severity::warning,
353 "thread_" + std::to_string(thread_id),
354 "Priority test " + std::to_string(j));
355 bus->publish_event(
alert);
363 for (
auto& t : threads) {
367 std::this_thread::sleep_for(200ms);
369 bus->unsubscribe_event(high_token.value());
370 bus->unsubscribe_event(normal_token.value());
371 bus->unsubscribe_event(low_token.value());
373 EXPECT_EQ(errors.load(), 0);
375 EXPECT_GT(high_priority.load(), 0);
376 EXPECT_GT(normal_priority.load(), 0);
377 EXPECT_GT(low_priority.load(), 0);
382 const int num_threads = 20;
383 const int events_per_thread = 1000;
385 std::atomic<int> total_received{0};
386 std::atomic<int> errors{0};
391 ASSERT_TRUE(token.is_ok());
393 std::vector<std::thread> threads;
394 std::latch sync_point(num_threads);
396 auto start_time = std::chrono::high_resolution_clock::now();
398 for (
int i = 0; i < num_threads; ++i) {
399 threads.emplace_back([&, thread_id = i]() {
400 sync_point.arrive_and_wait();
402 for (
int j = 0; j < events_per_thread; ++j) {
409 bus->publish_event(event);
417 for (
auto& t : threads) {
421 auto end_time = std::chrono::high_resolution_clock::now();
422 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
424 std::this_thread::sleep_for(300ms);
426 bus->unsubscribe_event(token.value());
428 EXPECT_EQ(errors.load(), 0);
430 double throughput = (num_threads * events_per_thread * 1000.0) / duration.count();
431 std::cout <<
"Event throughput: " << throughput <<
" events/sec" << std::endl;
436 const int num_iterations = 30;
437 const int threads_per_iteration = 10;
438 const int operations_per_thread = 100;
440 std::atomic<int> total_errors{0};
442 for (
int iteration = 0; iteration < num_iterations; ++iteration) {
448 auto test_bus = std::make_shared<event_bus>(config);
450 std::vector<std::thread> threads;
451 std::vector<subscription_token> tokens;
454 for (
int i = 0; i < 5; ++i) {
459 tokens.push_back(token.value());
464 for (
int i = 0; i < threads_per_iteration; ++i) {
465 threads.emplace_back([&]() {
466 for (
int j = 0; j < operations_per_thread; ++j) {
472 test_bus->publish_event(event);
480 for (
auto& t : threads) {
485 for (
auto& token : tokens) {
486 test_bus->unsubscribe_event(token);
493 EXPECT_EQ(total_errors.load(), 0);
509 const int num_threads = 16;
510 const int samples_per_thread = 1000;
512 std::atomic<int> errors{0};
513 std::vector<std::thread> threads;
514 std::latch sync_point(num_threads);
516 for (
int i = 0; i < num_threads; ++i) {
517 threads.emplace_back([&, thread_id = i]() {
518 sync_point.arrive_and_wait();
520 for (
int j = 0; j < samples_per_thread; ++j) {
522 auto duration = std::chrono::nanoseconds(j * 1000 + thread_id);
523 auto result = profiler.record_sample(
524 "operation_" + std::to_string(thread_id % 4), duration,
527 if (result.is_err()) {
537 for (
auto& t : threads) {
541 EXPECT_EQ(errors.load(), 0);
544 auto all_metrics = profiler.get_all_metrics();
545 EXPECT_EQ(all_metrics.size(), 4);
550 const int num_writers = 8;
551 const int num_readers = 4;
552 const int operations_per_thread = 500;
554 std::atomic<bool> running{
true};
555 std::atomic<int> errors{0};
556 std::vector<std::thread> threads;
559 for (
int i = 0; i < num_writers; ++i) {
560 threads.emplace_back([&, thread_id = i]() {
561 for (
int j = 0; j < operations_per_thread && running.load(); ++j) {
563 profiler.record_sample(
"shared_op",
564 std::chrono::nanoseconds(j * 100 + thread_id),
true);
573 for (
int i = 0; i < num_readers; ++i) {
574 threads.emplace_back([&]() {
575 while (running.load()) {
577 auto result = profiler.get_metrics(
"shared_op");
580 std::this_thread::sleep_for(1ms);
589 for (
int i = 0; i < num_writers; ++i) {
593 running.store(
false);
596 for (
int i = num_writers; i < static_cast<int>(threads.size()); ++i) {
600 EXPECT_EQ(errors.load(), 0);
605 const int num_threads = 8;
606 const int iterations = 1000;
608 std::atomic<int> errors{0};
609 std::vector<std::thread> threads;
610 std::latch sync_point(num_threads);
612 for (
int i = 0; i < num_threads; ++i) {
613 threads.emplace_back([&, thread_id = i]() {
614 sync_point.arrive_and_wait();
616 for (
int j = 0; j < iterations; ++j) {
619 profiler.set_lock_free_mode(j % 2 == 0);
622 bool mode = profiler.is_lock_free_mode();
626 profiler.record_sample(
"toggle_test",
627 std::chrono::nanoseconds(j + thread_id * 1000),
true);
635 for (
auto& t : threads) {
639 EXPECT_EQ(errors.load(), 0);
649 const int num_threads = 8;
650 const int iterations = 500;
652 std::atomic<int> errors{0};
653 std::vector<std::thread> threads;
654 std::latch sync_point(num_threads);
656 for (
int i = 0; i < num_threads; ++i) {
657 threads.emplace_back([&, thread_id = i]() {
658 sync_point.arrive_and_wait();
660 for (
int j = 0; j < iterations; ++j) {
663 monitor.set_cpu_threshold(50.0 + (thread_id * 5));
664 monitor.set_memory_threshold(60.0 + (j % 20));
665 monitor.set_latency_threshold(std::chrono::milliseconds(100 + j));
668 auto thresholds = monitor.get_thresholds();
672 auto result = monitor.check_thresholds();
681 for (
auto& t : threads) {
685 EXPECT_EQ(errors.load(), 0);
690 const int num_threads = 12;
691 const int operations_per_thread = 300;
693 std::atomic<int> errors{0};
694 std::vector<std::thread> threads;
695 std::latch sync_point(num_threads);
697 for (
int i = 0; i < num_threads; ++i) {
698 threads.emplace_back([&, thread_id = i]() {
699 sync_point.arrive_and_wait();
701 for (
int j = 0; j < operations_per_thread; ++j) {
705 auto timer = monitor.time_operation(
"op_" + std::to_string(thread_id % 3));
707 std::this_thread::sleep_for(std::chrono::microseconds(10));
712 auto result = monitor.collect();
722 for (
auto& t : threads) {
726 EXPECT_EQ(errors.load(), 0);
729 auto& profiler = monitor.get_profiler();
730 auto all_metrics = profiler.get_all_metrics();
731 EXPECT_EQ(all_metrics.size(), 3);