Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::safe_event_dispatcher Class Reference

Event dispatcher with exception handling and error recovery. More...

#include <safe_event_dispatcher.h>

Collaboration diagram for kcenon::monitoring::safe_event_dispatcher:
Collaboration graph

Classes

struct  dead_letter_entry
 

Public Types

using error_callback = std::function<void(const handler_error_info&)>
 

Public Member Functions

 safe_event_dispatcher (std::shared_ptr< interface_event_bus > bus=nullptr)
 
template<typename EventType , typename Handler >
uint64_t subscribe_safe (Handler &&handler, event_priority priority=event_priority::normal)
 Subscribe to events with automatic exception handling.
 
template<typename EventType >
bool publish_safe (const EventType &event, event_priority priority=event_priority::normal)
 Publish event with error recovery.
 
void set_error_callback (error_callback callback)
 Set callback for handler errors.
 
std::vector< handler_error_infoget_handler_errors () const
 Get recent handler errors.
 
uint64_t get_total_exceptions () const
 Get total exception count.
 
void clear_errors ()
 Clear error history.
 
void set_circuit_breaker_threshold (size_t threshold)
 Set circuit breaker threshold.
 
size_t get_dead_letter_queue_size () const
 Get dead letter queue size.
 
template<typename RecoveryHandler >
void process_dead_letters (RecoveryHandler &&handler)
 Process dead letter queue with recovery handler.
 
std::shared_ptr< interface_event_busget_bus () const
 Get underlying event bus.
 

Private Member Functions

void handle_exception (uint64_t handler_id, std::type_index event_type, const std::string &error_msg)
 
void add_to_dead_letter_queue (std::type_index event_type, const std::string &error_msg)
 

Private Attributes

std::shared_ptr< interface_event_busbus_
 
std::atomic< uint64_t > next_id_ {1}
 
std::atomic< uint64_t > total_exceptions_ {0}
 
std::mutex error_mutex_
 
std::vector< handler_error_inforecent_errors_
 
std::unordered_map< uint64_t, size_t > failed_handler_counts_
 
error_callback error_callback_
 
size_t circuit_breaker_threshold_
 
std::mutex dlq_mutex_
 
std::vector< dead_letter_entrydead_letter_queue_
 

Detailed Description

Event dispatcher with exception handling and error recovery.

Wraps event_bus to provide:

  • Exception isolation (one handler's failure doesn't affect others)
  • Error logging and metrics
  • Circuit breaker for repeatedly failing handlers
  • Dead letter queue for failed events

Production Safety

auto dispatcher = std::make_shared<safe_event_dispatcher>();
// Subscribe with automatic exception handling
dispatcher->subscribe<my_event>([](const my_event& evt) {
// This might throw, but won't crash the bus
process_event(evt);
});
// Publish events safely
dispatcher->publish(my_event{});
// Monitor failed handlers
auto errors = dispatcher->get_handler_errors();
for (const auto& error : errors) {
log_error("Handler {} failed: {}", error.handler_id, error.error_message);
}

Definition at line 63 of file safe_event_dispatcher.h.

Member Typedef Documentation

◆ error_callback

Definition at line 65 of file safe_event_dispatcher.h.

Constructor & Destructor Documentation

◆ safe_event_dispatcher()

kcenon::monitoring::safe_event_dispatcher::safe_event_dispatcher ( std::shared_ptr< interface_event_bus > bus = nullptr)
inlineexplicit

Definition at line 67 of file safe_event_dispatcher.h.

68 : bus_(bus ? bus : event_bus::instance())
71 {
72 }
static std::shared_ptr< interface_event_bus > instance()
Definition event_bus.h:164
std::shared_ptr< interface_event_bus > bus_

Member Function Documentation

◆ add_to_dead_letter_queue()

void kcenon::monitoring::safe_event_dispatcher::add_to_dead_letter_queue ( std::type_index event_type,
const std::string & error_msg )
inlineprivate

Definition at line 262 of file safe_event_dispatcher.h.

263 {
264 std::lock_guard<std::mutex> lock(dlq_mutex_);
265
266 dead_letter_entry entry{
267 event_type,
268 error_msg,
269 std::chrono::steady_clock::now()
270 };
271
272 dead_letter_queue_.push_back(entry);
273
274 // Limit dead letter queue size
275 if (dead_letter_queue_.size() > 1000) {
277 }
278 }
std::vector< dead_letter_entry > dead_letter_queue_

References dead_letter_queue_, and dlq_mutex_.

Referenced by publish_safe().

Here is the caller graph for this function:

◆ clear_errors()

void kcenon::monitoring::safe_event_dispatcher::clear_errors ( )
inline

Clear error history.

Definition at line 166 of file safe_event_dispatcher.h.

166 {
167 std::lock_guard<std::mutex> lock(error_mutex_);
168 recent_errors_.clear();
171 }
std::vector< handler_error_info > recent_errors_
std::unordered_map< uint64_t, size_t > failed_handler_counts_

References error_mutex_, failed_handler_counts_, recent_errors_, and total_exceptions_.

◆ get_bus()

std::shared_ptr< interface_event_bus > kcenon::monitoring::safe_event_dispatcher::get_bus ( ) const
inline

Get underlying event bus.

Definition at line 210 of file safe_event_dispatcher.h.

210 {
211 return bus_;
212 }

References bus_.

◆ get_dead_letter_queue_size()

size_t kcenon::monitoring::safe_event_dispatcher::get_dead_letter_queue_size ( ) const
inline

Get dead letter queue size.

Definition at line 184 of file safe_event_dispatcher.h.

184 {
185 std::lock_guard<std::mutex> lock(dlq_mutex_);
186 return dead_letter_queue_.size();
187 }

References dead_letter_queue_, and dlq_mutex_.

◆ get_handler_errors()

std::vector< handler_error_info > kcenon::monitoring::safe_event_dispatcher::get_handler_errors ( ) const
inline

Get recent handler errors.

Definition at line 151 of file safe_event_dispatcher.h.

151 {
152 std::lock_guard<std::mutex> lock(error_mutex_);
153 return recent_errors_;
154 }

References error_mutex_, and recent_errors_.

◆ get_total_exceptions()

uint64_t kcenon::monitoring::safe_event_dispatcher::get_total_exceptions ( ) const
inline

Get total exception count.

Definition at line 159 of file safe_event_dispatcher.h.

159 {
160 return total_exceptions_.load(std::memory_order_acquire);
161 }

References total_exceptions_.

◆ handle_exception()

void kcenon::monitoring::safe_event_dispatcher::handle_exception ( uint64_t handler_id,
std::type_index event_type,
const std::string & error_msg )
inlineprivate

Definition at line 221 of file safe_event_dispatcher.h.

223 {
224 total_exceptions_.fetch_add(1, std::memory_order_relaxed);
225
226 std::lock_guard<std::mutex> lock(error_mutex_);
227
228 // Record error
229 handler_error_info info{
230 handler_id,
231 error_msg,
232 event_type,
233 std::chrono::steady_clock::now()
234 };
235
236 recent_errors_.push_back(info);
237
238 // Keep only last 100 errors
239 if (recent_errors_.size() > 100) {
240 recent_errors_.erase(recent_errors_.begin());
241 }
242
243 // Track failure count
244 auto& count = failed_handler_counts_[handler_id];
245 count++;
246
247 // Circuit breaker check
248 if (count >= circuit_breaker_threshold_) {
249 // Handler has failed too many times
250 // In production, we would disable this handler
251 if (error_callback_) {
253 }
254 }
255
256 // Notify error callback
257 if (error_callback_) {
259 }
260 }
@ info
Informational, no action required.

References circuit_breaker_threshold_, error_callback_, error_mutex_, failed_handler_counts_, kcenon::monitoring::info, recent_errors_, and total_exceptions_.

Referenced by subscribe_safe().

Here is the caller graph for this function:

◆ process_dead_letters()

template<typename RecoveryHandler >
void kcenon::monitoring::safe_event_dispatcher::process_dead_letters ( RecoveryHandler && handler)
inline

Process dead letter queue with recovery handler.

Definition at line 193 of file safe_event_dispatcher.h.

193 {
194 std::lock_guard<std::mutex> lock(dlq_mutex_);
195
196 for (auto& dlq_entry : dead_letter_queue_) {
197 try {
198 handler(dlq_entry);
199 } catch (...) {
200 // Recovery handler failed, skip
201 }
202 }
203
204 dead_letter_queue_.clear();
205 }

References dead_letter_queue_, and dlq_mutex_.

◆ publish_safe()

template<typename EventType >
bool kcenon::monitoring::safe_event_dispatcher::publish_safe ( const EventType & event,
event_priority priority = event_priority::normal )
inline

Publish event with error recovery.

Parameters
eventEvent to publish
priorityEvent priority
Returns
true if published successfully

Definition at line 104 of file safe_event_dispatcher.h.

104 {
105 try {
106 bus_->publish(event, priority);
107 return true;
108 } catch (const std::exception& e) {
109 // Log error
110 if (error_callback_) {
111 handler_error_info info{
112 0, // No specific handler
113 std::string("Publish failed: ") + e.what(),
114 typeid(EventType),
115 std::chrono::steady_clock::now()
116 };
118 }
119
120 // Add to dead letter queue
121 add_to_dead_letter_queue(typeid(EventType), e.what());
122
123 return false;
124 } catch (...) {
125 if (error_callback_) {
126 handler_error_info info{
127 0,
128 "Publish failed: Unknown exception",
129 typeid(EventType),
130 std::chrono::steady_clock::now()
131 };
133 }
134
135 add_to_dead_letter_queue(typeid(EventType), "Unknown exception");
136 return false;
137 }
138 }
void add_to_dead_letter_queue(std::type_index event_type, const std::string &error_msg)

References add_to_dead_letter_queue(), bus_, error_callback_, and kcenon::monitoring::info.

Here is the call graph for this function:

◆ set_circuit_breaker_threshold()

void kcenon::monitoring::safe_event_dispatcher::set_circuit_breaker_threshold ( size_t threshold)
inline

Set circuit breaker threshold.

Parameters
thresholdNumber of failures before handler is disabled

Definition at line 177 of file safe_event_dispatcher.h.

177 {
178 circuit_breaker_threshold_ = threshold;
179 }

References circuit_breaker_threshold_.

◆ set_error_callback()

void kcenon::monitoring::safe_event_dispatcher::set_error_callback ( error_callback callback)
inline

Set callback for handler errors.

Definition at line 143 of file safe_event_dispatcher.h.

143 {
144 std::lock_guard<std::mutex> lock(error_mutex_);
145 error_callback_ = std::move(callback);
146 }

References error_callback_, and error_mutex_.

◆ subscribe_safe()

template<typename EventType , typename Handler >
uint64_t kcenon::monitoring::safe_event_dispatcher::subscribe_safe ( Handler && handler,
event_priority priority = event_priority::normal )
inline

Subscribe to events with automatic exception handling.

Parameters
handlerEvent handler function
priorityHandler priority
Returns
Subscription ID

Definition at line 81 of file safe_event_dispatcher.h.

81 {
82 // Wrap handler with exception handling
83 auto safe_handler = [this, h = std::forward<Handler>(handler), id = next_id_++]
84 (const EventType& event) {
85 try {
86 h(event);
87 } catch (const std::exception& e) {
88 handle_exception(id, typeid(EventType), e.what());
89 } catch (...) {
90 handle_exception(id, typeid(EventType), "Unknown exception");
91 }
92 };
93
94 return bus_->subscribe<EventType>(std::move(safe_handler));
95 }
void handle_exception(uint64_t handler_id, std::type_index event_type, const std::string &error_msg)

References bus_, handle_exception(), and next_id_.

Here is the call graph for this function:

Member Data Documentation

◆ bus_

std::shared_ptr<interface_event_bus> kcenon::monitoring::safe_event_dispatcher::bus_
private

Definition at line 281 of file safe_event_dispatcher.h.

Referenced by get_bus(), publish_safe(), and subscribe_safe().

◆ circuit_breaker_threshold_

size_t kcenon::monitoring::safe_event_dispatcher::circuit_breaker_threshold_
private

Definition at line 289 of file safe_event_dispatcher.h.

Referenced by handle_exception(), and set_circuit_breaker_threshold().

◆ dead_letter_queue_

std::vector<dead_letter_entry> kcenon::monitoring::safe_event_dispatcher::dead_letter_queue_
private

◆ dlq_mutex_

std::mutex kcenon::monitoring::safe_event_dispatcher::dlq_mutex_
mutableprivate

◆ error_callback_

error_callback kcenon::monitoring::safe_event_dispatcher::error_callback_
private

Definition at line 288 of file safe_event_dispatcher.h.

Referenced by handle_exception(), publish_safe(), and set_error_callback().

◆ error_mutex_

std::mutex kcenon::monitoring::safe_event_dispatcher::error_mutex_
mutableprivate

◆ failed_handler_counts_

std::unordered_map<uint64_t, size_t> kcenon::monitoring::safe_event_dispatcher::failed_handler_counts_
private

Definition at line 287 of file safe_event_dispatcher.h.

Referenced by clear_errors(), and handle_exception().

◆ next_id_

std::atomic<uint64_t> kcenon::monitoring::safe_event_dispatcher::next_id_ {1}
private

Definition at line 282 of file safe_event_dispatcher.h.

282{1};

Referenced by subscribe_safe().

◆ recent_errors_

std::vector<handler_error_info> kcenon::monitoring::safe_event_dispatcher::recent_errors_
private

Definition at line 286 of file safe_event_dispatcher.h.

Referenced by clear_errors(), get_handler_errors(), and handle_exception().

◆ total_exceptions_

std::atomic<uint64_t> kcenon::monitoring::safe_event_dispatcher::total_exceptions_ {0}
private

Definition at line 283 of file safe_event_dispatcher.h.

283{0};

Referenced by clear_errors(), get_total_exceptions(), and handle_exception().


The documentation for this class was generated from the following file: