Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
kcenon::network::session::messaging_session Class Reference

Manages a single connected client session on the server side, providing asynchronous read/write operations and pipeline transformations. More...

#include <messaging_session.h>

Inheritance diagram for kcenon::network::session::messaging_session:
Inheritance graph
Collaboration diagram for kcenon::network::session::messaging_session:
Collaboration graph

Public Member Functions

 messaging_session (asio::ip::tcp::socket socket, std::string_view server_id)
 Constructs a session with a given socket and server_id.
 
 ~messaging_session () noexcept override
 Destructor; calls stop_session() if not already stopped.
 
auto id () const -> std::string_view override
 Gets the unique identifier for this session.
 
auto is_connected () const -> bool override
 Checks if the session is currently connected.
 
auto send (std::vector< uint8_t > &&data) -> VoidResult override
 Sends data to the client.
 
auto close () -> void override
 Closes the session.
 
auto start_session () -> void
 Starts the session: sets up read/error callbacks and begins reading data.
 
auto stop_session () -> void
 Stops the session by closing the socket and marking the session as inactive.
 
auto is_stopped () const noexcept -> bool
 Checks if the session has been stopped.
 
auto send_packet (std::vector< uint8_t > &&data) -> void
 Sends data to the connected client, optionally using compression/encryption.
 
auto set_receive_callback (std::function< void(const std::vector< uint8_t > &)> callback) -> void
 Sets the callback for received data.
 
auto set_disconnection_callback (std::function< void(const std::string &)> callback) -> void
 Sets the callback for disconnection.
 
auto set_error_callback (std::function< void(std::error_code)> callback) -> void
 Sets the callback for errors.
 
auto server_id () const -> const std::string &
 Gets the server identifier.
 
- Public Member Functions inherited from kcenon::network::interfaces::i_session
virtual ~i_session ()=default
 Virtual destructor for proper cleanup.
 
 i_session (const i_session &)=delete
 
i_sessionoperator= (const i_session &)=delete
 
 i_session (i_session &&)=delete
 
i_sessionoperator= (i_session &&)=delete
 

Private Member Functions

auto on_receive (std::span< const uint8_t > data) -> void
 Callback for when data arrives from the client.
 
auto on_error (std::error_code ec) -> void
 Callback for handling socket errors from tcp_socket.
 
auto process_next_message () -> void
 Processes pending messages from the queue.
 

Private Attributes

std::string server_id_
 
std::shared_ptr< internal::tcp_socketsocket_
 
std::atomic< bool > is_stopped_
 
std::deque< std::vector< uint8_t > > pending_messages_
 Queue of pending received messages awaiting processing.
 
std::mutex queue_mutex_
 Mutex protecting access to pending_messages_ queue.
 
std::function< void(const std::vector< uint8_t > &)> receive_callback_
 Callbacks for session events.
 
std::function< void(const std::string &)> disconnection_callback_
 
std::function< void(std::error_code)> error_callback_
 
std::mutex callback_mutex_
 Mutex protecting callback access.
 

Static Private Attributes

static constexpr size_t max_pending_messages_ = 1000
 Maximum number of pending messages before applying backpressure.
 

Additional Inherited Members

- Protected Member Functions inherited from kcenon::network::interfaces::i_session
 i_session ()=default
 Default constructor (only for derived classes)
 

Detailed Description

Manages a single connected client session on the server side, providing asynchronous read/write operations and pipeline transformations.

This class implements the i_session interface, providing a composition-based design for TCP sessions. It replaces the previous inheritance-heavy approach with a cleaner interface-based model.

Responsibilities

  • Owns a tcp_socket for non-blocking I/O.
  • Optionally applies compression/encryption via pipeline_ before sending, and can do the reverse upon receiving data (if needed).
  • Provides callbacks (on_receive, on_error) for data handling and error detection.

Lifecycle

  • Constructed with an accepted asio::ip::tcp::socket.
  • start_session() sets up callbacks and begins socket_->start_read().
  • stop_session() closes the underlying socket, stopping further I/O.

Thread Safety

  • All public methods are thread-safe.
  • Session state (is_stopped_) is protected by atomic operations.
  • Pipeline mode flags are protected by mode_mutex_.
  • Socket operations are serialized through ASIO's io_context.

Interface Compliance

This class implements interfaces::i_session for composition-based usage.

Definition at line 77 of file messaging_session.h.

Constructor & Destructor Documentation

◆ messaging_session()

kcenon::network::session::messaging_session::messaging_session ( asio::ip::tcp::socket socket,
std::string_view server_id )

Constructs a session with a given socket and server_id.

Parameters
socketThe asio::ip::tcp::socket (already connected).
server_idAn identifier for this server instance or context.

Definition at line 18 of file messaging_session.cpp.

21 {
22 // Create the tcp_socket wrapper
23 socket_ = std::make_shared<internal::tcp_socket>(std::move(socket));
24 }
auto server_id() const -> const std::string &
Gets the server identifier.
std::shared_ptr< internal::tcp_socket > socket_

References socket_.

◆ ~messaging_session()

kcenon::network::session::messaging_session::~messaging_session ( )
overridenoexcept

Destructor; calls stop_session() if not already stopped.

Definition at line 26 of file messaging_session.cpp.

27 {
28 try
29 {
30 // Call stop_session to clean up resources
32 }
33 catch (...)
34 {
35 // Destructor must not throw - swallow all exceptions
36 }
37 }
auto stop_session() -> void
Stops the session by closing the socket and marking the session as inactive.

References stop_session().

Here is the call graph for this function:

Member Function Documentation

◆ close()

auto kcenon::network::session::messaging_session::close ( ) -> void
inlineoverridevirtual

Closes the session.

Implements i_session::close(). Delegates to stop_session().

Implements kcenon::network::interfaces::i_session.

Definition at line 136 of file messaging_session.h.

136{ stop_session(); }

References stop_session().

Here is the call graph for this function:

◆ id()

auto kcenon::network::session::messaging_session::id ( ) const -> std::string_view
inlinenodiscardoverridevirtual

Gets the unique identifier for this session.

Returns
A string view of the session ID.

Implements i_session::id(). Returns the server_id that was provided during construction.

Implements kcenon::network::interfaces::i_session.

Definition at line 107 of file messaging_session.h.

107 {
108 return server_id_;
109 }

References server_id_.

◆ is_connected()

auto kcenon::network::session::messaging_session::is_connected ( ) const -> bool
inlinenodiscardoverridevirtual

Checks if the session is currently connected.

Returns
true if connected, false otherwise.

Implements i_session::is_connected(). Returns the opposite of is_stopped().

Implements kcenon::network::interfaces::i_session.

Definition at line 118 of file messaging_session.h.

118 {
119 return !is_stopped_.load(std::memory_order_acquire);
120 }

References is_stopped_.

◆ is_stopped()

auto kcenon::network::session::messaging_session::is_stopped ( ) const -> bool
inlinenodiscardnoexcept

Checks if the session has been stopped.

Returns
true if the session is stopped, false otherwise.

Definition at line 158 of file messaging_session.h.

158 {
159 return is_stopped_.load(std::memory_order_relaxed);
160 }

References is_stopped_.

◆ on_error()

auto kcenon::network::session::messaging_session::on_error ( std::error_code ec) -> void
private

Callback for handling socket errors from tcp_socket.

Parameters
ecThe std::error_code describing the error.

By default, logs the error and calls stop_session().

Definition at line 180 of file messaging_session.cpp.

181 {
182 // NOTE: No logging in async handlers to prevent heap corruption during
183 // static destruction.
184
185 // Invoke error callback if set
186 {
187 std::lock_guard<std::mutex> lock(callback_mutex_);
188 if (error_callback_)
189 {
190 try
191 {
192 error_callback_(ec);
193 }
194 catch (...)
195 {
196 // Silently ignore exceptions - no logging during potential static destruction
197 }
198 }
199 }
200
201 stop_session();
202 }
std::function< void(std::error_code)> error_callback_
std::mutex callback_mutex_
Mutex protecting callback access.

◆ on_receive()

auto kcenon::network::session::messaging_session::on_receive ( std::span< const uint8_t > data) -> void
private

Callback for when data arrives from the client.

Parameters
dataA span view containing a chunk of received bytes.

Zero-Copy Performance

The span provides a non-owning view directly into the socket's internal read buffer, avoiding per-read vector allocations.

Lifetime Contract

  • The span is valid only until this callback returns.
  • Data must be copied into pending_messages_ for retention.

Override or extend the logic here to parse messages, handle commands, etc. If decompression/decryption is needed, apply pipeline_ accordingly.

Definition at line 145 of file messaging_session.cpp.

146 {
147 // NOTE: No logging in async handlers to prevent heap corruption during
148 // static destruction.
149
150 if (is_stopped_.load())
151 {
152 return;
153 }
154
155 // Report bytes received metric
157
158 // Check queue size before adding
159 {
160 std::lock_guard<std::mutex> lock(queue_mutex_);
161 size_t queue_size = pending_messages_.size();
162
163 // Apply backpressure if queue is getting full
164 // If queue is severely overflowing, disconnect the session
165 if (queue_size >= max_pending_messages_ * 2)
166 {
167 stop_session();
168 return;
169 }
170
171 // Copy span data into vector only when adding to queue (single copy point)
172 // This is the only allocation in the receive path
173 pending_messages_.emplace_back(data.begin(), data.end());
174 }
175
176 // Process messages from queue
178 }
static void report_bytes_received(size_t bytes)
Report bytes received.
std::deque< std::vector< uint8_t > > pending_messages_
Queue of pending received messages awaiting processing.
static constexpr size_t max_pending_messages_
Maximum number of pending messages before applying backpressure.
auto process_next_message() -> void
Processes pending messages from the queue.
std::mutex queue_mutex_
Mutex protecting access to pending_messages_ queue.

References kcenon::network::metrics::metric_reporter::report_bytes_received().

Here is the call graph for this function:

◆ process_next_message()

auto kcenon::network::session::messaging_session::process_next_message ( ) -> void
private

Processes pending messages from the queue.

This method dequeues and processes messages one at a time. Implement actual message handling logic here.

Definition at line 204 of file messaging_session.cpp.

205 {
206 // NOTE: No logging in message processing to prevent heap corruption during
207 // static destruction.
208
209 std::vector<uint8_t> message;
210
211 // Dequeue one message
212 {
213 std::lock_guard<std::mutex> lock(queue_mutex_);
214 if (pending_messages_.empty())
215 {
216 return;
217 }
218
219 message = std::move(pending_messages_.front());
220 pending_messages_.pop_front();
221 }
222
223 // Invoke receive callback if set
224 {
225 std::lock_guard<std::mutex> lock(callback_mutex_);
227 {
228 try
229 {
231 }
232 catch (...)
233 {
234 // Silently ignore exceptions - no logging during potential static destruction
235 }
236 }
237 }
238
239 // If there are more messages, process them asynchronously
240 // to avoid blocking the receive thread
241 std::lock_guard<std::mutex> lock(queue_mutex_);
242 if (!pending_messages_.empty())
243 {
244 // In a production system, you might want to post this to a thread pool
245 // or use a work queue to process messages asynchronously
246 // For now, we just process one message per call
247 }
248 }
std::function< void(const std::vector< uint8_t > &)> receive_callback_
Callbacks for session events.

References kcenon::network::message.

◆ send()

auto kcenon::network::session::messaging_session::send ( std::vector< uint8_t > && data) -> VoidResult
nodiscardoverridevirtual

Sends data to the client.

Parameters
dataThe data to send.
Returns
VoidResult indicating success or failure.

Implements i_session::send(). Wraps send_packet() with Result type.

Implements kcenon::network::interfaces::i_session.

Definition at line 121 of file messaging_session.cpp.

122 {
123 if (is_stopped_.load())
124 {
125 return error_void(
127 "Session is closed",
128 "messaging_session");
129 }
130
131 if (data.empty())
132 {
133 return error_void(
135 "Data cannot be empty",
136 "messaging_session");
137 }
138
139 // Delegate to send_packet for actual sending
140 send_packet(std::move(data));
141
142 return ok();
143 }
auto send_packet(std::vector< uint8_t > &&data) -> void
Sends data to the connected client, optionally using compression/encryption.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()

References kcenon::network::error_codes::network_system::connection_closed, kcenon::network::error_void(), kcenon::network::error_codes::common_errors::invalid_argument, and kcenon::network::ok().

Here is the call graph for this function:

◆ send_packet()

auto kcenon::network::session::messaging_session::send_packet ( std::vector< uint8_t > && data) -> void

Sends data to the connected client, optionally using compression/encryption.

Parameters
dataThe raw bytes to transmit (moved for efficiency).

Notes

  • If compress_mode_ or encrypt_mode_ is true, the data will be processed by the pipeline's compress/encrypt functions before writing.
  • Data is moved (not copied) to avoid memory allocation overhead.
  • After calling this function, the original vector will be empty.

Definition at line 103 of file messaging_session.cpp.

104 {
105 if (is_stopped_.load())
106 {
107 return;
108 }
109
110 // Report bytes sent metric
112
113 // Send data directly without pipeline transformation
114 // NOTE: No logging in async callbacks to prevent heap corruption during static destruction
115 socket_->async_send(std::move(data),
116 [](std::error_code, std::size_t) {
117 // Silently ignore errors - no logging during potential static destruction
118 });
119 }
static void report_bytes_sent(size_t bytes)
Report bytes sent.

References kcenon::network::metrics::metric_reporter::report_bytes_sent().

Here is the call graph for this function:

◆ server_id()

auto kcenon::network::session::messaging_session::server_id ( ) const -> const std::string&
inlinenodiscard

Gets the server identifier.

Returns
The server_id string.

Definition at line 210 of file messaging_session.h.

210 {
211 return server_id_;
212 }

References server_id_.

◆ set_disconnection_callback()

auto kcenon::network::session::messaging_session::set_disconnection_callback ( std::function< void(const std::string &)> callback) -> void

Sets the callback for disconnection.

Parameters
callbackFunction called when the session stops.

The callback receives the server_id as identification.

Definition at line 257 of file messaging_session.cpp.

259 {
260 std::lock_guard<std::mutex> lock(callback_mutex_);
261 disconnection_callback_ = std::move(callback);
262 }
std::function< void(const std::string &)> disconnection_callback_

◆ set_error_callback()

auto kcenon::network::session::messaging_session::set_error_callback ( std::function< void(std::error_code)> callback) -> void

Sets the callback for errors.

Parameters
callbackFunction called when an error occurs.

The callback receives the error code.

Definition at line 264 of file messaging_session.cpp.

266 {
267 std::lock_guard<std::mutex> lock(callback_mutex_);
268 error_callback_ = std::move(callback);
269 }

◆ set_receive_callback()

auto kcenon::network::session::messaging_session::set_receive_callback ( std::function< void(const std::vector< uint8_t > &)> callback) -> void

Sets the callback for received data.

Parameters
callbackFunction called when data is received.

The callback receives a const reference to the received data. It is invoked on the I/O thread, so keep processing minimal or dispatch to a worker thread.

Definition at line 250 of file messaging_session.cpp.

252 {
253 std::lock_guard<std::mutex> lock(callback_mutex_);
254 receive_callback_ = std::move(callback);
255 }

◆ start_session()

auto kcenon::network::session::messaging_session::start_session ( ) -> void

Starts the session: sets up read/error callbacks and begins reading data.

Definition at line 39 of file messaging_session.cpp.

40 {
41 // NOTE: No logging in session methods to prevent heap corruption during
42 // static destruction. These methods may run in async handlers when
43 // GlobalLoggerRegistry is already destroyed.
44
45 if (is_stopped_.load())
46 {
47 return;
48 }
49
50 // Set up callbacks with weak_ptr to avoid circular reference
51 // Use span-based callback for zero-copy receive path (no per-read allocation)
52 auto weak_self = weak_from_this();
53 socket_->set_receive_callback_view(
54 [weak_self](std::span<const uint8_t> data)
55 {
56 if (auto self = weak_self.lock())
57 {
58 self->on_receive(data);
59 }
60 });
61 socket_->set_error_callback([weak_self](std::error_code ec)
62 {
63 if (auto self = weak_self.lock())
64 {
65 self->on_error(ec);
66 }
67 });
68
69 // Begin reading
70 socket_->start_read();
71 }

◆ stop_session()

auto kcenon::network::session::messaging_session::stop_session ( ) -> void

Stops the session by closing the socket and marking the session as inactive.

Definition at line 73 of file messaging_session.cpp.

74 {
75 if (is_stopped_.exchange(true))
76 {
77 return;
78 }
79 // Close socket safely using atomic close() method
80 // This prevents data races between close and async read operations
81 if (socket_)
82 {
83 socket_->close();
84 }
85
86 // Invoke disconnection callback if set
87 {
88 std::lock_guard<std::mutex> lock(callback_mutex_);
90 {
91 try
92 {
94 }
95 catch (...)
96 {
97 // Silently ignore exceptions - no logging during potential static destruction
98 }
99 }
100 }
101 }

Referenced by close(), and ~messaging_session().

Here is the caller graph for this function:

Member Data Documentation

◆ callback_mutex_

std::mutex kcenon::network::session::messaging_session::callback_mutex_
mutableprivate

Mutex protecting callback access.

Definition at line 287 of file messaging_session.h.

◆ disconnection_callback_

std::function<void(const std::string&)> kcenon::network::session::messaging_session::disconnection_callback_
private

Definition at line 281 of file messaging_session.h.

◆ error_callback_

std::function<void(std::error_code)> kcenon::network::session::messaging_session::error_callback_
private

Definition at line 282 of file messaging_session.h.

◆ is_stopped_

std::atomic<bool> kcenon::network::session::messaging_session::is_stopped_
private
Initial value:
{
false
}

Indicates whether this session is stopped.

Definition at line 255 of file messaging_session.h.

255 {
256 false
257 };

Referenced by is_connected(), and is_stopped().

◆ max_pending_messages_

size_t kcenon::network::session::messaging_session::max_pending_messages_ = 1000
staticconstexprprivate

Maximum number of pending messages before applying backpressure.

When this limit is reached, a warning is logged. If doubled, the session is disconnected.

Definition at line 275 of file messaging_session.h.

◆ pending_messages_

std::deque<std::vector<uint8_t> > kcenon::network::session::messaging_session::pending_messages_
private

Queue of pending received messages awaiting processing.

Definition at line 262 of file messaging_session.h.

◆ queue_mutex_

std::mutex kcenon::network::session::messaging_session::queue_mutex_
mutableprivate

Mutex protecting access to pending_messages_ queue.

Definition at line 267 of file messaging_session.h.

◆ receive_callback_

std::function<void(const std::vector<uint8_t>&)> kcenon::network::session::messaging_session::receive_callback_
private

Callbacks for session events.

Definition at line 280 of file messaging_session.h.

◆ server_id_

std::string kcenon::network::session::messaging_session::server_id_
private

Identifier for the server side.

Definition at line 250 of file messaging_session.h.

Referenced by id(), and server_id().

◆ socket_

std::shared_ptr<internal::tcp_socket> kcenon::network::session::messaging_session::socket_
private

The wrapped TCP socket for this session.

Definition at line 253 of file messaging_session.h.

Referenced by messaging_session().


The documentation for this class was generated from the following files: