5#define NETWORK_USE_EXPERIMENTAL
17 : client_id_(client_id)
41 lifecycle_.wait_for_stop();
61 span->set_attribute(
"net.peer.name", host)
62 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(port))
63 .set_attribute(
"net.transport",
"quic")
64 .set_attribute(
"client.id", client_id_);
67 if (lifecycle_.is_running())
71 span->set_error(
"QUIC client is already running");
75 "QUIC client is already running",
76 "messaging_quic_client::start_client");
83 span->set_error(
"Host cannot be empty");
87 "Host cannot be empty",
88 "messaging_quic_client::start_client");
92 lifecycle_.set_running();
93 is_connected_.store(
false);
95 auto result = do_start_impl(host, port);
98 lifecycle_.mark_stopped();
101 span->set_error(result.error().message);
117 if (!lifecycle_.prepare_stop())
122 is_connected_.store(
false);
123 auto result = do_stop_impl();
124 lifecycle_.mark_stopped();
135 std::lock_guard<std::mutex> lock(socket_mutex_);
139 io_context_ = std::make_unique<asio::io_context>();
140 work_guard_ = std::make_unique<
141 asio::executor_work_guard<asio::io_context::executor_type>>(
142 asio::make_work_guard(*io_context_));
147 thread_pool_ = std::make_shared<integration::basic_thread_pool>(2);
150 io_context_future_ = thread_pool_->
submit([
this]() {
154 "[messaging_quic_client] Starting io_context on thread pool");
158 catch (
const std::exception& e)
161 std::string(e.what()));
165 do_connect(host, port);
168 " target=" + std::string(host) +
":" +
169 std::to_string(port));
173 catch (
const std::exception& e)
177 "Failed to start client: " + std::string(e.what()),
178 "messaging_quic_client::do_start_impl",
179 "Client ID: " + client_id_ +
", Host: " + std::string(host));
187 std::shared_ptr<internal::quic_socket> local_socket;
189 std::lock_guard<std::mutex> lock(socket_mutex_);
190 local_socket = std::move(socket_);
194 local_socket->stop_receive();
195 auto close_result = local_socket->close();
196 if (close_result.is_err())
199 close_result.error().message);
211 if (io_context_future_.valid())
213 io_context_future_.wait();
215 thread_pool_.reset();
218 handshake_complete_.store(
false);
223 catch (
const std::exception& e)
227 "Failed to stop client: " + std::string(e.what()),
228 "messaging_quic_client::do_stop_impl",
229 "Client ID: " + client_id_);
245 return local_socket->is_handshake_complete();
256 span->set_attribute(
"net.transport",
"quic")
257 .set_attribute(
"message.size",
static_cast<int64_t
>(data.size()))
258 .set_attribute(
"quic.stream_id",
static_cast<int64_t
>(default_stream_id_))
259 .set_attribute(
"client.id", client_id_);
266 span->set_error(
"Client is not running");
270 "Client is not running",
271 "messaging_quic_client::send_packet",
272 "Client ID: " + client_id_);
279 span->set_error(
"Data cannot be empty");
283 "Data cannot be empty",
284 "messaging_quic_client::send_packet",
285 "Client ID: " + client_id_);
288 auto local_socket = get_socket();
289 if (!is_connected() || !local_socket)
293 span->set_error(
"Client is not connected");
297 "Client is not connected",
298 "messaging_quic_client::send_packet",
299 "Client ID: " + client_id_);
302 auto result = local_socket->send_stream_data(default_stream_id_, std::move(data));
307 span->set_error(result.error().message);
323 "Data cannot be empty",
324 "messaging_quic_client::send_packet",
325 "Client ID: " + client_id_);
328 std::vector<uint8_t> byte_data(data.begin(), data.end());
329 return send_packet(std::move(byte_data));
334 return start_client(host, port);
339 return stop_client();
344 return send_packet(std::move(data));
349 auto local_socket = get_socket();
354 "Client is not connected",
355 "messaging_quic_client::create_stream",
356 "Client ID: " + client_id_);
359 return local_socket->create_stream(
false);
364 auto local_socket = get_socket();
369 "Client is not connected",
370 "messaging_quic_client::create_unidirectional_stream",
371 "Client ID: " + client_id_);
374 return local_socket->create_stream(
true);
378 std::vector<uint8_t>&& data,
387 span->set_attribute(
"net.transport",
"quic")
388 .set_attribute(
"message.size",
static_cast<int64_t
>(data.size()))
389 .set_attribute(
"quic.stream_id",
static_cast<int64_t
>(stream_id))
390 .set_attribute(
"quic.fin", fin)
391 .set_attribute(
"client.id", client_id_);
398 span->set_error(
"Data cannot be empty");
402 "Data cannot be empty",
403 "messaging_quic_client::send_on_stream",
404 "Client ID: " + client_id_);
407 auto local_socket = get_socket();
412 span->set_error(
"Client is not connected");
416 "Client is not connected",
417 "messaging_quic_client::send_on_stream",
418 "Client ID: " + client_id_);
421 auto result = local_socket->send_stream_data(stream_id, std::move(data), fin);
426 span->set_error(result.error().message);
438 auto local_socket = get_socket();
443 "Client is not connected",
444 "messaging_quic_client::close_stream",
445 "Client ID: " + client_id_);
448 return local_socket->close_stream(stream_id);
452 const std::vector<std::string>& protocols) ->
void
454 config_.alpn_protocols = protocols;
475 unsigned short port) ->
void
477 auto resolver = std::make_shared<asio::ip::udp::resolver>(*io_context_);
478 auto self = shared_from_this();
481 std::string(host) +
":" + std::to_string(port));
483 resolver->async_resolve(
485 std::to_string(port),
486 [
this, self, resolver, host_str = std::string(host)](
488 asio::ip::udp::resolver::results_type results) {
501 on_error(asio::error::host_not_found);
505 NETWORK_LOG_INFO(
"[messaging_quic_client] Resolve successful, creating socket");
509 auto endpoint = *results.begin();
512 asio::ip::udp::socket udp_socket(*io_context_, asio::ip::udp::v4());
515 auto quic_sock = std::make_shared<internal::quic_socket>(
516 std::move(udp_socket),
520 quic_sock->set_stream_data_callback(
521 [
this, self](uint64_t stream_id,
522 std::span<const uint8_t> data,
524 on_stream_data(stream_id, data, fin);
527 quic_sock->set_connected_callback([
this, self]() {
531 quic_sock->set_error_callback([
this, self](std::error_code err) {
535 quic_sock->set_close_callback(
536 [
this, self](uint64_t error_code,
const std::string& reason) {
537 on_close(error_code, reason);
542 std::lock_guard<std::mutex> lock(socket_mutex_);
547 auto connect_result = quic_sock->connect(endpoint, host_str);
548 if (connect_result.is_err())
551 connect_result.error().message);
552 on_error(std::make_error_code(std::errc::connection_refused));
557 quic_sock->start_receive();
561 catch (
const std::exception& e)
564 std::string(e.what()));
565 on_error(std::make_error_code(std::errc::connection_refused));
573 is_connected_.store(
true, std::memory_order_release);
574 handshake_complete_.store(
true);
577 auto local_socket = get_socket();
580 auto stream_result = local_socket->create_stream(
false);
581 if (stream_result.is_ok())
583 default_stream_id_ = stream_result.value();
585 std::to_string(default_stream_id_));
590 invoke_connected_callback();
594 std::span<const uint8_t> data,
603 std::to_string(data.size()) +
604 " bytes on stream " + std::to_string(stream_id));
606 std::vector<uint8_t> data_copy(data.begin(), data.end());
609 invoke_stream_receive_callback(stream_id, data_copy, fin);
612 if (stream_id == default_stream_id_)
614 invoke_receive_callback(data_copy);
623 invoke_error_callback(ec);
628 invoke_disconnected_callback();
631 is_connected_.store(
false, std::memory_order_release);
635 const std::string& reason) ->
void
637 NETWORK_LOG_INFO(
"[messaging_quic_client] Connection closed. Error code: " +
638 std::to_string(error_code) +
", reason: " + reason);
643 invoke_disconnected_callback();
646 is_connected_.store(
false, std::memory_order_release);
650 -> std::shared_ptr<internal::quic_socket>
662 callbacks_.invoke<
to_index(callback_index::receive)>(data);
666 const std::vector<uint8_t>& data,
669 callbacks_.invoke<
to_index(callback_index::stream_receive)>(stream_id, data, fin);
674 callbacks_.invoke<
to_index(callback_index::connected)>();
679 callbacks_.invoke<
to_index(callback_index::disconnected)>();
684 callbacks_.invoke<
to_index(callback_index::error)>(ec);
693 callbacks_.set<
to_index(callback_index::stream_receive)>(std::move(callback));
703 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
709 callbacks_.set<
to_index(callback_index::stream_receive)>(std::move(callback));
715 callbacks_.set<
to_index(callback_index::connected)>(std::move(callback));
721 callbacks_.set<
to_index(callback_index::disconnected)>(std::move(callback));
727 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
733 session_ticket_cb_ = std::move(callback);
739 early_data_cb_ = std::move(callback);
745 early_data_accepted_cb_ = std::move(callback);
std::shared_ptr< internal::quic_socket > socket_
The QUIC socket.
auto set_disconnected_callback(interfaces::i_quic_client::disconnected_callback_t callback) -> void override
Sets the callback for disconnection (interface version).
auto stop() -> VoidResult override
Stops the QUIC client.
auto wait_for_stop() -> void override
Blocks until stop() is called.
auto send(std::vector< uint8_t > &&data) -> VoidResult override
Sends data on the default stream (interface version).
std::mutex socket_mutex_
Protects socket_ from data races.
auto do_stop_impl() -> VoidResult
QUIC-specific implementation of client stop.
auto on_connect() -> void
Callback invoked when connection is established.
messaging_quic_client(std::string_view client_id)
Constructs a QUIC client with a given identifier.
auto is_running() const -> bool override
Checks if the client is currently running.
auto create_unidirectional_stream() -> Result< uint64_t > override
Creates a new unidirectional stream (interface version).
auto on_close(uint64_t error_code, const std::string &reason) -> void
Callback for connection close.
~messaging_quic_client() noexcept override
Destructor; automatically calls stop_client() if running.
auto alpn_protocol() const -> std::optional< std::string > override
Gets the negotiated ALPN protocol (interface version).
auto is_handshake_complete() const -> bool override
Checks if TLS handshake is complete (interface version).
auto invoke_stream_receive_callback(uint64_t stream_id, const std::vector< uint8_t > &data, bool fin) -> void
Invokes the stream receive callback.
auto set_early_data_accepted_callback(interfaces::i_quic_client::early_data_accepted_callback_t callback) -> void override
Sets the callback for early data acceptance notification (interface version).
auto start_client(std::string_view host, unsigned short port) -> VoidResult
Starts the client with default configuration.
auto close_stream(uint64_t stream_id) -> VoidResult override
Closes a stream (interface version).
std::function< void(uint64_t, const std::vector< uint8_t > &, bool)> stream_receive_callback_t
Callback type for stream data (stream_id, data, fin)
auto invoke_connected_callback() -> void
Invokes the connected callback.
auto send_on_stream(uint64_t stream_id, std::vector< uint8_t > &&data, bool fin=false) -> VoidResult override
Sends data on a specific stream (interface version).
auto do_start_impl(std::string_view host, unsigned short port) -> VoidResult
QUIC-specific implementation of client start.
auto invoke_receive_callback(const std::vector< uint8_t > &data) -> void
Invokes the receive callback.
auto set_session_ticket_callback(interfaces::i_quic_client::session_ticket_callback_t callback) -> void override
Sets the callback for session tickets (interface version).
auto set_receive_callback(interfaces::i_quic_client::receive_callback_t callback) -> void override
Sets the callback for received data on default stream (interface version).
auto set_stream_callback(interfaces::i_quic_client::stream_callback_t callback) -> void override
Sets the callback for stream data (interface version).
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
Send data on the default stream (stream 0).
auto on_stream_data(uint64_t stream_id, std::span< const uint8_t > data, bool fin) -> void
Callback for receiving stream data.
auto stop_client() -> VoidResult
Stops the client and releases all resources.
auto invoke_disconnected_callback() -> void
Invokes the disconnected callback.
auto on_error(std::error_code ec) -> void
Callback for handling errors.
auto do_connect(std::string_view host, unsigned short port) -> void
Internal connection implementation.
auto create_stream() -> Result< uint64_t > override
Creates a new bidirectional stream (interface version).
std::atomic< bool > is_connected_
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback.
auto is_connected() const -> bool override
Checks if the client is connected (interface version).
auto set_connected_callback(interfaces::i_quic_client::connected_callback_t callback) -> void override
Sets the callback for connection established (interface version).
auto start(std::string_view host, uint16_t port) -> VoidResult override
Starts the QUIC client connecting to the specified server.
auto set_stream_receive_callback(stream_receive_callback_t callback) -> void
Sets the callback for stream data reception (all streams, legacy version).
auto is_early_data_accepted() const -> bool override
Checks if early data was accepted (interface version).
auto stats() const -> quic_connection_stats
Get connection statistics.
auto client_id() const -> const std::string &
Returns the client identifier.
auto get_socket() const -> std::shared_ptr< internal::quic_socket >
Get the internal socket with mutex protection.
auto set_alpn_protocols(const std::vector< std::string > &protocols) -> void override
Sets the ALPN protocols for negotiation (interface version).
std::atomic< bool > early_data_accepted_
Early data acceptance status.
auto set_error_callback(interfaces::i_quic_client::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
utils::lifecycle_manager lifecycle_
auto set_early_data_callback(interfaces::i_quic_client::early_data_callback_t callback) -> void override
Sets the callback for early data production (interface version).
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void(bool accepted)> early_data_accepted_callback_t
Callback type for early data acceptance notification.
std::function< void()> connected_callback_t
Callback type for connection established.
std::function< std::vector< uint8_t >()> early_data_callback_t
Callback type for early data production.
std::function< void()> disconnected_callback_t
Callback type for disconnection.
std::function< void(const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data on default stream.
std::function< void( std::vector< uint8_t > ticket_data, uint32_t lifetime_hint, uint32_t max_early_data)> session_ticket_callback_t
Callback type for session ticket received (for 0-RTT resumption)
std::function< void(uint64_t, const std::vector< uint8_t > &, bool)> stream_callback_t
Callback type for stream data (stream_id, data, is_fin)
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
auto is_running() const -> bool
Checks if the component is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int invalid_argument
constexpr int internal_error
constexpr int already_exists
constexpr int connection_closed
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
RAII span implementation for distributed tracing.
Configuration options for QUIC client.
Statistics for a QUIC connection.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.