19 std::string_view server_id)
20 : server_id_(server_id)
23 socket_ = std::make_shared<internal::tcp_socket>(std::move(socket));
45 if (is_stopped_.load())
52 auto weak_self = weak_from_this();
53 socket_->set_receive_callback_view(
54 [weak_self](std::span<const uint8_t> data)
56 if (
auto self = weak_self.lock())
58 self->on_receive(data);
61 socket_->set_error_callback([weak_self](std::error_code ec)
63 if (
auto self = weak_self.lock())
70 socket_->start_read();
75 if (is_stopped_.exchange(
true))
88 std::lock_guard<std::mutex> lock(callback_mutex_);
89 if (disconnection_callback_)
93 disconnection_callback_(server_id_);
105 if (is_stopped_.load())
115 socket_->async_send(std::move(data),
116 [](std::error_code, std::size_t) {
123 if (is_stopped_.load())
128 "messaging_session");
135 "Data cannot be empty",
136 "messaging_session");
140 send_packet(std::move(data));
150 if (is_stopped_.load())
160 std::lock_guard<std::mutex> lock(queue_mutex_);
161 size_t queue_size = pending_messages_.size();
165 if (queue_size >= max_pending_messages_ * 2)
173 pending_messages_.emplace_back(data.begin(), data.end());
177 process_next_message();
187 std::lock_guard<std::mutex> lock(callback_mutex_);
213 std::lock_guard<std::mutex> lock(queue_mutex_);
214 if (pending_messages_.empty())
219 message = std::move(pending_messages_.front());
220 pending_messages_.pop_front();
225 std::lock_guard<std::mutex> lock(callback_mutex_);
226 if (receive_callback_)
241 std::lock_guard<std::mutex> lock(queue_mutex_);
242 if (!pending_messages_.empty())
251 std::function<
void(
const std::vector<uint8_t>&)> callback) ->
void
253 std::lock_guard<std::mutex> lock(callback_mutex_);
254 receive_callback_ = std::move(callback);
258 std::function<
void(
const std::string&)> callback) ->
void
260 std::lock_guard<std::mutex> lock(callback_mutex_);
261 disconnection_callback_ = std::move(callback);
265 std::function<
void(std::error_code)> callback) ->
void
267 std::lock_guard<std::mutex> lock(callback_mutex_);
268 error_callback_ = std::move(callback);
static void report_bytes_received(size_t bytes)
Report bytes received.
static void report_bytes_sent(size_t bytes)
Report bytes sent.
auto start_session() -> void
Starts the session: sets up read/error callbacks and begins reading data.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data to the client.
auto process_next_message() -> void
Processes pending messages from the queue.
auto send_packet(std::vector< uint8_t > &&data) -> void
Sends data to the connected client, optionally using compression/encryption.
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Sets the callback for received data.
auto on_error(std::error_code ec) -> void
Callback for handling socket errors from tcp_socket.
~messaging_session() noexcept override
Destructor; calls stop_session() if not already stopped.
auto on_receive(std::span< const uint8_t > data) -> void
Callback for when data arrives from the client.
messaging_session(asio::ip::tcp::socket socket, std::string_view server_id)
Constructs a session with a given socket and server_id.
auto set_disconnection_callback(std::function< void(const std::string &)> callback) -> void
Sets the callback for disconnection.
auto stop_session() -> void
Stops the session by closing the socket and marking the session as inactive.
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
Sets the callback for errors.
std::shared_ptr< internal::tcp_socket > socket_
Logger system integration interface for network_system.
Messaging session managing bidirectional message exchange.
constexpr int invalid_argument
constexpr int connection_closed
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Network system metrics definitions and reporting utilities.