14 using tcp = asio::ip::tcp;
17 : client_id_(client_id)
42 if (lifecycle_.is_running())
46 "WebSocket client is already running",
47 "messaging_ws_client");
54 "Host cannot be empty",
55 "messaging_ws_client");
58 lifecycle_.set_running();
59 is_connected_.store(
false);
61 auto result = do_start_impl(host, port, path);
64 lifecycle_.mark_stopped();
72 if (!lifecycle_.prepare_stop())
77 is_connected_.store(
false);
79 auto result = do_stop_impl();
81 lifecycle_.mark_stopped();
102 lifecycle_.wait_for_stop();
112 return start_client(host, port, path);
117 return stop_client();
129 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
133 "WebSocket not connected");
136 return ws_socket_->async_send_text(std::move(
message), std::move(handler));
140 std::vector<uint8_t>&& data,
143 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
147 "WebSocket not connected");
150 return ws_socket_->async_send_binary(std::move(data), std::move(handler));
155 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
159 "WebSocket not connected");
162 ws_socket_->async_send_ping(std::move(payload), [](std::error_code) {});
168 return send_ping(std::move(payload));
173 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
177 "WebSocket not connected");
181 std::string(reason), [](std::error_code) {});
192 callbacks_.set<
to_index(callback_index::text_message)>(std::move(callback));
198 callbacks_.set<
to_index(callback_index::binary_message)>(std::move(callback));
204 callbacks_.set<
to_index(callback_index::connected)>(std::move(callback));
212 callbacks_.set<
to_index(callback_index::disconnected)>(
214 callback(
static_cast<uint16_t
>(code), reason);
224 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
237 if (config_.host.empty())
239 config_.host = std::string(host);
241 config_.path = std::string(path);
245 io_context_ = std::make_unique<asio::io_context>();
246 work_guard_ = std::make_unique<
247 asio::executor_work_guard<asio::io_context::executor_type>>(
248 asio::make_work_guard(*io_context_));
254 NETWORK_LOG_WARN(
"[messaging_ws_client] network_context not initialized, creating temporary thread pool");
255 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
259 io_context_future_ = thread_pool_->
submit([
this]() {
264 }
catch (
const std::exception& e) {
266 std::string(e.what()));
278 catch (
const std::exception& e)
281 std::string(
"Failed to start client: ") + e.what());
291 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
292 if (ws_socket_ && ws_socket_->is_open())
294 ws_socket_->async_close(
296 [](std::error_code) {});
308 if (io_context_future_.valid())
311 io_context_future_.wait();
312 }
catch (
const std::exception& e) {
313 NETWORK_LOG_ERROR(
"[messaging_ws_client] Exception while waiting for io_context: " +
314 std::string(e.what()));
323 catch (
const std::exception& e)
326 std::string(
"Failed to stop client: ") + e.what());
332 tcp::resolver resolver(*io_context_);
333 auto endpoints = resolver.resolve(config_.host, std::to_string(config_.port));
335 auto socket = std::make_shared<tcp::socket>(*io_context_);
339 [
this, socket](std::error_code ec, tcp::endpoint)
345 invoke_error_callback(ec);
350 auto tcp_sock = std::make_shared<internal::tcp_socket>(std::move(*socket));
354 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
355 ws_socket_ = std::make_shared<internal::websocket_socket>(tcp_sock,
true);
358 ws_socket_->set_message_callback(
361 ws_socket_->set_ping_callback(
362 [
this](
const std::vector<uint8_t>& payload) { on_ping(payload); });
364 ws_socket_->set_close_callback(
366 { on_close(code, reason); });
368 ws_socket_->set_error_callback(
369 [
this](std::error_code ec) { on_error(ec); });
372 ws_socket_->async_handshake(
373 config_.host, config_.path, config_.port,
374 [
this](std::error_code ec)
379 "[messaging_ws_client] Handshake failed: " + ec.message());
380 invoke_error_callback(ec);
384 is_connected_.store(
true, std::memory_order_release);
386 "[messaging_ws_client] Connected (ID: " +
390 ws_socket_->start_read();
393 invoke_connected_callback();
401 invoke_message_callback(msg);
404 auto messaging_ws_client::on_ping(
const std::vector<uint8_t>& payload) ->
void
406 if (config_.auto_pong)
408 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
411 ws_socket_->async_send_ping(std::vector<uint8_t>(payload),
412 [](std::error_code) {});
418 const std::string& reason) ->
void
420 is_connected_.store(
false, std::memory_order_release);
424 invoke_disconnected_callback(code, reason);
427 auto messaging_ws_client::on_error(std::error_code ec) ->
void
430 invoke_error_callback(ec);
439 callbacks_.invoke<
to_index(callback_index::message)>(msg);
443 callbacks_.invoke<
to_index(callback_index::text_message)>(msg.as_text());
447 callbacks_.invoke<
to_index(callback_index::binary_message)>(msg.as_binary());
451 auto messaging_ws_client::invoke_connected_callback() ->
void
453 callbacks_.invoke<
to_index(callback_index::connected)>();
457 const std::string& reason) ->
void
459 callbacks_.invoke<
to_index(callback_index::disconnected)>(code, reason);
462 auto messaging_ws_client::invoke_error_callback(std::error_code ec) ->
void
464 callbacks_.invoke<
to_index(callback_index::error)>(ec);
utils::lifecycle_manager lifecycle_
auto start(std::string_view host, uint16_t port, std::string_view path="/") -> VoidResult override
Starts the WebSocket client connecting to the specified endpoint.
auto set_text_callback(interfaces::i_websocket_client::text_callback_t callback) -> void override
Sets the callback for text messages (interface version).
auto do_start_impl(std::string_view host, uint16_t port, std::string_view path) -> VoidResult
WebSocket-specific implementation of client start.
auto is_running() const -> bool override
Checks if the client is currently running.
auto ping(std::vector< uint8_t > &&payload={}) -> VoidResult override
Sends a ping frame (interface version).
messaging_ws_client(std::string_view client_id)
Constructs a WebSocket client.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
auto do_connect() -> void
Initiates async connection to the server.
auto set_connected_callback(interfaces::i_websocket_client::connected_callback_t callback) -> void override
Sets the callback for connection established (interface version).
auto stop() -> VoidResult override
Stops the WebSocket client.
auto wait_for_stop() -> void override
Blocks until stop() is called.
~messaging_ws_client() noexcept override
Destructor. Automatically stops the client if still running.
std::function< void(internal::ws_close_code, const std::string &)> disconnected_callback_t
Callback type for disconnection with close code.
auto set_binary_callback(interfaces::i_websocket_client::binary_callback_t callback) -> void override
Sets the callback for binary messages (interface version).
auto send_text(std::string &&message, interfaces::i_websocket_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a text message (interface version).
std::atomic< bool > is_connected_
auto is_connected() const -> bool override
Checks if the WebSocket connection is established.
auto set_disconnected_callback(interfaces::i_websocket_client::disconnected_callback_t callback) -> void override
Sets the callback for disconnection (interface version).
auto start_client(const ws_client_config &config) -> VoidResult
Starts the client with full configuration.
auto set_error_callback(interfaces::i_websocket_client::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
auto close(uint16_t code=1000, std::string_view reason="") -> VoidResult override
Closes the WebSocket connection gracefully (interface version).
auto do_stop_impl() -> VoidResult
WebSocket-specific implementation of client stop.
auto send_binary(std::vector< uint8_t > &&data, interfaces::i_websocket_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a binary message (interface version).
auto client_id() const -> const std::string &
Returns the client identifier.
auto send_ping(std::vector< uint8_t > &&payload={}) -> VoidResult
Sends a ping frame.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
std::function< void()> connected_callback_t
Callback type for connection established.
std::function< void(const std::vector< uint8_t > &)> binary_callback_t
Callback type for binary messages.
std::function< void(const std::string &)> text_callback_t
Callback type for text messages.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
std::function< void(uint16_t code, std::string_view reason)> disconnected_callback_t
Callback type for disconnection (with close code and reason)
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int invalid_argument
constexpr int internal_error
constexpr int already_exists
constexpr int connection_closed
constexpr int connection_failed
@ text
Text message (UTF-8 encoded)
ws_close_code
WebSocket close status codes (RFC 6455 Section 7.4).
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Global context for shared network system resources.
Configuration for WebSocket client.
Represents a complete WebSocket message.