Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::event_bus Class Reference

Thread-safe event bus implementation. More...

#include <event_bus.h>

Inheritance diagram for kcenon::monitoring::event_bus:
Inheritance graph
Collaboration diagram for kcenon::monitoring::event_bus:
Collaboration graph

Classes

struct  stats
 

Public Types

using config = event_bus_config
 

Public Member Functions

 event_bus (const config &cfg=config())
 
 ~event_bus ()
 
common::VoidResult start () override
 Start the event bus.
 
common::VoidResult stop () override
 Stop the event bus.
 
bool is_active () const override
 Check if event bus is active.
 
size_t get_pending_event_count () const override
 Get pending event count.
 
common::VoidResult process_pending_events () override
 Process all pending events synchronously.
 
common::VoidResult unsubscribe_event (const subscription_token &token) override
 Unsubscribe from events using subscription token.
 
stats get_stats () const
 
- Public Member Functions inherited from kcenon::monitoring::interface_event_bus
virtual ~interface_event_bus ()=default
 
template<concepts::EventType E>
common::VoidResult publish_event (const E &event)
 Publish an event to all subscribers.
 
template<concepts::EventType E>
common::Result< subscription_tokensubscribe_event (std::function< void(const E &)> handler, event_priority priority=event_priority::normal)
 Subscribe to events of a specific type.
 
template<concepts::EventType E, concepts::EventHandler< E > H>
common::Result< subscription_tokensubscribe_event (H &&handler, event_priority priority=event_priority::normal)
 Subscribe to events with a callable handler.
 
template<concepts::EventType E>
common::VoidResult clear_subscriptions ()
 Clear all subscriptions for a specific event type.
 
template<concepts::EventType E>
size_t get_subscriber_count () const
 Get the number of subscribers for an event type.
 

Static Public Member Functions

static std::shared_ptr< interface_event_businstance ()
 

Protected Member Functions

common::VoidResult publish_event_impl (std::type_index event_type, std::any event) override
 
common::Result< subscription_tokensubscribe_event_impl (std::type_index event_type, std::function< void(const std::any &)> handler, uint64_t handler_id, event_priority priority) override
 
common::VoidResult clear_subscriptions_impl (std::type_index event_type) override
 
size_t get_subscriber_count_impl (std::type_index event_type) const override
 
- Protected Member Functions inherited from kcenon::monitoring::interface_event_bus

Private Member Functions

void process_events_worker ()
 
void process_all_pending ()
 
void dispatch_event (const event_envelope &envelope)
 

Private Attributes

config config_
 
std::mutex bus_mutex_
 
std::mutex queue_mutex_
 
std::mutex handlers_mutex_
 
std::priority_queue< event_envelopeevent_queue_
 
std::unordered_map< std::type_index, std::vector< event_handler_wrapper > > handlers_
 
std::vector< std::thread > workers_
 
std::condition_variable queue_cv_
 
std::atomic< bool > is_running_
 
std::atomic< bool > stop_requested_
 
std::atomic< uint64_t > total_events_published_
 
std::atomic< uint64_t > total_events_processed_
 
std::atomic< uint64_t > total_events_dropped_
 

Detailed Description

Thread-safe event bus implementation.

Provides high-performance event distribution with:

  • Priority-based event processing
  • Back-pressure management
  • Async and sync processing modes
  • Type-safe publish/subscribe
Thread Safety:
All public methods are thread-safe. The event bus uses multiple mutexes for fine-grained locking:
  • bus_mutex_ for start/stop operations
  • queue_mutex_ for event queue access
  • handlers_mutex_ for subscriber management

Definition at line 146 of file event_bus.h.

Member Typedef Documentation

◆ config

Constructor & Destructor Documentation

◆ event_bus()

kcenon::monitoring::event_bus::event_bus ( const config & cfg = config())
inlineexplicit
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 150 of file event_bus.h.

151 : config_(cfg), is_running_(false), stop_requested_(false),
154
155 if (config_.auto_start) {
156 start();
157 }
158 }
std::atomic< bool > stop_requested_
Definition event_bus.h:442
std::atomic< uint64_t > total_events_published_
Definition event_bus.h:444
std::atomic< uint64_t > total_events_dropped_
Definition event_bus.h:446
std::atomic< uint64_t > total_events_processed_
Definition event_bus.h:445
std::atomic< bool > is_running_
Definition event_bus.h:441
common::VoidResult start() override
Start the event bus.
Definition event_bus.h:174

References kcenon::monitoring::event_bus_config::auto_start, and config_.

◆ ~event_bus()

kcenon::monitoring::event_bus::~event_bus ( )
inline
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 160 of file event_bus.h.

160 {
161 stop();
162 }
common::VoidResult stop() override
Stop the event bus.
Definition event_bus.h:193

References stop().

Here is the call graph for this function:

Member Function Documentation

◆ clear_subscriptions_impl()

common::VoidResult kcenon::monitoring::event_bus::clear_subscriptions_impl ( std::type_index event_type)
inlineoverrideprotectedvirtual

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 347 of file event_bus.h.

347 {
348 std::lock_guard<std::mutex> lock(handlers_mutex_);
349 handlers_.erase(event_type);
350 return common::ok();
351 }
std::unordered_map< std::type_index, std::vector< event_handler_wrapper > > handlers_
Definition event_bus.h:436

References handlers_, and handlers_mutex_.

◆ dispatch_event()

void kcenon::monitoring::event_bus::dispatch_event ( const event_envelope & envelope)
inlineprivate
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 413 of file event_bus.h.

413 {
414 std::lock_guard<std::mutex> lock(handlers_mutex_);
415
416 auto it = handlers_.find(envelope.type);
417 if (it != handlers_.end()) {
418 for (const auto& wrapper : it->second) {
419 try {
420 wrapper.handler(envelope.payload);
421 } catch (const std::exception& e) {
422 (void)e; // Suppress unused variable warning
423 // Log error but continue processing
424 // In production, this would be logged properly
425 }
426 }
427 }
428 }

References handlers_, handlers_mutex_, kcenon::monitoring::event_envelope::payload, and kcenon::monitoring::event_envelope::type.

Referenced by process_all_pending(), and process_events_worker().

Here is the caller graph for this function:

◆ get_pending_event_count()

size_t kcenon::monitoring::event_bus::get_pending_event_count ( ) const
inlineoverridevirtual

Get pending event count.

Returns
Number of events waiting to be processed

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 228 of file event_bus.h.

228 {
229 std::lock_guard<std::mutex> lock(queue_mutex_);
230 return event_queue_.size();
231 }
std::priority_queue< event_envelope > event_queue_
Definition event_bus.h:435

References event_queue_, and queue_mutex_.

◆ get_stats()

stats kcenon::monitoring::event_bus::get_stats ( ) const
inline
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 272 of file event_bus.h.

272 {
273 // Use scoped_lock to acquire both mutexes atomically with consistent ordering
274 // This prevents deadlock by acquiring locks in a deterministic order
275 // Lock ordering: queue_mutex before handlers_mutex (alphabetical for consistency)
276 std::scoped_lock lock(queue_mutex_, handlers_mutex_);
277
278 size_t queue_size = event_queue_.size();
279 bool back_pressure_active = queue_size >= config_.back_pressure_threshold;
280
281 size_t total_subscribers = 0;
282 for (const auto& [type, handlers] : handlers_) {
283 total_subscribers += handlers.size();
284 }
285
286 return {
290 queue_size,
291 total_subscribers,
292 back_pressure_active
293 };
294 }

References kcenon::monitoring::event_bus_config::back_pressure_threshold, config_, event_queue_, handlers_, handlers_mutex_, queue_mutex_, total_events_dropped_, total_events_processed_, and total_events_published_.

◆ get_subscriber_count_impl()

size_t kcenon::monitoring::event_bus::get_subscriber_count_impl ( std::type_index event_type) const
inlineoverrideprotectedvirtual

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 354 of file event_bus.h.

354 {
355 std::lock_guard<std::mutex> lock(handlers_mutex_);
356 auto it = handlers_.find(event_type);
357 return it != handlers_.end() ? it->second.size() : 0;
358 }

References handlers_, and handlers_mutex_.

◆ instance()

static std::shared_ptr< interface_event_bus > kcenon::monitoring::event_bus::instance ( )
inlinestatic
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 164 of file event_bus.h.

164 {
165 static std::shared_ptr<event_bus> singleton = [] {
166 auto bus = std::make_shared<event_bus>();
167 (void)bus->start();
168 return bus;
169 }();
170 return singleton;
171 }

◆ is_active()

bool kcenon::monitoring::event_bus::is_active ( ) const
inlineoverridevirtual

Check if event bus is active.

Returns
True if active, false otherwise

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 222 of file event_bus.h.

222 {
223 std::lock_guard<std::mutex> lock(bus_mutex_);
224 return is_running_;
225 }

References bus_mutex_, and is_running_.

◆ process_all_pending()

void kcenon::monitoring::event_bus::process_all_pending ( )
inlineprivate
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 395 of file event_bus.h.

395 {
396 std::vector<event_envelope> pending;
397 {
398 std::lock_guard<std::mutex> lock(queue_mutex_);
399 pending.reserve(event_queue_.size());
400 while (!event_queue_.empty()) {
401 pending.push_back(event_queue_.top());
402 event_queue_.pop();
403 }
404 }
405
406 for (auto& envelope : pending) {
407 dispatch_event(envelope);
408 total_events_processed_.fetch_add(1);
409 }
410 }
void dispatch_event(const event_envelope &envelope)
Definition event_bus.h:413
@ pending
Condition met, waiting for duration threshold.

References dispatch_event(), event_queue_, kcenon::monitoring::pending, queue_mutex_, and total_events_processed_.

Referenced by process_pending_events(), and stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ process_events_worker()

void kcenon::monitoring::event_bus::process_events_worker ( )
inlineprivate
Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 362 of file event_bus.h.

362 {
363 while (!stop_requested_) {
364 std::unique_lock<std::mutex> lock(queue_mutex_);
365
366 // Wait for events or stop signal
368 [this] { return !event_queue_.empty() || stop_requested_; });
369
370 if (stop_requested_ && event_queue_.empty()) {
371 break;
372 }
373
374 // Process a batch of events
375 std::vector<event_envelope> batch;
376 size_t batch_size = std::min(size_t(10), event_queue_.size());
377
378 for (size_t i = 0; i < batch_size && !event_queue_.empty(); ++i) {
379 // Copy element then pop to avoid const_cast UB
380 batch.push_back(event_queue_.top());
381 event_queue_.pop();
382 }
383
384 lock.unlock();
385
386 // Process the batch
387 for (auto& envelope : batch) {
388 dispatch_event(envelope);
389 total_events_processed_.fetch_add(1);
390 }
391 }
392 }
std::condition_variable queue_cv_
Definition event_bus.h:439
std::chrono::milliseconds processing_interval
Definition event_bus.h:90

References config_, dispatch_event(), event_queue_, kcenon::monitoring::event_bus_config::processing_interval, queue_cv_, queue_mutex_, stop_requested_, and total_events_processed_.

Referenced by start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ process_pending_events()

common::VoidResult kcenon::monitoring::event_bus::process_pending_events ( )
inlineoverridevirtual

Process all pending events synchronously.

Returns
Result indicating success or failure

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 234 of file event_bus.h.

234 {
236 return common::ok();
237 }

References process_all_pending().

Here is the call graph for this function:

◆ publish_event_impl()

common::VoidResult kcenon::monitoring::event_bus::publish_event_impl ( std::type_index event_type,
std::any event )
inlineoverrideprotectedvirtual

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 298 of file event_bus.h.

299 {
300 bool should_sleep = false;
301
302 {
303 std::lock_guard<std::mutex> lock(queue_mutex_);
304
306 const auto current_size = event_queue_.size();
307 if (current_size >= config_.max_queue_size) {
308 total_events_dropped_.fetch_add(1);
309 return common::VoidResult::err(error_info(monitoring_error_code::resource_exhausted, "Event queue is full").to_common_error());
310 }
311 should_sleep = current_size >= config_.back_pressure_threshold;
312 }
313
314 event_queue_.emplace(event_type, std::move(event), event_priority::normal);
315 total_events_published_.fetch_add(1);
316 }
317
318 if (should_sleep) {
319 std::this_thread::sleep_for(std::chrono::milliseconds(1));
320 }
321
322 queue_cv_.notify_one();
323 return common::ok();
324 }

References kcenon::monitoring::event_bus_config::back_pressure_threshold, config_, kcenon::monitoring::event_bus_config::enable_back_pressure, event_queue_, kcenon::monitoring::event_bus_config::max_queue_size, kcenon::monitoring::normal, queue_cv_, queue_mutex_, kcenon::monitoring::resource_exhausted, total_events_dropped_, and total_events_published_.

◆ start()

common::VoidResult kcenon::monitoring::event_bus::start ( )
inlineoverridevirtual

Start the event bus.

Returns
Result indicating success or failure

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 174 of file event_bus.h.

174 {
175 std::lock_guard<std::mutex> lock(bus_mutex_);
176
177 if (is_running_) {
178 return common::VoidResult::err(error_info(monitoring_error_code::already_started, "Event bus is already running").to_common_error());
179 }
180
181 stop_requested_ = false;
182 is_running_ = true;
183
184 // Start worker threads
185 for (size_t i = 0; i < config_.worker_thread_count; ++i) {
186 workers_.emplace_back(&event_bus::process_events_worker, this);
187 }
188
189 return common::ok();
190 }
std::vector< std::thread > workers_
Definition event_bus.h:438

References kcenon::monitoring::already_started, bus_mutex_, config_, is_running_, process_events_worker(), stop_requested_, kcenon::monitoring::event_bus_config::worker_thread_count, and workers_.

Here is the call graph for this function:

◆ stop()

common::VoidResult kcenon::monitoring::event_bus::stop ( )
inlineoverridevirtual

Stop the event bus.

Returns
Result indicating success or failure

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 193 of file event_bus.h.

193 {
194 {
195 std::lock_guard<std::mutex> lock(bus_mutex_);
196 if (!is_running_) {
197 return common::ok();
198 }
199
200 stop_requested_ = true;
201 is_running_ = false;
202 }
203
204 // Wake up all worker threads
205 queue_cv_.notify_all();
206
207 // Wait for all workers to finish
208 for (auto& worker : workers_) {
209 if (worker.joinable()) {
210 worker.join();
211 }
212 }
213 workers_.clear();
214
215 // Process any remaining events
217
218 return common::ok();
219 }

References bus_mutex_, is_running_, process_all_pending(), queue_cv_, stop_requested_, and workers_.

Referenced by ~event_bus().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ subscribe_event_impl()

common::Result< subscription_token > kcenon::monitoring::event_bus::subscribe_event_impl ( std::type_index event_type,
std::function< void(const std::any &)> handler,
uint64_t handler_id,
event_priority priority )
inlineoverrideprotectedvirtual

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 327 of file event_bus.h.

331 {
332
333 std::lock_guard<std::mutex> lock(handlers_mutex_);
334
335 handlers_[event_type].emplace_back(std::move(handler), priority, handler_id);
336
337 // Sort handlers by priority
338 std::sort(handlers_[event_type].begin(), handlers_[event_type].end(),
339 [](const event_handler_wrapper& a, const event_handler_wrapper& b) {
340 return static_cast<int>(a.priority) > static_cast<int>(b.priority);
341 });
342
343 return subscription_token(event_type, handler_id);
344 }

References handlers_, handlers_mutex_, and kcenon::monitoring::event_handler_wrapper::priority.

◆ unsubscribe_event()

common::VoidResult kcenon::monitoring::event_bus::unsubscribe_event ( const subscription_token & token)
inlineoverridevirtual

Unsubscribe from events using subscription token.

Parameters
tokenThe subscription token
Returns
Result indicating success or failure

Implements kcenon::monitoring::interface_event_bus.

Examples
/home/runner/work/monitoring_system/monitoring_system/include/kcenon/monitoring/core/event_bus.h.

Definition at line 240 of file event_bus.h.

240 {
241 std::lock_guard<std::mutex> lock(handlers_mutex_);
242
243 auto it = handlers_.find(token.get_event_type());
244 if (it != handlers_.end()) {
245 auto& handler_list = it->second;
246 handler_list.erase(
247 std::remove_if(handler_list.begin(), handler_list.end(),
248 [&token](const event_handler_wrapper& wrapper) {
249 return wrapper.id == token.get_handler_id();
250 }),
251 handler_list.end()
252 );
253
254 if (handler_list.empty()) {
255 handlers_.erase(it);
256 }
257 }
258
259 return common::ok();
260 }

References kcenon::monitoring::subscription_token::get_event_type(), handlers_, and handlers_mutex_.

Here is the call graph for this function:

Member Data Documentation

◆ bus_mutex_

std::mutex kcenon::monitoring::event_bus::bus_mutex_
mutableprivate

◆ config_

◆ event_queue_

◆ handlers_

std::unordered_map<std::type_index, std::vector<event_handler_wrapper> > kcenon::monitoring::event_bus::handlers_
private

◆ handlers_mutex_

◆ is_running_

std::atomic<bool> kcenon::monitoring::event_bus::is_running_
private

◆ queue_cv_

std::condition_variable kcenon::monitoring::event_bus::queue_cv_
private

◆ queue_mutex_

◆ stop_requested_

std::atomic<bool> kcenon::monitoring::event_bus::stop_requested_
private

◆ total_events_dropped_

std::atomic<uint64_t> kcenon::monitoring::event_bus::total_events_dropped_
private

◆ total_events_processed_

std::atomic<uint64_t> kcenon::monitoring::event_bus::total_events_processed_
private

◆ total_events_published_

std::atomic<uint64_t> kcenon::monitoring::event_bus::total_events_published_
private

◆ workers_

std::vector<std::thread> kcenon::monitoring::event_bus::workers_
private

The documentation for this class was generated from the following file: