Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_udp_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
9
11{
12 using udp = asio::ip::udp;
13
15 : client_id_(client_id)
16 {
17 }
18
20 {
22 {
23 (void)stop_client();
24 }
25 }
26
27 // ========================================================================
28 // Lifecycle Management
29 // ========================================================================
30
31 auto messaging_udp_client::start_client(std::string_view host, uint16_t port) -> VoidResult
32 {
33 if (lifecycle_.is_running())
34 {
35 return error_void(
37 "UDP client is already running",
38 "messaging_udp_client::start_client");
39 }
40
41 if (host.empty())
42 {
43 return error_void(
45 "Host cannot be empty",
46 "messaging_udp_client::start_client");
47 }
48
49 // Mark as running before starting
50 lifecycle_.set_running();
51
52 auto result = do_start_impl(host, port);
53 if (result.is_err())
54 {
55 lifecycle_.mark_stopped();
56 }
57
58 return result;
59 }
60
62 {
63 if (!lifecycle_.prepare_stop())
64 {
65 return ok(); // Not running or already stopping
66 }
67
68 auto result = do_stop_impl();
69
70 lifecycle_.mark_stopped();
71
72 return result;
73 }
74
75 auto messaging_udp_client::client_id() const -> const std::string&
76 {
77 return client_id_;
78 }
79
80 // ========================================================================
81 // i_network_component interface implementation
82 // ========================================================================
83
85 {
86 return lifecycle_.is_running();
87 }
88
90 {
91 lifecycle_.wait_for_stop();
92 }
93
94 // ========================================================================
95 // i_udp_client interface implementation
96 // ========================================================================
97
98 auto messaging_udp_client::start(std::string_view host, uint16_t port) -> VoidResult
99 {
100 return start_client(host, port);
101 }
102
104 {
105 return stop_client();
106 }
107
109 std::vector<uint8_t>&& data,
111 {
112 if (!lifecycle_.is_running())
113 {
114 return error_void(
116 "UDP client is not running",
117 "messaging_udp_client::send",
118 ""
119 );
120 }
121
122 std::lock_guard<std::mutex> socket_lock(socket_mutex_);
123 if (!socket_)
124 {
125 return error_void(
127 "Socket not available",
128 "messaging_udp_client::send",
129 ""
130 );
131 }
132
133 std::lock_guard<std::mutex> endpoint_lock(endpoint_mutex_);
134 socket_->async_send_to(std::move(data), target_endpoint_, std::move(handler));
135
136 return ok();
137 }
138
139 auto messaging_udp_client::set_target(std::string_view host, uint16_t port) -> VoidResult
140 {
141 if (!lifecycle_.is_running())
142 {
143 return error_void(
145 "UDP client is not running",
146 "messaging_udp_client::set_target",
147 ""
148 );
149 }
150
151 try
152 {
153 // Resolve new target endpoint
154 udp::resolver resolver(*io_context_);
155 auto endpoints = resolver.resolve(udp::v4(), std::string(host), std::to_string(port));
156
157 if (endpoints.empty())
158 {
159 return error_void(
161 "Failed to resolve host",
162 "messaging_udp_client::set_target",
163 "Host: " + std::string(host)
164 );
165 }
166
167 std::lock_guard<std::mutex> lock(endpoint_mutex_);
168 target_endpoint_ = *endpoints.begin();
169
170 NETWORK_LOG_INFO("Target updated to " + std::string(host) + ":" + std::to_string(port));
171
172 return ok();
173 }
174 catch (const std::exception& e)
175 {
176 return error_void(
178 std::string("Failed to set target: ") + e.what(),
179 "messaging_udp_client::set_target",
180 "Host: " + std::string(host) + ":" + std::to_string(port)
181 );
182 }
183 }
184
187 {
188 if (!callback)
189 {
190 // Clear the callback
191 callbacks_.set<to_index(callback_index::receive)>(nullptr);
192 return;
193 }
194
195 // Adapt the interface callback to the internal callback type
196 // Convert asio::ip::udp::endpoint to endpoint_info
197 callbacks_.set<to_index(callback_index::receive)>(
198 [callback = std::move(callback)](
199 const std::vector<uint8_t>& data,
200 const asio::ip::udp::endpoint& endpoint)
201 {
203 info.address = endpoint.address().to_string();
204 info.port = endpoint.port();
205 callback(data, info);
206 });
207 }
208
210 {
211 callbacks_.set<to_index(callback_index::receive)>(std::move(callback));
212 }
213
215 {
216 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
217 }
218
219 // ========================================================================
220 // Internal Implementation Methods
221 // ========================================================================
222
223 auto messaging_udp_client::do_start_impl(std::string_view host, uint16_t port) -> VoidResult
224 {
225 try
226 {
227 // Create io_context
228 io_context_ = std::make_unique<asio::io_context>();
229
230 // Resolve target endpoint
231 udp::resolver resolver(*io_context_);
232 auto endpoints = resolver.resolve(udp::v4(), std::string(host), std::to_string(port));
233
234 if (endpoints.empty())
235 {
236 return error_void(
238 "Failed to resolve host",
239 "messaging_udp_client::do_start_impl",
240 "Host: " + std::string(host)
241 );
242 }
243
244 {
245 std::lock_guard<std::mutex> lock(endpoint_mutex_);
246 target_endpoint_ = *endpoints.begin();
247 }
248
249 // Create UDP socket (bind to any port)
250 asio::ip::udp::socket raw_socket(*io_context_, udp::endpoint(udp::v4(), 0));
251
252 // Wrap in our udp_socket
253 socket_ = std::make_shared<internal::udp_socket>(std::move(raw_socket));
254
255 // Set callbacks to socket
256 auto receive_cb = get_receive_callback();
257 if (receive_cb)
258 {
259 socket_->set_receive_callback(std::move(receive_cb));
260 }
261
262 auto error_cb = get_error_callback();
263 if (error_cb)
264 {
265 socket_->set_error_callback(std::move(error_cb));
266 }
267
268 // Start receiving
269 socket_->start_receive();
270
271 // Get thread pool from network context
273 if (!thread_pool_) {
274 // Fallback: create a temporary thread pool if network_context is not initialized
275 NETWORK_LOG_WARN("[messaging_udp_client] network_context not initialized, creating temporary thread pool");
276 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
277 }
278
279 // Start io_context on thread pool
280 io_context_future_ = thread_pool_->submit(
281 [this]()
282 {
283 try
284 {
285 NETWORK_LOG_DEBUG("[messaging_udp_client] io_context started");
286 io_context_->run();
287 NETWORK_LOG_DEBUG("[messaging_udp_client] io_context stopped");
288 }
289 catch (const std::exception& e)
290 {
291 NETWORK_LOG_ERROR(std::string("Worker thread exception: ") + e.what());
292 }
293 });
294
295 NETWORK_LOG_INFO("UDP client started targeting " + std::string(host) + ":" + std::to_string(port));
296
297 return ok();
298 }
299 catch (const std::system_error& e)
300 {
301 return error_void(
303 std::string("Failed to create UDP socket: ") + e.what(),
304 "messaging_udp_client::do_start_impl",
305 "Host: " + std::string(host) + ":" + std::to_string(port)
306 );
307 }
308 catch (const std::exception& e)
309 {
310 return error_void(
312 std::string("Failed to start UDP client: ") + e.what(),
313 "messaging_udp_client::do_start_impl",
314 "Host: " + std::string(host) + ":" + std::to_string(port)
315 );
316 }
317 }
318
320 {
321 try
322 {
323 // Stop receiving
324 if (socket_)
325 {
326 socket_->stop_receive();
327 }
328
329 // Stop io_context
330 if (io_context_)
331 {
332 io_context_->stop();
333 }
334
335 // Wait for io_context task to complete
336 if (io_context_future_.valid())
337 {
338 try {
339 io_context_future_.wait();
340 } catch (const std::exception& e) {
341 NETWORK_LOG_ERROR("[messaging_udp_client] Exception while waiting for io_context: " +
342 std::string(e.what()));
343 }
344 }
345
346 // Clean up
347 socket_.reset();
348 io_context_.reset();
349
350 NETWORK_LOG_INFO("UDP client stopped");
351
352 return ok();
353 }
354 catch (const std::exception& e)
355 {
356 return error_void(
358 std::string("Failed to stop UDP client: ") + e.what(),
359 "messaging_udp_client::do_stop_impl",
360 ""
361 );
362 }
363 }
364
365 // ========================================================================
366 // Internal Callback Helpers
367 // ========================================================================
368
370 const std::vector<uint8_t>& data,
371 const asio::ip::udp::endpoint& endpoint) -> void
372 {
373 callbacks_.invoke<to_index(callback_index::receive)>(data, endpoint);
374 }
375
376 auto messaging_udp_client::invoke_error_callback(std::error_code ec) -> void
377 {
378 callbacks_.invoke<to_index(callback_index::error)>(ec);
379 }
380
382 {
383 return callbacks_.get<to_index(callback_index::receive)>();
384 }
385
387 {
388 return callbacks_.get<to_index(callback_index::error)>();
389 }
390
391} // namespace kcenon::network::core
auto invoke_receive_callback(const std::vector< uint8_t > &data, const asio::ip::udp::endpoint &endpoint) -> void
Invokes the receive callback with the given data and endpoint.
auto client_id() const -> const std::string &
Returns the client identifier.
auto send(std::vector< uint8_t > &&data, interfaces::i_udp_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a datagram to the configured target endpoint.
auto wait_for_stop() -> void override
Blocks until stop_client() is called.
std::function< void(const std::vector< uint8_t > &, const asio::ip::udp::endpoint &)> receive_callback_t
Callback type for received datagrams with sender endpoint.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback with the given error code.
auto set_target(std::string_view host, uint16_t port) -> VoidResult override
Changes the target endpoint for future sends.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto do_stop_impl() -> VoidResult
UDP-specific implementation of client stop.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
messaging_udp_client(std::string_view client_id)
Constructs a messaging_udp_client with an identifier.
~messaging_udp_client() noexcept override
Destructor. Automatically calls stop_client() if the client is still running.
auto stop() -> VoidResult override
Stops the UDP client.
auto do_start_impl(std::string_view host, uint16_t port) -> VoidResult
UDP-specific implementation of client start.
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the UDP client targeting the specified endpoint.
auto is_running() const -> bool override
Checks if the client is currently running.
auto start_client(std::string_view host, uint16_t port) -> VoidResult
Starts the client by resolving target host and port.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
auto set_receive_callback(interfaces::i_udp_client::receive_callback_t callback) -> void override
Sets the callback for received datagrams (interface version).
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
std::function< void( const std::vector< uint8_t > &, const endpoint_info &)> receive_callback_t
Callback type for received data (includes sender endpoint)
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
UDP client class.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Global context for shared network system resources.
Endpoint information for UDP datagrams.