Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
quic_connection_adapter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <cstring>
9#include <random>
10
11#ifdef _WIN32
12#include <WinSock2.h>
13#include <ws2tcpip.h>
14#pragma comment(lib, "ws2_32.lib")
15#else
16#include <arpa/inet.h>
17#include <fcntl.h>
18#include <netdb.h>
19#include <netinet/in.h>
20#include <poll.h>
21#include <sys/socket.h>
22#include <unistd.h>
23#endif
24
26
27namespace {
28
29auto generate_random_cid() -> protocols::quic::connection_id
30{
31 std::random_device rd;
32 std::mt19937 gen(rd());
33 // Use unsigned short instead of uint8_t for MSVC compatibility
34 // C++ standard only allows short, int, long, long long and their unsigned versions
35 std::uniform_int_distribution<unsigned short> dist(0, 255);
36
37 std::vector<uint8_t> cid_bytes(8);
38 for (auto& byte : cid_bytes) {
39 byte = static_cast<uint8_t>(dist(gen));
40 }
41
42 return protocols::quic::connection_id(cid_bytes);
43}
44
45auto close_socket(int fd) -> void
46{
47#ifdef _WIN32
48 closesocket(fd);
49#else
50 ::close(fd);
51#endif
52}
53
54auto set_nonblocking(int fd) -> bool
55{
56#ifdef _WIN32
57 u_long mode = 1;
58 return ioctlsocket(fd, FIONBIO, &mode) == 0;
59#else
60 int flags = fcntl(fd, F_GETFL, 0);
61 if (flags == -1) {
62 return false;
63 }
64 return fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
65#endif
66}
67
68} // namespace
69
71 const protocol::quic::quic_config& config,
72 std::string_view connection_id)
73 : connection_id_(connection_id)
74 , config_(config)
75{
76}
77
82
83auto quic_connection_adapter::send(std::span<const std::byte> data) -> VoidResult
84{
85 if (!is_connected_.load()) {
86 return error_void(
87 static_cast<int>(std::errc::not_connected),
88 "QUIC connection is not established"
89 );
90 }
91
92 if (!quic_conn_) {
93 return error_void(
94 static_cast<int>(std::errc::not_connected),
95 "QUIC connection object is null"
96 );
97 }
98
99 // Queue data for sending on the default stream (stream 0)
100 std::vector<uint8_t> buffer(data.size());
101 std::memcpy(buffer.data(), data.data(), data.size());
102
103 {
104 std::lock_guard lock(send_queue_mutex_);
105 send_queue_.push_back(std::move(buffer));
106 }
107
108 return ok();
109}
110
111auto quic_connection_adapter::send(std::vector<uint8_t>&& data) -> VoidResult
112{
113 if (!is_connected_.load()) {
114 return error_void(
115 static_cast<int>(std::errc::not_connected),
116 "QUIC connection is not established"
117 );
118 }
119
120 if (!quic_conn_) {
121 return error_void(
122 static_cast<int>(std::errc::not_connected),
123 "QUIC connection object is null"
124 );
125 }
126
127 {
128 std::lock_guard lock(send_queue_mutex_);
129 send_queue_.push_back(std::move(data));
130 }
131
132 return ok();
133}
134
135auto quic_connection_adapter::is_connected() const noexcept -> bool
136{
137 return is_connected_.load();
138}
139
140auto quic_connection_adapter::id() const noexcept -> std::string_view
141{
142 return connection_id_;
143}
144
146{
147 std::lock_guard lock(endpoint_mutex_);
148 return remote_endpoint_;
149}
150
152{
153 std::lock_guard lock(endpoint_mutex_);
154 return local_endpoint_;
155}
156
158{
159 if (is_connected_.load() || running_.load()) {
160 return error_void(
161 static_cast<int>(std::errc::already_connected),
162 "Already connected or connecting"
163 );
164 }
165
166 {
167 std::lock_guard lock(endpoint_mutex_);
168 remote_endpoint_ = endpoint;
169 }
170
171 // Create UDP socket
172#ifdef _WIN32
173 WSADATA wsaData;
174 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
175 return error_void(
176 static_cast<int>(std::errc::io_error),
177 "Failed to initialize Winsock"
178 );
179 }
180#endif
181
182 socket_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
183 if (socket_fd_ < 0) {
184 return error_void(
185 static_cast<int>(std::errc::io_error),
186 "Failed to create UDP socket"
187 );
188 }
189
190 if (!set_nonblocking(socket_fd_)) {
191 close_socket(socket_fd_);
192 socket_fd_ = -1;
193 return error_void(
194 static_cast<int>(std::errc::io_error),
195 "Failed to set socket to non-blocking mode"
196 );
197 }
198
199 // Create QUIC connection (client side)
200 auto initial_dcid = generate_random_cid();
201 quic_conn_ = std::make_unique<protocols::quic::connection>(false, initial_dcid);
202
203 // Set transport parameters from config
205 local_params.initial_max_data = config_.initial_max_data;
206 local_params.initial_max_stream_data_bidi_local = config_.initial_max_stream_data_bidi;
207 local_params.initial_max_stream_data_bidi_remote = config_.initial_max_stream_data_bidi;
208 local_params.initial_max_stream_data_uni = config_.initial_max_stream_data_uni;
209 local_params.initial_max_streams_bidi = config_.max_bidi_streams;
210 local_params.initial_max_streams_uni = config_.max_uni_streams;
211
212 if (config_.idle_timeout.count() > 0) {
213 local_params.max_idle_timeout =
214 static_cast<uint64_t>(config_.idle_timeout.count());
215 }
216
217 quic_conn_->set_local_params(local_params);
218
219 // Enable PMTUD if configured
220 if (config_.enable_pmtud) {
221 quic_conn_->enable_pmtud();
222 }
223
224 // Start handshake
225 std::string server_name = config_.server_name;
226 if (server_name.empty()) {
227 server_name = endpoint.host;
228 }
229
230 auto hs_result = quic_conn_->start_handshake(server_name);
231 if (!hs_result.is_ok()) {
232 quic_conn_.reset();
233 close_socket(socket_fd_);
234 socket_fd_ = -1;
235 return error_void(
236 static_cast<int>(std::errc::connection_refused),
237 "Failed to start QUIC handshake"
238 );
239 }
240
241 is_connecting_ = true;
242 running_ = true;
243 stop_requested_ = false;
244
245 // Start I/O thread
246 io_thread_ = std::thread(&quic_connection_adapter::io_thread_func, this);
247
248 return ok();
249}
250
251auto quic_connection_adapter::connect(std::string_view url) -> VoidResult
252{
253 auto [host, port] = parse_url(url);
254 if (host.empty() || port == 0) {
255 return error_void(
256 static_cast<int>(std::errc::invalid_argument),
257 "Invalid QUIC URL format (expected: quic://host:port or host:port)"
258 );
259 }
260
261 return connect(endpoint_info{host, port});
262}
263
264auto quic_connection_adapter::close() noexcept -> void
265{
266 stop_requested_ = true;
267
268 if (quic_conn_ && quic_conn_->is_established()) {
269 (void)quic_conn_->close(0, "Connection closed by client");
270 }
271
272 if (io_thread_.joinable()) {
273 io_thread_.join();
274 }
275
276 if (socket_fd_ >= 0) {
277 close_socket(socket_fd_);
278 socket_fd_ = -1;
279 }
280
281 quic_conn_.reset();
282 is_connected_ = false;
283 is_connecting_ = false;
284 running_ = false;
285
286 {
287 std::lock_guard lock(stop_mutex_);
288 }
289 stop_cv_.notify_all();
290}
291
293{
294 std::lock_guard lock(callbacks_mutex_);
295 callbacks_ = std::move(callbacks);
296}
297
299{
300 options_ = options;
301}
302
303auto quic_connection_adapter::set_timeout(std::chrono::milliseconds timeout) -> void
304{
305 options_.connect_timeout = timeout;
306}
307
308auto quic_connection_adapter::is_connecting() const noexcept -> bool
309{
310 return is_connecting_.load();
311}
312
314{
315 std::unique_lock lock(stop_mutex_);
316 stop_cv_.wait(lock, [this] { return !running_.load(); });
317}
318
320{
321 constexpr int POLL_TIMEOUT_MS = 10;
322 constexpr size_t MAX_PACKET_SIZE = 65535;
323
324 std::vector<uint8_t> recv_buffer(MAX_PACKET_SIZE);
325 sockaddr_in remote_addr{};
326 remote_addr.sin_family = AF_INET;
327
328 // Resolve remote address
329 {
330 std::lock_guard lock(endpoint_mutex_);
331 remote_addr.sin_port = htons(remote_endpoint_.port);
332
333 struct addrinfo hints{}, *result = nullptr;
334 hints.ai_family = AF_INET;
335 hints.ai_socktype = SOCK_DGRAM;
336
337 if (getaddrinfo(remote_endpoint_.host.c_str(), nullptr, &hints, &result) == 0
338 && result) {
339 remote_addr.sin_addr =
340 reinterpret_cast<sockaddr_in*>(result->ai_addr)->sin_addr;
341 freeaddrinfo(result);
342 } else {
343 inet_pton(AF_INET, remote_endpoint_.host.c_str(), &remote_addr.sin_addr);
344 }
345 }
346
347 while (!stop_requested_.load()) {
348 // Poll for incoming data
349#ifdef _WIN32
350 WSAPOLLFD pfd{};
351 pfd.fd = socket_fd_;
352 pfd.events = POLLIN;
353
354 int poll_result = WSAPoll(&pfd, 1, POLL_TIMEOUT_MS);
355#else
356 pollfd pfd{};
357 pfd.fd = socket_fd_;
358 pfd.events = POLLIN;
359
360 int poll_result = poll(&pfd, 1, POLL_TIMEOUT_MS);
361#endif
362
363 if (poll_result > 0 && (pfd.revents & POLLIN)) {
364 // Receive incoming packet
365 sockaddr_in from_addr{};
366 socklen_t from_len = sizeof(from_addr);
367
368 auto received = recvfrom(socket_fd_, reinterpret_cast<char*>(recv_buffer.data()),
369 static_cast<int>(recv_buffer.size()), 0,
370 reinterpret_cast<sockaddr*>(&from_addr), &from_len);
371
372 if (received > 0) {
373 std::span<const uint8_t> packet_data(recv_buffer.data(),
374 static_cast<size_t>(received));
375 auto result = quic_conn_->receive_packet(packet_data);
376 (void)result;
377
378 // Check for state changes
379 handle_state_change();
380 }
381 }
382
383 // Process outgoing data
384 process_outgoing();
385
386 // Check for timeouts
387 if (quic_conn_) {
388 auto timeout = quic_conn_->next_timeout();
389 if (timeout && std::chrono::steady_clock::now() >= *timeout) {
390 quic_conn_->on_timeout();
391 }
392 }
393
394 // Handle state changes
395 handle_state_change();
396
397 // Generate and send QUIC packets
398 if (quic_conn_) {
399 auto packets = quic_conn_->generate_packets();
400 for (const auto& packet : packets) {
401 sendto(socket_fd_, reinterpret_cast<const char*>(packet.data()),
402 static_cast<int>(packet.size()), 0,
403 reinterpret_cast<const sockaddr*>(&remote_addr),
404 sizeof(remote_addr));
405 }
406 }
407
408 // Check if connection is closed
409 if (quic_conn_ && quic_conn_->is_closed()) {
410 break;
411 }
412 }
413
414 running_ = false;
415 is_connected_ = false;
416 is_connecting_ = false;
417
418 // Notify callbacks
419 {
420 std::lock_guard lock(callbacks_mutex_);
421 if (callbacks_.on_disconnected) {
422 callbacks_.on_disconnected();
423 }
424 }
425
426 {
427 std::lock_guard lock(stop_mutex_);
428 }
429 stop_cv_.notify_all();
430}
431
433{
434 if (!quic_conn_ || !quic_conn_->is_established()) {
435 return;
436 }
437
438 std::deque<std::vector<uint8_t>> data_to_send;
439 {
440 std::lock_guard lock(send_queue_mutex_);
441 data_to_send.swap(send_queue_);
442 }
443
444 for (auto& data : data_to_send) {
445 // Get or create bidirectional stream
446 auto& stream_mgr = quic_conn_->streams();
447 auto stream_result = stream_mgr.create_bidirectional_stream();
448
449 if (stream_result.is_ok()) {
450 uint64_t stream_id = stream_result.value();
451 auto* stream = stream_mgr.get_stream(stream_id);
452 if (stream) {
453 (void)stream->write(data);
454 }
455 }
456 }
457}
458
460{
461 if (!quic_conn_ || !quic_conn_->is_established()) {
462 return;
463 }
464
465 auto& stream_mgr = quic_conn_->streams();
466
467 // Read data from all readable streams
468 for (uint64_t stream_id = 0; stream_id < 1000; stream_id += 4) {
469 auto* stream = stream_mgr.get_stream(stream_id);
470 if (stream && stream->has_data()) {
471 std::vector<uint8_t> buffer(4096);
472 auto read_result = stream->read(buffer);
473 if (read_result.is_ok() && read_result.value() > 0) {
474 buffer.resize(read_result.value());
475
476 std::lock_guard lock(callbacks_mutex_);
477 if (callbacks_.on_data) {
478 std::span<const std::byte> byte_span(
479 reinterpret_cast<const std::byte*>(buffer.data()),
480 buffer.size()
481 );
482 callbacks_.on_data(byte_span);
483 }
484 }
485 }
486 }
487}
488
490{
491 if (!quic_conn_) {
492 return;
493 }
494
495 bool was_connecting = is_connecting_.load();
496 bool is_now_established = quic_conn_->is_established();
497
498 if (was_connecting && is_now_established) {
499 is_connecting_ = false;
500 is_connected_ = true;
501
502 std::lock_guard lock(callbacks_mutex_);
503 if (callbacks_.on_connected) {
504 callbacks_.on_connected();
505 }
506 }
507
508 if (quic_conn_->is_draining() || quic_conn_->is_closed()) {
509 is_connected_ = false;
510
511 auto error_code = quic_conn_->close_error_code();
512 if (error_code && *error_code != 0) {
513 std::lock_guard lock(callbacks_mutex_);
514 if (callbacks_.on_error) {
515 callbacks_.on_error(std::make_error_code(std::errc::connection_reset));
516 }
517 }
518 }
519
520 // Process incoming data
521 process_incoming();
522}
523
524auto quic_connection_adapter::parse_url(std::string_view url)
525 -> std::pair<std::string, uint16_t>
526{
527 std::string url_str(url);
528
529 // Remove quic:// prefix if present
530 const std::string quic_prefix = "quic://";
531 if (url_str.substr(0, quic_prefix.size()) == quic_prefix) {
532 url_str = url_str.substr(quic_prefix.size());
533 }
534
535 // Find the last colon for port separation
536 auto colon_pos = url_str.rfind(':');
537 if (colon_pos == std::string::npos) {
538 return {"", 0};
539 }
540
541 std::string host = url_str.substr(0, colon_pos);
542 std::string port_str = url_str.substr(colon_pos + 1);
543
544 uint16_t port = 0;
545 try {
546 port = static_cast<uint16_t>(std::stoi(port_str));
547 } catch (...) {
548 return {"", 0};
549 }
550
551 return {host, port};
552}
553
554} // namespace kcenon::network::unified::adapters
auto local_endpoint() const noexcept -> endpoint_info override
Gets the local endpoint information.
auto connect(const endpoint_info &endpoint) -> VoidResult override
Connects to a remote endpoint using host/port.
auto is_connected() const noexcept -> bool override
Checks if the transport is currently connected.
auto set_timeout(std::chrono::milliseconds timeout) -> void override
Sets the connection timeout.
auto close() noexcept -> void override
Closes the connection gracefully.
~quic_connection_adapter() override
Destructor ensures proper cleanup.
auto parse_url(std::string_view url) -> std::pair< std::string, uint16_t >
Parse URL to extract host and port.
auto remote_endpoint() const noexcept -> endpoint_info override
Gets the remote endpoint information.
auto is_connecting() const noexcept -> bool override
Checks if the connection is in the process of connecting.
std::unique_ptr< protocols::quic::connection > quic_conn_
auto io_thread_func() -> void
I/O processing thread function.
auto set_options(connection_options options) -> void override
Sets connection options.
auto wait_for_stop() -> void override
Blocks until the component has stopped.
auto handle_state_change() -> void
Handle connection state changes.
auto set_callbacks(connection_callbacks callbacks) -> void override
Sets all connection callbacks at once.
auto send(std::span< const std::byte > data) -> VoidResult override
Sends raw data to the remote endpoint.
auto id() const noexcept -> std::string_view override
Gets the unique identifier for this transport/connection.
quic_connection_adapter(const protocol::quic::quic_config &config, std::string_view connection_id)
Constructs an adapter with QUIC configuration.
tracing_config config
Definition exporters.cpp:29
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Configuration options for QUIC connections.
Definition quic.h:45
QUIC transport parameters (RFC 9000 Section 18)
uint64_t max_idle_timeout
Maximum idle timeout in milliseconds (0 = disabled)
uint64_t initial_max_data
Initial maximum data for connection (default: 0)
uint64_t initial_max_streams_uni
Initial maximum unidirectional streams.
uint64_t initial_max_streams_bidi
Initial maximum bidirectional streams.
uint64_t initial_max_stream_data_bidi_remote
Initial maximum data for remotely-initiated bidirectional streams.
uint64_t initial_max_stream_data_bidi_local
Initial maximum data for locally-initiated bidirectional streams.
uint64_t initial_max_stream_data_uni
Initial maximum data for unidirectional streams.
Callback functions for connection events.
Definition types.h:154
Configuration options for connections.
Definition types.h:198
Network endpoint information (host/port or URL)
Definition types.h:56