Common System 0.2.0
Common interfaces and patterns for system integration
Loading...
Searching...
No Matches
event_bus.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
24#pragma once
25
26#include <string>
27#include <chrono>
28#include <cstdint>
29#include <functional>
30#include <memory>
31#include <mutex>
32#include <atomic>
33#include <unordered_map>
34#include <algorithm>
35#include <vector>
36#include <typeinfo>
37#include <typeindex>
38#include <concepts>
39
40// Include C++20 concepts for type validation
42
43// Provide a simple standalone implementation
44// Note: monitoring_system can extend or wrap this if needed
45
46namespace kcenon::common {
47namespace detail {
48 // Version identifier for ABI compatibility checking
49 // This ensures that object files compiled with different macro definitions
50 // cannot be linked together, preventing subtle runtime errors.
51 constexpr int event_bus_abi_version = 1; // Standalone implementation
52
83 template<typename T>
85 static size_t id() {
86 // Use std::type_index hash for deterministic, consistent IDs
87 static const size_t type_id = std::hash<std::type_index>{}(std::type_index(typeid(T)));
88 return type_id;
89 }
90 };
91} // namespace detail
92
93 // Public API - expose detail::event_type_id for backward compatibility
94 using detail::event_type_id;
95
96 enum class event_priority {
97 low = 0,
98 normal = 1,
99 high = 2
100 };
101
105 using subscription_id = uint64_t;
106
111 struct event {
112 std::string type_;
113 std::string data_;
114
115 event() = default;
116 event(const std::string& type, const std::string& data = "")
117 : type_(type), data_(data) {}
118
119 void set_type(const std::string& type) { type_ = type; }
120 void set_data(const std::string& data) { data_ = data; }
121 std::string get_type() const { return type_; }
122 std::string get_data() const { return data_; }
123 };
124
151 public:
161 using error_callback_t = std::function<void(const std::string& error_message,
162 size_t event_type_id,
163 uint64_t handler_id)>;
164
173 template<concepts::EventType EventType>
174 void publish(const EventType& evt, event_priority = event_priority::normal) {
175 // Snapshot handlers and error callback under the lock, then release
176 // before invoking handlers. This prevents reentrancy deadlock when
177 // a handler calls subscribe() or unsubscribe() on this bus.
178 std::vector<subscription_info> snapshot;
179 error_callback_t error_cb;
180 {
181 std::lock_guard<std::mutex> lock(mutex_);
182 auto type_id = event_type_id<EventType>::id();
183 auto range = handlers_.equal_range(type_id);
184 snapshot.reserve(std::distance(range.first, range.second));
185 for (auto it = range.first; it != range.second; ++it) {
186 snapshot.push_back(it->second);
187 }
188 error_cb = error_callback_;
189 }
190
191 // Get type identifier without RTTI
192 auto type_id = event_type_id<EventType>::id();
193
194 // Invoke handlers outside the lock — safe for reentrancy
195 for (auto& entry : snapshot) {
196 try {
197 // Type safety check: verify expected type matches
198 if (entry.expected_type_id != type_id) {
199 if (error_cb) {
200 error_cb("Type ID mismatch detected in event handler",
201 type_id, entry.id);
202 }
203 continue;
204 }
205
206 if (entry.handler) {
207 entry.handler(static_cast<const void*>(&evt));
208 }
209 } catch (const std::exception& e) {
210 if (error_cb) {
211 std::string error_msg = "Exception in event handler: ";
212 error_msg += e.what();
213 error_cb(error_msg, type_id, entry.id);
214 }
215 } catch (...) {
216 if (error_cb) {
217 error_cb("Unknown exception in event handler",
218 type_id, entry.id);
219 }
220 }
221 }
222 }
223
224 // For generic event (overload for type deduction)
226 // Snapshot handlers and error callback under the lock
227 std::vector<subscription_info> snapshot;
228 error_callback_t error_cb;
229 {
230 std::lock_guard<std::mutex> lock(mutex_);
231 auto type_id = event_type_id<event>::id();
232 auto range = handlers_.equal_range(type_id);
233 snapshot.reserve(std::distance(range.first, range.second));
234 for (auto it = range.first; it != range.second; ++it) {
235 snapshot.push_back(it->second);
236 }
237 error_cb = error_callback_;
238 }
239
240 auto type_id = event_type_id<event>::id();
241
242 // Invoke handlers outside the lock — safe for reentrancy
243 for (auto& entry : snapshot) {
244 try {
245 if (entry.expected_type_id != type_id) {
246 if (error_cb) {
247 error_cb("Type ID mismatch detected in event handler",
248 type_id, entry.id);
249 }
250 continue;
251 }
252
253 if (entry.handler) {
254 entry.handler(static_cast<const void*>(&evt));
255 }
256 } catch (const std::exception& e) {
257 if (error_cb) {
258 std::string error_msg = "Exception in event handler: ";
259 error_msg += e.what();
260 error_cb(error_msg, type_id, entry.id);
261 }
262 } catch (...) {
263 if (error_cb) {
264 error_cb("Unknown exception in event handler",
265 type_id, entry.id);
266 }
267 }
268 }
269 }
270
280 template<concepts::EventType EventType, concepts::EventHandler<EventType> HandlerFunc>
281 uint64_t subscribe(HandlerFunc&& func) {
282 std::lock_guard<std::mutex> lock(mutex_);
283
284 auto id = next_id_++;
285 auto type_id = event_type_id<EventType>::id();
286
288 info.id = id;
289 info.expected_type_id = type_id; // Store expected type ID for validation
290 // Wrap the handler in a type-erased function
291 info.handler = [f = std::function<void(const EventType&)>(std::forward<HandlerFunc>(func))]
292 (const void* evt_ptr) {
293 f(*static_cast<const EventType*>(evt_ptr));
294 };
295
296 handlers_.emplace(type_id, std::move(info));
297
298 return id;
299 }
300
301 // Non-template overload for generic event
302 uint64_t subscribe(std::function<void(const event&)>&& func) {
303 std::lock_guard<std::mutex> lock(mutex_);
304
305 auto id = next_id_++;
306 auto type_id = event_type_id<event>::id();
307
309 info.id = id;
310 info.expected_type_id = type_id; // Store expected type ID for validation
311 info.handler = [f = std::move(func)](const void* evt_ptr) {
312 f(*static_cast<const event*>(evt_ptr));
313 };
314
315 handlers_.emplace(type_id, std::move(info));
316
317 return id;
318 }
319
334 template<concepts::EventType EventType,
337 uint64_t subscribe_filtered(HandlerFunc&& func, FilterFunc&& filter) {
338 // Create a wrapped handler that includes the filtering logic
339 auto wrapped_handler = [f = std::forward<HandlerFunc>(func),
340 flt = std::forward<FilterFunc>(filter)]
341 (const EventType& evt) {
342 // Only call the handler if the filter passes
343 if (flt(evt)) {
344 f(evt);
345 }
346 };
347
348 // Subscribe with the wrapped handler
349 return subscribe<EventType>(std::move(wrapped_handler));
350 }
351
352 // Non-template overload for generic event filtering
354 std::function<void(const event&)>&& func,
355 std::function<bool(const event&)>&& filter) {
356 // Create a wrapped handler that includes the filtering logic
357 auto wrapped_handler = [f = std::move(func),
358 flt = std::move(filter)]
359 (const event& evt) {
360 // Only call the handler if the filter passes
361 if (flt(evt)) {
362 f(evt);
363 }
364 };
365
366 return subscribe(std::move(wrapped_handler));
367 }
368
369 void unsubscribe(uint64_t id) {
370 std::lock_guard<std::mutex> lock(mutex_);
371
372 // Find and remove the handler with this ID
373 for (auto it = handlers_.begin(); it != handlers_.end(); ) {
374 if (it->second.id == id) {
375 it = handlers_.erase(it);
376 } else {
377 ++it;
378 }
379 }
380 }
381
382 void start() { running_ = true; }
383 void stop() { running_ = false; }
384 bool is_running() const { return running_; }
385
408 std::lock_guard<std::mutex> lock(mutex_);
409 error_callback_ = std::move(callback);
410 }
411
416 std::lock_guard<std::mutex> lock(mutex_);
417 error_callback_ = nullptr;
418 }
419
426 return instance;
427 }
428
429 private:
431 uint64_t id;
432 size_t expected_type_id; // Type safety: store expected type ID
433 std::function<void(const void*)> handler;
434 };
435
436 mutable std::mutex mutex_;
437 std::unordered_multimap<size_t, subscription_info> handlers_;
438 std::atomic<uint64_t> next_id_{1};
439 std::atomic<bool> running_{true};
440 error_callback_t error_callback_; // Optional error callback for exception handling
441 };
442
444
445 template<typename T>
446 using event_handler = std::function<void(const T&)>;
447
455}
456
457// ABI compatibility check functions
458namespace kcenon::common {
459namespace detail {
469 inline constexpr int get_event_bus_abi_version() {
471 }
472} // namespace detail
473
493 inline bool verify_event_bus_abi(int expected_version) {
494 return detail::get_event_bus_abi_version() == expected_version;
495 }
496} // namespace kcenon::common
497
498// Common event types that can be used across modules
499namespace kcenon::common {
500namespace events {
501
507 std::string module_name;
508 std::chrono::steady_clock::time_point timestamp;
509
510 module_started_event(const std::string& name)
511 : module_name(name),
512 timestamp(std::chrono::steady_clock::now()) {}
513};
514
520 std::string module_name;
521 std::chrono::steady_clock::time_point timestamp;
522
523 module_stopped_event(const std::string& name)
524 : module_name(name),
525 timestamp(std::chrono::steady_clock::now()) {}
526};
527
533 std::string module_name;
534 std::string error_message;
536 std::chrono::steady_clock::time_point timestamp;
537
538 error_event(const std::string& module, const std::string& message, int code)
539 : module_name(module), error_message(message), error_code(code),
540 timestamp(std::chrono::steady_clock::now()) {}
541};
542
548 std::string name;
549 double value;
550 std::string unit;
551 std::chrono::steady_clock::time_point timestamp;
552
553 metric_event(const std::string& n, double v, const std::string& u = "")
554 : name(n), value(v), unit(u),
555 timestamp(std::chrono::steady_clock::now()) {}
556};
557
558} // namespace events
559} // namespace kcenon::common
Simple synchronous event bus for testing when monitoring is disabled.
Definition event_bus.h:150
std::function< void(const std::string &error_message, size_t event_type_id, uint64_t handler_id)> error_callback_t
Type for error callback function.
Definition event_bus.h:161
std::unordered_multimap< size_t, subscription_info > handlers_
Definition event_bus.h:437
void set_error_callback(error_callback_t callback)
Set the error callback for handler exceptions and type mismatches.
Definition event_bus.h:407
error_callback_t error_callback_
Definition event_bus.h:440
void unsubscribe(uint64_t id)
Definition event_bus.h:369
void clear_error_callback()
Clear the error callback.
Definition event_bus.h:415
uint64_t subscribe_filtered(HandlerFunc &&func, FilterFunc &&filter)
Subscribe to events with a filter function.
Definition event_bus.h:337
void publish(event &&evt, event_priority=event_priority::normal)
Definition event_bus.h:225
uint64_t subscribe_filtered(std::function< void(const event &)> &&func, std::function< bool(const event &)> &&filter)
Definition event_bus.h:353
std::atomic< bool > running_
Definition event_bus.h:439
uint64_t subscribe(HandlerFunc &&func)
Subscribe to events of a specific type.
Definition event_bus.h:281
std::atomic< uint64_t > next_id_
Definition event_bus.h:438
uint64_t subscribe(std::function< void(const event &)> &&func)
Definition event_bus.h:302
static simple_event_bus & instance()
Get the singleton instance (thread-safe via C++11 magic statics)
Definition event_bus.h:424
void publish(const EventType &evt, event_priority=event_priority::normal)
Publish an event to all subscribed handlers.
Definition event_bus.h:174
A callable that filters events based on criteria.
Definition event.h:93
A callable that can handle events of a specific type.
Definition event.h:71
A type that can be used as an event in the event bus.
Definition event.h:52
C++20 concepts for event bus types.
constexpr int event_bus_abi_version
Definition event_bus.h:51
constexpr int get_event_bus_abi_version()
Get the ABI version of the event_bus implementation.
Definition event_bus.h:469
Core interfaces.
Definition adapter.h:21
simple_event_bus & get_event_bus()
Access the global event bus instance.
Definition event_bus.h:452
uint64_t subscription_id
Type alias for subscription ID.
Definition event_bus.h:105
std::function< void(const T &)> event_handler
Definition event_bus.h:446
bool verify_event_bus_abi(int expected_version)
Verify ABI compatibility between modules.
Definition event_bus.h:493
Generate unique type ID for each event type using std::type_index.
Definition event_bus.h:84
Generic event structure for the event bus.
Definition event_bus.h:111
std::string get_type() const
Definition event_bus.h:121
std::string get_data() const
Definition event_bus.h:122
void set_type(const std::string &type)
Definition event_bus.h:119
void set_data(const std::string &data)
Definition event_bus.h:120
event(const std::string &type, const std::string &data="")
Definition event_bus.h:116
Event published when an error occurs.
Definition event_bus.h:532
std::chrono::steady_clock::time_point timestamp
Definition event_bus.h:536
error_event(const std::string &module, const std::string &message, int code)
Definition event_bus.h:538
Event for publishing metrics.
Definition event_bus.h:547
metric_event(const std::string &n, double v, const std::string &u="")
Definition event_bus.h:553
std::chrono::steady_clock::time_point timestamp
Definition event_bus.h:551
Event published when a module starts.
Definition event_bus.h:506
module_started_event(const std::string &name)
Definition event_bus.h:510
std::chrono::steady_clock::time_point timestamp
Definition event_bus.h:508
Event published when a module stops.
Definition event_bus.h:519
module_stopped_event(const std::string &name)
Definition event_bus.h:523
std::chrono::steady_clock::time_point timestamp
Definition event_bus.h:521
std::function< void(const void *)> handler
Definition event_bus.h:433