Common System 0.2.0
Common interfaces and patterns for system integration
Loading...
Searching...
No Matches
kcenon::common::simple_event_bus Class Reference

Simple synchronous event bus for testing when monitoring is disabled. More...

#include <event_bus.h>

Collaboration diagram for kcenon::common::simple_event_bus:
Collaboration graph

Classes

struct  subscription_info
 

Public Types

using error_callback_t
 Type for error callback function.
 

Public Member Functions

template<concepts::EventType EventType>
void publish (const EventType &evt, event_priority=event_priority::normal)
 Publish an event to all subscribed handlers.
 
void publish (event &&evt, event_priority=event_priority::normal)
 
template<concepts::EventType EventType, concepts::EventHandler< EventType > HandlerFunc>
uint64_t subscribe (HandlerFunc &&func)
 Subscribe to events of a specific type.
 
uint64_t subscribe (std::function< void(const event &)> &&func)
 
template<concepts::EventType EventType, concepts::EventHandler< EventType > HandlerFunc, concepts::EventFilter< EventType > FilterFunc>
uint64_t subscribe_filtered (HandlerFunc &&func, FilterFunc &&filter)
 Subscribe to events with a filter function.
 
uint64_t subscribe_filtered (std::function< void(const event &)> &&func, std::function< bool(const event &)> &&filter)
 
void unsubscribe (uint64_t id)
 
void start ()
 
void stop ()
 
bool is_running () const
 
void set_error_callback (error_callback_t callback)
 Set the error callback for handler exceptions and type mismatches.
 
void clear_error_callback ()
 Clear the error callback.
 

Static Public Member Functions

static simple_event_businstance ()
 Get the singleton instance (thread-safe via C++11 magic statics)
 

Private Attributes

std::mutex mutex_
 
std::unordered_multimap< size_t, subscription_infohandlers_
 
std::atomic< uint64_t > next_id_ {1}
 
std::atomic< bool > running_ {true}
 
error_callback_t error_callback_
 

Detailed Description

Simple synchronous event bus for testing when monitoring is disabled.

Thread-safety: This class is thread-safe using a mutex to protect subscription management and event dispatch.

Note: This implementation avoids RTTI by using template-based type IDs.

// Subscribe to an event type
std::cout << "Module started: " << e.module_name << "\n";
});
// Publish an event
bus.publish(events::module_started_event{"network_system"});
// Unsubscribe when done
bus.unsubscribe(id);
Simple synchronous event bus for testing when monitoring is disabled.
Definition event_bus.h:150
void unsubscribe(uint64_t id)
Definition event_bus.h:369
uint64_t subscribe(HandlerFunc &&func)
Subscribe to events of a specific type.
Definition event_bus.h:281
void publish(const EventType &evt, event_priority=event_priority::normal)
Publish an event to all subscribed handlers.
Definition event_bus.h:174
Event published when a module starts.
Definition event_bus.h:506

Definition at line 150 of file event_bus.h.

Member Typedef Documentation

◆ error_callback_t

Initial value:
std::function<void(const std::string& error_message,
size_t event_type_id,
uint64_t handler_id)>

Type for error callback function.

This callback is invoked when an exception occurs in an event handler or when a type mismatch is detected. The callback receives:

  • error_message: Description of what went wrong
  • event_type_id: The type ID of the event being processed
  • handler_id: The subscription ID of the failing handler

Definition at line 161 of file event_bus.h.

Member Function Documentation

◆ clear_error_callback()

void kcenon::common::simple_event_bus::clear_error_callback ( )
inline

Clear the error callback.

Definition at line 415 of file event_bus.h.

415 {
416 std::lock_guard<std::mutex> lock(mutex_);
417 error_callback_ = nullptr;
418 }
error_callback_t error_callback_
Definition event_bus.h:440

References error_callback_, and mutex_.

◆ instance()

static simple_event_bus & kcenon::common::simple_event_bus::instance ( )
inlinestatic

Get the singleton instance (thread-safe via C++11 magic statics)

Returns
Reference to the singleton instance

Definition at line 424 of file event_bus.h.

424 {
425 static simple_event_bus instance;
426 return instance;
427 }
static simple_event_bus & instance()
Get the singleton instance (thread-safe via C++11 magic statics)
Definition event_bus.h:424

References instance().

Referenced by kcenon::common::get_event_bus(), and instance().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_running()

bool kcenon::common::simple_event_bus::is_running ( ) const
inline

Definition at line 384 of file event_bus.h.

384{ return running_; }
std::atomic< bool > running_
Definition event_bus.h:439

References running_.

◆ publish() [1/2]

template<concepts::EventType EventType>
void kcenon::common::simple_event_bus::publish ( const EventType & evt,
event_priority = event_priority::normal )
inline

Publish an event to all subscribed handlers.

Template Parameters
EventTypeThe event type (must satisfy concepts::EventType)
Parameters
evtThe event to publish
priorityEvent priority (default: normal)
Note
Uses C++20 concepts for compile-time type validation.

Definition at line 174 of file event_bus.h.

174 {
175 // Snapshot handlers and error callback under the lock, then release
176 // before invoking handlers. This prevents reentrancy deadlock when
177 // a handler calls subscribe() or unsubscribe() on this bus.
178 std::vector<subscription_info> snapshot;
179 error_callback_t error_cb;
180 {
181 std::lock_guard<std::mutex> lock(mutex_);
182 auto type_id = event_type_id<EventType>::id();
183 auto range = handlers_.equal_range(type_id);
184 snapshot.reserve(std::distance(range.first, range.second));
185 for (auto it = range.first; it != range.second; ++it) {
186 snapshot.push_back(it->second);
187 }
188 error_cb = error_callback_;
189 }
190
191 // Get type identifier without RTTI
192 auto type_id = event_type_id<EventType>::id();
193
194 // Invoke handlers outside the lock — safe for reentrancy
195 for (auto& entry : snapshot) {
196 try {
197 // Type safety check: verify expected type matches
198 if (entry.expected_type_id != type_id) {
199 if (error_cb) {
200 error_cb("Type ID mismatch detected in event handler",
201 type_id, entry.id);
202 }
203 continue;
204 }
205
206 if (entry.handler) {
207 entry.handler(static_cast<const void*>(&evt));
208 }
209 } catch (const std::exception& e) {
210 if (error_cb) {
211 std::string error_msg = "Exception in event handler: ";
212 error_msg += e.what();
213 error_cb(error_msg, type_id, entry.id);
214 }
215 } catch (...) {
216 if (error_cb) {
217 error_cb("Unknown exception in event handler",
218 type_id, entry.id);
219 }
220 }
221 }
222 }
std::function< void(const std::string &error_message, size_t event_type_id, uint64_t handler_id)> error_callback_t
Type for error callback function.
Definition event_bus.h:161
std::unordered_multimap< size_t, subscription_info > handlers_
Definition event_bus.h:437

References error_callback_, handlers_, kcenon::common::detail::event_type_id< T >::id(), and mutex_.

Here is the call graph for this function:

◆ publish() [2/2]

void kcenon::common::simple_event_bus::publish ( event && evt,
event_priority = event_priority::normal )
inline

Definition at line 225 of file event_bus.h.

225 {
226 // Snapshot handlers and error callback under the lock
227 std::vector<subscription_info> snapshot;
228 error_callback_t error_cb;
229 {
230 std::lock_guard<std::mutex> lock(mutex_);
231 auto type_id = event_type_id<event>::id();
232 auto range = handlers_.equal_range(type_id);
233 snapshot.reserve(std::distance(range.first, range.second));
234 for (auto it = range.first; it != range.second; ++it) {
235 snapshot.push_back(it->second);
236 }
237 error_cb = error_callback_;
238 }
239
240 auto type_id = event_type_id<event>::id();
241
242 // Invoke handlers outside the lock — safe for reentrancy
243 for (auto& entry : snapshot) {
244 try {
245 if (entry.expected_type_id != type_id) {
246 if (error_cb) {
247 error_cb("Type ID mismatch detected in event handler",
248 type_id, entry.id);
249 }
250 continue;
251 }
252
253 if (entry.handler) {
254 entry.handler(static_cast<const void*>(&evt));
255 }
256 } catch (const std::exception& e) {
257 if (error_cb) {
258 std::string error_msg = "Exception in event handler: ";
259 error_msg += e.what();
260 error_cb(error_msg, type_id, entry.id);
261 }
262 } catch (...) {
263 if (error_cb) {
264 error_cb("Unknown exception in event handler",
265 type_id, entry.id);
266 }
267 }
268 }
269 }

References error_callback_, handlers_, kcenon::common::detail::event_type_id< T >::id(), and mutex_.

Here is the call graph for this function:

◆ set_error_callback()

void kcenon::common::simple_event_bus::set_error_callback ( error_callback_t callback)
inline

Set the error callback for handler exceptions and type mismatches.

This callback will be invoked whenever:

  • An exception is thrown by an event handler
  • A type ID mismatch is detected (should never happen in correct usage)

The error callback is useful for logging and debugging handler issues without stopping the processing of other handlers.

Parameters
callbackThe callback function to invoke on errors

Example usage:

bus.set_error_callback([](const std::string& msg, size_t type_id, uint64_t handler_id) {
std::cerr << "Event bus error [type=" << type_id
<< ", handler=" << handler_id << "]: " << msg << std::endl;
});

Definition at line 407 of file event_bus.h.

407 {
408 std::lock_guard<std::mutex> lock(mutex_);
409 error_callback_ = std::move(callback);
410 }

References error_callback_, and mutex_.

◆ start()

void kcenon::common::simple_event_bus::start ( )
inline

Definition at line 382 of file event_bus.h.

382{ running_ = true; }

References running_.

◆ stop()

void kcenon::common::simple_event_bus::stop ( )
inline

Definition at line 383 of file event_bus.h.

383{ running_ = false; }

References running_.

◆ subscribe() [1/2]

template<concepts::EventType EventType, concepts::EventHandler< EventType > HandlerFunc>
uint64_t kcenon::common::simple_event_bus::subscribe ( HandlerFunc && func)
inline

Subscribe to events of a specific type.

Template Parameters
EventTypeThe event type (must satisfy concepts::EventType)
HandlerFuncThe handler function type (must satisfy concepts::EventHandler)
Parameters
funcThe handler function to call when the event is published
Returns
Subscription ID for later unsubscription
Note
Uses C++20 concepts for compile-time type validation.

Definition at line 281 of file event_bus.h.

281 {
282 std::lock_guard<std::mutex> lock(mutex_);
283
284 auto id = next_id_++;
285 auto type_id = event_type_id<EventType>::id();
286
287 subscription_info info;
288 info.id = id;
289 info.expected_type_id = type_id; // Store expected type ID for validation
290 // Wrap the handler in a type-erased function
291 info.handler = [f = std::function<void(const EventType&)>(std::forward<HandlerFunc>(func))]
292 (const void* evt_ptr) {
293 f(*static_cast<const EventType*>(evt_ptr));
294 };
295
296 handlers_.emplace(type_id, std::move(info));
297
298 return id;
299 }
std::atomic< uint64_t > next_id_
Definition event_bus.h:438

References handlers_, kcenon::common::detail::event_type_id< T >::id(), mutex_, and next_id_.

Referenced by subscribe_filtered(), and subscribe_filtered().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ subscribe() [2/2]

uint64_t kcenon::common::simple_event_bus::subscribe ( std::function< void(const event &)> && func)
inline

Definition at line 302 of file event_bus.h.

302 {
303 std::lock_guard<std::mutex> lock(mutex_);
304
305 auto id = next_id_++;
306 auto type_id = event_type_id<event>::id();
307
308 subscription_info info;
309 info.id = id;
310 info.expected_type_id = type_id; // Store expected type ID for validation
311 info.handler = [f = std::move(func)](const void* evt_ptr) {
312 f(*static_cast<const event*>(evt_ptr));
313 };
314
315 handlers_.emplace(type_id, std::move(info));
316
317 return id;
318 }

References handlers_, kcenon::common::detail::event_type_id< T >::id(), mutex_, and next_id_.

Here is the call graph for this function:

◆ subscribe_filtered() [1/2]

template<concepts::EventType EventType, concepts::EventHandler< EventType > HandlerFunc, concepts::EventFilter< EventType > FilterFunc>
uint64_t kcenon::common::simple_event_bus::subscribe_filtered ( HandlerFunc && func,
FilterFunc && filter )
inline

Subscribe to events with a filter function.

Template Parameters
EventTypeThe event type (must satisfy concepts::EventType)
HandlerFuncThe handler function type (must satisfy concepts::EventHandler)
FilterFuncThe filter function type (must satisfy concepts::EventFilter)
Parameters
funcThe handler function to call when the event passes the filter
filterThe filter function to determine if the handler should be called
Returns
Subscription ID for later unsubscription
Note
The filter function is called before the handler. If it returns false, the handler is not invoked. This allows for efficient event filtering without creating multiple event types.
Uses C++20 concepts for compile-time type validation.

Definition at line 337 of file event_bus.h.

337 {
338 // Create a wrapped handler that includes the filtering logic
339 auto wrapped_handler = [f = std::forward<HandlerFunc>(func),
340 flt = std::forward<FilterFunc>(filter)]
341 (const EventType& evt) {
342 // Only call the handler if the filter passes
343 if (flt(evt)) {
344 f(evt);
345 }
346 };
347
348 // Subscribe with the wrapped handler
349 return subscribe<EventType>(std::move(wrapped_handler));
350 }

References subscribe().

Here is the call graph for this function:

◆ subscribe_filtered() [2/2]

uint64_t kcenon::common::simple_event_bus::subscribe_filtered ( std::function< void(const event &)> && func,
std::function< bool(const event &)> && filter )
inline

Definition at line 353 of file event_bus.h.

355 {
356 // Create a wrapped handler that includes the filtering logic
357 auto wrapped_handler = [f = std::move(func),
358 flt = std::move(filter)]
359 (const event& evt) {
360 // Only call the handler if the filter passes
361 if (flt(evt)) {
362 f(evt);
363 }
364 };
365
366 return subscribe(std::move(wrapped_handler));
367 }

References subscribe().

Here is the call graph for this function:

◆ unsubscribe()

void kcenon::common::simple_event_bus::unsubscribe ( uint64_t id)
inline

Definition at line 369 of file event_bus.h.

369 {
370 std::lock_guard<std::mutex> lock(mutex_);
371
372 // Find and remove the handler with this ID
373 for (auto it = handlers_.begin(); it != handlers_.end(); ) {
374 if (it->second.id == id) {
375 it = handlers_.erase(it);
376 } else {
377 ++it;
378 }
379 }
380 }

References handlers_, and mutex_.

Member Data Documentation

◆ error_callback_

error_callback_t kcenon::common::simple_event_bus::error_callback_
private

Definition at line 440 of file event_bus.h.

Referenced by clear_error_callback(), publish(), publish(), and set_error_callback().

◆ handlers_

std::unordered_multimap<size_t, subscription_info> kcenon::common::simple_event_bus::handlers_
private

Definition at line 437 of file event_bus.h.

Referenced by publish(), publish(), subscribe(), subscribe(), and unsubscribe().

◆ mutex_

std::mutex kcenon::common::simple_event_bus::mutex_
mutableprivate

◆ next_id_

std::atomic<uint64_t> kcenon::common::simple_event_bus::next_id_ {1}
private

Definition at line 438 of file event_bus.h.

438{1};

Referenced by subscribe(), and subscribe().

◆ running_

std::atomic<bool> kcenon::common::simple_event_bus::running_ {true}
private

Definition at line 439 of file event_bus.h.

439{true};

Referenced by is_running(), start(), and stop().


The documentation for this class was generated from the following file: