15 using tcp = asio::ip::tcp;
25 std::shared_ptr<internal::websocket_socket> ws_socket,
26 const std::string& remote_addr,
27 const std::string& ws_path =
"/")
36 std::function<
void(std::error_code, std::size_t)> handler)
50 std::function<
void(std::error_code, std::size_t)> handler)
59 return ws_socket_->async_send_binary(std::move(data), std::move(handler));
71 ws_socket_->async_close(code, reason, [](std::error_code) {});
79 [[nodiscard]]
auto path() const -> std::string_view {
return path_; }
87 auto get_socket() -> std::shared_ptr<internal::websocket_socket>
119 return pimpl_->connection_id();
124 return pimpl_->is_connected();
129 return pimpl_->send_binary(std::move(data),
nullptr);
139 return pimpl_->send_text(std::move(
message),
nullptr);
144 return pimpl_->send_binary(std::move(data),
nullptr);
159 return pimpl_->remote_endpoint();
167 : server_id_(server_id)
191 if (lifecycle_.is_running())
195 "WebSocket server is already running",
196 "messaging_ws_server");
199 lifecycle_.set_running();
201 auto result = do_start_impl(port, path);
204 lifecycle_.mark_stopped();
212 if (!lifecycle_.prepare_stop())
217 auto result = do_stop_impl();
219 lifecycle_.mark_stopped();
240 lifecycle_.wait_for_stop();
249 return start_server(port);
254 return stop_server();
275 callbacks_.set<
to_index(callback_index::connection)>(
276 [callback = std::move(callback)](std::shared_ptr<ws_connection> conn) {
290 callbacks_.set<
to_index(callback_index::disconnection)>(
291 [callback = std::move(callback)](
const std::string& session_id,
293 const std::string& reason) {
294 callback(session_id,
static_cast<uint16_t
>(code), reason);
306 callbacks_.set<
to_index(callback_index::text_message)>(
307 [callback = std::move(callback)](std::shared_ptr<ws_connection> conn,
321 callbacks_.set<
to_index(callback_index::binary_message)>(
322 [callback = std::move(callback)](std::shared_ptr<ws_connection> conn,
323 const std::vector<uint8_t>& data) {
324 callback(conn->id(), data);
336 callbacks_.set<
to_index(callback_index::error)>(
337 [callback = std::move(callback)](
const std::string& session_id, std::error_code ec) {
338 callback(session_id, ec);
354 if (config_.port == 0 || config_.port != port)
357 config_.path = std::string(path);
363 session_mgr_ = std::make_shared<ws_session_manager>(sess_config);
366 io_context_ = std::make_unique<asio::io_context>();
367 work_guard_ = std::make_unique<
368 asio::executor_work_guard<asio::io_context::executor_type>>(
369 asio::make_work_guard(*io_context_));
372 acceptor_ = std::make_unique<tcp::acceptor>(
373 *io_context_, tcp::endpoint(tcp::v4(), config_.port));
379 NETWORK_LOG_WARN(
"[messaging_ws_server] network_context not initialized, creating temporary thread pool");
380 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
384 io_context_future_ = thread_pool_->
submit([
this]() {
389 }
catch (
const std::exception& e) {
391 std::string(e.what()));
399 std::to_string(config_.port) +
" (ID: " +
404 catch (
const std::exception& e)
407 std::string(
"Failed to start server: ") + e.what());
418 auto conns = session_mgr_->get_all_connections();
419 for (
auto& conn : conns)
423 session_mgr_->clear_all_connections();
429 std::lock_guard<std::mutex> lock(acceptor_mutex_);
444 if (io_context_future_.valid())
447 io_context_future_.wait();
448 }
catch (
const std::exception& e) {
449 NETWORK_LOG_ERROR(
"[messaging_ws_server] Exception while waiting for io_context: " +
450 std::string(e.what()));
459 catch (
const std::exception& e)
462 std::string(
"Failed to stop server: ") + e.what());
473 auto conns = session_mgr_->get_all_connections();
474 for (
auto& conn : conns)
476 (void)conn->send_text(std::string(
message));
487 auto conns = session_mgr_->get_all_connections();
488 for (
auto& conn : conns)
490 (void)conn->send_binary(std::vector<uint8_t>(data));
495 -> std::shared_ptr<ws_connection>
501 return session_mgr_->get_connection(connection_id);
510 return session_mgr_->get_all_connection_ids();
516 std::lock_guard<std::mutex> lock(acceptor_mutex_);
519 if (!lifecycle_.is_running() || !acceptor_ || !acceptor_->is_open())
524 auto socket = std::make_shared<tcp::socket>(*io_context_);
526 acceptor_->async_accept(*socket,
527 [
this, socket](std::error_code ec)
531 handle_new_connection(socket);
536 "[messaging_ws_server] Accept error: " +
541 if (lifecycle_.is_running())
551 if (!session_mgr_ || !session_mgr_->can_accept_connection())
558 auto remote_ep = socket->remote_endpoint();
559 std::string conn_id = remote_ep.address().to_string() +
":" +
560 std::to_string(remote_ep.port());
561 std::string remote_addr = conn_id;
564 auto tcp_sock = std::make_shared<internal::tcp_socket>(std::move(*socket));
568 std::make_shared<internal::websocket_socket>(tcp_sock,
false);
571 auto conn_impl = std::make_shared<ws_connection_impl>(conn_id, ws_socket, remote_addr, config_.path);
572 auto conn = std::make_shared<ws_connection>(conn_impl);
575 ws_socket->set_message_callback(
577 { on_message(conn, msg); });
579 ws_socket->set_ping_callback(
580 [
this, ws_socket](
const std::vector<uint8_t>& payload)
582 if (config_.auto_pong)
584 ws_socket->async_send_ping(std::vector<uint8_t>(payload),
585 [](std::error_code) {});
589 ws_socket->set_close_callback(
591 const std::string& reason)
592 { on_close(conn_id, code, reason); });
594 ws_socket->set_error_callback(
595 [
this, conn_id](std::error_code ec) { on_error(conn_id, ec); });
598 ws_socket->async_accept(
599 [
this, conn_id, conn](std::error_code ec)
609 auto added_id = session_mgr_->add_connection(conn, conn_id);
610 if (added_id.empty())
613 "[messaging_ws_server] Failed to add connection");
621 auto ws_sock = conn->get_impl()->get_socket();
624 ws_sock->start_read();
628 invoke_connection_callback(conn);
635 invoke_message_callback(conn, msg);
640 const std::string& reason) ->
void
645 auto conn = session_mgr_->get_connection(conn_id);
648 conn->get_impl()->invalidate();
650 session_mgr_->remove_connection(conn_id);
655 invoke_disconnection_callback(conn_id, code, reason);
659 std::error_code ec) ->
void
662 "): " + ec.message());
663 invoke_error_callback(conn_id, ec);
672 callbacks_.invoke<
to_index(callback_index::connection)>(conn);
677 const std::string& reason) ->
void
679 callbacks_.invoke<
to_index(callback_index::disconnection)>(conn_id, code, reason);
685 callbacks_.invoke<
to_index(callback_index::message)>(conn, msg);
689 callbacks_.invoke<
to_index(callback_index::text_message)>(conn, msg.as_text());
693 callbacks_.invoke<
to_index(callback_index::binary_message)>(conn, msg.as_binary());
698 std::error_code ec) ->
void
700 callbacks_.invoke<
to_index(callback_index::error)>(conn_id, ec);
utils::lifecycle_manager lifecycle_
auto do_accept() -> void
Starts accepting new connections.
auto set_binary_callback(interfaces::i_websocket_server::binary_callback_t callback) -> void override
Sets the callback for binary messages (interface version).
std::function< void(std::shared_ptr< ws_connection >)> connection_callback_t
Callback type for new connections.
~messaging_ws_server() noexcept override
Destructor. Automatically stops the server if still running.
auto broadcast_binary(const std::vector< uint8_t > &data) -> void
Broadcasts a binary message to all connected clients.
std::shared_ptr< ws_session_manager > session_mgr_
std::function< void(const std::string &, std::error_code)> error_callback_t
Callback type for errors.
auto set_disconnection_callback(interfaces::i_websocket_server::disconnection_callback_t callback) -> void override
Sets the callback for disconnections (interface version).
auto get_all_connections() -> std::vector< std::string >
Gets all connection IDs.
auto set_connection_callback(interfaces::i_websocket_server::connection_callback_t callback) -> void override
Sets the callback for new connections (interface version).
auto handle_new_connection(std::shared_ptr< asio::ip::tcp::socket > socket) -> void
Handles a new connection.
auto do_stop_impl() -> VoidResult
WebSocket-specific implementation of server stop.
auto wait_for_stop() -> void override
Blocks until stop() is called.
auto invoke_message_callback(std::shared_ptr< ws_connection > conn, const internal::ws_message &msg) -> void
Invokes the message callback.
auto invoke_error_callback(const std::string &conn_id, std::error_code ec) -> void
Invokes the error callback.
auto is_running() const -> bool override
Checks if the server is currently running.
auto set_error_callback(interfaces::i_websocket_server::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
auto invoke_connection_callback(std::shared_ptr< ws_connection > conn) -> void
Invokes the connection callback.
auto stop_server() -> VoidResult
Stops the server and releases all resources.
auto do_start_impl(uint16_t port, std::string_view path) -> VoidResult
WebSocket-specific implementation of server start.
auto start(uint16_t port) -> VoidResult override
Starts the WebSocket server on the specified port.
auto start_server(const ws_server_config &config) -> VoidResult
Starts the server with full configuration.
auto connection_count() const -> size_t override
Gets the number of active WebSocket connections.
auto on_error(const std::string &conn_id, std::error_code ec) -> void
Handles errors.
auto server_id() const -> const std::string &
Returns the server identifier.
auto on_message(std::shared_ptr< ws_connection > conn, const internal::ws_message &msg) -> void
Handles received WebSocket messages.
std::function< void(std::shared_ptr< ws_connection >, const std::string &)> text_message_callback_t
Callback type for text messages.
auto invoke_disconnection_callback(const std::string &conn_id, internal::ws_close_code code, const std::string &reason) -> void
Invokes the disconnection callback.
messaging_ws_server(std::string_view server_id)
Constructs a WebSocket server.
auto set_text_callback(interfaces::i_websocket_server::text_callback_t callback) -> void override
Sets the callback for text messages (interface version).
std::function< void(std::shared_ptr< ws_connection >, const std::vector< uint8_t > &)> binary_message_callback_t
Callback type for binary messages.
auto get_connection(const std::string &connection_id) -> std::shared_ptr< ws_connection >
Gets a connection by ID.
auto broadcast_text(const std::string &message) -> void
Broadcasts a text message to all connected clients.
std::function< void(const std::string &, internal::ws_close_code, const std::string &)> disconnection_callback_t
Callback type for disconnections.
auto on_close(const std::string &conn_id, internal::ws_close_code code, const std::string &reason) -> void
Handles connection close.
auto stop() -> VoidResult override
Stops the WebSocket server.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
std::string connection_id_
auto invalidate() -> void
auto remote_endpoint() const -> std::string
auto send_binary(std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> VoidResult
ws_connection_impl(const std::string &conn_id, std::shared_ptr< internal::websocket_socket > ws_socket, const std::string &remote_addr, const std::string &ws_path="/")
auto get_socket() -> std::shared_ptr< internal::websocket_socket >
auto send_text(std::string &&message, std::function< void(std::error_code, std::size_t)> handler) -> VoidResult
auto close(internal::ws_close_code code, const std::string &reason) -> VoidResult
std::shared_ptr< internal::websocket_socket > ws_socket_
auto connection_id() const -> const std::string &
std::string remote_address_
auto path() const -> std::string_view
auto is_connected() const -> bool
auto is_connected() const -> bool override
Checks if the session is currently connected.
std::shared_ptr< ws_connection_impl > pimpl_
ws_connection(std::shared_ptr< class ws_connection_impl > impl)
auto send_text(std::string &&message) -> VoidResult override
Sends a text message to the client.
auto send_binary(std::vector< uint8_t > &&data) -> VoidResult override
Sends a binary message to the client.
auto path() const -> std::string_view override
Gets the requested path from the handshake.
auto close() -> void override
Closes the session.
auto id() const -> std::string_view override
Gets the unique identifier for this session.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data to the client.
auto remote_endpoint() const -> std::string
Gets the remote endpoint address.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(std::string_view, const std::vector< uint8_t > &)> binary_callback_t
Callback type for binary messages (session_id, data)
std::function< void(std::string_view, std::error_code)> error_callback_t
Callback type for errors (session_id, error)
std::function< void(std::string_view, const std::string &)> text_callback_t
Callback type for text messages (session_id, message)
std::function< void( std::string_view session_id, uint16_t code, std::string_view reason)> disconnection_callback_t
Callback type for disconnections (with close code and reason)
std::function< void(std::shared_ptr< i_websocket_session >)> connection_callback_t
Callback type for new connections.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int internal_error
constexpr int already_exists
constexpr int bind_failed
constexpr int connection_closed
@ text
Text message (UTF-8 encoded)
ws_close_code
WebSocket close status codes (RFC 6455 Section 7.4).
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Global context for shared network system resources.
Configuration for session management.
Configuration for WebSocket server.
Represents a complete WebSocket message.