Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
event_bus.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5
6#pragma once
7
17#include <algorithm>
18#include <any>
19#include <atomic>
20#include <chrono>
21#include <condition_variable>
22#include <cstdint>
23#include <functional>
24#include <memory>
25#include <mutex>
26#include <queue>
27#include <thread>
28#include <typeindex>
29#include <unordered_map>
30#include <vector>
32#include "result_types.h"
33#include "error_codes.h"
34
35namespace kcenon { namespace monitoring {
36
42 std::type_index type;
43 std::any payload;
45 std::chrono::steady_clock::time_point timestamp;
46 uint64_t id;
47
48 event_envelope(std::type_index t, std::any p, event_priority pr)
49 : type(t), payload(std::move(p)), priority(pr),
50 timestamp(std::chrono::steady_clock::now()),
51 id(generate_id()) {}
52
53 bool operator<(const event_envelope& other) const {
54 // Higher priority events come first
55 if (priority != other.priority) {
56 return static_cast<int>(priority) < static_cast<int>(other.priority);
57 }
58 // For same priority, older events come first
59 return timestamp > other.timestamp;
60 }
61
62private:
63 static uint64_t generate_id() {
64 static std::atomic<uint64_t> counter{0};
65 return counter.fetch_add(1, std::memory_order_relaxed);
66 }
67};
68
74 std::function<void(const std::any&)> handler;
76 uint64_t id;
77
78 event_handler_wrapper(std::function<void(const std::any&)> h,
79 event_priority p, uint64_t i)
80 : handler(std::move(h)), priority(p), id(i) {}
81};
82
88 size_t max_queue_size = 10000;
90 std::chrono::milliseconds processing_interval{10};
91 bool auto_start = false;
94};
95
147public:
149
150 explicit event_bus(const config& cfg = config())
151 : config_(cfg), is_running_(false), stop_requested_(false),
154
155 if (config_.auto_start) {
156 start();
157 }
158 }
159
161 stop();
162 }
163
164 static std::shared_ptr<interface_event_bus> instance() {
165 static std::shared_ptr<event_bus> singleton = [] {
166 auto bus = std::make_shared<event_bus>();
167 (void)bus->start();
168 return bus;
169 }();
170 return singleton;
171 }
172
173 // Start the event bus
174 common::VoidResult start() override {
175 std::lock_guard<std::mutex> lock(bus_mutex_);
176
177 if (is_running_) {
178 return common::VoidResult::err(error_info(monitoring_error_code::already_started, "Event bus is already running").to_common_error());
179 }
180
181 stop_requested_ = false;
182 is_running_ = true;
183
184 // Start worker threads
185 for (size_t i = 0; i < config_.worker_thread_count; ++i) {
186 workers_.emplace_back(&event_bus::process_events_worker, this);
187 }
188
189 return common::ok();
190 }
191
192 // Stop the event bus
193 common::VoidResult stop() override {
194 {
195 std::lock_guard<std::mutex> lock(bus_mutex_);
196 if (!is_running_) {
197 return common::ok();
198 }
199
200 stop_requested_ = true;
201 is_running_ = false;
202 }
203
204 // Wake up all worker threads
205 queue_cv_.notify_all();
206
207 // Wait for all workers to finish
208 for (auto& worker : workers_) {
209 if (worker.joinable()) {
210 worker.join();
211 }
212 }
213 workers_.clear();
214
215 // Process any remaining events
217
218 return common::ok();
219 }
220
221 // Check if event bus is active
222 bool is_active() const override {
223 std::lock_guard<std::mutex> lock(bus_mutex_);
224 return is_running_;
225 }
226
227 // Get pending event count
228 size_t get_pending_event_count() const override {
229 std::lock_guard<std::mutex> lock(queue_mutex_);
230 return event_queue_.size();
231 }
232
233 // Process all pending events synchronously
234 common::VoidResult process_pending_events() override {
236 return common::ok();
237 }
238
239 // Unsubscribe from events
240 common::VoidResult unsubscribe_event(const subscription_token& token) override {
241 std::lock_guard<std::mutex> lock(handlers_mutex_);
242
243 auto it = handlers_.find(token.get_event_type());
244 if (it != handlers_.end()) {
245 auto& handler_list = it->second;
246 handler_list.erase(
247 std::remove_if(handler_list.begin(), handler_list.end(),
248 [&token](const event_handler_wrapper& wrapper) {
249 return wrapper.id == token.get_handler_id();
250 }),
251 handler_list.end()
252 );
253
254 if (handler_list.empty()) {
255 handlers_.erase(it);
256 }
257 }
258
259 return common::ok();
260 }
261
262 // Get statistics
271
272 stats get_stats() const {
273 // Use scoped_lock to acquire both mutexes atomically with consistent ordering
274 // This prevents deadlock by acquiring locks in a deterministic order
275 // Lock ordering: queue_mutex before handlers_mutex (alphabetical for consistency)
276 std::scoped_lock lock(queue_mutex_, handlers_mutex_);
277
278 size_t queue_size = event_queue_.size();
279 bool back_pressure_active = queue_size >= config_.back_pressure_threshold;
280
281 size_t total_subscribers = 0;
282 for (const auto& [type, handlers] : handlers_) {
283 total_subscribers += handlers.size();
284 }
285
286 return {
290 queue_size,
291 total_subscribers,
292 back_pressure_active
293 };
294 }
295
296protected:
297 // Publish event implementation
298 common::VoidResult publish_event_impl(std::type_index event_type,
299 std::any event) override {
300 bool should_sleep = false;
301
302 {
303 std::lock_guard<std::mutex> lock(queue_mutex_);
304
306 const auto current_size = event_queue_.size();
307 if (current_size >= config_.max_queue_size) {
308 total_events_dropped_.fetch_add(1);
309 return common::VoidResult::err(error_info(monitoring_error_code::resource_exhausted, "Event queue is full").to_common_error());
310 }
311 should_sleep = current_size >= config_.back_pressure_threshold;
312 }
313
314 event_queue_.emplace(event_type, std::move(event), event_priority::normal);
315 total_events_published_.fetch_add(1);
316 }
317
318 if (should_sleep) {
319 std::this_thread::sleep_for(std::chrono::milliseconds(1));
320 }
321
322 queue_cv_.notify_one();
323 return common::ok();
324 }
325
326 // Subscribe event implementation
327 common::Result<subscription_token> subscribe_event_impl(
328 std::type_index event_type,
329 std::function<void(const std::any&)> handler,
330 uint64_t handler_id,
331 event_priority priority) override {
332
333 std::lock_guard<std::mutex> lock(handlers_mutex_);
334
335 handlers_[event_type].emplace_back(std::move(handler), priority, handler_id);
336
337 // Sort handlers by priority
338 std::sort(handlers_[event_type].begin(), handlers_[event_type].end(),
339 [](const event_handler_wrapper& a, const event_handler_wrapper& b) {
340 return static_cast<int>(a.priority) > static_cast<int>(b.priority);
341 });
342
343 return subscription_token(event_type, handler_id);
344 }
345
346 // Clear subscriptions implementation
347 common::VoidResult clear_subscriptions_impl(std::type_index event_type) override {
348 std::lock_guard<std::mutex> lock(handlers_mutex_);
349 handlers_.erase(event_type);
350 return common::ok();
351 }
352
353 // Get subscriber count implementation
354 size_t get_subscriber_count_impl(std::type_index event_type) const override {
355 std::lock_guard<std::mutex> lock(handlers_mutex_);
356 auto it = handlers_.find(event_type);
357 return it != handlers_.end() ? it->second.size() : 0;
358 }
359
360private:
361 // Process events worker thread
363 while (!stop_requested_) {
364 std::unique_lock<std::mutex> lock(queue_mutex_);
365
366 // Wait for events or stop signal
368 [this] { return !event_queue_.empty() || stop_requested_; });
369
370 if (stop_requested_ && event_queue_.empty()) {
371 break;
372 }
373
374 // Process a batch of events
375 std::vector<event_envelope> batch;
376 size_t batch_size = std::min(size_t(10), event_queue_.size());
377
378 for (size_t i = 0; i < batch_size && !event_queue_.empty(); ++i) {
379 // Copy element then pop to avoid const_cast UB
380 batch.push_back(event_queue_.top());
381 event_queue_.pop();
382 }
383
384 lock.unlock();
385
386 // Process the batch
387 for (auto& envelope : batch) {
388 dispatch_event(envelope);
389 total_events_processed_.fetch_add(1);
390 }
391 }
392 }
393
394 // Process all pending events
396 std::vector<event_envelope> pending;
397 {
398 std::lock_guard<std::mutex> lock(queue_mutex_);
399 pending.reserve(event_queue_.size());
400 while (!event_queue_.empty()) {
401 pending.push_back(event_queue_.top());
402 event_queue_.pop();
403 }
404 }
405
406 for (auto& envelope : pending) {
407 dispatch_event(envelope);
408 total_events_processed_.fetch_add(1);
409 }
410 }
411
412 // Dispatch event to handlers
413 void dispatch_event(const event_envelope& envelope) {
414 std::lock_guard<std::mutex> lock(handlers_mutex_);
415
416 auto it = handlers_.find(envelope.type);
417 if (it != handlers_.end()) {
418 for (const auto& wrapper : it->second) {
419 try {
420 wrapper.handler(envelope.payload);
421 } catch (const std::exception& e) {
422 (void)e; // Suppress unused variable warning
423 // Log error but continue processing
424 // In production, this would be logged properly
425 }
426 }
427 }
428 }
429
431 mutable std::mutex bus_mutex_;
432 mutable std::mutex queue_mutex_;
433 mutable std::mutex handlers_mutex_;
434
435 std::priority_queue<event_envelope> event_queue_;
436 std::unordered_map<std::type_index, std::vector<event_handler_wrapper>> handlers_;
437
438 std::vector<std::thread> workers_;
439 std::condition_variable queue_cv_;
440
441 std::atomic<bool> is_running_;
442 std::atomic<bool> stop_requested_;
443
444 std::atomic<uint64_t> total_events_published_;
445 std::atomic<uint64_t> total_events_processed_;
446 std::atomic<uint64_t> total_events_dropped_;
447};
448
449} } // namespace kcenon::monitoring
Priority levels for event processing.
Thread-safe event bus implementation.
Definition event_bus.h:146
void dispatch_event(const event_envelope &envelope)
Definition event_bus.h:413
common::VoidResult publish_event_impl(std::type_index event_type, std::any event) override
Definition event_bus.h:298
common::Result< subscription_token > subscribe_event_impl(std::type_index event_type, std::function< void(const std::any &)> handler, uint64_t handler_id, event_priority priority) override
Definition event_bus.h:327
std::atomic< bool > stop_requested_
Definition event_bus.h:442
std::atomic< uint64_t > total_events_published_
Definition event_bus.h:444
event_bus(const config &cfg=config())
Definition event_bus.h:150
std::atomic< uint64_t > total_events_dropped_
Definition event_bus.h:446
common::VoidResult unsubscribe_event(const subscription_token &token) override
Unsubscribe from events using subscription token.
Definition event_bus.h:240
common::VoidResult process_pending_events() override
Process all pending events synchronously.
Definition event_bus.h:234
size_t get_subscriber_count_impl(std::type_index event_type) const override
Definition event_bus.h:354
size_t get_pending_event_count() const override
Get pending event count.
Definition event_bus.h:228
bool is_active() const override
Check if event bus is active.
Definition event_bus.h:222
std::condition_variable queue_cv_
Definition event_bus.h:439
std::atomic< uint64_t > total_events_processed_
Definition event_bus.h:445
std::atomic< bool > is_running_
Definition event_bus.h:441
std::priority_queue< event_envelope > event_queue_
Definition event_bus.h:435
std::unordered_map< std::type_index, std::vector< event_handler_wrapper > > handlers_
Definition event_bus.h:436
std::vector< std::thread > workers_
Definition event_bus.h:438
static std::shared_ptr< interface_event_bus > instance()
Definition event_bus.h:164
common::VoidResult start() override
Start the event bus.
Definition event_bus.h:174
common::VoidResult clear_subscriptions_impl(std::type_index event_type) override
Definition event_bus.h:347
common::VoidResult stop() override
Stop the event bus.
Definition event_bus.h:193
Pure virtual interface for event bus implementation.
Token for managing event subscriptions.
Monitoring system specific error codes.
Event bus interface for decoupled component communication.
@ counter
Monotonically increasing counter.
@ pending
Condition met, waiting for duration threshold.
Result pattern type definitions for monitoring system.
Extended error information with context.
Configuration for event bus.
Definition event_bus.h:87
std::chrono::milliseconds processing_interval
Definition event_bus.h:90
Container for events with metadata.
Definition event_bus.h:41
std::chrono::steady_clock::time_point timestamp
Definition event_bus.h:45
event_envelope(std::type_index t, std::any p, event_priority pr)
Definition event_bus.h:48
bool operator<(const event_envelope &other) const
Definition event_bus.h:53
Wrapper for event handlers with metadata.
Definition event_bus.h:73
event_handler_wrapper(std::function< void(const std::any &)> h, event_priority p, uint64_t i)
Definition event_bus.h:78
std::function< void(const std::any &)> handler
Definition event_bus.h:74