Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
websocket_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
11
13{
14 using tcp = asio::ip::tcp;
15
16 messaging_ws_client::messaging_ws_client(std::string_view client_id)
17 : client_id_(client_id)
18 {
19 }
20
22 {
24 {
25 (void)stop_client();
26 }
27 }
28
29 // ========================================================================
30 // Lifecycle Management
31 // ========================================================================
32
34 {
35 config_ = config;
36 return start_client(config.host, config.port, config.path);
37 }
38
39 auto messaging_ws_client::start_client(std::string_view host, uint16_t port,
40 std::string_view path) -> VoidResult
41 {
42 if (lifecycle_.is_running())
43 {
44 return error_void(
46 "WebSocket client is already running",
47 "messaging_ws_client");
48 }
49
50 if (host.empty())
51 {
52 return error_void(
54 "Host cannot be empty",
55 "messaging_ws_client");
56 }
57
58 lifecycle_.set_running();
59 is_connected_.store(false);
60
61 auto result = do_start_impl(host, port, path);
62 if (result.is_err())
63 {
64 lifecycle_.mark_stopped();
65 }
66
67 return result;
68 }
69
71 {
72 if (!lifecycle_.prepare_stop())
73 {
74 return ok();
75 }
76
77 is_connected_.store(false);
78
79 auto result = do_stop_impl();
80
81 lifecycle_.mark_stopped();
82
83 return result;
84 }
85
86 auto messaging_ws_client::client_id() const -> const std::string&
87 {
88 return client_id_;
89 }
90
91 // ========================================================================
92 // i_network_component interface implementation
93 // ========================================================================
94
96 {
97 return lifecycle_.is_running();
98 }
99
101 {
102 lifecycle_.wait_for_stop();
103 }
104
105 // ========================================================================
106 // i_websocket_client interface implementation
107 // ========================================================================
108
109 auto messaging_ws_client::start(std::string_view host, uint16_t port,
110 std::string_view path) -> VoidResult
111 {
112 return start_client(host, port, path);
113 }
114
116 {
117 return stop_client();
118 }
119
121 {
122 return is_connected_.load(std::memory_order_relaxed);
123 }
124
126 std::string&& message,
128 {
129 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
130 if (!ws_socket_)
131 {
133 "WebSocket not connected");
134 }
135
136 return ws_socket_->async_send_text(std::move(message), std::move(handler));
137 }
138
140 std::vector<uint8_t>&& data,
142 {
143 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
144 if (!ws_socket_)
145 {
147 "WebSocket not connected");
148 }
149
150 return ws_socket_->async_send_binary(std::move(data), std::move(handler));
151 }
152
153 auto messaging_ws_client::send_ping(std::vector<uint8_t>&& payload) -> VoidResult
154 {
155 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
156 if (!ws_socket_)
157 {
159 "WebSocket not connected");
160 }
161
162 ws_socket_->async_send_ping(std::move(payload), [](std::error_code) {});
163 return ok();
164 }
165
166 auto messaging_ws_client::ping(std::vector<uint8_t>&& payload) -> VoidResult
167 {
168 return send_ping(std::move(payload));
169 }
170
171 auto messaging_ws_client::close(uint16_t code, std::string_view reason) -> VoidResult
172 {
173 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
174 if (!ws_socket_)
175 {
177 "WebSocket not connected");
178 }
179
180 ws_socket_->async_close(static_cast<internal::ws_close_code>(code),
181 std::string(reason), [](std::error_code) {});
182 return ok();
183 }
184
185 // ========================================================================
186 // Interface callback setters
187 // ========================================================================
188
191 {
192 callbacks_.set<to_index(callback_index::text_message)>(std::move(callback));
193 }
194
197 {
198 callbacks_.set<to_index(callback_index::binary_message)>(std::move(callback));
199 }
200
203 {
204 callbacks_.set<to_index(callback_index::connected)>(std::move(callback));
205 }
206
209 {
210 // Adapt interface callback to internal callback type
211 if (callback) {
212 callbacks_.set<to_index(callback_index::disconnected)>(
213 [callback = std::move(callback)](internal::ws_close_code code, const std::string& reason) {
214 callback(static_cast<uint16_t>(code), reason);
215 });
216 } else {
217 callbacks_.set<to_index(callback_index::disconnected)>(disconnected_callback_t{});
218 }
219 }
220
223 {
224 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
225 }
226
227 // ========================================================================
228 // Internal Implementation
229 // ========================================================================
230
231 auto messaging_ws_client::do_start_impl(std::string_view host, uint16_t port,
232 std::string_view path) -> VoidResult
233 {
234 try
235 {
236 // Store config if not already set
237 if (config_.host.empty())
238 {
239 config_.host = std::string(host);
240 config_.port = port;
241 config_.path = std::string(path);
242 }
243
244 // Create io_context and work guard
245 io_context_ = std::make_unique<asio::io_context>();
246 work_guard_ = std::make_unique<
247 asio::executor_work_guard<asio::io_context::executor_type>>(
248 asio::make_work_guard(*io_context_));
249
250 // Get thread pool from network context
252 if (!thread_pool_) {
253 // Fallback: create a temporary thread pool if network_context is not initialized
254 NETWORK_LOG_WARN("[messaging_ws_client] network_context not initialized, creating temporary thread pool");
255 thread_pool_ = std::make_shared<integration::basic_thread_pool>(std::thread::hardware_concurrency());
256 }
257
258 // Start io_context on thread pool
259 io_context_future_ = thread_pool_->submit([this]() {
260 try {
261 NETWORK_LOG_DEBUG("[messaging_ws_client] io_context started");
262 io_context_->run();
263 NETWORK_LOG_DEBUG("[messaging_ws_client] io_context stopped");
264 } catch (const std::exception& e) {
265 NETWORK_LOG_ERROR("[messaging_ws_client] Exception in io_context: " +
266 std::string(e.what()));
267 }
268 });
269
270 // Start async connection
271 do_connect();
272
273 NETWORK_LOG_INFO("[messaging_ws_client] Client started (ID: " +
274 client_id_ + ")");
275
276 return ok();
277 }
278 catch (const std::exception& e)
279 {
281 std::string("Failed to start client: ") + e.what());
282 }
283 }
284
286 {
287 try
288 {
289 // Close WebSocket connection
290 {
291 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
292 if (ws_socket_ && ws_socket_->is_open())
293 {
294 ws_socket_->async_close(
296 [](std::error_code) {});
297 }
298 }
299
300 // Stop io_context
301 work_guard_.reset();
302 if (io_context_)
303 {
304 io_context_->stop();
305 }
306
307 // Wait for io_context task to complete
308 if (io_context_future_.valid())
309 {
310 try {
311 io_context_future_.wait();
312 } catch (const std::exception& e) {
313 NETWORK_LOG_ERROR("[messaging_ws_client] Exception while waiting for io_context: " +
314 std::string(e.what()));
315 }
316 }
317
318 NETWORK_LOG_INFO("[messaging_ws_client] Client stopped (ID: " +
319 client_id_ + ")");
320
321 return ok();
322 }
323 catch (const std::exception& e)
324 {
326 std::string("Failed to stop client: ") + e.what());
327 }
328 }
329
331 {
332 tcp::resolver resolver(*io_context_);
333 auto endpoints = resolver.resolve(config_.host, std::to_string(config_.port));
334
335 auto socket = std::make_shared<tcp::socket>(*io_context_);
336
337 asio::async_connect(
338 *socket, endpoints,
339 [this, socket](std::error_code ec, tcp::endpoint)
340 {
341 if (ec)
342 {
343 NETWORK_LOG_ERROR("[messaging_ws_client] Connection failed: " +
344 ec.message());
345 invoke_error_callback(ec);
346 return;
347 }
348
349 // Wrap in tcp_socket
350 auto tcp_sock = std::make_shared<internal::tcp_socket>(std::move(*socket));
351
352 // Wrap in websocket_socket
353 {
354 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
355 ws_socket_ = std::make_shared<internal::websocket_socket>(tcp_sock, true);
356
357 // Set WebSocket callbacks
358 ws_socket_->set_message_callback(
359 [this](const internal::ws_message& msg) { on_message(msg); });
360
361 ws_socket_->set_ping_callback(
362 [this](const std::vector<uint8_t>& payload) { on_ping(payload); });
363
364 ws_socket_->set_close_callback(
365 [this](internal::ws_close_code code, const std::string& reason)
366 { on_close(code, reason); });
367
368 ws_socket_->set_error_callback(
369 [this](std::error_code ec) { on_error(ec); });
370
371 // Perform handshake
372 ws_socket_->async_handshake(
373 config_.host, config_.path, config_.port,
374 [this](std::error_code ec)
375 {
376 if (ec)
377 {
378 NETWORK_LOG_ERROR(
379 "[messaging_ws_client] Handshake failed: " + ec.message());
380 invoke_error_callback(ec);
381 return;
382 }
383
384 is_connected_.store(true, std::memory_order_release);
386 "[messaging_ws_client] Connected (ID: " +
387 client_id_ + ")");
388
389 // Start reading frames
390 ws_socket_->start_read();
391
392 // Invoke connected callback
393 invoke_connected_callback();
394 });
395 }
396 });
397 }
398
399 auto messaging_ws_client::on_message(const internal::ws_message& msg) -> void
400 {
401 invoke_message_callback(msg);
402 }
403
404 auto messaging_ws_client::on_ping(const std::vector<uint8_t>& payload) -> void
405 {
406 if (config_.auto_pong)
407 {
408 std::lock_guard<std::mutex> lock(ws_socket_mutex_);
409 if (ws_socket_)
410 {
411 ws_socket_->async_send_ping(std::vector<uint8_t>(payload),
412 [](std::error_code) {});
413 }
414 }
415 }
416
417 auto messaging_ws_client::on_close(internal::ws_close_code code,
418 const std::string& reason) -> void
419 {
420 is_connected_.store(false, std::memory_order_release);
421 NETWORK_LOG_INFO("[messaging_ws_client] Connection closed (ID: " +
422 client_id_ + ")");
423
424 invoke_disconnected_callback(code, reason);
425 }
426
427 auto messaging_ws_client::on_error(std::error_code ec) -> void
428 {
429 NETWORK_LOG_ERROR("[messaging_ws_client] Error: " + ec.message());
430 invoke_error_callback(ec);
431 }
432
433 // ========================================================================
434 // Internal Callback Helpers
435 // ========================================================================
436
437 auto messaging_ws_client::invoke_message_callback(const internal::ws_message& msg) -> void
438 {
439 callbacks_.invoke<to_index(callback_index::message)>(msg);
440
441 if (msg.type == internal::ws_message_type::text)
442 {
443 callbacks_.invoke<to_index(callback_index::text_message)>(msg.as_text());
444 }
445 else if (msg.type == internal::ws_message_type::binary)
446 {
447 callbacks_.invoke<to_index(callback_index::binary_message)>(msg.as_binary());
448 }
449 }
450
451 auto messaging_ws_client::invoke_connected_callback() -> void
452 {
453 callbacks_.invoke<to_index(callback_index::connected)>();
454 }
455
456 auto messaging_ws_client::invoke_disconnected_callback(internal::ws_close_code code,
457 const std::string& reason) -> void
458 {
459 callbacks_.invoke<to_index(callback_index::disconnected)>(code, reason);
460 }
461
462 auto messaging_ws_client::invoke_error_callback(std::error_code ec) -> void
463 {
464 callbacks_.invoke<to_index(callback_index::error)>(ec);
465 }
466
467} // namespace kcenon::network::core
auto start(std::string_view host, uint16_t port, std::string_view path="/") -> VoidResult override
Starts the WebSocket client connecting to the specified endpoint.
auto set_text_callback(interfaces::i_websocket_client::text_callback_t callback) -> void override
Sets the callback for text messages (interface version).
auto do_start_impl(std::string_view host, uint16_t port, std::string_view path) -> VoidResult
WebSocket-specific implementation of client start.
auto is_running() const -> bool override
Checks if the client is currently running.
auto ping(std::vector< uint8_t > &&payload={}) -> VoidResult override
Sends a ping frame (interface version).
messaging_ws_client(std::string_view client_id)
Constructs a WebSocket client.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
auto do_connect() -> void
Initiates async connection to the server.
auto set_connected_callback(interfaces::i_websocket_client::connected_callback_t callback) -> void override
Sets the callback for connection established (interface version).
auto stop() -> VoidResult override
Stops the WebSocket client.
auto wait_for_stop() -> void override
Blocks until stop() is called.
~messaging_ws_client() noexcept override
Destructor. Automatically stops the client if still running.
std::function< void(internal::ws_close_code, const std::string &)> disconnected_callback_t
Callback type for disconnection with close code.
auto set_binary_callback(interfaces::i_websocket_client::binary_callback_t callback) -> void override
Sets the callback for binary messages (interface version).
auto send_text(std::string &&message, interfaces::i_websocket_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a text message (interface version).
auto is_connected() const -> bool override
Checks if the WebSocket connection is established.
auto set_disconnected_callback(interfaces::i_websocket_client::disconnected_callback_t callback) -> void override
Sets the callback for disconnection (interface version).
auto start_client(const ws_client_config &config) -> VoidResult
Starts the client with full configuration.
auto set_error_callback(interfaces::i_websocket_client::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
auto close(uint16_t code=1000, std::string_view reason="") -> VoidResult override
Closes the WebSocket connection gracefully (interface version).
auto do_stop_impl() -> VoidResult
WebSocket-specific implementation of client stop.
auto send_binary(std::vector< uint8_t > &&data, interfaces::i_websocket_client::send_callback_t handler=nullptr) -> VoidResult override
Sends a binary message (interface version).
auto client_id() const -> const std::string &
Returns the client identifier.
auto send_ping(std::vector< uint8_t > &&payload={}) -> VoidResult
Sends a ping frame.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(std::error_code, std::size_t)> send_callback_t
Callback type for send completion.
std::function< void()> connected_callback_t
Callback type for connection established.
std::function< void(const std::vector< uint8_t > &)> binary_callback_t
Callback type for binary messages.
std::function< void(const std::string &)> text_callback_t
Callback type for text messages.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
std::function< void(uint16_t code, std::string_view reason)> disconnected_callback_t
Callback type for disconnection (with close code and reason)
auto is_running() const -> bool
Checks if the component is currently running.
tracing_config config
Definition exporters.cpp:29
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
@ text
Text message (UTF-8 encoded)
ws_close_code
WebSocket close status codes (RFC 6455 Section 7.4).
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Global context for shared network system resources.
Configuration for WebSocket client.
Represents a complete WebSocket message.