Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
messaging_server.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
15
17{
18
19 using tcp = asio::ip::tcp;
20
21 messaging_server::messaging_server(const std::string& server_id)
22 : server_id_(server_id)
23 {
24 }
25
27 {
28 if (is_running())
29 {
30 (void)stop_server();
31 }
32 // Additional cleanup for resources not managed by lifecycle manager.
33 try
34 {
35 // Ensure all resources are cleaned up even if stop_server() returned early
36 // This handles the case where start_server() failed and is_running() is false
37 // but resources were partially created
38 // Lock to prevent race with do_accept() accessing the acceptor
39 {
40 std::lock_guard<std::mutex> lock(acceptor_mutex_);
41 if (acceptor_)
42 {
43 asio::error_code ec;
44 acceptor_->cancel(ec);
45 if (acceptor_->is_open())
46 {
47 acceptor_->close(ec);
48 }
49 acceptor_.reset();
50 }
51 }
52
54 {
55 cleanup_timer_->cancel();
56 cleanup_timer_.reset();
57 }
58
59 if (work_guard_)
60 {
61 work_guard_.reset();
62 }
63
64 if (io_context_)
65 {
66 io_context_->stop();
67 io_context_.reset();
68 }
69 }
70 catch (...)
71 {
72 // Destructor must not throw - swallow all exceptions
73 }
74 }
75
76 // =========================================================================
77 // Lifecycle Management
78 // =========================================================================
79
80 auto messaging_server::start_server(unsigned short port) -> VoidResult
81 {
82 // Create tracing span for server start operation
83 auto span = tracing::is_tracing_enabled()
84 ? std::make_optional(tracing::trace_context::create_span("tcp.server.start"))
85 : std::nullopt;
86 if (span)
87 {
88 span->set_attribute("net.host.port", static_cast<int64_t>(port))
89 .set_attribute("net.transport", "tcp")
90 .set_attribute("server.id", server_id_);
91 }
92
93 // Use base class do_start which handles lifecycle management
94 auto result = do_start(port);
95 if (result.is_err())
96 {
97 if (span)
98 {
99 span->set_error(result.error().message);
100 }
101 }
102 else
103 {
104 if (span)
105 {
106 span->set_status(tracing::span_status::ok);
107 }
108 }
109
110 return result;
111 }
112
114 {
115 // Use base class do_stop which handles lifecycle management and calls on_stopped()
116 return do_stop();
117 }
118
120 {
121 // No-op for server - no disconnection callback at server level
122 }
123
124 // Note: wait_for_stop() and is_running() are inherited from startable_base
125
126 auto messaging_server::server_id() const -> const std::string&
127 {
128 return server_id_;
129 }
130
131 // =========================================================================
132 // Callback Setters
133 // =========================================================================
134
136 {
137 callbacks_.set<to_index(callback_index::connection)>(std::move(callback));
138 }
139
141 {
142 callbacks_.set<to_index(callback_index::disconnection)>(std::move(callback));
143 }
144
146 {
147 callbacks_.set<to_index(callback_index::receive)>(std::move(callback));
148 }
149
151 {
152 callbacks_.set<to_index(callback_index::error)>(std::move(callback));
153 }
154
155 // =========================================================================
156 // Internal Callback Helpers
157 // =========================================================================
158
160 {
161 return callbacks_.get<to_index(callback_index::connection)>();
162 }
163
165 {
166 return callbacks_.get<to_index(callback_index::disconnection)>();
167 }
168
170 {
171 return callbacks_.get<to_index(callback_index::receive)>();
172 }
173
175 {
176 return callbacks_.get<to_index(callback_index::error)>();
177 }
178
180 std::shared_ptr<session::messaging_session> session) -> void
181 {
182 callbacks_.invoke<to_index(callback_index::connection)>(std::move(session));
183 }
184
185 // =========================================================================
186 // Internal Implementation Methods
187 // =========================================================================
188
189 auto messaging_server::do_start_impl(unsigned short port) -> VoidResult
190 {
191 // This method handles TCP-specific initialization.
192 try
193 {
194 // Create io_context and acceptor
195 io_context_ = std::make_shared<asio::io_context>();
196 // Create work guard to keep io_context running
197 work_guard_ = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
198 asio::make_work_guard(*io_context_)
199 );
200 acceptor_ = std::make_unique<tcp::acceptor>(
201 *io_context_, tcp::endpoint(tcp::v4(), port));
202
203 // Create cleanup timer
204 cleanup_timer_ = std::make_unique<asio::steady_timer>(*io_context_);
205
206 // Begin accepting connections
207 do_accept();
208
209 // Start periodic cleanup timer
210 start_cleanup_timer();
211
212 // Use io_context_thread_manager for unified thread management
214 .run_io_context(io_context_, "messaging_server:" + server_id());
215
216 NETWORK_LOG_INFO("[messaging_server] Started listening on port " + std::to_string(port));
217 return ok();
218 }
219 catch (const std::system_error& e)
220 {
221 // Cleanup partially created resources to prevent memory corruption
222 // Resources may have been created before the exception was thrown
223 acceptor_.reset();
224 cleanup_timer_.reset();
225 work_guard_.reset();
226 io_context_.reset();
227
228 // Check for specific error codes (ASIO uses asio::error category)
229 if (e.code() == asio::error::address_in_use ||
230 e.code() == std::errc::address_in_use)
231 {
232 return error_void(
234 "Failed to bind to port: address already in use",
235 "messaging_server::do_start",
236 "Port: " + std::to_string(port)
237 );
238 }
239 else if (e.code() == asio::error::access_denied ||
240 e.code() == std::errc::permission_denied)
241 {
242 return error_void(
244 "Failed to bind to port: permission denied",
245 "messaging_server::do_start",
246 "Port: " + std::to_string(port)
247 );
248 }
249
250 return error_void(
252 "Failed to start server: " + std::string(e.what()),
253 "messaging_server::do_start",
254 "Port: " + std::to_string(port)
255 );
256 }
257 catch (const std::exception& e)
258 {
259 // Cleanup partially created resources to prevent memory corruption
260 acceptor_.reset();
261 cleanup_timer_.reset();
262 work_guard_.reset();
263 io_context_.reset();
264
265 return error_void(
267 "Failed to start server: " + std::string(e.what()),
268 "messaging_server::do_start",
269 "Port: " + std::to_string(port)
270 );
271 }
272 }
273
275 {
276 // This method handles TCP-specific cleanup.
277 try
278 {
279 // Step 1: Cancel and close the acceptor to stop accepting new connections
280 // Lock to prevent race with do_accept() accessing the acceptor
281 {
282 std::lock_guard<std::mutex> lock(acceptor_mutex_);
283 if (acceptor_)
284 {
285 asio::error_code ec;
286 // Cancel pending async_accept operations to prevent memory leaks
287 acceptor_->cancel(ec);
288 if (acceptor_->is_open())
289 {
290 acceptor_->close(ec);
291 }
292 }
293 }
294
295 // Cancel cleanup timer
296 if (cleanup_timer_)
297 {
298 cleanup_timer_->cancel();
299 }
300
301 // Step 2: Stop all active sessions (close sockets, stop reading)
302 // NOTE: Do NOT clear sessions here - they may still be referenced by
303 // pending async callbacks. We must wait for io_context to finish first.
304 {
305 std::lock_guard<std::mutex> lock(sessions_mutex_);
306 for (auto& sess : sessions_)
307 {
308 if (sess)
309 {
310 sess->stop_session();
311 }
312 }
313 }
314
315 // Step 3: Release work guard to allow io_context to finish
316 if (work_guard_)
317 {
318 work_guard_.reset();
319 }
320
321 // Step 4: Stop io_context via unified manager
322 if (io_context_)
323 {
325 }
326
327 // Step 5: Wait for io_context task to complete
328 // This ensures all pending async callbacks have finished executing
329 if (io_context_future_.valid())
330 {
331 io_context_future_.wait();
332 }
333
334 // Step 5.5: Run remaining handlers to clean up pending async operations.
335 // When io_context::stop() is called, pending handlers may not have
336 // executed yet. Running poll() ensures all handlers are invoked and
337 // their captured resources (resolver/socket) are properly destroyed
338 // before io_context is destroyed, preventing heap corruption.
339 if (io_context_)
340 {
341 try
342 {
343 io_context_->restart();
344 io_context_->poll();
345 }
346 catch (...)
347 {
348 // Ignore exceptions during cleanup
349 }
350 }
351
352 // Step 6: NOW it's safe to clear sessions - all async operations are done
353 {
354 std::lock_guard<std::mutex> lock(sessions_mutex_);
355 sessions_.clear();
356 }
357
358 // Step 7: Release resources explicitly to ensure cleanup
359 acceptor_.reset();
360 cleanup_timer_.reset();
361 io_context_.reset();
362
363 NETWORK_LOG_INFO("[messaging_server] Stopped.");
364 return ok();
365 }
366 catch (const std::exception& e)
367 {
368 return error_void(
370 "Failed to stop server: " + std::string(e.what()),
371 "messaging_server::do_stop",
372 "Server ID: " + server_id()
373 );
374 }
375 }
376
378 {
379 // Lock to prevent race with do_stop() closing the acceptor
380 std::lock_guard<std::mutex> lock(acceptor_mutex_);
381
382 // Early return if server is stopping or acceptor is invalid
383 if (!is_running() || !acceptor_ || !acceptor_->is_open())
384 {
385 return;
386 }
387
388 auto self = shared_from_this();
389 acceptor_->async_accept(
390 [this, self](std::error_code ec, tcp::socket sock)
391 { on_accept(ec, std::move(sock)); });
392 }
393
394 auto messaging_server::on_accept(std::error_code ec, tcp::socket socket)
395 -> void
396 {
397 if (!is_running())
398 {
399 return;
400 }
401
402 // Create tracing span for accept operation
403 auto span = tracing::is_tracing_enabled()
404 ? std::make_optional(tracing::trace_context::create_span("tcp.server.accept"))
405 : std::nullopt;
406 if (span)
407 {
408 span->set_attribute("net.transport", "tcp")
409 .set_attribute("server.id", server_id_);
410 }
411
412 if (ec)
413 {
414 if (span)
415 {
416 span->set_error(ec.message());
417 }
418 NETWORK_LOG_ERROR("[messaging_server] Accept error: " + ec.message());
419#if KCENON_WITH_COMMON_SYSTEM
420 // Record connection error
421 connection_errors_.fetch_add(1, std::memory_order_relaxed);
422 if (monitor_) {
423 monitor_->record_metric("connection_errors", static_cast<double>(connection_errors_.load()));
424 }
425#endif
426 // Report metrics
428 return;
429 }
430
431 // Add peer information to span
432 if (span)
433 {
434 try
435 {
436 auto endpoint = socket.remote_endpoint();
437 span->set_attribute("net.peer.ip", endpoint.address().to_string())
438 .set_attribute("net.peer.port", static_cast<int64_t>(endpoint.port()));
439 }
440 catch (...)
441 {
442 // Ignore errors getting endpoint info
443 }
444 }
445
446 // Report successful connection
448
449 // Cleanup dead sessions before adding new one
450 cleanup_dead_sessions();
451
452 // Create a new messaging_session
453 auto new_session = std::make_shared<kcenon::network::session::messaging_session>(
454 std::move(socket), server_id());
455
456 // Set up session callbacks to forward to server callbacks
457 // Use base class getters to avoid direct member access
458 auto self = shared_from_this();
459
460 // Get copies of callbacks via thread-safe getters from base class
461 auto local_receive_callback = get_receive_callback();
462 auto local_disconnection_callback = get_disconnection_callback();
463 auto local_error_callback = get_error_callback();
464
465 // Set up callbacks without holding any locks to avoid lock-order-inversion
466 // Lambdas use captured local callbacks directly to avoid locking callback_mutex_
467 if (local_receive_callback)
468 {
469 new_session->set_receive_callback(
470 [self, new_session, local_receive_callback](const std::vector<uint8_t>& data)
471 {
472 // Call the captured callback directly without locking to prevent deadlock
473 local_receive_callback(new_session, data);
474 });
475 }
476
477 if (local_disconnection_callback)
478 {
479 new_session->set_disconnection_callback(
480 [self, local_disconnection_callback](const std::string& session_id)
481 {
482 // Call the captured callback directly without locking to prevent deadlock
483 local_disconnection_callback(session_id);
484 });
485 }
486
487 if (local_error_callback)
488 {
489 new_session->set_error_callback(
490 [self, new_session, local_error_callback](std::error_code ec)
491 {
492 // Call the captured callback directly without locking to prevent deadlock
493 local_error_callback(new_session, ec);
494 });
495 }
496
497 // Track it in our sessions_ vector (protected by mutex)
498 {
499 std::lock_guard<std::mutex> lock(sessions_mutex_);
500 sessions_.push_back(new_session);
501 }
502
503 // Start the session
504 new_session->start_session();
505
506 // Invoke connection callback via base class helper
507 invoke_connection_callback(new_session);
508
509#if KCENON_WITH_COMMON_SYSTEM
510 // Record active connections metric
511 {
512 std::lock_guard<std::mutex> lock(sessions_mutex_);
513 if (monitor_) {
514 monitor_->record_metric("active_connections", static_cast<double>(sessions_.size()));
515 }
516 }
517#endif
518
519 // Report active connections to metrics system
520 {
521 std::lock_guard<std::mutex> lock(sessions_mutex_);
523 }
524
525 // Mark span as successful before accepting next connection
526 if (span)
527 {
528 span->set_status(tracing::span_status::ok);
529 }
530
531 // Accept next connection
532 do_accept();
533 }
534
536 {
537 std::lock_guard<std::mutex> lock(sessions_mutex_);
538
539 // Remove sessions that have been stopped
540 sessions_.erase(
541 std::remove_if(sessions_.begin(), sessions_.end(),
542 [](const auto& session) {
543 return session && session->is_stopped();
544 }),
545 sessions_.end()
546 );
547
548 NETWORK_LOG_DEBUG("[messaging_server] Cleaned up dead sessions. Active: " +
549 std::to_string(sessions_.size()));
550
551#if KCENON_WITH_COMMON_SYSTEM
552 // Update active connections metric after cleanup
553 if (monitor_) {
554 monitor_->record_metric("active_connections", static_cast<double>(sessions_.size()));
555 }
556#endif
557 }
558
560 {
561 if (!cleanup_timer_ || !is_running())
562 {
563 return;
564 }
565
566 // Schedule cleanup every 30 seconds
567 cleanup_timer_->expires_after(std::chrono::seconds(30));
568
569 auto self = shared_from_this();
570 cleanup_timer_->async_wait(
571 [this, self](const std::error_code& ec)
572 {
573 if (!ec && is_running())
574 {
575 cleanup_dead_sessions();
576 start_cleanup_timer(); // Reschedule
577 }
578 }
579 );
580 }
581
582#if KCENON_WITH_COMMON_SYSTEM
583 auto messaging_server::set_monitor(kcenon::common::interfaces::IMonitor* monitor) -> void
584 {
585 monitor_ = monitor;
586 }
587
588 auto messaging_server::get_monitor() const -> kcenon::common::interfaces::IMonitor*
589 {
590 return monitor_;
591 }
592#endif
593
594 // Callback setters are inherited from messaging_server_base
595
596} // namespace kcenon::network::core
auto on_stopped() -> void
Called after stop operation completes. No-op for server (no disconnection callback at server level).
auto invoke_connection_callback(std::shared_ptr< session::messaging_session > session) -> void
Invokes the connection callback with the given session.
auto set_disconnection_callback(disconnection_callback_t callback) -> void
Sets the callback for client disconnections.
~messaging_server() noexcept
Destructor. If the server is still running, stop_server() is invoked.
std::unique_ptr< asio::ip::tcp::acceptor > acceptor_
auto get_error_callback() const -> error_callback_t
Gets a copy of the error callback.
std::unique_ptr< asio::steady_timer > cleanup_timer_
Timer for periodic cleanup of stopped sessions.
std::function< void(std::shared_ptr< session::messaging_session >, std::error_code)> error_callback_t
Callback type for errors.
auto cleanup_dead_sessions() -> void
Removes stopped sessions from the sessions vector.
auto do_accept() -> void
Initiates an asynchronous accept operation (async_accept).
std::function< void(std::shared_ptr< session::messaging_session >)> connection_callback_t
Callback type for new connection.
std::shared_ptr< asio::io_context > io_context_
auto get_disconnection_callback() const -> disconnection_callback_t
Gets a copy of the disconnection callback.
std::function< void(std::shared_ptr< session::messaging_session >, const std::vector< uint8_t > &)> receive_callback_t
Callback type for received data.
auto start_cleanup_timer() -> void
Starts a periodic timer that triggers session cleanup.
auto get_receive_callback() const -> receive_callback_t
Gets a copy of the receive callback.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > work_guard_
auto do_stop_impl() -> VoidResult
TCP-specific implementation of server stop.
bool is_running() const noexcept
Check if the server is running.
auto start_server(unsigned short port) -> VoidResult
Starts the server on the specified port.
auto set_error_callback(error_callback_t callback) -> void
Sets the callback for session errors.
auto on_accept(std::error_code ec, asio::ip::tcp::socket socket) -> void
Handler called when an asynchronous accept finishes.
auto stop_server() -> VoidResult
Stops the server and closes all connections.
messaging_server(const std::string &server_id)
Constructs a messaging_server with an optional string server_id.
auto set_receive_callback(receive_callback_t callback) -> void
Sets the callback for received messages.
std::function< void(const std::string &)> disconnection_callback_t
Callback type for disconnection.
std::mutex acceptor_mutex_
Mutex protecting access to acceptor_ for thread-safe operations.
auto set_connection_callback(connection_callback_t callback) -> void
Sets the callback for new client connections.
auto do_start_impl(unsigned short port) -> VoidResult
TCP-specific implementation of server start.
auto server_id() const -> const std::string &
Returns the server identifier.
auto get_connection_callback() const -> connection_callback_t
Gets a copy of the connection callback.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
static void report_connection_failed(const std::string &reason)
Report a failed connection attempt.
static void report_connection_accepted()
Report a new connection accepted.
static void report_active_connections(size_t count)
Report active connections count.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
auto get() const -> std::tuple_element_t< Index, std::tuple< CallbackTypes... > >
Gets a callback at the specified index.
Feature flags for network_system.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
TCP server implementation.
Messaging session managing bidirectional message exchange.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
constexpr auto to_index(E e) noexcept -> std::size_t
Helper to convert enum to std::size_t for callback_manager access.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Network system metrics definitions and reporting utilities.
RAII span implementation for distributed tracing.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.