Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_session.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <span>
8
12
13// Use nested namespace definition (C++17)
15{
16
17 // Use string_view in constructor for efficiency (C++17)
18 messaging_session::messaging_session(asio::ip::tcp::socket socket,
19 std::string_view server_id)
20 : server_id_(server_id)
21 {
22 // Create the tcp_socket wrapper
23 socket_ = std::make_shared<internal::tcp_socket>(std::move(socket));
24 }
25
27 {
28 try
29 {
30 // Call stop_session to clean up resources
32 }
33 catch (...)
34 {
35 // Destructor must not throw - swallow all exceptions
36 }
37 }
38
40 {
41 // NOTE: No logging in session methods to prevent heap corruption during
42 // static destruction. These methods may run in async handlers when
43 // GlobalLoggerRegistry is already destroyed.
44
45 if (is_stopped_.load())
46 {
47 return;
48 }
49
50 // Set up callbacks with weak_ptr to avoid circular reference
51 // Use span-based callback for zero-copy receive path (no per-read allocation)
52 auto weak_self = weak_from_this();
53 socket_->set_receive_callback_view(
54 [weak_self](std::span<const uint8_t> data)
55 {
56 if (auto self = weak_self.lock())
57 {
58 self->on_receive(data);
59 }
60 });
61 socket_->set_error_callback([weak_self](std::error_code ec)
62 {
63 if (auto self = weak_self.lock())
64 {
65 self->on_error(ec);
66 }
67 });
68
69 // Begin reading
70 socket_->start_read();
71 }
72
74 {
75 if (is_stopped_.exchange(true))
76 {
77 return;
78 }
79 // Close socket safely using atomic close() method
80 // This prevents data races between close and async read operations
81 if (socket_)
82 {
83 socket_->close();
84 }
85
86 // Invoke disconnection callback if set
87 {
88 std::lock_guard<std::mutex> lock(callback_mutex_);
89 if (disconnection_callback_)
90 {
91 try
92 {
93 disconnection_callback_(server_id_);
94 }
95 catch (...)
96 {
97 // Silently ignore exceptions - no logging during potential static destruction
98 }
99 }
100 }
101 }
102
103 auto messaging_session::send_packet(std::vector<uint8_t>&& data) -> void
104 {
105 if (is_stopped_.load())
106 {
107 return;
108 }
109
110 // Report bytes sent metric
112
113 // Send data directly without pipeline transformation
114 // NOTE: No logging in async callbacks to prevent heap corruption during static destruction
115 socket_->async_send(std::move(data),
116 [](std::error_code, std::size_t) {
117 // Silently ignore errors - no logging during potential static destruction
118 });
119 }
120
121 auto messaging_session::send(std::vector<uint8_t>&& data) -> VoidResult
122 {
123 if (is_stopped_.load())
124 {
125 return error_void(
127 "Session is closed",
128 "messaging_session");
129 }
130
131 if (data.empty())
132 {
133 return error_void(
135 "Data cannot be empty",
136 "messaging_session");
137 }
138
139 // Delegate to send_packet for actual sending
140 send_packet(std::move(data));
141
142 return ok();
143 }
144
145 auto messaging_session::on_receive(std::span<const uint8_t> data) -> void
146 {
147 // NOTE: No logging in async handlers to prevent heap corruption during
148 // static destruction.
149
150 if (is_stopped_.load())
151 {
152 return;
153 }
154
155 // Report bytes received metric
157
158 // Check queue size before adding
159 {
160 std::lock_guard<std::mutex> lock(queue_mutex_);
161 size_t queue_size = pending_messages_.size();
162
163 // Apply backpressure if queue is getting full
164 // If queue is severely overflowing, disconnect the session
165 if (queue_size >= max_pending_messages_ * 2)
166 {
167 stop_session();
168 return;
169 }
170
171 // Copy span data into vector only when adding to queue (single copy point)
172 // This is the only allocation in the receive path
173 pending_messages_.emplace_back(data.begin(), data.end());
174 }
175
176 // Process messages from queue
177 process_next_message();
178 }
179
180 auto messaging_session::on_error(std::error_code ec) -> void
181 {
182 // NOTE: No logging in async handlers to prevent heap corruption during
183 // static destruction.
184
185 // Invoke error callback if set
186 {
187 std::lock_guard<std::mutex> lock(callback_mutex_);
188 if (error_callback_)
189 {
190 try
191 {
192 error_callback_(ec);
193 }
194 catch (...)
195 {
196 // Silently ignore exceptions - no logging during potential static destruction
197 }
198 }
199 }
200
201 stop_session();
202 }
203
205 {
206 // NOTE: No logging in message processing to prevent heap corruption during
207 // static destruction.
208
209 std::vector<uint8_t> message;
210
211 // Dequeue one message
212 {
213 std::lock_guard<std::mutex> lock(queue_mutex_);
214 if (pending_messages_.empty())
215 {
216 return;
217 }
218
219 message = std::move(pending_messages_.front());
220 pending_messages_.pop_front();
221 }
222
223 // Invoke receive callback if set
224 {
225 std::lock_guard<std::mutex> lock(callback_mutex_);
226 if (receive_callback_)
227 {
228 try
229 {
230 receive_callback_(message);
231 }
232 catch (...)
233 {
234 // Silently ignore exceptions - no logging during potential static destruction
235 }
236 }
237 }
238
239 // If there are more messages, process them asynchronously
240 // to avoid blocking the receive thread
241 std::lock_guard<std::mutex> lock(queue_mutex_);
242 if (!pending_messages_.empty())
243 {
244 // In a production system, you might want to post this to a thread pool
245 // or use a work queue to process messages asynchronously
246 // For now, we just process one message per call
247 }
248 }
249
251 std::function<void(const std::vector<uint8_t>&)> callback) -> void
252 {
253 std::lock_guard<std::mutex> lock(callback_mutex_);
254 receive_callback_ = std::move(callback);
255 }
256
258 std::function<void(const std::string&)> callback) -> void
259 {
260 std::lock_guard<std::mutex> lock(callback_mutex_);
261 disconnection_callback_ = std::move(callback);
262 }
263
265 std::function<void(std::error_code)> callback) -> void
266 {
267 std::lock_guard<std::mutex> lock(callback_mutex_);
268 error_callback_ = std::move(callback);
269 }
270
271} // namespace kcenon::network::session
static void report_bytes_received(size_t bytes)
Report bytes received.
static void report_bytes_sent(size_t bytes)
Report bytes sent.
auto start_session() -> void
Starts the session: sets up read/error callbacks and begins reading data.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data to the client.
auto process_next_message() -> void
Processes pending messages from the queue.
auto send_packet(std::vector< uint8_t > &&data) -> void
Sends data to the connected client, optionally using compression/encryption.
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Sets the callback for received data.
auto on_error(std::error_code ec) -> void
Callback for handling socket errors from tcp_socket.
~messaging_session() noexcept override
Destructor; calls stop_session() if not already stopped.
auto on_receive(std::span< const uint8_t > data) -> void
Callback for when data arrives from the client.
messaging_session(asio::ip::tcp::socket socket, std::string_view server_id)
Constructs a session with a given socket and server_id.
auto set_disconnection_callback(std::function< void(const std::string &)> callback) -> void
Sets the callback for disconnection.
auto stop_session() -> void
Stops the session by closing the socket and marking the session as inactive.
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
Sets the callback for errors.
std::shared_ptr< internal::tcp_socket > socket_
Logger system integration interface for network_system.
Messaging session managing bidirectional message exchange.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Network system metrics definitions and reporting utilities.