19 using tcp = asio::ip::tcp;
22 : server_id_(server_id)
88 span->set_attribute(
"net.host.port",
static_cast<int64_t
>(port))
89 .set_attribute(
"net.transport",
"tcp")
90 .set_attribute(
"server.id", server_id_);
94 auto result = do_start(port);
99 span->set_error(result.error().message);
137 callbacks_.set<
to_index(callback_index::connection)>(std::move(callback));
142 callbacks_.set<
to_index(callback_index::disconnection)>(std::move(callback));
147 callbacks_.set<
to_index(callback_index::receive)>(std::move(callback));
152 callbacks_.set<
to_index(callback_index::error)>(std::move(callback));
180 std::shared_ptr<session::messaging_session> session) ->
void
182 callbacks_.invoke<
to_index(callback_index::connection)>(std::move(session));
195 io_context_ = std::make_shared<asio::io_context>();
197 work_guard_ = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
198 asio::make_work_guard(*io_context_)
200 acceptor_ = std::make_unique<tcp::acceptor>(
201 *io_context_, tcp::endpoint(tcp::v4(), port));
204 cleanup_timer_ = std::make_unique<asio::steady_timer>(*io_context_);
210 start_cleanup_timer();
216 NETWORK_LOG_INFO(
"[messaging_server] Started listening on port " + std::to_string(port));
219 catch (
const std::system_error& e)
224 cleanup_timer_.reset();
229 if (e.code() == asio::error::address_in_use ||
230 e.code() == std::errc::address_in_use)
234 "Failed to bind to port: address already in use",
235 "messaging_server::do_start",
236 "Port: " + std::to_string(port)
239 else if (e.code() == asio::error::access_denied ||
240 e.code() == std::errc::permission_denied)
244 "Failed to bind to port: permission denied",
245 "messaging_server::do_start",
246 "Port: " + std::to_string(port)
252 "Failed to start server: " + std::string(e.what()),
253 "messaging_server::do_start",
254 "Port: " + std::to_string(port)
257 catch (
const std::exception& e)
261 cleanup_timer_.reset();
267 "Failed to start server: " + std::string(e.what()),
268 "messaging_server::do_start",
269 "Port: " + std::to_string(port)
282 std::lock_guard<std::mutex> lock(acceptor_mutex_);
287 acceptor_->cancel(ec);
288 if (acceptor_->is_open())
290 acceptor_->close(ec);
298 cleanup_timer_->cancel();
305 std::lock_guard<std::mutex> lock(sessions_mutex_);
306 for (
auto& sess : sessions_)
310 sess->stop_session();
329 if (io_context_future_.valid())
331 io_context_future_.wait();
343 io_context_->restart();
354 std::lock_guard<std::mutex> lock(sessions_mutex_);
360 cleanup_timer_.reset();
366 catch (
const std::exception& e)
370 "Failed to stop server: " + std::string(e.what()),
371 "messaging_server::do_stop",
372 "Server ID: " + server_id()
380 std::lock_guard<std::mutex> lock(acceptor_mutex_);
383 if (!is_running() || !acceptor_ || !acceptor_->is_open())
388 auto self = shared_from_this();
389 acceptor_->async_accept(
390 [
this, self](std::error_code ec, tcp::socket sock)
391 { on_accept(ec, std::move(sock)); });
408 span->set_attribute(
"net.transport",
"tcp")
409 .set_attribute(
"server.id", server_id_);
416 span->set_error(ec.message());
419#if KCENON_WITH_COMMON_SYSTEM
421 connection_errors_.fetch_add(1, std::memory_order_relaxed);
423 monitor_->record_metric(
"connection_errors",
static_cast<double>(connection_errors_.load()));
436 auto endpoint = socket.remote_endpoint();
437 span->set_attribute(
"net.peer.ip", endpoint.address().to_string())
438 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(endpoint.port()));
450 cleanup_dead_sessions();
453 auto new_session = std::make_shared<kcenon::network::session::messaging_session>(
454 std::move(socket), server_id());
458 auto self = shared_from_this();
461 auto local_receive_callback = get_receive_callback();
462 auto local_disconnection_callback = get_disconnection_callback();
463 auto local_error_callback = get_error_callback();
467 if (local_receive_callback)
469 new_session->set_receive_callback(
470 [self, new_session, local_receive_callback](
const std::vector<uint8_t>& data)
473 local_receive_callback(new_session, data);
477 if (local_disconnection_callback)
479 new_session->set_disconnection_callback(
480 [self, local_disconnection_callback](
const std::string& session_id)
483 local_disconnection_callback(session_id);
487 if (local_error_callback)
489 new_session->set_error_callback(
490 [self, new_session, local_error_callback](std::error_code ec)
493 local_error_callback(new_session, ec);
499 std::lock_guard<std::mutex> lock(sessions_mutex_);
500 sessions_.push_back(new_session);
504 new_session->start_session();
507 invoke_connection_callback(new_session);
509#if KCENON_WITH_COMMON_SYSTEM
512 std::lock_guard<std::mutex> lock(sessions_mutex_);
514 monitor_->record_metric(
"active_connections",
static_cast<double>(sessions_.size()));
521 std::lock_guard<std::mutex> lock(sessions_mutex_);
537 std::lock_guard<std::mutex> lock(sessions_mutex_);
541 std::remove_if(sessions_.begin(), sessions_.end(),
542 [](
const auto& session) {
543 return session && session->is_stopped();
549 std::to_string(sessions_.size()));
551#if KCENON_WITH_COMMON_SYSTEM
554 monitor_->record_metric(
"active_connections",
static_cast<double>(sessions_.size()));
561 if (!cleanup_timer_ || !is_running())
567 cleanup_timer_->expires_after(std::chrono::seconds(30));
569 auto self = shared_from_this();
570 cleanup_timer_->async_wait(
571 [
this, self](
const std::error_code& ec)
573 if (!ec && is_running())
575 cleanup_dead_sessions();
576 start_cleanup_timer();
582#if KCENON_WITH_COMMON_SYSTEM
583 auto messaging_server::set_monitor(kcenon::common::interfaces::IMonitor* monitor) ->
void
588 auto messaging_server::get_monitor() const ->
kcenon::common::interfaces::IMonitor*
auto on_stopped() -> void
Called after stop operation completes. No-op for server (no disconnection callback at server level).
auto invoke_connection_callback(std::shared_ptr< session::messaging_session > session) -> void
Invokes the connection callback with the given session.
auto set_disconnection_callback(disconnection_callback_t callback) -> void
Sets the callback for client disconnections.
~messaging_server() noexcept
Destructor. If the server is still running, stop_server() is invoked.
std::unique_ptr< asio::ip::tcp::acceptor > acceptor_
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
std::unique_ptr< asio::steady_timer > cleanup_timer_
Timer for periodic cleanup of stopped sessions.
std::function< void(std::shared_ptr< session::messaging_session >, std::error_code)> error_callback_t
Callback type for errors.
auto cleanup_dead_sessions() -> void
Removes stopped sessions from the sessions vector.
auto do_accept() -> void
Initiates an asynchronous accept operation (async_accept).
std::function< void(std::shared_ptr< session::messaging_session >)> connection_callback_t
Callback type for new connection.
std::shared_ptr< asio::io_context > io_context_
auto get_disconnection_callback() const -> disconnection_callback_t
Gets a copy of the disconnection callback.
std::function< void(std::shared_ptr< session::messaging_session >, const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data.
auto start_cleanup_timer() -> void
Starts a periodic timer that triggers session cleanup.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > work_guard_
auto do_stop_impl() -> VoidResult
TCP-specific implementation of server stop.
bool is_running() const noexcept
Check if the server is running.
auto start_server(unsigned short port) -> VoidResult
Starts the server on the specified port.
auto set_error_callback(error_callback_t callback) -> void
Sets the callback for session errors.
auto on_accept(std::error_code ec, asio::ip::tcp::socket socket) -> void
Handler called when an asynchronous accept finishes.
auto stop_server() -> VoidResult
Stops the server and closes all connections.
messaging_server(const std::string &server_id)
Constructs a messaging_server with an optional string server_id.
auto set_receive_callback(receive_callback_t callback) -> void
Sets the callback for received messages.
std::function< void(const std::string &)> disconnection_callback_t
Callback type for disconnection.
std::mutex acceptor_mutex_
Mutex protecting access to acceptor_ for thread-safe operations.
auto set_connection_callback(connection_callback_t callback) -> void
Sets the callback for new client connections.
auto do_start_impl(unsigned short port) -> VoidResult
TCP-specific implementation of server start.
auto server_id() const -> const std::string &
Returns the server identifier.
auto get_connection_callback() const -> connection_callback_t
Gets a copy of the connection callback.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
static void report_connection_failed(const std::string &reason)
Report a failed connection attempt.
static void report_connection_accepted()
Report a new connection accepted.
static void report_active_connections(size_t count)
Report active connections count.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
Feature flags for network_system.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
TCP server implementation.
Messaging session managing bidirectional message exchange.
constexpr int internal_error
constexpr int bind_failed
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
Network system metrics definitions and reporting utilities.
RAII span implementation for distributed tracing.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.