21#include <condition_variable>
29#include <unordered_map>
35namespace kcenon {
namespace monitoring {
50 timestamp(std::chrono::steady_clock::now()),
56 return static_cast<int>(
priority) <
static_cast<int>(
other.priority);
64 static std::atomic<uint64_t>
counter{0};
65 return counter.fetch_add(1, std::memory_order_relaxed);
74 std::function<void(
const std::any&)>
handler;
164 static std::shared_ptr<interface_event_bus>
instance() {
165 static std::shared_ptr<event_bus> singleton = [] {
166 auto bus = std::make_shared<event_bus>();
174 common::VoidResult
start()
override {
193 common::VoidResult
stop()
override {
209 if (worker.joinable()) {
245 auto& handler_list = it->second;
247 std::remove_if(handler_list.begin(), handler_list.end(),
249 return wrapper.id == token.get_handler_id();
254 if (handler_list.empty()) {
281 size_t total_subscribers = 0;
282 for (
const auto& [type, handlers] :
handlers_) {
283 total_subscribers += handlers.size();
299 std::any event)
override {
300 bool should_sleep =
false;
319 std::this_thread::sleep_for(std::chrono::milliseconds(1));
328 std::type_index event_type,
329 std::function<
void(
const std::any&)> handler,
335 handlers_[event_type].emplace_back(std::move(handler), priority, handler_id);
357 return it !=
handlers_.end() ? it->second.size() : 0;
368 [
this] { return !event_queue_.empty() || stop_requested_; });
375 std::vector<event_envelope> batch;
376 size_t batch_size = std::min(
size_t(10),
event_queue_.size());
378 for (
size_t i = 0; i < batch_size && !
event_queue_.empty(); ++i) {
387 for (
auto& envelope : batch) {
396 std::vector<event_envelope>
pending;
406 for (
auto& envelope :
pending) {
418 for (
const auto& wrapper : it->second) {
420 wrapper.handler(envelope.
payload);
421 }
catch (
const std::exception& e) {
436 std::unordered_map<std::type_index, std::vector<event_handler_wrapper>>
handlers_;
Priority levels for event processing.
Thread-safe event bus implementation.
void dispatch_event(const event_envelope &envelope)
common::VoidResult publish_event_impl(std::type_index event_type, std::any event) override
common::Result< subscription_token > subscribe_event_impl(std::type_index event_type, std::function< void(const std::any &)> handler, uint64_t handler_id, event_priority priority) override
std::atomic< bool > stop_requested_
void process_events_worker()
std::atomic< uint64_t > total_events_published_
event_bus(const config &cfg=config())
std::mutex handlers_mutex_
std::atomic< uint64_t > total_events_dropped_
common::VoidResult unsubscribe_event(const subscription_token &token) override
Unsubscribe from events using subscription token.
void process_all_pending()
common::VoidResult process_pending_events() override
Process all pending events synchronously.
size_t get_subscriber_count_impl(std::type_index event_type) const override
size_t get_pending_event_count() const override
Get pending event count.
bool is_active() const override
Check if event bus is active.
std::condition_variable queue_cv_
std::atomic< uint64_t > total_events_processed_
std::atomic< bool > is_running_
std::priority_queue< event_envelope > event_queue_
std::unordered_map< std::type_index, std::vector< event_handler_wrapper > > handlers_
std::vector< std::thread > workers_
static std::shared_ptr< interface_event_bus > instance()
common::VoidResult start() override
Start the event bus.
common::VoidResult clear_subscriptions_impl(std::type_index event_type) override
common::VoidResult stop() override
Stop the event bus.
Pure virtual interface for event bus implementation.
Token for managing event subscriptions.
std::type_index get_event_type() const
Monitoring system specific error codes.
Event bus interface for decoupled component communication.
@ counter
Monotonically increasing counter.
@ pending
Condition met, waiting for duration threshold.
Result pattern type definitions for monitoring system.
Extended error information with context.
size_t current_queue_size
bool is_back_pressure_active
Configuration for event bus.
bool enable_back_pressure
std::chrono::milliseconds processing_interval
size_t worker_thread_count
size_t back_pressure_threshold
Container for events with metadata.
static uint64_t generate_id()
std::chrono::steady_clock::time_point timestamp
event_envelope(std::type_index t, std::any p, event_priority pr)
bool operator<(const event_envelope &other) const
Wrapper for event handlers with metadata.
event_handler_wrapper(std::function< void(const std::any &)> h, event_priority p, uint64_t i)
std::function< void(const std::any &)> handler