Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#include <chrono>
12#include <span>
13#include <string_view>
14
15// Use nested namespace definition (C++17)
16namespace kcenon::network::core {
17
18using tcp = asio::ip::tcp;
19
20// Use string_view for better efficiency (C++17)
21messaging_client::messaging_client(std::string_view client_id)
22 : client_id_(client_id) {
23}
24
26 if (is_running()) {
27 (void)stop_client();
28 }
29}
30
31auto messaging_client::start_client(std::string_view host, unsigned short port)
32 -> VoidResult {
33 // Create tracing span for client start operation
34 auto span = tracing::is_tracing_enabled()
35 ? std::make_optional(tracing::trace_context::create_span("tcp.client.start"))
36 : std::nullopt;
37 if (span) {
38 span->set_attribute("net.peer.name", host)
39 .set_attribute("net.peer.port", static_cast<int64_t>(port))
40 .set_attribute("net.transport", "tcp")
41 .set_attribute("client.id", client_id_);
42 }
43
44 if (host.empty()) {
45 if (span) {
46 span->set_error("Host cannot be empty");
47 }
49 "Host cannot be empty",
50 "messaging_client::start_client");
51 }
52
53 // Reset connection state before start
54 is_connected_.store(false, std::memory_order_release);
55
56 // Use base class do_start which handles lifecycle management
57 auto result = do_start(host, port);
58 if (result.is_err()) {
59 if (span) {
60 span->set_error(result.error().message);
61 }
62 } else {
63 if (span) {
64 span->set_status(tracing::span_status::ok);
65 }
66 }
67
68 return result;
69}
70
72 // Use base class do_stop which handles lifecycle management and calls on_stopped()
73 // Note: do_stop() sets stop_initiated_ first, then calls do_stop_impl() which
74 // handles is_connected_ reset and async operation cleanup in the correct order.
75 return do_stop();
76}
77
79 // Invoke disconnected callback after stop completes
80 callbacks_.invoke<to_index(callback_index::disconnected)>();
81}
82
83// Note: wait_for_stop() and is_running() are inherited from startable_base
84
85auto messaging_client::is_connected() const -> bool {
86 return is_connected_.load(std::memory_order_acquire);
87}
88
89auto messaging_client::client_id() const -> const std::string& {
90 return client_id_;
91}
92
93auto messaging_client::send_packet(std::vector<uint8_t>&& data) -> VoidResult {
94 // Create tracing span for send operation
95 auto span = tracing::is_tracing_enabled()
96 ? std::make_optional(tracing::trace_context::create_span("tcp.client.send"))
97 : std::nullopt;
98 if (span) {
99 span->set_attribute("net.transport", "tcp")
100 .set_attribute("message.size", static_cast<int64_t>(data.size()))
101 .set_attribute("client.id", client_id_);
102 }
103
104 if (!is_connected()) {
105 if (span) {
106 span->set_error("Client is not connected");
107 }
109 "Client is not connected",
110 "messaging_client::send_packet",
111 "Client ID: " + client_id_);
112 }
113
114 if (data.empty()) {
115 if (span) {
116 span->set_error("Data cannot be empty");
117 }
119 "Data cannot be empty",
120 "messaging_client::send_packet");
121 }
122
123 auto result = do_send_impl(std::move(data));
124 if (span) {
125 if (result.is_err()) {
126 span->set_error(result.error().message);
127 } else {
128 span->set_status(tracing::span_status::ok);
129 }
130 }
131 return result;
132}
133
135 callbacks_.set<to_index(callback_index::receive)>(std::move(callback));
136}
137
139 callbacks_.set<to_index(callback_index::connected)>(std::move(callback));
140}
141
143 callbacks_.set<to_index(callback_index::disconnected)>(std::move(callback));
144}
145
147 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
148}
149
150// ===========================================================================
151// IProtocolClient Interface Implementation
152// ===========================================================================
153
154auto messaging_client::start(std::string_view host, uint16_t port) -> VoidResult {
155 return start_client(host, port);
156}
157
159 return stop_client();
160}
161
162auto messaging_client::send(std::vector<uint8_t>&& data) -> VoidResult {
163 return send_packet(std::move(data));
164}
165
166auto messaging_client::set_observer(std::shared_ptr<interfaces::connection_observer> observer) -> void {
167 observer_ = observer;
168
169 // Bridge observer methods to existing callbacks
170 if (observer_) {
171 set_receive_callback([observer](const std::vector<uint8_t>& data) {
172 observer->on_receive(data);
173 });
174
175 set_connected_callback([observer]() {
176 observer->on_connected();
177 });
178
179 set_disconnected_callback([observer]() {
180 observer->on_disconnected();
181 });
182
183 set_error_callback([observer](std::error_code ec) {
184 observer->on_error(ec);
185 });
186 }
187}
188
189// ===========================================================================
190// Internal Callback Helpers
191// ===========================================================================
192
194 is_connected_.store(connected, std::memory_order_release);
195}
196
197auto messaging_client::invoke_receive_callback(const std::vector<uint8_t>& data) -> void {
198 callbacks_.invoke<to_index(callback_index::receive)>(data);
199}
200
202 callbacks_.invoke<to_index(callback_index::connected)>();
203}
204
206 callbacks_.invoke<to_index(callback_index::disconnected)>();
207}
208
209auto messaging_client::invoke_error_callback(std::error_code ec) -> void {
210 callbacks_.invoke<to_index(callback_index::error)>(ec);
211}
212
213// TCP-specific implementation of client start
214auto messaging_client::do_start_impl(std::string_view host, unsigned short port)
215 -> VoidResult {
216 try {
217 {
218 std::lock_guard<std::mutex> lock(socket_mutex_);
219 socket_.reset();
220 }
221
222 // Use Intentional Leak pattern for io_context to prevent heap corruption
223 // during static destruction phase. The no-op deleter ensures io_context
224 // survives until process termination, avoiding issues when pending async
225 // operations' handlers reference io_context internals.
226 // Memory impact: ~few KB per client (reclaimed by OS on process termination)
227 io_context_ = std::shared_ptr<asio::io_context>(
228 new asio::io_context(),
229 [](asio::io_context*) { /* no-op deleter - intentional leak */ });
230 // Create work guard to keep io_context running
231 work_guard_ = std::make_unique<
232 asio::executor_work_guard<asio::io_context::executor_type>>(
233 asio::make_work_guard(*io_context_));
234
235 // Use io_context_thread_manager for unified thread management
237 .run_io_context(io_context_, "messaging_client:" + client_id());
238
239 do_connect(host, port);
240
241 // NOTE: No logging here to prevent heap corruption during static destruction.
242 // When common_system uses async logging, log messages may still be queued
243 // when static destruction begins, causing heap corruption when the logging
244 // thread accesses destroyed GlobalLoggerRegistry.
245
246 return ok();
247 } catch (const std::exception &e) {
249 "Failed to start client: " + std::string(e.what()),
250 "messaging_client::do_start",
251 "Client ID: " + client_id() +
252 ", Host: " + std::string(host));
253 }
254}
255
256// TCP-specific implementation of client stop
258 // NOTE: No logging in do_stop to prevent heap corruption during static
259 // destruction. This method may be called from destructor when
260 // GlobalLoggerRegistry is already destroyed.
261
262 // Mark as disconnected first (stop_initiated_ is already set by base class do_stop())
263 is_connected_.store(false, std::memory_order_release);
264
265 try {
266 // Swap out socket with mutex protection and close outside lock
267 std::shared_ptr<internal::tcp_socket> local_socket;
268 {
269 std::lock_guard<std::mutex> lock(socket_mutex_);
270 local_socket = std::move(socket_);
271 }
272 // Close socket safely using tcp_socket::close() which posts to the socket's
273 // executor for thread-safety with async operations.
274 if (local_socket) {
275 local_socket->close();
276 }
277
278 // Cancel pending connection operations by posting to io_context.
279 // This ensures socket operations happen on the same thread as async_connect,
280 // preventing data races detected by ThreadSanitizer.
281 if (io_context_) {
282 std::shared_ptr<asio::ip::tcp::resolver> resolver_to_cancel;
283 std::shared_ptr<asio::ip::tcp::socket> socket_to_close;
284 {
285 std::lock_guard<std::mutex> lock(pending_mutex_);
286 resolver_to_cancel = pending_resolver_;
287 socket_to_close = pending_socket_;
288 }
289
290 if (resolver_to_cancel || socket_to_close) {
291 // Use promise/future to wait for the close operation to complete
292 auto close_promise = std::make_shared<std::promise<void>>();
293 auto close_future = close_promise->get_future();
294
295 asio::post(*io_context_,
296 [resolver_to_cancel, socket_to_close, close_promise]() {
297 if (resolver_to_cancel) {
298 resolver_to_cancel->cancel();
299 }
300 if (socket_to_close) {
301 asio::error_code ec;
302 // Only use close() - do NOT call cancel() on pending_socket_.
303 // When async_connect() is in progress, the socket may be open but
304 // not fully registered with the reactor. Calling cancel() on such
305 // a socket causes SEGV in epoll_reactor::cancel_ops() when accessing
306 // uninitialized op_queue. close() is sufficient as it cancels all
307 // pending async operations and is safe in any socket state.
308 socket_to_close->close(ec);
309 }
310 close_promise->set_value();
311 });
312
313 // Wait with timeout in case io_context is already stopped
314 close_future.wait_for(std::chrono::milliseconds(100));
315 }
316 }
317
318 // Release work guard to allow io_context to finish
319 if (work_guard_) {
320 work_guard_.reset();
321 }
322
323 // Stop io_context BEFORE waiting. This ensures io_context::run() returns
324 // promptly without waiting for pending async operations to complete.
325 // This is safe because io_context uses intentional leak pattern (no-op deleter),
326 // so any handlers that don't run won't cause heap corruption since
327 // io_context won't be destroyed.
328 if (io_context_) {
330 io_context_);
331 }
332
333 // Wait for io_context task to complete - should return immediately after stop()
334 if (io_context_future_.valid()) {
335 io_context_future_.wait();
336 }
337
338 // Clear pending resources after io_context is stopped
339 {
340 std::lock_guard<std::mutex> lock(pending_mutex_);
341 pending_resolver_.reset();
342 pending_socket_.reset();
343 }
344 // Note: io_context uses intentional leak pattern (no-op deleter),
345 // so reset() just clears the shared_ptr without destroying io_context
346 io_context_.reset();
347
348 return ok();
349 } catch (const std::exception &e) {
351 "Failed to stop client: " + std::string(e.what()),
352 "messaging_client::do_stop",
353 "Client ID: " + client_id());
354 }
355}
356
357// wait_for_stop() is provided by base class
358
359auto messaging_client::on_connection_failed(std::error_code ec) -> void {
360 // NOTE: No logging here to prevent heap corruption during static destruction.
361 // When common_system's GlobalLoggerRegistry is destroyed before this callback
362 // completes, any logging (even with is_logging_safe() check) causes heap
363 // corruption because the check cannot synchronize with common_system's
364 // static destruction order.
365
366 // Mark as not connected (but keep is_running_ true so destructor calls stop_client)
367 set_connected(false);
368
369 // Invoke error callback using base class method
370 invoke_error_callback(ec);
371
372 // Note: Do NOT release work_guard_ or signal stop_promise_ here.
373 // The destructor will call stop_client() which handles all cleanup properly.
374 // This avoids race conditions between callback cleanup and destructor cleanup.
375}
376
377auto messaging_client::do_connect(std::string_view host, unsigned short port)
378 -> void {
379 // Use resolver to get endpoints
380 // Store resolver as member so we can cancel it during stop_client()
381 auto resolver = std::make_shared<tcp::resolver>(*io_context_);
382 {
383 std::lock_guard<std::mutex> lock(pending_mutex_);
384 pending_resolver_ = resolver;
385 }
386 auto self = shared_from_this();
387
388 // NOTE: No logging in async handlers to prevent heap corruption during
389 // static destruction. The is_logging_safe() check cannot synchronize with
390 // common_system's static destruction order.
391
392 resolver->async_resolve(
393 std::string(host), std::to_string(port),
394 [self, resolver](std::error_code ec,
395 tcp::resolver::results_type results) {
396 // Check if stop was initiated before proceeding with socket operations.
397 // This prevents race conditions when do_stop() is called during connection.
398 if (self->is_stop_initiated()) {
399 return;
400 }
401
402 // Clear pending resolver only if it's still the same resolver
403 // (prevents race when rapid connect/disconnect overwrites pending_resolver_)
404 {
405 std::lock_guard<std::mutex> lock(self->pending_mutex_);
406 if (self->pending_resolver_.get() == resolver.get()) {
407 self->pending_resolver_.reset();
408 }
409 }
410 if (ec) {
411 self->on_connection_failed(ec);
412 return;
413 }
414
415 // Check again after clearing resolver
416 if (self->is_stop_initiated()) {
417 return;
418 }
419
420 // Attempt to connect to one of the resolved endpoints
421 // Store socket as member so we can cancel it during stop_client()
422 auto raw_socket = std::make_shared<tcp::socket>(*self->io_context_);
423 {
424 std::lock_guard<std::mutex> lock(self->pending_mutex_);
425 self->pending_socket_ = raw_socket;
426 }
427 asio::async_connect(
428 *raw_socket, results,
429 [self,
430 raw_socket](std::error_code connect_ec,
431 [[maybe_unused]] const tcp::endpoint &endpoint) {
432 // Check if stop was initiated before proceeding with socket operations.
433 // This prevents race conditions when do_stop() is called during connection.
434 if (self->is_stop_initiated()) {
435 return;
436 }
437
438 // Clear pending socket only if it's still the same socket
439 // (prevents race when rapid connect/disconnect overwrites pending_socket_)
440 {
441 std::lock_guard<std::mutex> lock(self->pending_mutex_);
442 if (self->pending_socket_.get() == raw_socket.get()) {
443 self->pending_socket_.reset();
444 }
445 }
446 if (connect_ec) {
447 self->on_connection_failed(connect_ec);
448 return;
449 }
450
451 // Check again after clearing socket
452 if (self->is_stop_initiated()) {
453 return;
454 }
455
456 // On success, wrap it in our tcp_socket with mutex protection
457 {
458 std::lock_guard<std::mutex> lock(self->socket_mutex_);
459 self->socket_ = std::make_shared<internal::tcp_socket>(
460 std::move(*raw_socket));
461 }
462 self->on_connect(connect_ec);
463 });
464 });
465}
466
467auto messaging_client::on_connect(std::error_code ec) -> void {
468 // NOTE: No logging in async handlers to prevent heap corruption during
469 // static destruction. The is_logging_safe() check cannot synchronize with
470 // common_system's static destruction order.
471 if (ec) {
472 return;
473 }
474 set_connected(true);
475
476 // Invoke connected callback using base class method
477 invoke_connected_callback();
478
479 // set callbacks and start read loop with mutex protection
480 // Use span-based callback for zero-copy receive path (no per-read allocation)
481 auto self = shared_from_this();
482 auto local_socket = get_socket();
483
484 if (local_socket) {
485 local_socket->set_receive_callback_view(
486 [self](std::span<const uint8_t> chunk) { self->on_receive(chunk); });
487 local_socket->set_error_callback(
488 [self](std::error_code err) { self->on_error(err); });
489
490 // Post start_read() to io_context to ensure tcp_socket is fully initialized
491 // This prevents SEGV in async_read_some when socket descriptor state is incomplete
492 if (io_context_) {
493 asio::post(*io_context_, [local_socket]() {
494 if (local_socket) {
495 local_socket->start_read();
496 }
497 });
498 }
499 }
500}
501
502// TCP-specific implementation of data send
503auto messaging_client::do_send_impl(std::vector<uint8_t> &&data) -> VoidResult {
504 // Get a local copy of socket with mutex protection
505 auto local_socket = get_socket();
506
507 if (!local_socket) {
509 "Socket not available",
510 "messaging_client::do_send",
511 "Client ID: " + client_id());
512 }
513
514 // Send data directly without pipeline transformation
515 // NOTE: No logging in send callbacks to prevent heap corruption during
516 // static destruction.
517 local_socket->async_send(std::move(data),
518 [](std::error_code, std::size_t) {
519 // Silently ignore errors - no logging during potential static destruction
520 });
521
522 return ok();
523}
524
525auto messaging_client::on_receive(std::span<const uint8_t> data) -> void {
526 // NOTE: No logging in async handlers to prevent heap corruption during
527 // static destruction.
528 if (!is_connected()) {
529 return;
530 }
531
532 // Invoke receive callback using base class method
533 // Copy span to vector only at API boundary to maintain external callback compatibility
534 std::vector<uint8_t> data_copy(data.begin(), data.end());
535 invoke_receive_callback(data_copy);
536}
537
538auto messaging_client::on_error(std::error_code ec) -> void {
539 // NOTE: No logging in async handlers to prevent heap corruption during
540 // static destruction.
541
542 // Invoke error callback using base class method
543 invoke_error_callback(ec);
544
545 // Invoke disconnected callback if was connected
546 if (is_connected()) {
547 invoke_disconnected_callback();
548 }
549
550 // Mark connection as lost using base class method
551 // Note: We don't modify is_running_ here - the base class stop_client() handles that
552 set_connected(false);
553}
554
556 -> std::shared_ptr<internal::tcp_socket> {
557 std::lock_guard<std::mutex> lock(socket_mutex_);
558 return socket_;
559}
560
561} // namespace kcenon::network::core
auto set_connected(bool connected) -> void
Sets the connected state.
auto do_start_impl(std::string_view host, unsigned short port) -> VoidResult
TCP-specific implementation of client start.
auto client_id() const -> const std::string &
Returns the client identifier.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto do_send_impl(std::vector< uint8_t > &&data) -> VoidResult
TCP-specific implementation of data send.
auto get_socket() const -> std::shared_ptr< internal::tcp_socket >
auto is_connected() const -> bool override
Checks if the client is connected to the server (IProtocolClient interface).
auto invoke_connected_callback() -> void
Invokes the connected callback.
auto start_client(std::string_view host, unsigned short port) -> VoidResult
Starts the client and connects to the specified host and port.
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the client and connects to the specified server (IProtocolClient interface).
auto stop_client() -> VoidResult
Stops the client and disconnects from the server.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
std::shared_ptr< internal::tcp_socket > socket_
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
Sends data to the connected server.
~messaging_client() noexcept
Destructor; automatically calls stop_client() if the client is still running.
auto set_observer(std::shared_ptr< interfaces::connection_observer > observer) -> void override
Sets the connection observer for unified event handling.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data to the connected server (IProtocolClient interface).
auto invoke_receive_callback(const std::vector< uint8_t > &data) -> void
Invokes the receive callback.
auto set_disconnected_callback(disconnected_callback_t callback) -> void override
Sets the callback for disconnection.
std::function< void(const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data.
auto on_error(std::error_code ec) -> void
Callback for handling socket errors from tcp_socket.
auto invoke_disconnected_callback() -> void
Invokes the disconnected callback.
std::function< void()> connected_callback_t
Callback type for connection established.
messaging_client(std::string_view client_id)
Constructs a client with a given client_id used for logging or identification.
auto set_receive_callback(receive_callback_t callback) -> void override
Sets the callback for received data.
auto on_connection_failed(std::error_code ec) -> void
Handles connection failure during async resolve or connect.
auto on_connect(std::error_code ec) -> void
Callback invoked upon completion of an async connect.
std::function< void()> disconnected_callback_t
Callback type for disconnection.
auto is_running() const -> bool override
Checks if the client is currently running.
auto on_stopped() -> void
Called after stop operation completes. Invokes the disconnected callback.
auto stop() -> VoidResult override
Stops the client and closes the connection (IProtocolClient interface).
auto do_stop_impl() -> VoidResult
TCP-specific implementation of client stop.
auto on_receive(std::span< const uint8_t > data) -> void
Callback for receiving data from the tcp_socket.
auto do_connect(std::string_view host, unsigned short port) -> void
Internally attempts to resolve and connect to the remote host:port.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback.
auto set_connected_callback(connected_callback_t callback) -> void override
Sets the callback for connection established.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
TCP client implementation.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
RAII span implementation for distributed tracing.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.