Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
safe_event_dispatcher.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5
13#pragma once
14
15#include "event_bus.h"
16#include <exception>
17#include <functional>
18#include <string>
19
20namespace kcenon { namespace monitoring {
21
27 uint64_t handler_id;
28 std::string error_message;
29 std::type_index event_type;
30 std::chrono::steady_clock::time_point timestamp;
31};
32
64public:
65 using error_callback = std::function<void(const handler_error_info&)>;
66
67 explicit safe_event_dispatcher(std::shared_ptr<interface_event_bus> bus = nullptr)
68 : bus_(bus ? bus : event_bus::instance())
71 {
72 }
73
80 template<typename EventType, typename Handler>
81 uint64_t subscribe_safe(Handler&& handler, event_priority priority = event_priority::normal) {
82 // Wrap handler with exception handling
83 auto safe_handler = [this, h = std::forward<Handler>(handler), id = next_id_++]
84 (const EventType& event) {
85 try {
86 h(event);
87 } catch (const std::exception& e) {
88 handle_exception(id, typeid(EventType), e.what());
89 } catch (...) {
90 handle_exception(id, typeid(EventType), "Unknown exception");
91 }
92 };
93
94 return bus_->subscribe<EventType>(std::move(safe_handler));
95 }
96
103 template<typename EventType>
104 bool publish_safe(const EventType& event, event_priority priority = event_priority::normal) {
105 try {
106 bus_->publish(event, priority);
107 return true;
108 } catch (const std::exception& e) {
109 // Log error
110 if (error_callback_) {
112 0, // No specific handler
113 std::string("Publish failed: ") + e.what(),
114 typeid(EventType),
115 std::chrono::steady_clock::now()
116 };
118 }
119
120 // Add to dead letter queue
121 add_to_dead_letter_queue(typeid(EventType), e.what());
122
123 return false;
124 } catch (...) {
125 if (error_callback_) {
127 0,
128 "Publish failed: Unknown exception",
129 typeid(EventType),
130 std::chrono::steady_clock::now()
131 };
133 }
134
135 add_to_dead_letter_queue(typeid(EventType), "Unknown exception");
136 return false;
137 }
138 }
139
144 std::lock_guard<std::mutex> lock(error_mutex_);
145 error_callback_ = std::move(callback);
146 }
147
151 std::vector<handler_error_info> get_handler_errors() const {
152 std::lock_guard<std::mutex> lock(error_mutex_);
153 return recent_errors_;
154 }
155
159 uint64_t get_total_exceptions() const {
160 return total_exceptions_.load(std::memory_order_acquire);
161 }
162
167 std::lock_guard<std::mutex> lock(error_mutex_);
168 recent_errors_.clear();
171 }
172
177 void set_circuit_breaker_threshold(size_t threshold) {
178 circuit_breaker_threshold_ = threshold;
179 }
180
185 std::lock_guard<std::mutex> lock(dlq_mutex_);
186 return dead_letter_queue_.size();
187 }
188
192 template<typename RecoveryHandler>
193 void process_dead_letters(RecoveryHandler&& handler) {
194 std::lock_guard<std::mutex> lock(dlq_mutex_);
195
196 for (auto& dlq_entry : dead_letter_queue_) {
197 try {
198 handler(dlq_entry);
199 } catch (...) {
200 // Recovery handler failed, skip
201 }
202 }
203
204 dead_letter_queue_.clear();
205 }
206
210 std::shared_ptr<interface_event_bus> get_bus() const {
211 return bus_;
212 }
213
214private:
216 std::type_index event_type;
217 std::string error_message;
218 std::chrono::steady_clock::time_point timestamp;
219 };
220
221 void handle_exception(uint64_t handler_id,
222 std::type_index event_type,
223 const std::string& error_msg) {
224 total_exceptions_.fetch_add(1, std::memory_order_relaxed);
225
226 std::lock_guard<std::mutex> lock(error_mutex_);
227
228 // Record error
230 handler_id,
231 error_msg,
232 event_type,
233 std::chrono::steady_clock::now()
234 };
235
236 recent_errors_.push_back(info);
237
238 // Keep only last 100 errors
239 if (recent_errors_.size() > 100) {
240 recent_errors_.erase(recent_errors_.begin());
241 }
242
243 // Track failure count
244 auto& count = failed_handler_counts_[handler_id];
245 count++;
246
247 // Circuit breaker check
248 if (count >= circuit_breaker_threshold_) {
249 // Handler has failed too many times
250 // In production, we would disable this handler
251 if (error_callback_) {
253 }
254 }
255
256 // Notify error callback
257 if (error_callback_) {
259 }
260 }
261
262 void add_to_dead_letter_queue(std::type_index event_type,
263 const std::string& error_msg) {
264 std::lock_guard<std::mutex> lock(dlq_mutex_);
265
266 dead_letter_entry entry{
267 event_type,
268 error_msg,
269 std::chrono::steady_clock::now()
270 };
271
272 dead_letter_queue_.push_back(entry);
273
274 // Limit dead letter queue size
275 if (dead_letter_queue_.size() > 1000) {
277 }
278 }
279
280private:
281 std::shared_ptr<interface_event_bus> bus_;
282 std::atomic<uint64_t> next_id_{1};
283 std::atomic<uint64_t> total_exceptions_{0};
284
285 mutable std::mutex error_mutex_;
286 std::vector<handler_error_info> recent_errors_;
287 std::unordered_map<uint64_t, size_t> failed_handler_counts_;
290
291 mutable std::mutex dlq_mutex_;
292 std::vector<dead_letter_entry> dead_letter_queue_;
293};
294
295} } // namespace kcenon::monitoring
Priority levels for event processing.
Thread-safe event bus implementation.
Definition event_bus.h:146
Event dispatcher with exception handling and error recovery.
bool publish_safe(const EventType &event, event_priority priority=event_priority::normal)
Publish event with error recovery.
uint64_t subscribe_safe(Handler &&handler, event_priority priority=event_priority::normal)
Subscribe to events with automatic exception handling.
void process_dead_letters(RecoveryHandler &&handler)
Process dead letter queue with recovery handler.
std::vector< handler_error_info > recent_errors_
void handle_exception(uint64_t handler_id, std::type_index event_type, const std::string &error_msg)
void set_circuit_breaker_threshold(size_t threshold)
Set circuit breaker threshold.
void set_error_callback(error_callback callback)
Set callback for handler errors.
std::shared_ptr< interface_event_bus > get_bus() const
Get underlying event bus.
std::function< void(const handler_error_info &)> error_callback
safe_event_dispatcher(std::shared_ptr< interface_event_bus > bus=nullptr)
void add_to_dead_letter_queue(std::type_index event_type, const std::string &error_msg)
std::vector< handler_error_info > get_handler_errors() const
Get recent handler errors.
std::vector< dead_letter_entry > dead_letter_queue_
std::unordered_map< uint64_t, size_t > failed_handler_counts_
uint64_t get_total_exceptions() const
Get total exception count.
std::shared_ptr< interface_event_bus > bus_
size_t get_dead_letter_queue_size() const
Get dead letter queue size.
Lightweight event bus implementation for monitoring system.
@ info
Informational, no action required.
Information about handler execution errors.
std::chrono::steady_clock::time_point timestamp
std::string error_message
std::chrono::steady_clock::time_point timestamp
std::type_index event_type