Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
quic_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#define NETWORK_USE_EXPERIMENTAL
12
14{
15
17 : client_id_(client_id)
18{
19}
20
22{
24 {
25 (void)stop_client(); // Ignore result in destructor
26 }
27}
28
29auto messaging_quic_client::client_id() const -> const std::string&
30{
31 return client_id_;
32}
33
35{
36 return lifecycle_.is_running();
37}
38
40{
41 lifecycle_.wait_for_stop();
42}
43
44auto messaging_quic_client::start_client(std::string_view host,
45 unsigned short port) -> VoidResult
46{
47 // Use default config
48 return start_client(host, port, quic_client_config{});
49}
50
51auto messaging_quic_client::start_client(std::string_view host,
52 unsigned short port,
53 const quic_client_config& config) -> VoidResult
54{
55 // Create tracing span for client start operation
56 auto span = tracing::is_tracing_enabled()
57 ? std::make_optional(tracing::trace_context::create_span("quic.client.start"))
58 : std::nullopt;
59 if (span)
60 {
61 span->set_attribute("net.peer.name", host)
62 .set_attribute("net.peer.port", static_cast<int64_t>(port))
63 .set_attribute("net.transport", "quic")
64 .set_attribute("client.id", client_id_);
65 }
66
67 if (lifecycle_.is_running())
68 {
69 if (span)
70 {
71 span->set_error("QUIC client is already running");
72 }
73 return error_void(
75 "QUIC client is already running",
76 "messaging_quic_client::start_client");
77 }
78
79 if (host.empty())
80 {
81 if (span)
82 {
83 span->set_error("Host cannot be empty");
84 }
85 return error_void(
87 "Host cannot be empty",
88 "messaging_quic_client::start_client");
89 }
90
91 config_ = config;
92 lifecycle_.set_running();
93 is_connected_.store(false);
94
95 auto result = do_start_impl(host, port);
96 if (result.is_err())
97 {
98 lifecycle_.mark_stopped();
99 if (span)
100 {
101 span->set_error(result.error().message);
102 }
103 }
104 else
105 {
106 if (span)
107 {
108 span->set_status(tracing::span_status::ok);
109 }
110 }
111
112 return result;
113}
114
116{
117 if (!lifecycle_.prepare_stop())
118 {
119 return ok(); // Already stopped or not running
120 }
121
122 is_connected_.store(false);
123 auto result = do_stop_impl();
124 lifecycle_.mark_stopped();
125
126 return result;
127}
128
129auto messaging_quic_client::do_start_impl(std::string_view host,
130 unsigned short port) -> VoidResult
131{
132 try
133 {
134 {
135 std::lock_guard<std::mutex> lock(socket_mutex_);
136 socket_.reset();
137 }
138
139 io_context_ = std::make_unique<asio::io_context>();
140 work_guard_ = std::make_unique<
141 asio::executor_work_guard<asio::io_context::executor_type>>(
142 asio::make_work_guard(*io_context_));
143
145 if (!thread_pool_)
146 {
147 thread_pool_ = std::make_shared<integration::basic_thread_pool>(2);
148 }
149
150 io_context_future_ = thread_pool_->submit([this]() {
151 try
152 {
154 "[messaging_quic_client] Starting io_context on thread pool");
155 io_context_->run();
156 NETWORK_LOG_INFO("[messaging_quic_client] io_context stopped");
157 }
158 catch (const std::exception& e)
159 {
160 NETWORK_LOG_ERROR("[messaging_quic_client] Exception in io_context: " +
161 std::string(e.what()));
162 }
163 });
164
165 do_connect(host, port);
166
167 NETWORK_LOG_INFO("[messaging_quic_client] started. ID=" + client_id_ +
168 " target=" + std::string(host) + ":" +
169 std::to_string(port));
170
171 return ok();
172 }
173 catch (const std::exception& e)
174 {
175 return error_void(
177 "Failed to start client: " + std::string(e.what()),
178 "messaging_quic_client::do_start_impl",
179 "Client ID: " + client_id_ + ", Host: " + std::string(host));
180 }
181}
182
184{
185 try
186 {
187 std::shared_ptr<internal::quic_socket> local_socket;
188 {
189 std::lock_guard<std::mutex> lock(socket_mutex_);
190 local_socket = std::move(socket_);
191 }
192 if (local_socket)
193 {
194 local_socket->stop_receive();
195 auto close_result = local_socket->close();
196 if (close_result.is_err())
197 {
198 NETWORK_LOG_WARN("[messaging_quic_client] Close error: " +
199 close_result.error().message);
200 }
201 }
202
203 if (work_guard_)
204 {
205 work_guard_.reset();
206 }
207 if (io_context_)
208 {
209 io_context_->stop();
210 }
211 if (io_context_future_.valid())
212 {
213 io_context_future_.wait();
214 }
215 thread_pool_.reset();
216 io_context_.reset();
217
218 handshake_complete_.store(false);
219
220 NETWORK_LOG_INFO("[messaging_quic_client] stopped.");
221 return ok();
222 }
223 catch (const std::exception& e)
224 {
225 return error_void(
227 "Failed to stop client: " + std::string(e.what()),
228 "messaging_quic_client::do_stop_impl",
229 "Client ID: " + client_id_);
230 }
231}
232
234{
235 return is_connected_.load(std::memory_order_acquire);
236}
237
239{
240 auto local_socket = get_socket();
241 if (!local_socket)
242 {
243 return false;
244 }
245 return local_socket->is_handshake_complete();
246}
247
248auto messaging_quic_client::send_packet(std::vector<uint8_t>&& data) -> VoidResult
249{
250 // Create tracing span for send operation
251 auto span = tracing::is_tracing_enabled()
252 ? std::make_optional(tracing::trace_context::create_span("quic.client.send"))
253 : std::nullopt;
254 if (span)
255 {
256 span->set_attribute("net.transport", "quic")
257 .set_attribute("message.size", static_cast<int64_t>(data.size()))
258 .set_attribute("quic.stream_id", static_cast<int64_t>(default_stream_id_))
259 .set_attribute("client.id", client_id_);
260 }
261
262 if (!is_running())
263 {
264 if (span)
265 {
266 span->set_error("Client is not running");
267 }
268 return error_void(
270 "Client is not running",
271 "messaging_quic_client::send_packet",
272 "Client ID: " + client_id_);
273 }
274
275 if (data.empty())
276 {
277 if (span)
278 {
279 span->set_error("Data cannot be empty");
280 }
281 return error_void(
283 "Data cannot be empty",
284 "messaging_quic_client::send_packet",
285 "Client ID: " + client_id_);
286 }
287
288 auto local_socket = get_socket();
289 if (!is_connected() || !local_socket)
290 {
291 if (span)
292 {
293 span->set_error("Client is not connected");
294 }
295 return error_void(
297 "Client is not connected",
298 "messaging_quic_client::send_packet",
299 "Client ID: " + client_id_);
300 }
301
302 auto result = local_socket->send_stream_data(default_stream_id_, std::move(data));
303 if (span)
304 {
305 if (result.is_err())
306 {
307 span->set_error(result.error().message);
308 }
309 else
310 {
311 span->set_status(tracing::span_status::ok);
312 }
313 }
314 return result;
315}
316
317auto messaging_quic_client::send_packet(std::string_view data) -> VoidResult
318{
319 if (data.empty())
320 {
321 return error_void(
323 "Data cannot be empty",
324 "messaging_quic_client::send_packet",
325 "Client ID: " + client_id_);
326 }
327
328 std::vector<uint8_t> byte_data(data.begin(), data.end());
329 return send_packet(std::move(byte_data));
330}
331
332auto messaging_quic_client::start(std::string_view host, uint16_t port) -> VoidResult
333{
334 return start_client(host, port);
335}
336
338{
339 return stop_client();
340}
341
342auto messaging_quic_client::send(std::vector<uint8_t>&& data) -> VoidResult
343{
344 return send_packet(std::move(data));
345}
346
348{
349 auto local_socket = get_socket();
350 if (!local_socket)
351 {
352 return error<uint64_t>(
354 "Client is not connected",
355 "messaging_quic_client::create_stream",
356 "Client ID: " + client_id_);
357 }
358
359 return local_socket->create_stream(false);
360}
361
363{
364 auto local_socket = get_socket();
365 if (!local_socket)
366 {
367 return error<uint64_t>(
369 "Client is not connected",
370 "messaging_quic_client::create_unidirectional_stream",
371 "Client ID: " + client_id_);
372 }
373
374 return local_socket->create_stream(true);
375}
376
378 std::vector<uint8_t>&& data,
379 bool fin) -> VoidResult
380{
381 // Create tracing span for stream send operation
382 auto span = tracing::is_tracing_enabled()
383 ? std::make_optional(tracing::trace_context::create_span("quic.stream.send"))
384 : std::nullopt;
385 if (span)
386 {
387 span->set_attribute("net.transport", "quic")
388 .set_attribute("message.size", static_cast<int64_t>(data.size()))
389 .set_attribute("quic.stream_id", static_cast<int64_t>(stream_id))
390 .set_attribute("quic.fin", fin)
391 .set_attribute("client.id", client_id_);
392 }
393
394 if (data.empty())
395 {
396 if (span)
397 {
398 span->set_error("Data cannot be empty");
399 }
400 return error_void(
402 "Data cannot be empty",
403 "messaging_quic_client::send_on_stream",
404 "Client ID: " + client_id_);
405 }
406
407 auto local_socket = get_socket();
408 if (!local_socket)
409 {
410 if (span)
411 {
412 span->set_error("Client is not connected");
413 }
414 return error_void(
416 "Client is not connected",
417 "messaging_quic_client::send_on_stream",
418 "Client ID: " + client_id_);
419 }
420
421 auto result = local_socket->send_stream_data(stream_id, std::move(data), fin);
422 if (span)
423 {
424 if (result.is_err())
425 {
426 span->set_error(result.error().message);
427 }
428 else
429 {
430 span->set_status(tracing::span_status::ok);
431 }
432 }
433 return result;
434}
435
437{
438 auto local_socket = get_socket();
439 if (!local_socket)
440 {
441 return error_void(
443 "Client is not connected",
444 "messaging_quic_client::close_stream",
445 "Client ID: " + client_id_);
446 }
447
448 return local_socket->close_stream(stream_id);
449}
450
452 const std::vector<std::string>& protocols) -> void
453{
454 config_.alpn_protocols = protocols;
455}
456
457auto messaging_quic_client::alpn_protocol() const -> std::optional<std::string>
458{
459 // TODO: Implement ALPN negotiation result retrieval
460 return std::nullopt;
461}
462
464{
465 return early_data_accepted_.load();
466}
467
469{
470 // TODO: Implement connection statistics retrieval
471 return quic_connection_stats{};
472}
473
474auto messaging_quic_client::do_connect(std::string_view host,
475 unsigned short port) -> void
476{
477 auto resolver = std::make_shared<asio::ip::udp::resolver>(*io_context_);
478 auto self = shared_from_this();
479
480 NETWORK_LOG_INFO("[messaging_quic_client] Starting async resolve for " +
481 std::string(host) + ":" + std::to_string(port));
482
483 resolver->async_resolve(
484 std::string(host),
485 std::to_string(port),
486 [this, self, resolver, host_str = std::string(host)](
487 std::error_code ec,
488 asio::ip::udp::resolver::results_type results) {
489 NETWORK_LOG_INFO("[messaging_quic_client] Resolve callback invoked");
490 if (ec)
491 {
492 NETWORK_LOG_ERROR("[messaging_quic_client] Resolve error: " +
493 ec.message());
494 on_error(ec);
495 return;
496 }
497
498 if (results.empty())
499 {
500 NETWORK_LOG_ERROR("[messaging_quic_client] No endpoints resolved");
501 on_error(asio::error::host_not_found);
502 return;
503 }
504
505 NETWORK_LOG_INFO("[messaging_quic_client] Resolve successful, creating socket");
506
507 try
508 {
509 auto endpoint = *results.begin();
510
511 // Create UDP socket
512 asio::ip::udp::socket udp_socket(*io_context_, asio::ip::udp::v4());
513
514 // Create QUIC socket as client
515 auto quic_sock = std::make_shared<internal::quic_socket>(
516 std::move(udp_socket),
518
519 // Set callbacks
520 quic_sock->set_stream_data_callback(
521 [this, self](uint64_t stream_id,
522 std::span<const uint8_t> data,
523 bool fin) {
524 on_stream_data(stream_id, data, fin);
525 });
526
527 quic_sock->set_connected_callback([this, self]() {
528 on_connect();
529 });
530
531 quic_sock->set_error_callback([this, self](std::error_code err) {
532 on_error(err);
533 });
534
535 quic_sock->set_close_callback(
536 [this, self](uint64_t error_code, const std::string& reason) {
537 on_close(error_code, reason);
538 });
539
540 // Store socket
541 {
542 std::lock_guard<std::mutex> lock(socket_mutex_);
543 socket_ = quic_sock;
544 }
545
546 // Connect to server
547 auto connect_result = quic_sock->connect(endpoint, host_str);
548 if (connect_result.is_err())
549 {
550 NETWORK_LOG_ERROR("[messaging_quic_client] Connect failed: " +
551 connect_result.error().message);
552 on_error(std::make_error_code(std::errc::connection_refused));
553 return;
554 }
555
556 // Start receive loop
557 quic_sock->start_receive();
558
559 NETWORK_LOG_INFO("[messaging_quic_client] Connection initiated");
560 }
561 catch (const std::exception& e)
562 {
563 NETWORK_LOG_ERROR("[messaging_quic_client] Exception during connect: " +
564 std::string(e.what()));
565 on_error(std::make_error_code(std::errc::connection_refused));
566 }
567 });
568}
569
571{
572 NETWORK_LOG_INFO("[messaging_quic_client] Connected successfully.");
573 is_connected_.store(true, std::memory_order_release);
574 handshake_complete_.store(true);
575
576 // Create default stream for send_packet()
577 auto local_socket = get_socket();
578 if (local_socket)
579 {
580 auto stream_result = local_socket->create_stream(false);
581 if (stream_result.is_ok())
582 {
583 default_stream_id_ = stream_result.value();
584 NETWORK_LOG_DEBUG("[messaging_quic_client] Default stream created: " +
585 std::to_string(default_stream_id_));
586 }
587 }
588
589 // Invoke connected callback
590 invoke_connected_callback();
591}
592
594 std::span<const uint8_t> data,
595 bool fin) -> void
596{
597 if (!is_connected())
598 {
599 return;
600 }
601
602 NETWORK_LOG_DEBUG("[messaging_quic_client] Received " +
603 std::to_string(data.size()) +
604 " bytes on stream " + std::to_string(stream_id));
605
606 std::vector<uint8_t> data_copy(data.begin(), data.end());
607
608 // Invoke stream receive callback
609 invoke_stream_receive_callback(stream_id, data_copy, fin);
610
611 // Also invoke default receive callback for default stream
612 if (stream_id == default_stream_id_)
613 {
614 invoke_receive_callback(data_copy);
615 }
616}
617
618auto messaging_quic_client::on_error(std::error_code ec) -> void
619{
620 NETWORK_LOG_ERROR("[messaging_quic_client] Error: " + ec.message());
621
622 // Invoke error callback
623 invoke_error_callback(ec);
624
625 if (is_connected())
626 {
627 // Invoke disconnected callback
628 invoke_disconnected_callback();
629 }
630
631 is_connected_.store(false, std::memory_order_release);
632}
633
634auto messaging_quic_client::on_close(uint64_t error_code,
635 const std::string& reason) -> void
636{
637 NETWORK_LOG_INFO("[messaging_quic_client] Connection closed. Error code: " +
638 std::to_string(error_code) + ", reason: " + reason);
639
640 if (is_connected())
641 {
642 // Invoke disconnected callback
643 invoke_disconnected_callback();
644 }
645
646 is_connected_.store(false, std::memory_order_release);
647}
648
650 -> std::shared_ptr<internal::quic_socket>
651{
652 std::lock_guard<std::mutex> lock(socket_mutex_);
653 return socket_;
654}
655
656// =============================================================================
657// Callback invocation helpers
658// =============================================================================
659
660auto messaging_quic_client::invoke_receive_callback(const std::vector<uint8_t>& data) -> void
661{
662 callbacks_.invoke<to_index(callback_index::receive)>(data);
663}
664
666 const std::vector<uint8_t>& data,
667 bool fin) -> void
668{
669 callbacks_.invoke<to_index(callback_index::stream_receive)>(stream_id, data, fin);
670}
671
673{
674 callbacks_.invoke<to_index(callback_index::connected)>();
675}
676
678{
679 callbacks_.invoke<to_index(callback_index::disconnected)>();
680}
681
682auto messaging_quic_client::invoke_error_callback(std::error_code ec) -> void
683{
684 callbacks_.invoke<to_index(callback_index::error)>(ec);
685}
686
687// =============================================================================
688// Legacy callback setters
689// =============================================================================
690
692{
693 callbacks_.set<to_index(callback_index::stream_receive)>(std::move(callback));
694}
695
696// =============================================================================
697// i_quic_client interface callback implementations
698// =============================================================================
699
702{
703 callbacks_.set<to_index(callback_index::receive)>(std::move(callback));
704}
705
708{
709 callbacks_.set<to_index(callback_index::stream_receive)>(std::move(callback));
710}
711
714{
715 callbacks_.set<to_index(callback_index::connected)>(std::move(callback));
716}
717
720{
721 callbacks_.set<to_index(callback_index::disconnected)>(std::move(callback));
722}
723
726{
727 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
728}
729
732{
733 session_ticket_cb_ = std::move(callback);
734}
735
738{
739 early_data_cb_ = std::move(callback);
740}
741
744{
745 early_data_accepted_cb_ = std::move(callback);
746}
747
748} // namespace kcenon::network::core
std::shared_ptr< internal::quic_socket > socket_
The QUIC socket.
auto set_disconnected_callback(interfaces::i_quic_client::disconnected_callback_t callback) -> void override
Sets the callback for disconnection (interface version).
auto stop() -> VoidResult override
Stops the QUIC client.
auto wait_for_stop() -> void override
Blocks until stop() is called.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data on the default stream (interface version).
std::mutex socket_mutex_
Protects socket_ from data races.
auto do_stop_impl() -> VoidResult
QUIC-specific implementation of client stop.
auto on_connect() -> void
Callback invoked when connection is established.
messaging_quic_client(std::string_view client_id)
Constructs a QUIC client with a given identifier.
auto is_running() const -> bool override
Checks if the client is currently running.
auto create_unidirectional_stream() -> Result< uint64_t > override
Creates a new unidirectional stream (interface version).
auto on_close(uint64_t error_code, const std::string &reason) -> void
Callback for connection close.
~messaging_quic_client() noexcept override
Destructor; automatically calls stop_client() if running.
auto alpn_protocol() const -> std::optional< std::string > override
Gets the negotiated ALPN protocol (interface version).
auto is_handshake_complete() const -> bool override
Checks if TLS handshake is complete (interface version).
auto invoke_stream_receive_callback(uint64_t stream_id, const std::vector< uint8_t > &data, bool fin) -> void
Invokes the stream receive callback.
auto set_early_data_accepted_callback(interfaces::i_quic_client::early_data_accepted_callback_t callback) -> void override
Sets the callback for early data acceptance notification (interface version).
auto start_client(std::string_view host, unsigned short port) -> VoidResult
Starts the client with default configuration.
auto close_stream(uint64_t stream_id) -> VoidResult override
Closes a stream (interface version).
std::function< void(uint64_t, const std::vector< uint8_t > &, bool)> stream_receive_callback_t
Callback type for stream data (stream_id, data, fin)
auto invoke_connected_callback() -> void
Invokes the connected callback.
auto send_on_stream(uint64_t stream_id, std::vector< uint8_t > &&data, bool fin=false) -> VoidResult override
Sends data on a specific stream (interface version).
auto do_start_impl(std::string_view host, unsigned short port) -> VoidResult
QUIC-specific implementation of client start.
auto invoke_receive_callback(const std::vector< uint8_t > &data) -> void
Invokes the receive callback.
auto set_session_ticket_callback(interfaces::i_quic_client::session_ticket_callback_t callback) -> void override
Sets the callback for session tickets (interface version).
auto set_receive_callback(interfaces::i_quic_client::receive_callback_t callback) -> void override
Sets the callback for received data on default stream (interface version).
auto set_stream_callback(interfaces::i_quic_client::stream_callback_t callback) -> void override
Sets the callback for stream data (interface version).
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
Send data on the default stream (stream 0).
auto on_stream_data(uint64_t stream_id, std::span< const uint8_t > data, bool fin) -> void
Callback for receiving stream data.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
auto invoke_disconnected_callback() -> void
Invokes the disconnected callback.
auto on_error(std::error_code ec) -> void
Callback for handling errors.
auto do_connect(std::string_view host, unsigned short port) -> void
Internal connection implementation.
auto create_stream() -> Result< uint64_t > override
Creates a new bidirectional stream (interface version).
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback.
auto is_connected() const -> bool override
Checks if the client is connected (interface version).
auto set_connected_callback(interfaces::i_quic_client::connected_callback_t callback) -> void override
Sets the callback for connection established (interface version).
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the QUIC client connecting to the specified server.
auto set_stream_receive_callback(stream_receive_callback_t callback) -> void
Sets the callback for stream data reception (all streams, legacy version).
auto is_early_data_accepted() const -> bool override
Checks if early data was accepted (interface version).
auto stats() const -> quic_connection_stats
Get connection statistics.
auto client_id() const -> const std::string &
Returns the client identifier.
auto get_socket() const -> std::shared_ptr< internal::quic_socket >
Get the internal socket with mutex protection.
auto set_alpn_protocols(const std::vector< std::string > &protocols) -> void override
Sets the ALPN protocols for negotiation (interface version).
std::atomic< bool > early_data_accepted_
Early data acceptance status.
auto set_error_callback(interfaces::i_quic_client::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
auto set_early_data_callback(interfaces::i_quic_client::early_data_callback_t callback) -> void override
Sets the callback for early data production (interface version).
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(bool accepted)> early_data_accepted_callback_t
Callback type for early data acceptance notification.
std::function< void()> connected_callback_t
Callback type for connection established.
std::function< std::vector< uint8_t >()> early_data_callback_t
Callback type for early data production.
std::function< void()> disconnected_callback_t
Callback type for disconnection.
std::function< void(const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data on default stream.
std::function< void( std::vector< uint8_t > ticket_data, uint32_t lifetime_hint, uint32_t max_early_data)> session_ticket_callback_t
Callback type for session ticket received (for 0-RTT resumption)
std::function< void(uint64_t, const std::vector< uint8_t > &, bool)> stream_callback_t
Callback type for stream data (stream_id, data, is_fin)
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
auto is_running() const -> bool
Checks if the component is currently running.
tracing_config config
Definition exporters.cpp:29
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
RAII span implementation for distributed tracing.
Configuration options for QUIC client.
Definition quic_client.h:51
Statistics for a QUIC connection.
Definition quic_client.h:97
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.