Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
event_bus.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#pragma once
12
13#include <any>
14#include <functional>
15#include <memory>
16#include <mutex>
17#include <typeindex>
18#include <unordered_map>
19#include <vector>
22
23namespace kcenon::thread {
24
29public:
30 virtual ~event_base() = default;
31
36 virtual std::string type_name() const = 0;
37
42 std::chrono::steady_clock::time_point timestamp() const {
43 return timestamp_;
44 }
45
46protected:
47 std::chrono::steady_clock::time_point timestamp_{std::chrono::steady_clock::now()};
48};
49
55class event_bus {
56public:
60 using handler_func = std::function<void(const std::any&)>;
61
66 public:
67 subscription() = default;
68 subscription(event_bus* bus, std::type_index type, std::size_t id)
69 : bus_(bus), type_(type), id_(id) {}
70
74 void unsubscribe() {
75 if (bus_) {
77 bus_ = nullptr;
78 }
79 }
80
84 bool is_active() const {
85 return bus_ != nullptr;
86 }
87
88 // Move operations
89 subscription(subscription&& other) noexcept
90 : bus_(other.bus_), type_(other.type_), id_(other.id_) {
91 other.bus_ = nullptr;
92 }
93
94 subscription& operator=(subscription&& other) noexcept {
95 if (this != &other) {
97 bus_ = other.bus_;
98 type_ = other.type_;
99 id_ = other.id_;
100 other.bus_ = nullptr;
101 }
102 return *this;
103 }
104
105 // Disable copy
106 subscription(const subscription&) = delete;
108
110 unsubscribe();
111 }
112
113 private:
114 event_bus* bus_{nullptr};
115 std::type_index type_{typeid(void)};
116 std::size_t id_{0};
117 };
118
123 explicit event_bus(std::shared_ptr<thread_pool> pool = nullptr)
124 : thread_pool_(pool) {
125 if (!thread_pool_) {
126 // Create default thread pool with 2 threads for event processing
127 thread_pool_ = thread_pool_builder("event_bus")
128 .with_workers(2)
130 }
131 }
132
138 template<typename Event>
139 void publish(const Event& event) {
140 auto type = std::type_index(typeid(Event));
141
142 // Get handlers snapshot to avoid holding lock during callbacks
143 std::vector<handler_func> handlers_copy;
144 {
145 std::lock_guard<std::mutex> lock(mutex_);
146 auto it = handlers_.find(type);
147 if (it != handlers_.end()) {
148 for (const auto& [id, handler] : it->second) {
149 handlers_copy.push_back(handler);
150 }
151 }
152 }
153
154 // Process handlers asynchronously
155 if (!handlers_copy.empty() && thread_pool_) {
156 thread_pool_->submit([handlers_copy, event]() {
157 for (const auto& handler : handlers_copy) {
158 try {
159 handler(std::any(event));
160 } catch (...) {
161 // Log error but continue with other handlers
162 }
163 }
164 });
165 }
166 }
167
173 template<typename Event>
174 void publish_sync(const Event& event) {
175 auto type = std::type_index(typeid(Event));
176
177 std::lock_guard<std::mutex> lock(mutex_);
178 auto it = handlers_.find(type);
179 if (it != handlers_.end()) {
180 for (const auto& [id, handler] : it->second) {
181 try {
182 handler(std::any(event));
183 } catch (...) {
184 // Log error but continue with other handlers
185 }
186 }
187 }
188 }
189
196 template<typename Event>
197 [[nodiscard]] subscription subscribe(std::function<void(const Event&)> handler) {
198 auto type = std::type_index(typeid(Event));
199 auto wrapped_handler = [handler](const std::any& any_event) {
200 try {
201 const auto& event = std::any_cast<const Event&>(any_event);
202 handler(event);
203 } catch (const std::bad_any_cast&) {
204 // Type mismatch - should not happen
205 }
206 };
207
208 std::lock_guard<std::mutex> lock(mutex_);
209 auto id = next_handler_id_++;
210 handlers_[type][id] = wrapped_handler;
211
212 return subscription(this, type, id);
213 }
214
219 template<typename Event>
221 auto type = std::type_index(typeid(Event));
222 std::lock_guard<std::mutex> lock(mutex_);
223 handlers_.erase(type);
224 }
225
230 std::lock_guard<std::mutex> lock(mutex_);
231 handlers_.clear();
232 }
233
239 template<typename Event>
240 std::size_t subscriber_count() const {
241 auto type = std::type_index(typeid(Event));
242 std::lock_guard<std::mutex> lock(mutex_);
243 auto it = handlers_.find(type);
244 if (it != handlers_.end()) {
245 return it->second.size();
246 }
247 return 0;
248 }
249
254 static event_bus& instance() {
255 static event_bus instance;
256 return instance;
257 }
258
259private:
263 void unsubscribe(std::type_index type, std::size_t id) {
264 std::lock_guard<std::mutex> lock(mutex_);
265 auto it = handlers_.find(type);
266 if (it != handlers_.end()) {
267 it->second.erase(id);
268 if (it->second.empty()) {
269 handlers_.erase(it);
270 }
271 }
272 }
273
274 mutable std::mutex mutex_;
275 std::unordered_map<std::type_index, std::unordered_map<std::size_t, handler_func>> handlers_;
276 std::shared_ptr<thread_pool> thread_pool_;
277 std::size_t next_handler_id_{1};
278};
279
280// Common event types
281
286 std::string system_name;
287
288 explicit system_startup_event(std::string name)
289 : system_name(std::move(name)) {}
290
291 std::string type_name() const override {
292 return "SystemStartupEvent";
293 }
294};
295
300 std::string system_name;
301 std::string reason;
302
303 system_shutdown_event(std::string name, std::string reason_msg = "")
304 : system_name(std::move(name)), reason(std::move(reason_msg)) {}
305
306 std::string type_name() const override {
307 return "SystemShutdownEvent";
308 }
309};
310
315 std::string config_path;
316 std::any old_value;
317 std::any new_value;
318
319 config_changed_event(std::string path, std::any old_val, std::any new_val)
320 : config_path(std::move(path))
321 , old_value(std::move(old_val))
322 , new_value(std::move(new_val)) {}
323
324 std::string type_name() const override {
325 return "ConfigChangedEvent";
326 }
327};
328
333 enum class severity { info, warning, critical };
334
336 std::string message;
338
339 performance_alert_event(severity lvl, std::string msg, double value)
340 : level(lvl), message(std::move(msg)), metric_value(value) {}
341
342 std::string type_name() const override {
343 return "PerformanceAlertEvent";
344 }
345};
346
347} // namespace kcenon::thread
Event base class for type safety.
Definition event_bus.h:28
virtual std::string type_name() const =0
Get event type name.
std::chrono::steady_clock::time_point timestamp_
Definition event_bus.h:47
std::chrono::steady_clock::time_point timestamp() const
Get event timestamp.
Definition event_bus.h:42
virtual ~event_base()=default
Subscription handle for managing subscriptions.
Definition event_bus.h:65
subscription(subscription &&other) noexcept
Definition event_bus.h:89
bool is_active() const
Check if subscription is active.
Definition event_bus.h:84
subscription & operator=(const subscription &)=delete
void unsubscribe()
Unsubscribe from events.
Definition event_bus.h:74
subscription(const subscription &)=delete
subscription(event_bus *bus, std::type_index type, std::size_t id)
Definition event_bus.h:68
subscription & operator=(subscription &&other) noexcept
Definition event_bus.h:94
Event Bus for publish-subscribe pattern.
Definition event_bus.h:55
void clear_all_subscriptions()
Clear all subscriptions.
Definition event_bus.h:229
void clear_subscriptions()
Clear all subscriptions for a specific event type.
Definition event_bus.h:220
void publish(const Event &event)
Publish an event asynchronously.
Definition event_bus.h:139
static event_bus & instance()
Get singleton instance.
Definition event_bus.h:254
event_bus(std::shared_ptr< thread_pool > pool=nullptr)
Constructor.
Definition event_bus.h:123
subscription subscribe(std::function< void(const Event &)> handler)
Subscribe to events of a specific type.
Definition event_bus.h:197
std::function< void(const std::any &)> handler_func
Handler function type.
Definition event_bus.h:60
std::unordered_map< std::type_index, std::unordered_map< std::size_t, handler_func > > handlers_
Definition event_bus.h:275
std::size_t subscriber_count() const
Get the number of subscribers for an event type.
Definition event_bus.h:240
void unsubscribe(std::type_index type, std::size_t id)
Unsubscribe a specific handler.
Definition event_bus.h:263
void publish_sync(const Event &event)
Publish an event synchronously.
Definition event_bus.h:174
std::size_t next_handler_id_
Definition event_bus.h:277
std::shared_ptr< thread_pool > thread_pool_
Definition event_bus.h:276
Fluent builder for creating and configuring thread pools.
thread_pool_builder & with_workers(std::size_t count)
Sets the number of worker threads.
std::shared_ptr< thread_pool > build_and_start()
Builds the pool and starts it immediately.
Core thread pool implementation with work stealing and auto-scaling.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Configuration changed event.
Definition event_bus.h:314
config_changed_event(std::string path, std::any old_val, std::any new_val)
Definition event_bus.h:319
std::string type_name() const override
Get event type name.
Definition event_bus.h:324
performance_alert_event(severity lvl, std::string msg, double value)
Definition event_bus.h:339
std::string type_name() const override
Get event type name.
Definition event_bus.h:342
system_shutdown_event(std::string name, std::string reason_msg="")
Definition event_bus.h:303
std::string type_name() const override
Get event type name.
Definition event_bus.h:306
system_startup_event(std::string name)
Definition event_bus.h:288
std::string type_name() const override
Get event type name.
Definition event_bus.h:291
Fluent builder for creating and configuring thread pools.