12 using udp = asio::ip::udp;
15 : server_id_(server_id)
33 if (lifecycle_.is_running())
37 "UDP server is already running",
38 "messaging_udp_server::start_server");
42 lifecycle_.set_running();
44 auto result = do_start_impl(port);
47 lifecycle_.mark_stopped();
55 if (!lifecycle_.prepare_stop())
60 auto result = do_stop_impl();
62 lifecycle_.mark_stopped();
83 lifecycle_.wait_for_stop();
92 return start_server(port);
102 std::vector<uint8_t>&& data,
105 if (!lifecycle_.is_running())
109 "UDP server is not running",
110 "messaging_udp_server::send_to",
119 "Socket not available",
120 "messaging_udp_server::send_to",
128 asio::ip::udp::endpoint asio_endpoint(
129 asio::ip::make_address(endpoint.address),
133 socket_->async_send_to(std::move(data), asio_endpoint, std::move(handler));
136 catch (
const std::exception& e)
140 std::string(
"Failed to send datagram: ") + e.what(),
141 "messaging_udp_server::send_to",
142 "Target: " + endpoint.address +
":" + std::to_string(endpoint.port)
153 callbacks_.set<
to_index(callback_index::receive)>(
nullptr);
159 callbacks_.set<
to_index(callback_index::receive)>(
160 [callback = std::move(callback)](
161 const std::vector<uint8_t>& data,
162 const asio::ip::udp::endpoint& endpoint)
165 info.address = endpoint.address().to_string();
166 info.port = endpoint.port();
167 callback(data, info);
173 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
178 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
190 io_context_ = std::make_unique<asio::io_context>();
193 asio::ip::udp::socket raw_socket(*io_context_, udp::endpoint(udp::v4(), port));
196 socket_ = std::make_shared<internal::udp_socket>(std::move(raw_socket));
199 auto receive_cb = get_receive_callback();
202 socket_->set_receive_callback(std::move(receive_cb));
205 auto error_cb = get_error_callback();
208 socket_->set_error_callback(std::move(error_cb));
212 socket_->start_receive();
218 NETWORK_LOG_WARN(
"[messaging_udp_server] network_context not initialized, creating temporary thread pool");
219 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
223 io_context_future_ = thread_pool_->
submit(
232 catch (
const std::exception& e)
242 catch (
const std::system_error& e)
246 std::string(
"Failed to bind UDP socket: ") + e.what(),
247 "messaging_udp_server::do_start_impl",
248 "Port: " + std::to_string(port)
251 catch (
const std::exception& e)
255 std::string(
"Failed to start UDP server: ") + e.what(),
256 "messaging_udp_server::do_start_impl",
257 "Port: " + std::to_string(port)
269 socket_->stop_receive();
279 if (io_context_future_.valid())
282 io_context_future_.wait();
283 }
catch (
const std::exception& e) {
284 NETWORK_LOG_ERROR(
"[messaging_udp_server] Exception while waiting for io_context: " +
285 std::string(e.what()));
297 catch (
const std::exception& e)
301 std::string(
"Failed to stop UDP server: ") + e.what(),
302 "messaging_udp_server::do_stop_impl",
313 const std::vector<uint8_t>& data,
314 const asio::ip::udp::endpoint& endpoint) ->
void
316 callbacks_.invoke<
to_index(callback_index::receive)>(data, endpoint);
321 callbacks_.invoke<
to_index(callback_index::error)>(ec);
auto start_server(uint16_t port) -> VoidResult
Starts the server on the specified port.
auto stop_server() -> VoidResult
Stops the server and releases all resources.
auto do_start_impl(uint16_t port) -> VoidResult
UDP-specific implementation of server start.
auto is_running() const -> bool override
Checks if the server is currently running.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
utils::lifecycle_manager lifecycle_
~messaging_udp_server() noexcept override
Destructor. If the server is still running, stop_server() is invoked.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback with the given error code.
std::function< void(const std::vector< uint8_t > &, const asio::ip::udp::endpoint &)> receive_callback_t
Callback type for received datagrams with sender endpoint.
auto stop() -> VoidResult override
Stops the UDP server.
auto start(uint16_t port) -> VoidResult override
Starts the UDP server on the specified port.
auto send_to(const interfaces::i_udp_server::endpoint_info &endpoint, std::vector< uint8_t > &&data, interfaces::i_udp_server::send_callback_t handler=nullptr) -> VoidResult override
Sends a datagram to the specified endpoint.
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
messaging_udp_server(std::string_view server_id)
Constructs a messaging_udp_server with an identifier.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto wait_for_stop() -> void override
Blocks until stop_server() is called.
auto set_receive_callback(interfaces::i_udp_server::receive_callback_t callback) -> void override
Sets the callback for received datagrams (interface version).
auto invoke_receive_callback(const std::vector< uint8_t > &data, const asio::ip::udp::endpoint &endpoint) -> void
Invokes the receive callback with the given data and endpoint.
auto do_stop_impl() -> VoidResult
UDP-specific implementation of server stop.
auto server_id() const -> const std::string &
Returns the server identifier.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void( const std::vector< uint8_t > &, const endpoint_info &)> receive_callback_t
Callback type for received data (includes sender endpoint)
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int internal_error
constexpr int bind_failed
constexpr int server_already_running
constexpr int server_not_started
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Global context for shared network system resources.
Endpoint information for UDP datagrams.