Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_udp_server.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
9
11{
12 using udp = asio::ip::udp;
13
15 : server_id_(server_id)
16 {
17 }
18
20 {
22 {
23 (void)stop_server();
24 }
25 }
26
27 // ========================================================================
28 // Lifecycle Management
29 // ========================================================================
30
32 {
33 if (lifecycle_.is_running())
34 {
35 return error_void(
37 "UDP server is already running",
38 "messaging_udp_server::start_server");
39 }
40
41 // Mark as running before starting
42 lifecycle_.set_running();
43
44 auto result = do_start_impl(port);
45 if (result.is_err())
46 {
47 lifecycle_.mark_stopped();
48 }
49
50 return result;
51 }
52
54 {
55 if (!lifecycle_.prepare_stop())
56 {
57 return ok(); // Not running or already stopping
58 }
59
60 auto result = do_stop_impl();
61
62 lifecycle_.mark_stopped();
63
64 return result;
65 }
66
67 auto messaging_udp_server::server_id() const -> const std::string&
68 {
69 return server_id_;
70 }
71
72 // ========================================================================
73 // i_network_component interface implementation
74 // ========================================================================
75
77 {
78 return lifecycle_.is_running();
79 }
80
82 {
83 lifecycle_.wait_for_stop();
84 }
85
86 // ========================================================================
87 // i_udp_server interface implementation
88 // ========================================================================
89
91 {
92 return start_server(port);
93 }
94
96 {
97 return stop_server();
98 }
99
102 std::vector<uint8_t>&& data,
104 {
105 if (!lifecycle_.is_running())
106 {
107 return error_void(
109 "UDP server is not running",
110 "messaging_udp_server::send_to",
111 ""
112 );
113 }
114
115 if (!socket_)
116 {
117 return error_void(
119 "Socket not available",
120 "messaging_udp_server::send_to",
121 ""
122 );
123 }
124
125 try
126 {
127 // Convert endpoint_info to asio::ip::udp::endpoint
128 asio::ip::udp::endpoint asio_endpoint(
129 asio::ip::make_address(endpoint.address),
130 endpoint.port
131 );
132
133 socket_->async_send_to(std::move(data), asio_endpoint, std::move(handler));
134 return ok();
135 }
136 catch (const std::exception& e)
137 {
138 return error_void(
140 std::string("Failed to send datagram: ") + e.what(),
141 "messaging_udp_server::send_to",
142 "Target: " + endpoint.address + ":" + std::to_string(endpoint.port)
143 );
144 }
145 }
146
149 {
150 if (!callback)
151 {
152 // Clear the callback
153 callbacks_.set<to_index(callback_index::receive)>(nullptr);
154 return;
155 }
156
157 // Adapt the interface callback to the internal callback type
158 // Convert asio::ip::udp::endpoint to endpoint_info
159 callbacks_.set<to_index(callback_index::receive)>(
160 [callback = std::move(callback)](
161 const std::vector<uint8_t>& data,
162 const asio::ip::udp::endpoint& endpoint)
163 {
165 info.address = endpoint.address().to_string();
166 info.port = endpoint.port();
167 callback(data, info);
168 });
169 }
170
172 {
173 callbacks_.set<to_index(callback_index::receive)>(std::move(callback));
174 }
175
177 {
178 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
179 }
180
181 // ========================================================================
182 // Internal Implementation Methods
183 // ========================================================================
184
186 {
187 try
188 {
189 // Create io_context
190 io_context_ = std::make_unique<asio::io_context>();
191
192 // Create and bind UDP socket
193 asio::ip::udp::socket raw_socket(*io_context_, udp::endpoint(udp::v4(), port));
194
195 // Wrap in our udp_socket
196 socket_ = std::make_shared<internal::udp_socket>(std::move(raw_socket));
197
198 // Set callbacks to socket
199 auto receive_cb = get_receive_callback();
200 if (receive_cb)
201 {
202 socket_->set_receive_callback(std::move(receive_cb));
203 }
204
205 auto error_cb = get_error_callback();
206 if (error_cb)
207 {
208 socket_->set_error_callback(std::move(error_cb));
209 }
210
211 // Start receiving
212 socket_->start_receive();
213
214 // Get thread pool from network context
216 if (!thread_pool_) {
217 // Fallback: create a temporary thread pool if network_context is not initialized
218 NETWORK_LOG_WARN("[messaging_udp_server] network_context not initialized, creating temporary thread pool");
219 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
220 }
221
222 // Start io_context on thread pool
223 io_context_future_ = thread_pool_->submit(
224 [this]()
225 {
226 try
227 {
228 NETWORK_LOG_DEBUG("[messaging_udp_server] io_context started");
229 io_context_->run();
230 NETWORK_LOG_DEBUG("[messaging_udp_server] io_context stopped");
231 }
232 catch (const std::exception& e)
233 {
234 NETWORK_LOG_ERROR(std::string("Worker thread exception: ") + e.what());
235 }
236 });
237
238 NETWORK_LOG_INFO("UDP server started on port " + std::to_string(port));
239
240 return ok();
241 }
242 catch (const std::system_error& e)
243 {
244 return error_void(
246 std::string("Failed to bind UDP socket: ") + e.what(),
247 "messaging_udp_server::do_start_impl",
248 "Port: " + std::to_string(port)
249 );
250 }
251 catch (const std::exception& e)
252 {
253 return error_void(
255 std::string("Failed to start UDP server: ") + e.what(),
256 "messaging_udp_server::do_start_impl",
257 "Port: " + std::to_string(port)
258 );
259 }
260 }
261
263 {
264 try
265 {
266 // Stop receiving
267 if (socket_)
268 {
269 socket_->stop_receive();
270 }
271
272 // Stop io_context
273 if (io_context_)
274 {
275 io_context_->stop();
276 }
277
278 // Join worker thread
279 if (io_context_future_.valid())
280 {
281 try {
282 io_context_future_.wait();
283 } catch (const std::exception& e) {
284 NETWORK_LOG_ERROR("[messaging_udp_server] Exception while waiting for io_context: " +
285 std::string(e.what()));
286 }
287 }
288
289 // Clean up
290 socket_.reset();
291 io_context_.reset();
292
293 NETWORK_LOG_INFO("UDP server stopped");
294
295 return ok();
296 }
297 catch (const std::exception& e)
298 {
299 return error_void(
301 std::string("Failed to stop UDP server: ") + e.what(),
302 "messaging_udp_server::do_stop_impl",
303 ""
304 );
305 }
306 }
307
308 // ========================================================================
309 // Internal Callback Helpers
310 // ========================================================================
311
313 const std::vector<uint8_t>& data,
314 const asio::ip::udp::endpoint& endpoint) -> void
315 {
316 callbacks_.invoke<to_index(callback_index::receive)>(data, endpoint);
317 }
318
319 auto messaging_udp_server::invoke_error_callback(std::error_code ec) -> void
320 {
321 callbacks_.invoke<to_index(callback_index::error)>(ec);
322 }
323
325 {
326 return callbacks_.get<to_index(callback_index::receive)>();
327 }
328
330 {
331 return callbacks_.get<to_index(callback_index::error)>();
332 }
333
334} // namespace kcenon::network::core
auto start_server(uint16_t port) -> VoidResult
Starts the server on the specified port.
auto stop_server() -> VoidResult
Stops the server and releases all resources.
auto do_start_impl(uint16_t port) -> VoidResult
UDP-specific implementation of server start.
auto is_running() const -> bool override
Checks if the server is currently running.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
~messaging_udp_server() noexcept override
Destructor. If the server is still running, stop_server() is invoked.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback with the given error code.
std::function< void(const std::vector< uint8_t > &, const asio::ip::udp::endpoint &)> receive_callback_t
Callback type for received datagrams with sender endpoint.
auto stop() -> VoidResult override
Stops the UDP server.
auto start(uint16_t port) -> VoidResult override
Starts the UDP server on the specified port.
auto send_to(const interfaces::i_udp_server::endpoint_info &endpoint, std::vector< uint8_t > &&data, interfaces::i_udp_server::send_callback_t handler=nullptr) -> VoidResult override
Sends a datagram to the specified endpoint.
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
messaging_udp_server(std::string_view server_id)
Constructs a messaging_udp_server with an identifier.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto wait_for_stop() -> void override
Blocks until stop_server() is called.
auto set_receive_callback(interfaces::i_udp_server::receive_callback_t callback) -> void override
Sets the callback for received datagrams (interface version).
auto invoke_receive_callback(const std::vector< uint8_t > &data, const asio::ip::udp::endpoint &endpoint) -> void
Invokes the receive callback with the given data and endpoint.
auto do_stop_impl() -> VoidResult
UDP-specific implementation of server stop.
auto server_id() const -> const std::string &
Returns the server identifier.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void( const std::vector< uint8_t > &, const endpoint_info &)> receive_callback_t
Callback type for received data (includes sender endpoint)
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
UDP server class.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Global context for shared network system resources.
Endpoint information for UDP datagrams.