12 using udp = asio::ip::udp;
15 : client_id_(client_id)
33 if (lifecycle_.is_running())
37 "UDP client is already running",
38 "messaging_udp_client::start_client");
45 "Host cannot be empty",
46 "messaging_udp_client::start_client");
50 lifecycle_.set_running();
52 auto result = do_start_impl(host, port);
55 lifecycle_.mark_stopped();
63 if (!lifecycle_.prepare_stop())
68 auto result = do_stop_impl();
70 lifecycle_.mark_stopped();
91 lifecycle_.wait_for_stop();
100 return start_client(host, port);
105 return stop_client();
109 std::vector<uint8_t>&& data,
112 if (!lifecycle_.is_running())
116 "UDP client is not running",
117 "messaging_udp_client::send",
122 std::lock_guard<std::mutex> socket_lock(socket_mutex_);
127 "Socket not available",
128 "messaging_udp_client::send",
133 std::lock_guard<std::mutex> endpoint_lock(endpoint_mutex_);
134 socket_->async_send_to(std::move(data), target_endpoint_, std::move(handler));
141 if (!lifecycle_.is_running())
145 "UDP client is not running",
146 "messaging_udp_client::set_target",
154 udp::resolver resolver(*io_context_);
155 auto endpoints = resolver.resolve(udp::v4(), std::string(host), std::to_string(port));
157 if (endpoints.empty())
161 "Failed to resolve host",
162 "messaging_udp_client::set_target",
163 "Host: " + std::string(host)
167 std::lock_guard<std::mutex> lock(endpoint_mutex_);
168 target_endpoint_ = *endpoints.begin();
170 NETWORK_LOG_INFO(
"Target updated to " + std::string(host) +
":" + std::to_string(port));
174 catch (
const std::exception& e)
178 std::string(
"Failed to set target: ") + e.what(),
179 "messaging_udp_client::set_target",
180 "Host: " + std::string(host) +
":" + std::to_string(port)
191 callbacks_.set<
to_index(callback_index::receive)>(
nullptr);
197 callbacks_.set<
to_index(callback_index::receive)>(
198 [callback = std::move(callback)](
199 const std::vector<uint8_t>& data,
200 const asio::ip::udp::endpoint& endpoint)
203 info.address = endpoint.address().to_string();
204 info.port = endpoint.port();
205 callback(data, info);
211 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
216 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
228 io_context_ = std::make_unique<asio::io_context>();
231 udp::resolver resolver(*io_context_);
232 auto endpoints = resolver.resolve(udp::v4(), std::string(host), std::to_string(port));
234 if (endpoints.empty())
238 "Failed to resolve host",
239 "messaging_udp_client::do_start_impl",
240 "Host: " + std::string(host)
245 std::lock_guard<std::mutex> lock(endpoint_mutex_);
246 target_endpoint_ = *endpoints.begin();
250 asio::ip::udp::socket raw_socket(*io_context_, udp::endpoint(udp::v4(), 0));
253 socket_ = std::make_shared<internal::udp_socket>(std::move(raw_socket));
256 auto receive_cb = get_receive_callback();
259 socket_->set_receive_callback(std::move(receive_cb));
262 auto error_cb = get_error_callback();
265 socket_->set_error_callback(std::move(error_cb));
269 socket_->start_receive();
275 NETWORK_LOG_WARN(
"[messaging_udp_client] network_context not initialized, creating temporary thread pool");
276 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
280 io_context_future_ = thread_pool_->
submit(
289 catch (
const std::exception& e)
295 NETWORK_LOG_INFO(
"UDP client started targeting " + std::string(host) +
":" + std::to_string(port));
299 catch (
const std::system_error& e)
303 std::string(
"Failed to create UDP socket: ") + e.what(),
304 "messaging_udp_client::do_start_impl",
305 "Host: " + std::string(host) +
":" + std::to_string(port)
308 catch (
const std::exception& e)
312 std::string(
"Failed to start UDP client: ") + e.what(),
313 "messaging_udp_client::do_start_impl",
314 "Host: " + std::string(host) +
":" + std::to_string(port)
326 socket_->stop_receive();
336 if (io_context_future_.valid())
339 io_context_future_.wait();
340 }
catch (
const std::exception& e) {
341 NETWORK_LOG_ERROR(
"[messaging_udp_client] Exception while waiting for io_context: " +
342 std::string(e.what()));
354 catch (
const std::exception& e)
358 std::string(
"Failed to stop UDP client: ") + e.what(),
359 "messaging_udp_client::do_stop_impl",
370 const std::vector<uint8_t>& data,
371 const asio::ip::udp::endpoint& endpoint) ->
void
373 callbacks_.invoke<
to_index(callback_index::receive)>(data, endpoint);
378 callbacks_.invoke<
to_index(callback_index::error)>(ec);
auto invoke_receive_callback(const std::vector< uint8_t > &data, const asio::ip::udp::endpoint &endpoint) -> void
Invokes the receive callback with the given data and endpoint.
auto client_id() const -> const std::string &
Returns the client identifier.
auto send(std::vector< uint8_t > &&data, interfaces::i_udp_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a datagram to the configured target endpoint.
auto wait_for_stop() -> void override
Blocks until stop_client() is called.
std::function< void(const std::vector< uint8_t > &, const asio::ip::udp::endpoint &)> receive_callback_t
Callback type for received datagrams with sender endpoint.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback with the given error code.
utils::lifecycle_manager lifecycle_
auto set_target(std::string_view host, uint16_t port) -> VoidResult override
Changes the target endpoint for future sends.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto do_stop_impl() -> VoidResult
UDP-specific implementation of client stop.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
messaging_udp_client(std::string_view client_id)
Constructs a messaging_udp_client with an identifier.
~messaging_udp_client() noexcept override
Destructor. Automatically calls stop_client() if the client is still running.
auto stop() -> VoidResult override
Stops the UDP client.
auto do_start_impl(std::string_view host, uint16_t port) -> VoidResult
UDP-specific implementation of client start.
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the UDP client targeting the specified endpoint.
auto is_running() const -> bool override
Checks if the client is currently running.
auto start_client(std::string_view host, uint16_t port) -> VoidResult
Starts the client by resolving target host and port.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
auto set_receive_callback(interfaces::i_udp_client::receive_callback_t callback) -> void override
Sets the callback for received datagrams (interface version).
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
std::function< void( const std::vector< uint8_t > &, const endpoint_info &)> receive_callback_t
Callback type for received data (includes sender endpoint)
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int invalid_argument
constexpr int internal_error
constexpr int already_exists
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Global context for shared network system resources.
Endpoint information for UDP datagrams.