Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
thread_safety_tests.cpp File Reference
#include <gtest/gtest.h>
#include <kcenon/monitoring/core/event_bus.h>
#include <kcenon/monitoring/core/event_types.h>
#include <atomic>
#include <chrono>
#include <latch>
#include <thread>
#include <vector>
#include <kcenon/monitoring/core/performance_monitor.h>
Include dependency graph for thread_safety_tests.cpp:

Go to the source code of this file.

Classes

class  MonitoringThreadSafetyTest
 
class  PerformanceProfilerThreadSafetyTest
 
class  PerformanceMonitorThreadSafetyTest
 

Functions

 TEST_F (MonitoringThreadSafetyTest, ConcurrentEventPublication)
 
 TEST_F (MonitoringThreadSafetyTest, MultipleEventTypesConcurrent)
 
 TEST_F (MonitoringThreadSafetyTest, MultipleSubscribersConcurrent)
 
 TEST_F (MonitoringThreadSafetyTest, DynamicSubscriptionChanges)
 
 TEST_F (MonitoringThreadSafetyTest, EventPriorityConcurrent)
 
 TEST_F (MonitoringThreadSafetyTest, HighVolumeStressTest)
 
 TEST_F (MonitoringThreadSafetyTest, MemorySafetyTest)
 
 TEST_F (PerformanceProfilerThreadSafetyTest, ConcurrentSampleRecording)
 
 TEST_F (PerformanceProfilerThreadSafetyTest, ConcurrentReadWrite)
 
 TEST_F (PerformanceProfilerThreadSafetyTest, ConcurrentLockFreeModeToggle)
 
 TEST_F (PerformanceMonitorThreadSafetyTest, ConcurrentThresholdModification)
 
 TEST_F (PerformanceMonitorThreadSafetyTest, ConcurrentProfilingOperations)
 

Function Documentation

◆ TEST_F() [1/12]

TEST_F ( MonitoringThreadSafetyTest ,
ConcurrentEventPublication  )

Definition at line 40 of file thread_safety_tests.cpp.

40 {
41 const int num_publishers = 15;
42 const int events_per_publisher = 500;
43
44 std::atomic<int> events_received{0};
45 std::atomic<int> errors{0};
46
47 // Subscribe to performance alerts
48 auto token =
49 bus->subscribe_event<performance_alert_event>([&](const performance_alert_event& e) {
50 (void)e; // Suppress unused parameter warning on MSVC
51 ++events_received;
52 });
53
54 ASSERT_TRUE(token.is_ok());
55
56 std::vector<std::thread> threads;
57 std::latch sync_point(num_publishers);
58
59 for (int i = 0; i < num_publishers; ++i) {
60 threads.emplace_back([&, thread_id = i]() {
61 sync_point.arrive_and_wait();
62
63 for (int j = 0; j < events_per_publisher; ++j) {
64 try {
66 performance_alert_event::alert_type::high_cpu_usage,
67 performance_alert_event::alert_severity::warning,
68 "thread_" + std::to_string(thread_id), "Test message " + std::to_string(j));
69
70 auto result = bus->publish_event(alert);
71 if (result.is_err()) {
72 ++errors;
73 }
74 } catch (...) {
75 ++errors;
76 }
77
78 if (j % 50 == 0) {
79 std::this_thread::yield();
80 }
81 }
82 });
83 }
84
85 for (auto& t : threads) {
86 t.join();
87 }
88
89 // Wait for event processing with timeout polling
90 auto deadline = std::chrono::steady_clock::now() + 5s;
91 while (events_received.load() < num_publishers * events_per_publisher &&
92 std::chrono::steady_clock::now() < deadline) {
93 std::this_thread::yield();
94 }
95
96 bus->unsubscribe_event(token.value());
97
98 EXPECT_EQ(errors.load(), 0);
99 EXPECT_LE(events_received.load(), num_publishers * events_per_publisher);
100}
Event for performance-related alerts.
Core alert data structure.

◆ TEST_F() [2/12]

TEST_F ( MonitoringThreadSafetyTest ,
DynamicSubscriptionChanges  )

Definition at line 260 of file thread_safety_tests.cpp.

260 {
261 const int num_publishers = 5;
262 const int num_dynamic_subscribers = 10;
263 const int events_per_publisher = 400;
264
265 std::atomic<bool> running{true};
266 std::atomic<int> errors{0};
267 std::vector<std::thread> threads;
268
269 // Publishers
270 for (int i = 0; i < num_publishers; ++i) {
271 threads.emplace_back([&]() {
272 for (int j = 0; j < events_per_publisher && running.load(); ++j) {
273 try {
275 performance_alert_event::alert_type::threshold_exceeded,
276 performance_alert_event::alert_severity::critical, "dynamic_test",
277 "Message " + std::to_string(j));
278 bus->publish_event(alert);
279 } catch (...) {
280 ++errors;
281 }
282 std::this_thread::sleep_for(2ms);
283 }
284 });
285 }
286
287 // Dynamic subscribers
288 for (int i = 0; i < num_dynamic_subscribers; ++i) {
289 threads.emplace_back([&]() {
290 while (running.load()) {
291 try {
292 auto token = bus->subscribe_event<performance_alert_event>(
293 [](const performance_alert_event&) {
294 // Process event
295 });
296
297 if (token.is_ok()) {
298 std::this_thread::sleep_for(20ms);
299 bus->unsubscribe_event(token.value());
300 }
301
302 std::this_thread::sleep_for(10ms);
303 } catch (...) {
304 ++errors;
305 }
306 }
307 });
308 }
309
310 std::this_thread::sleep_for(500ms);
311 running.store(false);
312
313 for (auto& t : threads) {
314 t.join();
315 }
316
317 EXPECT_EQ(errors.load(), 0);
318}

◆ TEST_F() [3/12]

TEST_F ( MonitoringThreadSafetyTest ,
EventPriorityConcurrent  )

Definition at line 321 of file thread_safety_tests.cpp.

321 {
322 const int num_threads = 10;
323 const int events_per_thread = 200;
324
325 std::atomic<int> high_priority{0};
326 std::atomic<int> normal_priority{0};
327 std::atomic<int> low_priority{0};
328 std::atomic<int> errors{0};
329
330 // Subscribe with different priorities
331 auto high_token = bus->subscribe_event<performance_alert_event>(
332 [&](const performance_alert_event&) { ++high_priority; }, event_priority::high);
333
334 auto normal_token = bus->subscribe_event<performance_alert_event>(
335 [&](const performance_alert_event&) { ++normal_priority; }, event_priority::normal);
336
337 auto low_token = bus->subscribe_event<performance_alert_event>(
338 [&](const performance_alert_event&) { ++low_priority; }, event_priority::low);
339
340 ASSERT_TRUE(high_token.is_ok());
341 ASSERT_TRUE(normal_token.is_ok());
342 ASSERT_TRUE(low_token.is_ok());
343
344 std::vector<std::thread> threads;
345
346 for (int i = 0; i < num_threads; ++i) {
347 threads.emplace_back([&, thread_id = i]() {
348 for (int j = 0; j < events_per_thread; ++j) {
349 try {
351 performance_alert_event::alert_type::high_error_rate,
352 performance_alert_event::alert_severity::warning,
353 "thread_" + std::to_string(thread_id),
354 "Priority test " + std::to_string(j));
355 bus->publish_event(alert);
356 } catch (...) {
357 ++errors;
358 }
359 }
360 });
361 }
362
363 for (auto& t : threads) {
364 t.join();
365 }
366
367 std::this_thread::sleep_for(200ms);
368
369 bus->unsubscribe_event(high_token.value());
370 bus->unsubscribe_event(normal_token.value());
371 bus->unsubscribe_event(low_token.value());
372
373 EXPECT_EQ(errors.load(), 0);
374 // All priorities should receive events
375 EXPECT_GT(high_priority.load(), 0);
376 EXPECT_GT(normal_priority.load(), 0);
377 EXPECT_GT(low_priority.load(), 0);
378}

◆ TEST_F() [4/12]

TEST_F ( MonitoringThreadSafetyTest ,
HighVolumeStressTest  )

Definition at line 381 of file thread_safety_tests.cpp.

381 {
382 const int num_threads = 20;
383 const int events_per_thread = 1000;
384
385 std::atomic<int> total_received{0};
386 std::atomic<int> errors{0};
387
388 auto token = bus->subscribe_event<logging_metric_event>(
389 [&](const logging_metric_event&) { ++total_received; });
390
391 ASSERT_TRUE(token.is_ok());
392
393 std::vector<std::thread> threads;
394 std::latch sync_point(num_threads);
395
396 auto start_time = std::chrono::high_resolution_clock::now();
397
398 for (int i = 0; i < num_threads; ++i) {
399 threads.emplace_back([&, thread_id = i]() {
400 sync_point.arrive_and_wait();
401
402 for (int j = 0; j < events_per_thread; ++j) {
403 try {
405 stats.total_logs = j;
406 stats.error_count = j % 10;
407 logging_metric_event event("logger_" + std::to_string(thread_id), stats);
408
409 bus->publish_event(event);
410 } catch (...) {
411 ++errors;
412 }
413 }
414 });
415 }
416
417 for (auto& t : threads) {
418 t.join();
419 }
420
421 auto end_time = std::chrono::high_resolution_clock::now();
422 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
423
424 std::this_thread::sleep_for(300ms);
425
426 bus->unsubscribe_event(token.value());
427
428 EXPECT_EQ(errors.load(), 0);
429
430 double throughput = (num_threads * events_per_thread * 1000.0) / duration.count();
431 std::cout << "Event throughput: " << throughput << " events/sec" << std::endl;
432}
Event containing logging system metrics.
Definition event_types.h:79

References kcenon::monitoring::logging_metric_event::logging_stats::error_count, and kcenon::monitoring::logging_metric_event::logging_stats::total_logs.

◆ TEST_F() [5/12]

TEST_F ( MonitoringThreadSafetyTest ,
MemorySafetyTest  )

Definition at line 435 of file thread_safety_tests.cpp.

435 {
436 const int num_iterations = 30;
437 const int threads_per_iteration = 10;
438 const int operations_per_thread = 100;
439
440 std::atomic<int> total_errors{0};
441
442 for (int iteration = 0; iteration < num_iterations; ++iteration) {
443 event_bus::config config;
444 config.max_queue_size = 1000;
445 config.worker_thread_count = 2;
446 config.auto_start = true;
447
448 auto test_bus = std::make_shared<event_bus>(config);
449
450 std::vector<std::thread> threads;
451 std::vector<subscription_token> tokens;
452
453 // Subscribe
454 for (int i = 0; i < 5; ++i) {
455 auto token = test_bus->subscribe_event<system_resource_event>(
456 [](const system_resource_event&) {});
457
458 if (token.is_ok()) {
459 tokens.push_back(token.value());
460 }
461 }
462
463 // Worker threads
464 for (int i = 0; i < threads_per_iteration; ++i) {
465 threads.emplace_back([&]() {
466 for (int j = 0; j < operations_per_thread; ++j) {
467 try {
469 stats.cpu_usage_percent = static_cast<double>(j);
470 system_resource_event event(stats);
471
472 test_bus->publish_event(event);
473 } catch (...) {
474 ++total_errors;
475 }
476 }
477 });
478 }
479
480 for (auto& t : threads) {
481 t.join();
482 }
483
484 // Unsubscribe
485 for (auto& token : tokens) {
486 test_bus->unsubscribe_event(token);
487 }
488
489 test_bus->stop();
490 // Bus destructor called here
491 }
492
493 EXPECT_EQ(total_errors.load(), 0);
494}
Event containing system resource metrics.
Configuration for event bus.
Definition event_bus.h:87

References kcenon::monitoring::event_bus_config::auto_start, kcenon::monitoring::system_resource_event::resource_stats::cpu_usage_percent, kcenon::monitoring::event_bus_config::max_queue_size, and kcenon::monitoring::event_bus_config::worker_thread_count.

◆ TEST_F() [6/12]

TEST_F ( MonitoringThreadSafetyTest ,
MultipleEventTypesConcurrent  )

Definition at line 103 of file thread_safety_tests.cpp.

103 {
104 const int num_threads = 12;
105 const int events_per_thread = 300;
106
107 std::atomic<int> perf_alerts{0};
108 std::atomic<int> resource_events{0};
109 std::atomic<int> thread_pool_events{0};
110 std::atomic<int> errors{0};
111
112 // Subscribe to different event types
113 auto perf_token = bus->subscribe_event<performance_alert_event>(
114 [&](const performance_alert_event&) { ++perf_alerts; });
115
116 auto resource_token = bus->subscribe_event<system_resource_event>(
117 [&](const system_resource_event&) { ++resource_events; });
118
119 auto pool_token = bus->subscribe_event<thread_pool_metric_event>(
120 [&](const thread_pool_metric_event&) { ++thread_pool_events; });
121
122 ASSERT_TRUE(perf_token.is_ok());
123 ASSERT_TRUE(resource_token.is_ok());
124 ASSERT_TRUE(pool_token.is_ok());
125
126 std::vector<std::thread> threads;
127
128 for (int i = 0; i < num_threads; ++i) {
129 threads.emplace_back([&, thread_id = i]() {
130 for (int j = 0; j < events_per_thread; ++j) {
131 try {
132 // Publish different event types
133 switch (j % 3) {
134 case 0: {
136 performance_alert_event::alert_type::high_memory_usage,
137 performance_alert_event::alert_severity::info,
138 "component_" + std::to_string(thread_id), "Test");
139 bus->publish_event(alert);
140 break;
141 }
142 case 1: {
144 stats.cpu_usage_percent = 50.0;
145 stats.memory_used_bytes = 1024 * 1024;
146 system_resource_event resource(stats);
147 bus->publish_event(resource);
148 break;
149 }
150 case 2: {
152 stats.active_threads = 4;
153 stats.queued_tasks = 10;
154 thread_pool_metric_event pool("pool_" + std::to_string(thread_id),
155 stats);
156 bus->publish_event(pool);
157 break;
158 }
159 }
160 } catch (...) {
161 ++errors;
162 }
163 }
164 });
165 }
166
167 for (auto& t : threads) {
168 t.join();
169 }
170
171 // Wait for event processing with timeout polling
172 auto deadline = std::chrono::steady_clock::now() + 5s;
173 while ((perf_alerts.load() + resource_events.load() + thread_pool_events.load() < 1) &&
174 std::chrono::steady_clock::now() < deadline) {
175 std::this_thread::yield();
176 }
177
178 bus->unsubscribe_event(perf_token.value());
179 bus->unsubscribe_event(resource_token.value());
180 bus->unsubscribe_event(pool_token.value());
181
182 EXPECT_EQ(errors.load(), 0);
183 EXPECT_GT(perf_alerts.load() + resource_events.load() + thread_pool_events.load(), 0);
184}
Event containing thread pool metrics.
Definition event_types.h:49

References kcenon::monitoring::thread_pool_metric_event::thread_pool_stats::active_threads, kcenon::monitoring::system_resource_event::resource_stats::cpu_usage_percent, kcenon::monitoring::system_resource_event::resource_stats::memory_used_bytes, and kcenon::monitoring::thread_pool_metric_event::thread_pool_stats::queued_tasks.

◆ TEST_F() [7/12]

TEST_F ( MonitoringThreadSafetyTest ,
MultipleSubscribersConcurrent  )

Definition at line 187 of file thread_safety_tests.cpp.

187 {
188 const int num_subscribers = 20;
189 const int num_publishers = 5;
190 const int events_per_publisher = 300;
191
192 std::vector<std::atomic<int>> subscriber_counts(num_subscribers);
193 std::vector<subscription_token> tokens;
194 std::atomic<int> errors{0};
195
196 // Register subscribers
197 for (int i = 0; i < num_subscribers; ++i) {
198 auto token = bus->subscribe_event<system_resource_event>(
199 [&, sub_id = i](const system_resource_event&) { ++subscriber_counts[sub_id]; });
200
201 ASSERT_TRUE(token.is_ok());
202 tokens.push_back(token.value());
203 }
204
205 std::vector<std::thread> threads;
206
207 // Publishers
208 for (int i = 0; i < num_publishers; ++i) {
209 threads.emplace_back([&, pub_id = i]() {
210 for (int j = 0; j < events_per_publisher; ++j) {
211 try {
213 stats.cpu_usage_percent = static_cast<double>(pub_id * 10 + j);
214 system_resource_event event(stats);
215
216 auto result = bus->publish_event(event);
217 if (result.is_err()) {
218 ++errors;
219 }
220 } catch (...) {
221 ++errors;
222 }
223
224 if (j % 30 == 0) {
225 std::this_thread::yield();
226 }
227 }
228 });
229 }
230
231 for (auto& t : threads) {
232 t.join();
233 }
234
235 // Brief wait for event processing
236 auto deadline = std::chrono::steady_clock::now() + 5s;
237 while (std::chrono::steady_clock::now() < deadline) {
238 bool all_received = true;
239 for (int i = 0; i < num_subscribers; ++i) {
240 if (subscriber_counts[i].load() == 0) {
241 all_received = false;
242 break;
243 }
244 }
245 if (all_received) {
246 break;
247 }
248 std::this_thread::yield();
249 }
250
251 // Unsubscribe all
252 for (auto& token : tokens) {
253 bus->unsubscribe_event(token);
254 }
255
256 EXPECT_EQ(errors.load(), 0);
257}

References kcenon::monitoring::system_resource_event::resource_stats::cpu_usage_percent.

◆ TEST_F() [8/12]

TEST_F ( PerformanceMonitorThreadSafetyTest ,
ConcurrentProfilingOperations  )

Definition at line 689 of file thread_safety_tests.cpp.

689 {
690 const int num_threads = 12;
691 const int operations_per_thread = 300;
692
693 std::atomic<int> errors{0};
694 std::vector<std::thread> threads;
695 std::latch sync_point(num_threads);
696
697 for (int i = 0; i < num_threads; ++i) {
698 threads.emplace_back([&, thread_id = i]() {
699 sync_point.arrive_and_wait();
700
701 for (int j = 0; j < operations_per_thread; ++j) {
702 try {
703 // Use scoped timer
704 {
705 auto timer = monitor.time_operation("op_" + std::to_string(thread_id % 3));
706 // Simulate work
707 std::this_thread::sleep_for(std::chrono::microseconds(10));
708 } // Timer records on destruction
709
710 // Collect metrics periodically
711 if (j % 50 == 0) {
712 auto result = monitor.collect();
713 (void)result;
714 }
715 } catch (...) {
716 ++errors;
717 }
718 }
719 });
720 }
721
722 for (auto& t : threads) {
723 t.join();
724 }
725
726 EXPECT_EQ(errors.load(), 0);
727
728 // Verify all operations were recorded
729 auto& profiler = monitor.get_profiler();
730 auto all_metrics = profiler.get_all_metrics();
731 EXPECT_EQ(all_metrics.size(), 3); // 3 unique operations
732}
@ timer
StatsD-specific timer metric.

References kcenon::monitoring::timer.

◆ TEST_F() [9/12]

TEST_F ( PerformanceMonitorThreadSafetyTest ,
ConcurrentThresholdModification  )

Definition at line 648 of file thread_safety_tests.cpp.

648 {
649 const int num_threads = 8;
650 const int iterations = 500;
651
652 std::atomic<int> errors{0};
653 std::vector<std::thread> threads;
654 std::latch sync_point(num_threads);
655
656 for (int i = 0; i < num_threads; ++i) {
657 threads.emplace_back([&, thread_id = i]() {
658 sync_point.arrive_and_wait();
659
660 for (int j = 0; j < iterations; ++j) {
661 try {
662 // Set thresholds
663 monitor.set_cpu_threshold(50.0 + (thread_id * 5));
664 monitor.set_memory_threshold(60.0 + (j % 20));
665 monitor.set_latency_threshold(std::chrono::milliseconds(100 + j));
666
667 // Read thresholds
668 auto thresholds = monitor.get_thresholds();
669 (void)thresholds;
670
671 // Check thresholds
672 auto result = monitor.check_thresholds();
673 (void)result;
674 } catch (...) {
675 ++errors;
676 }
677 }
678 });
679 }
680
681 for (auto& t : threads) {
682 t.join();
683 }
684
685 EXPECT_EQ(errors.load(), 0);
686}

◆ TEST_F() [10/12]

TEST_F ( PerformanceProfilerThreadSafetyTest ,
ConcurrentLockFreeModeToggle  )

Definition at line 604 of file thread_safety_tests.cpp.

604 {
605 const int num_threads = 8;
606 const int iterations = 1000;
607
608 std::atomic<int> errors{0};
609 std::vector<std::thread> threads;
610 std::latch sync_point(num_threads);
611
612 for (int i = 0; i < num_threads; ++i) {
613 threads.emplace_back([&, thread_id = i]() {
614 sync_point.arrive_and_wait();
615
616 for (int j = 0; j < iterations; ++j) {
617 try {
618 // Toggle mode
619 profiler.set_lock_free_mode(j % 2 == 0);
620
621 // Read mode
622 bool mode = profiler.is_lock_free_mode();
623 (void)mode;
624
625 // Record sample
626 profiler.record_sample("toggle_test",
627 std::chrono::nanoseconds(j + thread_id * 1000), true);
628 } catch (...) {
629 ++errors;
630 }
631 }
632 });
633 }
634
635 for (auto& t : threads) {
636 t.join();
637 }
638
639 EXPECT_EQ(errors.load(), 0);
640}

◆ TEST_F() [11/12]

TEST_F ( PerformanceProfilerThreadSafetyTest ,
ConcurrentReadWrite  )

Definition at line 549 of file thread_safety_tests.cpp.

549 {
550 const int num_writers = 8;
551 const int num_readers = 4;
552 const int operations_per_thread = 500;
553
554 std::atomic<bool> running{true};
555 std::atomic<int> errors{0};
556 std::vector<std::thread> threads;
557
558 // Writers
559 for (int i = 0; i < num_writers; ++i) {
560 threads.emplace_back([&, thread_id = i]() {
561 for (int j = 0; j < operations_per_thread && running.load(); ++j) {
562 try {
563 profiler.record_sample("shared_op",
564 std::chrono::nanoseconds(j * 100 + thread_id), true);
565 } catch (...) {
566 ++errors;
567 }
568 }
569 });
570 }
571
572 // Readers
573 for (int i = 0; i < num_readers; ++i) {
574 threads.emplace_back([&]() {
575 while (running.load()) {
576 try {
577 auto result = profiler.get_metrics("shared_op");
578 // Result may be err if not yet recorded
579 (void)result;
580 std::this_thread::sleep_for(1ms);
581 } catch (...) {
582 ++errors;
583 }
584 }
585 });
586 }
587
588 // Let writers complete
589 for (int i = 0; i < num_writers; ++i) {
590 threads[i].join();
591 }
592
593 running.store(false);
594
595 // Join readers
596 for (int i = num_writers; i < static_cast<int>(threads.size()); ++i) {
597 threads[i].join();
598 }
599
600 EXPECT_EQ(errors.load(), 0);
601}

◆ TEST_F() [12/12]

TEST_F ( PerformanceProfilerThreadSafetyTest ,
ConcurrentSampleRecording  )

Definition at line 508 of file thread_safety_tests.cpp.

508 {
509 const int num_threads = 16;
510 const int samples_per_thread = 1000;
511
512 std::atomic<int> errors{0};
513 std::vector<std::thread> threads;
514 std::latch sync_point(num_threads);
515
516 for (int i = 0; i < num_threads; ++i) {
517 threads.emplace_back([&, thread_id = i]() {
518 sync_point.arrive_and_wait();
519
520 for (int j = 0; j < samples_per_thread; ++j) {
521 try {
522 auto duration = std::chrono::nanoseconds(j * 1000 + thread_id);
523 auto result = profiler.record_sample(
524 "operation_" + std::to_string(thread_id % 4), duration,
525 j % 10 != 0 // 10% failure rate
526 );
527 if (result.is_err()) {
528 ++errors;
529 }
530 } catch (...) {
531 ++errors;
532 }
533 }
534 });
535 }
536
537 for (auto& t : threads) {
538 t.join();
539 }
540
541 EXPECT_EQ(errors.load(), 0);
542
543 // Verify data integrity
544 auto all_metrics = profiler.get_all_metrics();
545 EXPECT_EQ(all_metrics.size(), 4); // 4 unique operations
546}