Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_bridge.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13// Suppress deprecation warnings from thread_system headers
14#pragma clang diagnostic push
15#pragma clang diagnostic ignored "-Wdeprecated-declarations"
16
18
22#include <atomic>
23#include <mutex>
24
25#if KCENON_WITH_CONTAINER_SYSTEM
26#include "container.h"
27#endif
28
30
32public:
34 uint64_t messages_sent = 0;
35 uint64_t messages_received = 0;
36 uint64_t bytes_sent = 0;
37 uint64_t bytes_received = 0;
38 uint64_t connections_active = 0;
39 std::chrono::milliseconds avg_latency{0};
40 std::chrono::steady_clock::time_point start_time;
41 };
42
43 impl() : initialized_(false) {
44 metrics_.start_time = std::chrono::steady_clock::now();
45 }
46
47 std::atomic<bool> initialized_{false};
48 mutable std::mutex metrics_mutex_;
51
52#if KCENON_WITH_CONTAINER_SYSTEM
53 std::shared_ptr<container_module::value_container> active_container_;
54 std::function<void(const container_module::value_container&)> container_handler_;
55#endif
56
57#if KCENON_WITH_THREAD_SYSTEM
58 std::shared_ptr<kcenon::thread::thread_pool> thread_pool_;
59#endif
60 std::shared_ptr<thread_pool_interface> thread_pool_interface_;
61};
62
63messaging_bridge::messaging_bridge() : pimpl_(std::make_unique<impl>()) {
64 pimpl_->metrics_.start_time = std::chrono::steady_clock::now();
65}
66
72
73// INetworkBridge interface implementation
74
76 std::lock_guard<std::mutex> lock(pimpl_->metrics_mutex_);
77
78 if (pimpl_->initialized_.load()) {
79 return error_void(
81 "messaging_bridge already initialized",
82 "messaging_bridge::initialize");
83 }
84
85 // Process configuration properties
86 auto enabled_it = config.properties.find("enabled");
87 if (enabled_it != config.properties.end() && enabled_it->second == "false") {
88 return error_void(
90 "Bridge is disabled in configuration",
91 "messaging_bridge::initialize");
92 }
93
94 // Initialize bridge metrics
96 pimpl_->bridge_metrics_.last_activity = std::chrono::steady_clock::now();
98
99 pimpl_->initialized_.store(true);
100 return ok();
101}
102
104 std::lock_guard<std::mutex> lock(pimpl_->metrics_mutex_);
105
106 if (!pimpl_->initialized_.load()) {
107 return ok(); // Already shut down, idempotent
108 }
109
110 pimpl_->initialized_.store(false);
112
113 return ok();
114}
115
117 return pimpl_->initialized_.load();
118}
119
121 std::lock_guard<std::mutex> lock(pimpl_->metrics_mutex_);
122
123 // Update bridge metrics from performance metrics
125 pimpl_->bridge_metrics_.last_activity = std::chrono::steady_clock::now();
126 pimpl_->bridge_metrics_.custom_metrics["messages_sent"] = static_cast<double>(pimpl_->metrics_.messages_sent);
127 pimpl_->bridge_metrics_.custom_metrics["messages_received"] = static_cast<double>(pimpl_->metrics_.messages_received);
128 pimpl_->bridge_metrics_.custom_metrics["bytes_sent"] = static_cast<double>(pimpl_->metrics_.bytes_sent);
129 pimpl_->bridge_metrics_.custom_metrics["bytes_received"] = static_cast<double>(pimpl_->metrics_.bytes_received);
130 pimpl_->bridge_metrics_.custom_metrics["connections_active"] = static_cast<double>(pimpl_->metrics_.connections_active);
131 pimpl_->bridge_metrics_.custom_metrics["avg_latency_ms"] = static_cast<double>(pimpl_->metrics_.avg_latency.count());
132
133 return pimpl_->bridge_metrics_;
134}
135
136// messaging_bridge-specific methods (maintained for backward compatibility)
137
138std::shared_ptr<kcenon::network::core::messaging_server> messaging_bridge::create_server(
139 const std::string& server_id
140) {
141 return std::make_shared<kcenon::network::core::messaging_server>(server_id);
142}
143
144std::shared_ptr<kcenon::network::core::messaging_client> messaging_bridge::create_client(
145 const std::string& client_id
146) {
147 return std::make_shared<kcenon::network::core::messaging_client>(client_id);
148}
149
150#if KCENON_WITH_CONTAINER_SYSTEM
151void messaging_bridge::set_container(
152 std::shared_ptr<container_module::value_container> container
153) {
154 pimpl_->active_container_ = container;
155}
156
157void messaging_bridge::set_container_message_handler(
158 std::function<void(const container_module::value_container&)> handler
159) {
160 pimpl_->container_handler_ = handler;
161}
162#endif
163
164#if KCENON_WITH_THREAD_SYSTEM
165void messaging_bridge::set_thread_pool(
166 std::shared_ptr<kcenon::thread::thread_pool> pool
167) {
168 pimpl_->thread_pool_ = pool;
169}
170#endif
171
173 std::lock_guard<std::mutex> lock(pimpl_->metrics_mutex_);
175 pimpl_->metrics_.start_time = std::chrono::steady_clock::now();
176}
177
179 std::shared_ptr<thread_pool_interface> pool
180) {
182}
183
184std::shared_ptr<thread_pool_interface> messaging_bridge::get_thread_pool_interface() const {
186 // Return the global thread integration manager's pool
188 }
190}
191
192} // namespace kcenon::network::integration
193
194#pragma clang diagnostic pop
std::shared_ptr< thread_pool_interface > thread_pool_interface_
BridgeMetrics get_metrics() const override
Get current metrics.
VoidResult initialize(const BridgeConfig &config) override
Initialize the bridge with configuration.
bool is_initialized() const override
Check if the bridge is initialized.
std::shared_ptr< kcenon::network::core::messaging_client > create_client(const std::string &client_id)
Create a messaging client with messaging_system compatible API.
std::shared_ptr< kcenon::network::core::messaging_server > create_server(const std::string &server_id)
Create a messaging server with messaging_system compatible API.
void set_thread_pool_interface(std::shared_ptr< thread_pool_interface > pool)
Set thread pool using the integration interface.
void reset_metrics()
Reset performance metrics.
VoidResult shutdown() override
Shutdown the bridge.
std::shared_ptr< thread_pool_interface > get_thread_pool_interface() const
Get the thread pool interface.
static thread_integration_manager & instance()
Get the singleton instance.
std::shared_ptr< thread_pool_interface > get_thread_pool()
Get the current thread pool.
Feature flags for network_system.
tracing_config config
Definition exporters.cpp:29
Bridge for messaging_system compatibility implementing INetworkBridge.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Network-specific error and result type definitions.
Configuration for bridge initialization.
Metrics and health information for a bridge.
std::chrono::steady_clock::time_point last_activity
Timestamp of last activity or health check.
std::map< std::string, double > custom_metrics
Bridge-specific custom metrics.
bool is_healthy
Overall health status of the bridge.
Thread system integration interface for network_system.