Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
quic_session.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
10
12{
13
14 quic_session::quic_session(std::shared_ptr<internal::quic_socket> socket,
15 std::string_view session_id)
16 : session_id_(session_id), socket_(std::move(socket))
17 {
18 NETWORK_LOG_DEBUG("[quic_session] Created session: " + session_id_);
19 }
20
22 {
23 try
24 {
25 if (is_active_.load())
26 {
27 auto result = close(0);
28 (void)result; // Ignore result in destructor
29 }
30 }
31 catch (...)
32 {
33 // Destructor must not throw
34 }
35 }
36
37 auto quic_session::session_id() const -> const std::string&
38 {
39 return session_id_;
40 }
41
42 auto quic_session::remote_endpoint() const -> asio::ip::udp::endpoint
43 {
44 std::lock_guard<std::mutex> lock(socket_mutex_);
45 if (socket_)
46 {
47 return socket_->remote_endpoint();
48 }
49 return {};
50 }
51
52 auto quic_session::is_active() const noexcept -> bool
53 {
54 return is_active_.load(std::memory_order_relaxed);
55 }
56
57 auto quic_session::send(std::vector<uint8_t>&& data) -> VoidResult
58 {
59 if (!is_active_.load())
60 {
62 "Session is not active",
63 "quic_session");
64 }
65
66 std::lock_guard<std::mutex> lock(socket_mutex_);
67 if (!socket_)
68 {
70 "Socket is null",
71 "quic_session");
72 }
73
74 // Report bytes sent metric
76
77 return socket_->send_stream_data(default_stream_id_, std::move(data), false);
78 }
79
80 auto quic_session::send(std::string_view data) -> VoidResult
81 {
82 std::vector<uint8_t> bytes(data.begin(), data.end());
83 return send(std::move(bytes));
84 }
85
86 auto quic_session::send_on_stream(uint64_t stream_id,
87 std::vector<uint8_t>&& data,
88 bool fin) -> VoidResult
89 {
90 if (!is_active_.load())
91 {
93 "Session is not active",
94 "quic_session");
95 }
96
97 std::lock_guard<std::mutex> lock(socket_mutex_);
98 if (!socket_)
99 {
101 "Socket is null",
102 "quic_session");
103 }
104
106
107 return socket_->send_stream_data(stream_id, std::move(data), fin);
108 }
109
111 {
112 if (!is_active_.load())
113 {
115 "Session is not active",
116 "quic_session");
117 }
118
119 std::lock_guard<std::mutex> lock(socket_mutex_);
120 if (!socket_)
121 {
123 "Socket is null",
124 "quic_session");
125 }
126
127 return socket_->create_stream(false);
128 }
129
131 {
132 if (!is_active_.load())
133 {
135 "Session is not active",
136 "quic_session");
137 }
138
139 std::lock_guard<std::mutex> lock(socket_mutex_);
140 if (!socket_)
141 {
143 "Socket is null",
144 "quic_session");
145 }
146
147 return socket_->create_stream(true);
148 }
149
150 auto quic_session::close_stream(uint64_t stream_id) -> VoidResult
151 {
152 if (!is_active_.load())
153 {
155 "Session is not active",
156 "quic_session");
157 }
158
159 std::lock_guard<std::mutex> lock(socket_mutex_);
160 if (!socket_)
161 {
163 "Socket is null",
164 "quic_session");
165 }
166
167 return socket_->close_stream(stream_id);
168 }
169
170 auto quic_session::close(uint64_t error_code) -> VoidResult
171 {
172 if (!is_active_.exchange(false))
173 {
174 return ok();
175 }
176
177 NETWORK_LOG_INFO("[quic_session] Closing session: " + session_id_);
178
179 VoidResult result = ok();
180
181 {
182 std::lock_guard<std::mutex> lock(socket_mutex_);
183 if (socket_)
184 {
185 result = socket_->close(error_code, "Session closed");
186 }
187 }
188
189 // Invoke close callback
190 {
191 std::lock_guard<std::mutex> lock(callback_mutex_);
192 if (close_callback_)
193 {
194 try
195 {
196 close_callback_();
197 }
198 catch (const std::exception& e)
199 {
200 NETWORK_LOG_ERROR("[quic_session] Exception in close callback: "
201 + std::string(e.what()));
202 }
203 }
204 }
205
206 return result;
207 }
208
209 auto quic_session::stats() const -> core::quic_connection_stats
210 {
211 // Return empty stats for now - will be implemented with actual QUIC stats
213 }
214
216 std::function<void(const std::vector<uint8_t>&)> callback) -> void
217 {
218 std::lock_guard<std::mutex> lock(callback_mutex_);
219 receive_callback_ = std::move(callback);
220 }
221
223 std::function<void(uint64_t, const std::vector<uint8_t>&, bool)> callback)
224 -> void
225 {
226 std::lock_guard<std::mutex> lock(callback_mutex_);
227 stream_receive_callback_ = std::move(callback);
228 }
229
230 auto quic_session::set_close_callback(std::function<void()> callback) -> void
231 {
232 std::lock_guard<std::mutex> lock(callback_mutex_);
233 close_callback_ = std::move(callback);
234 }
235
237 {
238 if (!is_active_.load())
239 {
240 return;
241 }
242
243 auto weak_self = weak_from_this();
244
245 std::lock_guard<std::mutex> lock(socket_mutex_);
246 if (!socket_)
247 {
248 return;
249 }
250
251 // Set up stream data callback
252 socket_->set_stream_data_callback(
253 [weak_self](uint64_t stream_id, std::span<const uint8_t> data, bool fin)
254 {
255 if (auto self = weak_self.lock())
256 {
257 self->on_stream_data(stream_id, data, fin);
258 }
259 });
260
261 // Set up error callback
262 socket_->set_error_callback([weak_self](std::error_code ec)
263 {
264 if (auto self = weak_self.lock())
265 {
266 self->on_error(ec);
267 }
268 });
269
270 // Set up close callback
271 socket_->set_close_callback(
272 [weak_self](uint64_t error_code, const std::string& reason)
273 {
274 if (auto self = weak_self.lock())
275 {
276 self->on_close(error_code, reason);
277 }
278 });
279
280 NETWORK_LOG_INFO("[quic_session] Started session: " + session_id_);
281 }
282
283 auto quic_session::handle_packet(std::span<const uint8_t> data) -> void
284 {
285 // This is called by the server when a packet arrives for this session
286 // The actual packet handling is done by quic_socket
287 std::lock_guard<std::mutex> lock(socket_mutex_);
288 if (socket_)
289 {
290 // The socket will process the packet internally
291 // No direct call needed as socket handles its own receive loop
292 }
293 }
294
296 const protocols::quic::connection_id& conn_id) const -> bool
297 {
298 std::lock_guard<std::mutex> lock(socket_mutex_);
299 if (!socket_)
300 {
301 return false;
302 }
303 return socket_->local_connection_id() == conn_id ||
304 socket_->remote_connection_id() == conn_id;
305 }
306
307 auto quic_session::on_stream_data(uint64_t stream_id,
308 std::span<const uint8_t> data,
309 bool fin) -> void
310 {
311 if (!is_active_.load())
312 {
313 return;
314 }
315
316 // Report bytes received metric
318
319 NETWORK_LOG_DEBUG("[quic_session] Received " + std::to_string(data.size())
320 + " bytes on stream " + std::to_string(stream_id));
321
322 std::vector<uint8_t> data_copy(data.begin(), data.end());
323
324 std::lock_guard<std::mutex> lock(callback_mutex_);
325
326 // Call stream-specific callback if set
327 if (stream_receive_callback_)
328 {
329 try
330 {
331 stream_receive_callback_(stream_id, data_copy, fin);
332 }
333 catch (const std::exception& e)
334 {
336 "[quic_session] Exception in stream receive callback: "
337 + std::string(e.what()));
338 }
339 }
340
341 // Also call default receive callback for default stream
342 if (stream_id == default_stream_id_ && receive_callback_)
343 {
344 try
345 {
346 receive_callback_(data_copy);
347 }
348 catch (const std::exception& e)
349 {
350 NETWORK_LOG_ERROR("[quic_session] Exception in receive callback: "
351 + std::string(e.what()));
352 }
353 }
354 }
355
356 auto quic_session::on_error(std::error_code ec) -> void
357 {
358 NETWORK_LOG_ERROR("[quic_session] Error on session " + session_id_ + ": "
359 + ec.message());
360
361 // Close the session on error
362 auto result = close(1);
363 (void)result;
364 }
365
366 auto quic_session::on_close(uint64_t error_code, const std::string& reason)
367 -> void
368 {
369 NETWORK_LOG_INFO("[quic_session] Session " + session_id_ + " closed: "
370 + reason + " (code: " + std::to_string(error_code) + ")");
371
372 is_active_.store(false);
373
374 // Invoke close callback
375 {
376 std::lock_guard<std::mutex> lock(callback_mutex_);
377 if (close_callback_)
378 {
379 try
380 {
381 close_callback_();
382 }
383 catch (const std::exception& e)
384 {
385 NETWORK_LOG_ERROR("[quic_session] Exception in close callback: "
386 + std::string(e.what()));
387 }
388 }
389 }
390 }
391
392} // namespace kcenon::network::session
static void report_bytes_received(size_t bytes)
Report bytes received.
static void report_bytes_sent(size_t bytes)
Report bytes sent.
QUIC Connection ID (RFC 9000 Section 5.1)
auto close() -> void override
Closes the session (interface version).
auto close_stream(uint64_t stream_id) -> VoidResult override
Closes a stream (interface version).
auto matches_connection_id(const protocols::quic::connection_id &conn_id) const -> bool
Check if this session matches a connection ID.
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Set callback for received data on the default stream.
auto on_error(std::error_code ec) -> void
auto handle_packet(std::span< const uint8_t > data) -> void
Handle incoming packet (called by server).
auto create_stream() -> Result< uint64_t > override
Creates a new bidirectional stream (interface version).
auto set_stream_receive_callback(std::function< void(uint64_t, const std::vector< uint8_t > &, bool)> callback) -> void
Set callback for stream data reception.
quic_session(std::shared_ptr< internal::quic_socket > socket, std::string_view session_id)
Constructs a QUIC session with an existing socket.
auto send_on_stream(uint64_t stream_id, std::vector< uint8_t > &&data, bool fin=false) -> VoidResult override
Sends data on a specific stream (interface version).
~quic_session() noexcept
Destructor; closes the session if still active.
auto create_unidirectional_stream() -> Result< uint64_t > override
Creates a new unidirectional stream (interface version).
auto on_stream_data(uint64_t stream_id, std::span< const uint8_t > data, bool fin) -> void
auto session_id() const -> const std::string &
Get the unique session identifier.
auto start_session() -> void
Start receiving data (called by server).
auto remote_endpoint() const -> asio::ip::udp::endpoint
Get the remote endpoint address.
auto is_active() const noexcept -> bool
Check if the session is currently active.
auto send(std::string_view data) -> VoidResult
Send string data on the default stream.
std::shared_ptr< internal::quic_socket > socket_
auto stats() const -> core::quic_connection_stats
Get connection statistics.
auto set_close_callback(std::function< void()> callback) -> void
Set callback when session closes.
auto on_close(uint64_t error_code, const std::string &reason) -> void
Logger system integration interface for network_system.
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Network system metrics definitions and reporting utilities.
QUIC-specific session with stream multiplexing.
Statistics for a QUIC connection.
Definition quic_client.h:97