18using tcp = asio::ip::tcp;
22 : client_id_(client_id) {
38 span->set_attribute(
"net.peer.name", host)
39 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(port))
40 .set_attribute(
"net.transport",
"tcp")
41 .set_attribute(
"client.id", client_id_);
46 span->set_error(
"Host cannot be empty");
49 "Host cannot be empty",
50 "messaging_client::start_client");
54 is_connected_.store(
false, std::memory_order_release);
57 auto result = do_start(host, port);
58 if (result.is_err()) {
60 span->set_error(result.error().message);
80 callbacks_.invoke<
to_index(callback_index::disconnected)>();
99 span->set_attribute(
"net.transport",
"tcp")
100 .set_attribute(
"message.size",
static_cast<int64_t
>(data.size()))
101 .set_attribute(
"client.id", client_id_);
104 if (!is_connected()) {
106 span->set_error(
"Client is not connected");
109 "Client is not connected",
110 "messaging_client::send_packet",
111 "Client ID: " + client_id_);
116 span->set_error(
"Data cannot be empty");
119 "Data cannot be empty",
120 "messaging_client::send_packet");
123 auto result = do_send_impl(std::move(data));
125 if (result.is_err()) {
126 span->set_error(result.error().message);
135 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
139 callbacks_.set<
to_index(callback_index::connected)>(std::move(callback));
143 callbacks_.set<
to_index(callback_index::disconnected)>(std::move(callback));
147 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
155 return start_client(host, port);
159 return stop_client();
163 return send_packet(std::move(data));
167 observer_ = observer;
171 set_receive_callback([observer](
const std::vector<uint8_t>& data) {
172 observer->on_receive(data);
175 set_connected_callback([observer]() {
176 observer->on_connected();
179 set_disconnected_callback([observer]() {
180 observer->on_disconnected();
183 set_error_callback([observer](std::error_code ec) {
184 observer->on_error(ec);
194 is_connected_.store(
connected, std::memory_order_release);
198 callbacks_.invoke<
to_index(callback_index::receive)>(data);
202 callbacks_.invoke<
to_index(callback_index::connected)>();
206 callbacks_.invoke<
to_index(callback_index::disconnected)>();
210 callbacks_.invoke<
to_index(callback_index::error)>(ec);
218 std::lock_guard<std::mutex> lock(socket_mutex_);
227 io_context_ = std::shared_ptr<asio::io_context>(
228 new asio::io_context(),
229 [](asio::io_context*) { });
231 work_guard_ = std::make_unique<
232 asio::executor_work_guard<asio::io_context::executor_type>>(
233 asio::make_work_guard(*io_context_));
239 do_connect(host, port);
247 }
catch (
const std::exception &e) {
249 "Failed to start client: " + std::string(e.what()),
250 "messaging_client::do_start",
251 "Client ID: " + client_id() +
252 ", Host: " + std::string(host));
263 is_connected_.store(
false, std::memory_order_release);
267 std::shared_ptr<internal::tcp_socket> local_socket;
269 std::lock_guard<std::mutex> lock(socket_mutex_);
270 local_socket = std::move(socket_);
275 local_socket->close();
282 std::shared_ptr<asio::ip::tcp::resolver> resolver_to_cancel;
283 std::shared_ptr<asio::ip::tcp::socket> socket_to_close;
285 std::lock_guard<std::mutex> lock(pending_mutex_);
286 resolver_to_cancel = pending_resolver_;
287 socket_to_close = pending_socket_;
290 if (resolver_to_cancel || socket_to_close) {
292 auto close_promise = std::make_shared<std::promise<void>>();
293 auto close_future = close_promise->get_future();
295 asio::post(*io_context_,
296 [resolver_to_cancel, socket_to_close, close_promise]() {
297 if (resolver_to_cancel) {
298 resolver_to_cancel->cancel();
300 if (socket_to_close) {
308 socket_to_close->close(ec);
310 close_promise->set_value();
314 close_future.wait_for(std::chrono::milliseconds(100));
334 if (io_context_future_.valid()) {
335 io_context_future_.wait();
340 std::lock_guard<std::mutex> lock(pending_mutex_);
341 pending_resolver_.reset();
342 pending_socket_.reset();
349 }
catch (
const std::exception &e) {
351 "Failed to stop client: " + std::string(e.what()),
352 "messaging_client::do_stop",
353 "Client ID: " + client_id());
367 set_connected(
false);
370 invoke_error_callback(ec);
381 auto resolver = std::make_shared<tcp::resolver>(*io_context_);
383 std::lock_guard<std::mutex> lock(pending_mutex_);
384 pending_resolver_ = resolver;
386 auto self = shared_from_this();
392 resolver->async_resolve(
393 std::string(host), std::to_string(port),
394 [self, resolver](std::error_code ec,
395 tcp::resolver::results_type results) {
398 if (self->is_stop_initiated()) {
405 std::lock_guard<std::mutex> lock(self->pending_mutex_);
406 if (self->pending_resolver_.get() == resolver.get()) {
407 self->pending_resolver_.reset();
411 self->on_connection_failed(ec);
416 if (self->is_stop_initiated()) {
422 auto raw_socket = std::make_shared<tcp::socket>(*self->io_context_);
424 std::lock_guard<std::mutex> lock(self->pending_mutex_);
425 self->pending_socket_ = raw_socket;
428 *raw_socket, results,
430 raw_socket](std::error_code connect_ec,
431 [[maybe_unused]]
const tcp::endpoint &endpoint) {
434 if (self->is_stop_initiated()) {
441 std::lock_guard<std::mutex> lock(self->pending_mutex_);
442 if (self->pending_socket_.get() == raw_socket.get()) {
443 self->pending_socket_.reset();
447 self->on_connection_failed(connect_ec);
452 if (self->is_stop_initiated()) {
458 std::lock_guard<std::mutex> lock(self->socket_mutex_);
459 self->socket_ = std::make_shared<internal::tcp_socket>(
460 std::move(*raw_socket));
462 self->on_connect(connect_ec);
477 invoke_connected_callback();
481 auto self = shared_from_this();
482 auto local_socket = get_socket();
485 local_socket->set_receive_callback_view(
486 [self](std::span<const uint8_t> chunk) { self->on_receive(chunk); });
487 local_socket->set_error_callback(
488 [self](std::error_code err) { self->on_error(err); });
493 asio::post(*io_context_, [local_socket]() {
495 local_socket->start_read();
505 auto local_socket = get_socket();
509 "Socket not available",
510 "messaging_client::do_send",
511 "Client ID: " + client_id());
517 local_socket->async_send(std::move(data),
518 [](std::error_code, std::size_t) {
528 if (!is_connected()) {
534 std::vector<uint8_t> data_copy(data.begin(), data.end());
535 invoke_receive_callback(data_copy);
543 invoke_error_callback(ec);
546 if (is_connected()) {
547 invoke_disconnected_callback();
552 set_connected(
false);
556 -> std::shared_ptr<internal::tcp_socket> {
auto set_connected(bool connected) -> void
Sets the connected state.
auto do_start_impl(std::string_view host, unsigned short port) -> VoidResult
TCP-specific implementation of client start.
auto client_id() const -> const std::string &
Returns the client identifier.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto do_send_impl(std::vector< uint8_t > &&data) -> VoidResult
TCP-specific implementation of data send.
auto get_socket() const -> std::shared_ptr< internal::tcp_socket >
auto is_connected() const -> bool override
Checks if the client is connected to the server (IProtocolClient interface).
auto invoke_connected_callback() -> void
Invokes the connected callback.
auto start_client(std::string_view host, unsigned short port) -> VoidResult
Starts the client and connects to the specified host and port.
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the client and connects to the specified server (IProtocolClient interface).
auto stop_client() -> VoidResult
Stops the client and disconnects from the server.
auto set_error_callback(error_callback_t callback) -> void override
Sets the callback for errors.
std::shared_ptr< internal::tcp_socket > socket_
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
Sends data to the connected server.
~messaging_client() noexcept
Destructor; automatically calls stop_client() if the client is still running.
std::atomic< bool > is_connected_
auto set_observer(std::shared_ptr< interfaces::connection_observer > observer) -> void override
Sets the connection observer for unified event handling.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data to the connected server (IProtocolClient interface).
auto invoke_receive_callback(const std::vector< uint8_t > &data) -> void
Invokes the receive callback.
auto set_disconnected_callback(disconnected_callback_t callback) -> void override
Sets the callback for disconnection.
std::function< void(const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data.
auto on_error(std::error_code ec) -> void
Callback for handling socket errors from tcp_socket.
auto invoke_disconnected_callback() -> void
Invokes the disconnected callback.
std::function< void()> connected_callback_t
Callback type for connection established.
messaging_client(std::string_view client_id)
Constructs a client with a given client_id used for logging or identification.
auto set_receive_callback(receive_callback_t callback) -> void override
Sets the callback for received data.
auto on_connection_failed(std::error_code ec) -> void
Handles connection failure during async resolve or connect.
auto on_connect(std::error_code ec) -> void
Callback invoked upon completion of an async connect.
std::function< void()> disconnected_callback_t
Callback type for disconnection.
auto is_running() const -> bool override
Checks if the client is currently running.
auto on_stopped() -> void
Called after stop operation completes. Invokes the disconnected callback.
auto stop() -> VoidResult override
Stops the client and closes the connection (IProtocolClient interface).
auto do_stop_impl() -> VoidResult
TCP-specific implementation of client stop.
auto on_receive(std::span< const uint8_t > data) -> void
Callback for receiving data from the tcp_socket.
auto do_connect(std::string_view host, unsigned short port) -> void
Internally attempts to resolve and connect to the remote host:port.
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback.
auto set_connected_callback(connected_callback_t callback) -> void override
Sets the callback for connection established.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
TCP client implementation.
constexpr int invalid_argument
constexpr int internal_error
constexpr int connection_closed
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
RAII span implementation for distributed tracing.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.