Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::event_bus Class Reference

Event Bus for publish-subscribe pattern. More...

#include <event_bus.h>

Collaboration diagram for kcenon::thread::event_bus:
Collaboration graph

Classes

class  subscription
 Subscription handle for managing subscriptions. More...
 

Public Types

using handler_func = std::function<void(const std::any&)>
 Handler function type.
 

Public Member Functions

 event_bus (std::shared_ptr< thread_pool > pool=nullptr)
 Constructor.
 
template<typename Event >
void publish (const Event &event)
 Publish an event asynchronously.
 
template<typename Event >
void publish_sync (const Event &event)
 Publish an event synchronously.
 
template<typename Event >
subscription subscribe (std::function< void(const Event &)> handler)
 Subscribe to events of a specific type.
 
template<typename Event >
void clear_subscriptions ()
 Clear all subscriptions for a specific event type.
 
void clear_all_subscriptions ()
 Clear all subscriptions.
 
template<typename Event >
std::size_t subscriber_count () const
 Get the number of subscribers for an event type.
 

Static Public Member Functions

static event_businstance ()
 Get singleton instance.
 

Private Member Functions

void unsubscribe (std::type_index type, std::size_t id)
 Unsubscribe a specific handler.
 

Private Attributes

std::mutex mutex_
 
std::unordered_map< std::type_index, std::unordered_map< std::size_t, handler_func > > handlers_
 
std::shared_ptr< thread_poolthread_pool_
 
std::size_t next_handler_id_ {1}
 

Detailed Description

Event Bus for publish-subscribe pattern.

Provides asynchronous event distribution across system components

Definition at line 55 of file event_bus.h.

Member Typedef Documentation

◆ handler_func

using kcenon::thread::event_bus::handler_func = std::function<void(const std::any&)>

Handler function type.

Definition at line 60 of file event_bus.h.

Constructor & Destructor Documentation

◆ event_bus()

kcenon::thread::event_bus::event_bus ( std::shared_ptr< thread_pool > pool = nullptr)
inlineexplicit

Constructor.

Parameters
thread_poolOptional thread pool for async processing

Definition at line 123 of file event_bus.h.

124 : thread_pool_(pool) {
125 if (!thread_pool_) {
126 // Create default thread pool with 2 threads for event processing
127 thread_pool_ = thread_pool_builder("event_bus")
128 .with_workers(2)
129 .build_and_start();
130 }
131 }
std::shared_ptr< thread_pool > thread_pool_
Definition event_bus.h:276

References kcenon::thread::thread_pool_builder::build_and_start(), thread_pool_, and kcenon::thread::thread_pool_builder::with_workers().

Here is the call graph for this function:

Member Function Documentation

◆ clear_all_subscriptions()

void kcenon::thread::event_bus::clear_all_subscriptions ( )
inline

Clear all subscriptions.

Definition at line 229 of file event_bus.h.

229 {
230 std::lock_guard<std::mutex> lock(mutex_);
231 handlers_.clear();
232 }
std::unordered_map< std::type_index, std::unordered_map< std::size_t, handler_func > > handlers_
Definition event_bus.h:275

References handlers_, and mutex_.

◆ clear_subscriptions()

template<typename Event >
void kcenon::thread::event_bus::clear_subscriptions ( )
inline

Clear all subscriptions for a specific event type.

Template Parameters
EventEvent type

Definition at line 220 of file event_bus.h.

220 {
221 auto type = std::type_index(typeid(Event));
222 std::lock_guard<std::mutex> lock(mutex_);
223 handlers_.erase(type);
224 }

References handlers_, and mutex_.

◆ instance()

static event_bus & kcenon::thread::event_bus::instance ( )
inlinestatic

Get singleton instance.

Returns
Event bus instance

Definition at line 254 of file event_bus.h.

254 {
255 static event_bus instance;
256 return instance;
257 }
static event_bus & instance()
Get singleton instance.
Definition event_bus.h:254
event_bus(std::shared_ptr< thread_pool > pool=nullptr)
Constructor.
Definition event_bus.h:123

References instance().

Referenced by instance().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ publish()

template<typename Event >
void kcenon::thread::event_bus::publish ( const Event & event)
inline

Publish an event asynchronously.

Template Parameters
EventEvent type
Parameters
eventEvent to publish

Definition at line 139 of file event_bus.h.

139 {
140 auto type = std::type_index(typeid(Event));
141
142 // Get handlers snapshot to avoid holding lock during callbacks
143 std::vector<handler_func> handlers_copy;
144 {
145 std::lock_guard<std::mutex> lock(mutex_);
146 auto it = handlers_.find(type);
147 if (it != handlers_.end()) {
148 for (const auto& [id, handler] : it->second) {
149 handlers_copy.push_back(handler);
150 }
151 }
152 }
153
154 // Process handlers asynchronously
155 if (!handlers_copy.empty() && thread_pool_) {
156 thread_pool_->submit([handlers_copy, event]() {
157 for (const auto& handler : handlers_copy) {
158 try {
159 handler(std::any(event));
160 } catch (...) {
161 // Log error but continue with other handlers
162 }
163 }
164 });
165 }
166 }

References handlers_, mutex_, and thread_pool_.

◆ publish_sync()

template<typename Event >
void kcenon::thread::event_bus::publish_sync ( const Event & event)
inline

Publish an event synchronously.

Template Parameters
EventEvent type
Parameters
eventEvent to publish

Definition at line 174 of file event_bus.h.

174 {
175 auto type = std::type_index(typeid(Event));
176
177 std::lock_guard<std::mutex> lock(mutex_);
178 auto it = handlers_.find(type);
179 if (it != handlers_.end()) {
180 for (const auto& [id, handler] : it->second) {
181 try {
182 handler(std::any(event));
183 } catch (...) {
184 // Log error but continue with other handlers
185 }
186 }
187 }
188 }

References handlers_, and mutex_.

◆ subscribe()

template<typename Event >
subscription kcenon::thread::event_bus::subscribe ( std::function< void(const Event &)> handler)
inlinenodiscard

Subscribe to events of a specific type.

Template Parameters
EventEvent type to subscribe to
Parameters
handlerHandler function
Returns
Subscription handle

Definition at line 197 of file event_bus.h.

197 {
198 auto type = std::type_index(typeid(Event));
199 auto wrapped_handler = [handler](const std::any& any_event) {
200 try {
201 const auto& event = std::any_cast<const Event&>(any_event);
202 handler(event);
203 } catch (const std::bad_any_cast&) {
204 // Type mismatch - should not happen
205 }
206 };
207
208 std::lock_guard<std::mutex> lock(mutex_);
209 auto id = next_handler_id_++;
210 handlers_[type][id] = wrapped_handler;
211
212 return subscription(this, type, id);
213 }
std::size_t next_handler_id_
Definition event_bus.h:277

References handlers_, mutex_, and next_handler_id_.

◆ subscriber_count()

template<typename Event >
std::size_t kcenon::thread::event_bus::subscriber_count ( ) const
inline

Get the number of subscribers for an event type.

Template Parameters
EventEvent type
Returns
Number of subscribers

Definition at line 240 of file event_bus.h.

240 {
241 auto type = std::type_index(typeid(Event));
242 std::lock_guard<std::mutex> lock(mutex_);
243 auto it = handlers_.find(type);
244 if (it != handlers_.end()) {
245 return it->second.size();
246 }
247 return 0;
248 }

References handlers_, and mutex_.

◆ unsubscribe()

void kcenon::thread::event_bus::unsubscribe ( std::type_index type,
std::size_t id )
inlineprivate

Unsubscribe a specific handler.

Definition at line 263 of file event_bus.h.

263 {
264 std::lock_guard<std::mutex> lock(mutex_);
265 auto it = handlers_.find(type);
266 if (it != handlers_.end()) {
267 it->second.erase(id);
268 if (it->second.empty()) {
269 handlers_.erase(it);
270 }
271 }
272 }

References handlers_, and mutex_.

Referenced by kcenon::thread::event_bus::subscription::unsubscribe().

Here is the caller graph for this function:

Member Data Documentation

◆ handlers_

std::unordered_map<std::type_index, std::unordered_map<std::size_t, handler_func> > kcenon::thread::event_bus::handlers_
private

◆ mutex_

std::mutex kcenon::thread::event_bus::mutex_
mutableprivate

◆ next_handler_id_

std::size_t kcenon::thread::event_bus::next_handler_id_ {1}
private

Definition at line 277 of file event_bus.h.

277{1};

Referenced by subscribe().

◆ thread_pool_

std::shared_ptr<thread_pool> kcenon::thread::event_bus::thread_pool_
private

Definition at line 276 of file event_bus.h.

Referenced by event_bus(), and publish().


The documentation for this class was generated from the following file: