7#define NETWORK_USE_EXPERIMENTAL
23 : server_id_(server_id)
47 lifecycle_.wait_for_stop();
60 if (lifecycle_.is_running())
64 "QUIC server is already running",
65 "messaging_quic_server::start_server");
69 lifecycle_.set_running();
71 auto result = do_start_impl(port);
74 lifecycle_.mark_stopped();
82 if (!lifecycle_.is_running())
86 "QUIC server is not running",
87 "messaging_quic_server::stop_server");
90 if (!lifecycle_.prepare_stop())
94 "QUIC server is already stopping",
95 "messaging_quic_server::stop_server");
98 auto result = do_stop_impl();
99 lifecycle_.mark_stopped();
106 return start_server(port);
111 return stop_server();
124 io_context_ = std::make_unique<asio::io_context>();
125 work_guard_ = std::make_unique<
126 asio::executor_work_guard<asio::io_context::executor_type>>(
127 asio::make_work_guard(*io_context_));
130 udp_socket_ = std::make_unique<asio::ip::udp::socket>(
132 asio::ip::udp::endpoint(asio::ip::udp::v4(), port));
135 cleanup_timer_ = std::make_unique<asio::steady_timer>(*io_context_);
141 start_cleanup_timer();
147 thread_pool_ = std::make_shared<integration::basic_thread_pool>(2);
151 io_context_future_ = thread_pool_->
submit(
157 "[messaging_quic_server] Starting io_context on thread pool");
160 "[messaging_quic_server] io_context stopped");
162 catch (
const std::exception& e)
165 "[messaging_quic_server] Exception in io_context: "
166 + std::string(e.what()));
171 + std::to_string(port));
174 catch (
const std::system_error& e)
176 if (e.code() == asio::error::address_in_use ||
177 e.code() == std::errc::address_in_use)
180 "Failed to bind to port: address already in use",
181 "messaging_quic_server::do_start_impl",
182 "Port: " + std::to_string(port));
184 else if (e.code() == asio::error::access_denied ||
185 e.code() == std::errc::permission_denied)
188 "Failed to bind to port: permission denied",
189 "messaging_quic_server::do_start_impl",
190 "Port: " + std::to_string(port));
194 "Failed to start server: " + std::string(e.what()),
195 "messaging_quic_server::do_start_impl",
196 "Port: " + std::to_string(port));
198 catch (
const std::exception& e)
201 "Failed to start server: " + std::string(e.what()),
202 "messaging_quic_server::do_start_impl",
203 "Port: " + std::to_string(port));
214 cleanup_timer_->cancel();
221 udp_socket_->cancel(ec);
222 if (udp_socket_->is_open())
224 udp_socket_->close(ec);
244 if (io_context_future_.valid())
246 io_context_future_.wait();
251 cleanup_timer_.reset();
252 thread_pool_.reset();
258 catch (
const std::exception& e)
261 "Failed to stop server: " + std::string(e.what()),
262 "messaging_quic_server::do_stop_impl",
263 "Server ID: " + server_id_);
268 -> std::vector<std::shared_ptr<session::quic_session>>
271 std::vector<std::shared_ptr<session::quic_session>> result;
273 for (
const auto& [
id, session] :
sessions_)
275 result.push_back(session);
281 -> std::shared_ptr<session::quic_session>
283 std::shared_lock<std::shared_mutex> lock(sessions_mutex_);
284 auto it = sessions_.find(session_id);
285 if (it != sessions_.end())
302 std::shared_ptr<session::quic_session> session;
304 std::unique_lock<std::shared_mutex> lock(sessions_mutex_);
305 auto it = sessions_.find(session_id);
306 if (it == sessions_.end())
310 "messaging_quic_server::disconnect_session",
311 "Session ID: " + session_id);
313 session = it->second;
319 return session->close(error_code);
326 std::vector<std::shared_ptr<session::quic_session>> sessions_to_close;
328 std::unique_lock<std::shared_mutex> lock(sessions_mutex_);
329 sessions_to_close.reserve(sessions_.size());
330 for (
auto& [
id, session] : sessions_)
332 sessions_to_close.push_back(session);
337 for (
auto& session : sessions_to_close)
341 auto result = session->close(error_code);
350 auto sessions_list = sessions();
351 for (
auto& session : sessions_list)
353 if (session && session->is_active())
355 std::vector<uint8_t> data_copy(data);
356 auto result = session->send(std::move(data_copy));
360 + session->session_id() +
": "
361 + result.error().message);
369 const std::vector<std::string>& session_ids,
372 for (
const auto& session_id : session_ids)
374 auto session = get_session(session_id);
375 if (session && session->is_active())
377 std::vector<uint8_t> data_copy(data);
378 auto result = session->send(std::move(data_copy));
382 + session_id +
": " + result.error().message);
389#if KCENON_WITH_COMMON_SYSTEM
390auto messaging_quic_server::set_monitor(
391 kcenon::common::interfaces::IMonitor* monitor) ->
void
396auto messaging_quic_server::get_monitor() const
397 ->
kcenon::common::interfaces::IMonitor*
405 if (!is_running() || !udp_socket_)
410 auto self = shared_from_this();
411 udp_socket_->async_receive_from(
412 asio::buffer(recv_buffer_),
414 [
this, self](std::error_code ec, std::size_t bytes_received)
423 if (ec != asio::error::operation_aborted)
426 "[messaging_quic_server] Receive error: " + ec.message());
428 invoke_error_callback(ec);
434 std::span<const uint8_t> packet_data(recv_buffer_.data(),
436 handle_packet(packet_data, recv_endpoint_);
444 const asio::ip::udp::endpoint& from)
454 if (header_result.is_err())
457 + from.address().to_string());
466 dcid = hdr.dest_conn_id;
468 header_result.value().first);
471 auto session = find_or_create_session(dcid, from);
475 "[messaging_quic_server] Could not find or create session for packet");
480 session->handle_packet(data);
485 const asio::ip::udp::endpoint& endpoint)
486 -> std::shared_ptr<session::quic_session>
490 std::shared_lock<std::shared_mutex> lock(sessions_mutex_);
491 for (
const auto& [
id, session] : sessions_)
493 if (session->matches_connection_id(dcid))
501 if (session_count() >= config_.max_connections)
504 "[messaging_quic_server] Connection limit reached, rejecting new connection");
509 auto session_id = generate_session_id();
512 asio::ip::udp::socket session_socket(
513 *io_context_, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0));
514 session_socket.connect(endpoint);
516 auto quic_socket = std::make_shared<internal::quic_socket>(
520 if (!config_.cert_file.empty() && !config_.key_file.empty())
523 quic_socket->accept(config_.cert_file, config_.key_file);
524 if (accept_result.is_err())
527 "[messaging_quic_server] Failed to accept connection: "
528 + accept_result.error().message);
534 std::make_shared<session::quic_session>(quic_socket, session_id);
537 auto self = weak_from_this();
539 session->set_receive_callback(
540 [self, session_id](
const std::vector<uint8_t>& data)
542 if (
auto server = self.lock())
544 auto sess = server->get_session(session_id);
547 server->invoke_receive_callback(sess, data);
552 session->set_stream_receive_callback(
553 [self, session_id](uint64_t stream_id,
554 const std::vector<uint8_t>& data,
557 if (
auto server = self.lock())
559 auto sess = server->get_session(session_id);
562 server->invoke_stream_receive_callback(sess, stream_id, data, fin);
567 session->set_close_callback(
570 if (
auto server = self.lock())
572 server->on_session_close(session_id);
578 std::unique_lock<std::shared_mutex> lock(sessions_mutex_);
579 sessions_[session_id] = session;
583 session->start_session();
590 invoke_connection_callback(session);
592#if KCENON_WITH_COMMON_SYSTEM
595 monitor_->record_metric(
"active_connections",
596 static_cast<double>(session_count()));
601 + session_id +
" from " + endpoint.address().to_string());
608 auto counter = session_counter_.fetch_add(1);
609 std::ostringstream oss;
610 oss << server_id_ <<
"-" << counter;
617 std::shared_ptr<session::quic_session> session;
619 std::unique_lock<std::shared_mutex> lock(sessions_mutex_);
620 auto it = sessions_.find(session_id);
621 if (it != sessions_.end())
623 session = it->second;
634 invoke_disconnection_callback(session);
636#if KCENON_WITH_COMMON_SYSTEM
639 monitor_->record_metric(
"active_connections",
640 static_cast<double>(session_count()));
651 if (!cleanup_timer_ || !is_running())
657 cleanup_timer_->expires_after(std::chrono::seconds(30));
659 auto self = shared_from_this();
660 cleanup_timer_->async_wait(
661 [
this, self](
const std::error_code& ec)
663 if (!ec && is_running())
665 cleanup_dead_sessions();
666 start_cleanup_timer();
673 std::vector<std::string> dead_session_ids;
676 std::shared_lock<std::shared_mutex> lock(sessions_mutex_);
677 for (
const auto& [
id, session] : sessions_)
679 if (!session || !session->is_active())
681 dead_session_ids.push_back(
id);
686 for (
const auto&
id : dead_session_ids)
688 on_session_close(
id);
691 if (!dead_session_ids.empty())
694 "[messaging_quic_server] Cleaned up "
695 + std::to_string(dead_session_ids.size())
696 +
" dead sessions. Active: " + std::to_string(session_count()));
705 std::shared_ptr<session::quic_session> session) ->
void
707 callbacks_.invoke<
to_index(callback_index::connection)>(session);
711 std::shared_ptr<session::quic_session> session) ->
void
713 callbacks_.invoke<
to_index(callback_index::disconnection)>(session);
717 std::shared_ptr<session::quic_session> session,
718 const std::vector<uint8_t>& data) ->
void
720 callbacks_.invoke<
to_index(callback_index::receive)>(session, data);
724 std::shared_ptr<session::quic_session> session,
726 const std::vector<uint8_t>& data,
729 callbacks_.invoke<
to_index(callback_index::stream_receive)>(session, stream_id, data, fin);
734 callbacks_.invoke<
to_index(callback_index::error)>(ec);
743 callbacks_.set<
to_index(callback_index::connection)>(std::move(callback));
748 callbacks_.set<
to_index(callback_index::disconnection)>(std::move(callback));
753 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
758 callbacks_.set<
to_index(callback_index::stream_receive)>(std::move(callback));
763 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
774 set_connection_callback(
775 [cb = std::move(callback)](std::shared_ptr<session::quic_session> session) {
787 set_disconnection_callback(
788 [cb = std::move(callback)](std::shared_ptr<session::quic_session> session) {
791 cb(session->session_id());
799 set_receive_callback(
800 [cb = std::move(callback)](std::shared_ptr<session::quic_session> session,
801 const std::vector<uint8_t>& data) {
804 cb(session->session_id(), data);
812 set_stream_receive_callback(
813 [cb = std::move(callback)](std::shared_ptr<session::quic_session> session,
815 const std::vector<uint8_t>& data,
819 cb(session->session_id(), stream_id, data, fin);
831 interface_error_cb_ = std::move(callback);
auto cleanup_dead_sessions() -> void
auto invoke_error_callback(std::error_code ec) -> void
Invokes the error callback.
auto is_running() const -> bool override
Checks if the server is currently running.
std::function< void(std::error_code)> error_callback_t
Callback type for errors.
auto on_session_close(const std::string &session_id) -> void
utils::lifecycle_manager lifecycle_
auto stop_server() -> VoidResult
Stops the server and releases all resources.
auto disconnect_all(uint64_t error_code=0) -> void
Disconnect all active sessions.
messaging_quic_server(std::string_view server_id)
Constructs a QUIC server with a given identifier.
auto sessions() const -> std::vector< std::shared_ptr< session::quic_session > >
Get all active sessions.
auto stop() -> VoidResult override
Stops the QUIC server.
auto disconnect_session(const std::string &session_id, uint64_t error_code=0) -> VoidResult
Disconnect a specific session.
auto broadcast(std::vector< uint8_t > &&data) -> VoidResult
Send data to all connected clients.
auto handle_packet(std::span< const uint8_t > data, const asio::ip::udp::endpoint &from) -> void
auto invoke_connection_callback(std::shared_ptr< session::quic_session > session) -> void
Invokes the connection callback.
auto start(uint16_t port) -> VoidResult override
Starts the QUIC server on the specified port.
auto set_stream_receive_callback(stream_receive_callback_t callback) -> void
Sets the callback for stream data reception (legacy version).
auto start_receive() -> void
std::shared_mutex sessions_mutex_
auto set_receive_callback(interfaces::i_quic_server::receive_callback_t callback) -> void override
Sets the callback for received data on default stream (interface version).
auto set_connection_callback(interfaces::i_quic_server::connection_callback_t callback) -> void override
Sets the callback for new connections (interface version).
auto session_count() const -> size_t
Get the number of active sessions.
auto start_server(unsigned short port) -> VoidResult
Start the server with default configuration.
std::function< void(std::shared_ptr< session::quic_session >, uint64_t, const std::vector< uint8_t > &, bool)> stream_receive_callback_t
Callback type for stream data (session, stream_id, data, fin)
auto do_start_impl(unsigned short port) -> VoidResult
QUIC-specific implementation of server start.
auto invoke_receive_callback(std::shared_ptr< session::quic_session > session, const std::vector< uint8_t > &data) -> void
Invokes the receive callback.
std::function< void(std::shared_ptr< session::quic_session >, const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data (session, data)
auto generate_session_id() -> std::string
auto invoke_disconnection_callback(std::shared_ptr< session::quic_session > session) -> void
Invokes the disconnection callback.
std::function< void(std::shared_ptr< session::quic_session >)> disconnection_callback_t
Callback type for disconnections.
auto wait_for_stop() -> void override
Blocks until stop() is called.
std::map< std::string, std::shared_ptr< session::quic_session > > sessions_
auto get_session(const std::string &session_id) -> std::shared_ptr< session::quic_session >
Get a session by its ID.
~messaging_quic_server() noexcept override
Destructor; automatically calls stop_server() if running.
auto start_cleanup_timer() -> void
auto set_disconnection_callback(interfaces::i_quic_server::disconnection_callback_t callback) -> void override
Sets the callback for disconnections (interface version).
auto connection_count() const -> size_t override
Gets the number of active QUIC connections (interface version).
auto server_id() const -> const std::string &
Returns the server identifier.
auto do_stop_impl() -> VoidResult
QUIC-specific implementation of server stop.
auto find_or_create_session(const protocols::quic::connection_id &dcid, const asio::ip::udp::endpoint &endpoint) -> std::shared_ptr< session::quic_session >
auto invoke_stream_receive_callback(std::shared_ptr< session::quic_session > session, uint64_t stream_id, const std::vector< uint8_t > &data, bool fin) -> void
Invokes the stream receive callback.
auto set_stream_callback(interfaces::i_quic_server::stream_callback_t callback) -> void override
Sets the callback for stream data (interface version).
auto set_error_callback(interfaces::i_quic_server::error_callback_t callback) -> void override
Sets the callback for errors (interface version).
auto multicast(const std::vector< std::string > &session_ids, std::vector< uint8_t > &&data) -> VoidResult
Send data to specific sessions.
std::function< void(std::shared_ptr< session::quic_session >)> connection_callback_t
Callback type for new connections.
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
virtual std::future< void > submit(std::function< void()> task)=0
Submit a task to the thread pool.
std::function< void( std::string_view, uint64_t, const std::vector< uint8_t > &, bool)> stream_callback_t
Callback type for stream data (session_id, stream_id, data, is_fin)
std::function< void(std::string_view, const std::vector< uint8_t > &)> receive_callback_t
Callback type for default stream data (session_id, data)
std::function< void(std::string_view, std::error_code)> error_callback_t
Callback type for errors (session_id, error)
std::function< void(std::shared_ptr< i_quic_session >)> connection_callback_t
Callback type for new connections.
std::function< void(std::string_view)> disconnection_callback_t
Callback type for disconnections (session_id)
static void report_connection_accepted()
Report a new connection accepted.
static void report_active_connections(size_t count)
Report active connections count.
QUIC Connection ID (RFC 9000 Section 5.1)
static auto parse_header(std::span< const uint8_t > data) -> Result< std::pair< packet_header, size_t > >
Parse a packet header (without header protection removal)
auto is_running() const -> bool
Checks if the component is currently running.
Feature flags for network_system.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int internal_error
constexpr int bind_failed
constexpr int server_already_running
constexpr int server_not_started
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Network system metrics definitions and reporting utilities.
QUIC-specific session with stream multiplexing.
Configuration options for QUIC server.