Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
reliable_udp_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#define NETWORK_USE_EXPERIMENTAL
10
11#include <algorithm>
12#include <atomic>
13#include <chrono>
14#include <cstring>
15#include <future>
16#include <map>
17#include <mutex>
18#include <thread>
19
21{
22 // Packet header structure (12 bytes)
24 {
25 uint32_t sequence_number; // Packet sequence number
26 uint32_t ack_number; // Acknowledgment number
27 uint16_t flags; // Control flags
28 uint16_t data_length; // Payload length
29
30 static constexpr uint16_t FLAG_ACK = 0x01; // Acknowledgment packet
31 static constexpr uint16_t FLAG_DATA = 0x02; // Data packet
32 static constexpr uint16_t FLAG_SYN = 0x04; // Connection start
33 static constexpr uint16_t FLAG_FIN = 0x08; // Connection end
34
35 auto serialize() const -> std::vector<uint8_t>
36 {
37 std::vector<uint8_t> buffer(sizeof(packet_header));
38 std::memcpy(buffer.data(), this, sizeof(packet_header));
39 return buffer;
40 }
41
42 static auto deserialize(const std::vector<uint8_t>& buffer) -> packet_header
43 {
44 packet_header header{};
45 if (buffer.size() >= sizeof(packet_header))
46 {
47 std::memcpy(&header, buffer.data(), sizeof(packet_header));
48 }
49 return header;
50 }
51 };
52
53 // Packet info for retransmission tracking
55 {
56 std::vector<uint8_t> data;
57 std::chrono::steady_clock::time_point send_time;
60 };
61
63 {
64 public:
67 {
68 NETWORK_LOG_DEBUG("[reliable_udp_client::" + client_id_ +
69 "] Created with mode=" + std::to_string(static_cast<int>(mode)));
70 }
71
72 ~impl() noexcept
73 {
74 if (is_running_.load())
75 {
76 (void)stop_client();
77 }
78 }
79
80 auto start_client(std::string_view host, uint16_t port) -> VoidResult
81 {
82 std::lock_guard<std::mutex> lock(state_mutex_);
83
84 if (is_running_.load())
85 {
87 "Client is already running");
88 }
89
90 // Create underlying UDP client
91 udp_client_ = std::make_shared<messaging_udp_client>(client_id_);
92
93 // Set receive callback
94 udp_client_->set_receive_callback(
95 [this, self = shared_from_this()](const std::vector<uint8_t>& data,
96 const asio::ip::udp::endpoint&) {
98 });
99
100 // Set error callback
101 udp_client_->set_error_callback([this, self = shared_from_this()](
102 std::error_code ec) { handle_error(ec); });
103
104 // Start UDP client
105 auto result = udp_client_->start_client(host, port);
106 if (result.is_err())
107 {
108 return result;
109 }
110
111 is_running_.store(true);
112
113 // Start retransmission timer for reliable modes
115 {
117 }
118
119 NETWORK_LOG_INFO("[reliable_udp_client::" + client_id_ + "] Started successfully");
120 return ok();
121 }
122
124 {
125 std::lock_guard<std::mutex> lock(state_mutex_);
126
127 if (!is_running_.load())
128 {
129 return ok();
130 }
131
132 is_running_.store(false);
133
134 // Stop retransmission timer
136
137 // Stop underlying UDP client
138 if (udp_client_)
139 {
140 (void)udp_client_->stop_client();
141 udp_client_.reset();
142 }
143
144 // Clear pending packets
145 {
146 std::lock_guard<std::mutex> pending_lock(pending_mutex_);
147 pending_packets_.clear();
148 }
149
150 // Clear receive buffer
151 {
152 std::lock_guard<std::mutex> recv_lock(receive_mutex_);
153 receive_buffer_.clear();
154 }
155
156 NETWORK_LOG_INFO("[reliable_udp_client::" + client_id_ + "] Stopped");
157 return ok();
158 }
159
160 auto send_packet(std::vector<uint8_t>&& data) -> VoidResult
161 {
162 if (!is_running_.load())
163 {
165 "Client is not running");
166 }
167
168 switch (mode_)
169 {
171 return send_unreliable(std::move(data));
172
175 return send_reliable(std::move(data));
176
178 return send_sequenced(std::move(data));
179 }
180
181 return error_void(error_codes::common_errors::internal_error, "Invalid reliability mode");
182 }
183
184 auto wait_for_stop() -> void
185 {
186 if (udp_client_)
187 {
188 udp_client_->wait_for_stop();
189 }
190 }
191
192 auto set_receive_callback(std::function<void(const std::vector<uint8_t>&)> callback)
193 -> void
194 {
195 std::lock_guard<std::mutex> lock(callback_mutex_);
196 receive_callback_ = std::move(callback);
197 }
198
199 auto set_error_callback(std::function<void(std::error_code)> callback) -> void
200 {
201 std::lock_guard<std::mutex> lock(callback_mutex_);
202 error_callback_ = std::move(callback);
203 }
204
205 auto set_congestion_window(size_t packets) -> void { congestion_window_ = packets; }
206
207 auto set_max_retries(size_t retries) -> void { max_retries_ = retries; }
208
209 auto set_retransmission_timeout(uint32_t timeout_ms) -> void
210 {
211 retransmission_timeout_ms_ = timeout_ms;
212 }
213
215 {
216 std::lock_guard<std::mutex> lock(stats_mutex_);
217 return stats_;
218 }
219
220 auto is_running() const -> bool { return is_running_.load(); }
221
222 auto client_id() const -> const std::string& { return client_id_; }
223
224 auto mode() const -> reliability_mode { return mode_; }
225
226 auto shared_from_this() -> std::shared_ptr<impl>
227 {
228 // This is called from impl, not from reliable_udp_client
229 // We need to be careful about the shared_ptr lifetime
230 return std::shared_ptr<impl>(this, [](impl*) {
231 // No-op deleter since we don't own the impl
232 });
233 }
234
235 private:
236 // Unreliable send (pure UDP)
237 auto send_unreliable(std::vector<uint8_t>&& data) -> VoidResult
238 {
239 packet_header header{};
240 header.sequence_number = 0;
241 header.ack_number = 0;
242 header.flags = packet_header::FLAG_DATA;
243 header.data_length = static_cast<uint16_t>(data.size());
244
245 auto packet = create_packet(header, data);
246
247 return udp_client_->send(
248 std::move(packet), [this](std::error_code ec, std::size_t) {
249 if (!ec)
250 {
251 std::lock_guard<std::mutex> lock(stats_mutex_);
253 }
254 });
255 }
256
257 // Reliable send (with ACK and retransmission)
258 auto send_reliable(std::vector<uint8_t>&& data) -> VoidResult
259 {
260 std::lock_guard<std::mutex> lock(pending_mutex_);
261
262 // Check congestion window
264 {
266 "Congestion window full");
267 }
268
269 uint32_t seq = next_sequence_++;
270
271 packet_header header{};
272 header.sequence_number = seq;
273 header.ack_number = 0;
274 header.flags = packet_header::FLAG_DATA;
275 header.data_length = static_cast<uint16_t>(data.size());
276
277 auto packet = create_packet(header, data);
278
279 // Store for retransmission
280 packet_info info{};
281 info.data = packet;
282 info.send_time = std::chrono::steady_clock::now();
283 info.retransmit_count = 0;
284 info.sequence_number = seq;
285 pending_packets_[seq] = std::move(info);
286
287 // Send packet
288 return udp_client_->send(
289 std::move(packet), [this, seq](std::error_code ec, std::size_t) {
290 if (!ec)
291 {
292 std::lock_guard<std::mutex> lock(stats_mutex_);
294 }
295 else
296 {
297 // Remove from pending if send failed
298 std::lock_guard<std::mutex> pending_lock(pending_mutex_);
299 pending_packets_.erase(seq);
300 }
301 });
302 }
303
304 // Sequenced send (drop old packets)
305 auto send_sequenced(std::vector<uint8_t>&& data) -> VoidResult
306 {
307 uint32_t seq = next_sequence_++;
308
309 packet_header header{};
310 header.sequence_number = seq;
311 header.ack_number = 0;
312 header.flags = packet_header::FLAG_DATA;
313 header.data_length = static_cast<uint16_t>(data.size());
314
315 auto packet = create_packet(header, data);
316
317 return udp_client_->send(
318 std::move(packet), [this](std::error_code ec, std::size_t) {
319 if (!ec)
320 {
321 std::lock_guard<std::mutex> lock(stats_mutex_);
323 }
324 });
325 }
326
327 // Create packet with header + payload
328 auto create_packet(const packet_header& header, const std::vector<uint8_t>& payload)
329 -> std::vector<uint8_t>
330 {
331 std::vector<uint8_t> packet(sizeof(packet_header) + payload.size());
332 std::memcpy(packet.data(), &header, sizeof(packet_header));
333 if (!payload.empty())
334 {
335 std::memcpy(packet.data() + sizeof(packet_header), payload.data(),
336 payload.size());
337 }
338 return packet;
339 }
340
341 // Handle received packet
342 auto handle_received_packet(const std::vector<uint8_t>& data) -> void
343 {
344 if (data.size() < sizeof(packet_header))
345 {
346 NETWORK_LOG_WARN("[reliable_udp_client::" + client_id_ +
347 "] Received invalid packet (too small)");
348 return;
349 }
350
351 packet_header header{};
352 std::memcpy(&header, data.data(), sizeof(packet_header));
353
354 // Handle ACK packet
355 if (header.flags & packet_header::FLAG_ACK)
356 {
357 handle_ack(header.ack_number);
358 return;
359 }
360
361 // Handle data packet
362 if (header.flags & packet_header::FLAG_DATA)
363 {
364 // Send ACK for reliable modes
367 {
368 send_ack(header.sequence_number);
369 }
370
371 // Extract payload
372 std::vector<uint8_t> payload(data.begin() + sizeof(packet_header), data.end());
373
374 // Process based on mode
375 switch (mode_)
376 {
378 deliver_to_application(std::move(payload));
379 break;
380
382 handle_ordered_delivery(header.sequence_number, std::move(payload));
383 break;
384
386 deliver_to_application(std::move(payload));
387 break;
388
390 handle_sequenced_delivery(header.sequence_number, std::move(payload));
391 break;
392 }
393
394 {
395 std::lock_guard<std::mutex> lock(stats_mutex_);
397 }
398 }
399 }
400
401 // Handle ACK
402 auto handle_ack(uint32_t ack_number) -> void
403 {
404 std::lock_guard<std::mutex> lock(pending_mutex_);
405
406 auto it = pending_packets_.find(ack_number);
407 if (it != pending_packets_.end())
408 {
409 // Calculate RTT
410 auto now = std::chrono::steady_clock::now();
411 auto rtt = std::chrono::duration_cast<std::chrono::milliseconds>(
412 now - it->second.send_time)
413 .count();
414
415 // Update stats
416 {
417 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
419
420 // Update average RTT (exponential moving average)
421 if (stats_.average_rtt_ms == 0.0)
422 {
423 stats_.average_rtt_ms = static_cast<double>(rtt);
424 }
425 else
426 {
427 stats_.average_rtt_ms = 0.875 * stats_.average_rtt_ms + 0.125 * rtt;
428 }
429 }
430
431 // Remove from pending
432 pending_packets_.erase(it);
433
434 NETWORK_LOG_TRACE("[reliable_udp_client::" + client_id_ + "] Received ACK for seq=" +
435 std::to_string(ack_number) + ", RTT=" + std::to_string(rtt) + "ms");
436 }
437 }
438
439 // Send ACK
440 auto send_ack(uint32_t sequence_number) -> void
441 {
442 packet_header header{};
443 header.sequence_number = 0;
444 header.ack_number = sequence_number;
445 header.flags = packet_header::FLAG_ACK;
446 header.data_length = 0;
447
448 auto packet = create_packet(header, {});
449
450 udp_client_->send(std::move(packet), [this](std::error_code ec, std::size_t) {
451 if (!ec)
452 {
453 std::lock_guard<std::mutex> lock(stats_mutex_);
455 }
456 });
457 }
458
459 // Handle ordered delivery
460 auto handle_ordered_delivery(uint32_t sequence_number, std::vector<uint8_t>&& payload)
461 -> void
462 {
463 std::lock_guard<std::mutex> lock(receive_mutex_);
464
465 if (sequence_number == expected_sequence_)
466 {
467 // Deliver in-order packet
468 deliver_to_application(std::move(payload));
470
471 // Deliver any buffered packets that are now in order
472 while (true)
473 {
474 auto it = receive_buffer_.find(expected_sequence_);
475 if (it == receive_buffer_.end())
476 {
477 break;
478 }
479
480 deliver_to_application(std::move(it->second));
481 receive_buffer_.erase(it);
483 }
484 }
485 else if (sequence_number > expected_sequence_)
486 {
487 // Buffer out-of-order packet
488 receive_buffer_[sequence_number] = std::move(payload);
489
490 NETWORK_LOG_TRACE("[reliable_udp_client::" + client_id_ +
491 "] Buffered out-of-order packet seq=" +
492 std::to_string(sequence_number) +
493 " (expected=" + std::to_string(expected_sequence_) + ")");
494 }
495 else
496 {
497 // Duplicate or old packet, ignore
498 NETWORK_LOG_TRACE("[reliable_udp_client::" + client_id_ +
499 "] Dropped old packet seq=" + std::to_string(sequence_number));
500 }
501 }
502
503 // Handle sequenced delivery
504 auto handle_sequenced_delivery(uint32_t sequence_number, std::vector<uint8_t>&& payload)
505 -> void
506 {
507 if (sequence_number >= expected_sequence_)
508 {
509 expected_sequence_ = sequence_number + 1;
510 deliver_to_application(std::move(payload));
511 }
512 else
513 {
514 // Drop old packet
515 std::lock_guard<std::mutex> lock(stats_mutex_);
517
518 NETWORK_LOG_TRACE("[reliable_udp_client::" + client_id_ +
519 "] Dropped old packet in sequenced mode seq=" +
520 std::to_string(sequence_number));
521 }
522 }
523
524 // Deliver packet to application
525 auto deliver_to_application(std::vector<uint8_t>&& payload) -> void
526 {
527 std::lock_guard<std::mutex> lock(callback_mutex_);
529 {
530 receive_callback_(payload);
531 }
532 }
533
534 // Handle error
535 auto handle_error(std::error_code ec) -> void
536 {
537 NETWORK_LOG_ERROR("[reliable_udp_client::" + client_id_ + "] Error: " + ec.message());
538
539 std::lock_guard<std::mutex> lock(callback_mutex_);
540 if (error_callback_)
541 {
542 error_callback_(ec);
543 }
544 }
545
546 // Start retransmission timer
548 {
550 if (!thread_pool_) {
551 NETWORK_LOG_ERROR("[reliable_udp_client::" + client_id_ +
552 "] Failed to get thread pool for retransmission timer");
553 return;
554 }
555
556 stop_retransmission_.store(false);
557 retransmission_future_ = thread_pool_->submit([this]() {
558 while (is_running_.load() && !stop_retransmission_.load())
559 {
560 std::this_thread::sleep_for(
561 std::chrono::milliseconds(retransmission_timeout_ms_));
562 if (!stop_retransmission_.load()) {
564 }
565 }
566 });
567 }
568
569 // Stop retransmission timer
571 {
572 stop_retransmission_.store(true);
573 if (retransmission_future_.valid())
574 {
575 try {
577 } catch (...) {
578 NETWORK_LOG_ERROR("[reliable_udp_client::" + client_id_ +
579 "] Exception while waiting for retransmission timer to stop");
580 }
581 }
582 thread_pool_.reset();
583 }
584
585 // Check for packets needing retransmission
586 auto check_and_retransmit() -> void
587 {
588 std::lock_guard<std::mutex> lock(pending_mutex_);
589
590 auto now = std::chrono::steady_clock::now();
591
592 for (auto it = pending_packets_.begin(); it != pending_packets_.end();)
593 {
594 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
595 now - it->second.send_time)
596 .count();
597
598 if (elapsed >= retransmission_timeout_ms_)
599 {
600 // Check max retries
601 if (it->second.retransmit_count >= max_retries_)
602 {
603 NETWORK_LOG_WARN("[reliable_udp_client::" + client_id_ +
604 "] Packet seq=" + std::to_string(it->first) +
605 " exceeded max retries, dropping");
606
607 {
608 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
610 }
611
612 it = pending_packets_.erase(it);
613 continue;
614 }
615
616 // Retransmit
617 auto packet_copy = it->second.data;
618 it->second.send_time = now;
619 it->second.retransmit_count++;
620
621 {
622 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
624 }
625
626 NETWORK_LOG_TRACE("[reliable_udp_client::" + client_id_ +
627 "] Retransmitting seq=" + std::to_string(it->first) +
628 " (attempt " + std::to_string(it->second.retransmit_count) +
629 ")");
630
631 udp_client_->send(std::move(packet_copy),
632 [](std::error_code, std::size_t) {});
633 }
634
635 ++it;
636 }
637 }
638
639 private:
640 std::string client_id_;
642
643 std::atomic<bool> is_running_{false};
644 std::shared_ptr<messaging_udp_client> udp_client_;
645
646 // Sequence numbers
647 std::atomic<uint32_t> next_sequence_{1};
648 std::atomic<uint32_t> expected_sequence_{1};
649
650 // Pending packets (waiting for ACK)
651 mutable std::mutex pending_mutex_;
652 std::map<uint32_t, packet_info> pending_packets_;
653
654 // Receive buffer (for ordered delivery)
655 mutable std::mutex receive_mutex_;
656 std::map<uint32_t, std::vector<uint8_t>> receive_buffer_;
657
658 // Callbacks
659 mutable std::mutex callback_mutex_;
660 std::function<void(const std::vector<uint8_t>&)> receive_callback_;
661 std::function<void(std::error_code)> error_callback_;
662
663 // Configuration
665 size_t max_retries_{5};
667
668 // Statistics
669 mutable std::mutex stats_mutex_;
671
672 // Retransmission timer
673 std::shared_ptr<integration::thread_pool_interface> thread_pool_;
674 std::future<void> retransmission_future_;
675 std::atomic<bool> stop_retransmission_{false};
676
677 // State mutex
678 std::mutex state_mutex_;
679 };
680
681 // reliable_udp_client implementation
682
684 : pimpl_(std::make_unique<impl>(client_id, mode))
685 {
686 }
687
689
690 auto reliable_udp_client::start_client(std::string_view host, uint16_t port) -> VoidResult
691 {
692 return pimpl_->start_client(host, port);
693 }
694
695 auto reliable_udp_client::stop_client() -> VoidResult { return pimpl_->stop_client(); }
696
697 auto reliable_udp_client::send_packet(std::vector<uint8_t>&& data) -> VoidResult
698 {
699 return pimpl_->send_packet(std::move(data));
700 }
701
702 auto reliable_udp_client::wait_for_stop() -> void { pimpl_->wait_for_stop(); }
703
705 std::function<void(const std::vector<uint8_t>&)> callback) -> void
706 {
707 pimpl_->set_receive_callback(std::move(callback));
708 }
709
710 auto reliable_udp_client::set_error_callback(std::function<void(std::error_code)> callback)
711 -> void
712 {
713 pimpl_->set_error_callback(std::move(callback));
714 }
715
717 {
718 pimpl_->set_congestion_window(packets);
719 }
720
721 auto reliable_udp_client::set_max_retries(size_t retries) -> void
722 {
723 pimpl_->set_max_retries(retries);
724 }
725
726 auto reliable_udp_client::set_retransmission_timeout(uint32_t timeout_ms) -> void
727 {
728 pimpl_->set_retransmission_timeout(timeout_ms);
729 }
730
732 {
733 return pimpl_->get_stats();
734 }
735
736 auto reliable_udp_client::is_running() const -> bool { return pimpl_->is_running(); }
737
738 auto reliable_udp_client::client_id() const -> const std::string&
739 {
740 return pimpl_->client_id();
741 }
742
744
745} // namespace kcenon::network::core
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
auto create_packet(const packet_header &header, const std::vector< uint8_t > &payload) -> std::vector< uint8_t >
auto send_ack(uint32_t sequence_number) -> void
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
auto send_unreliable(std::vector< uint8_t > &&data) -> VoidResult
std::shared_ptr< messaging_udp_client > udp_client_
std::function< void(const std::vector< uint8_t > &)> receive_callback_
std::function< void(std::error_code)> error_callback_
auto handle_ordered_delivery(uint32_t sequence_number, std::vector< uint8_t > &&payload) -> void
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
auto start_client(std::string_view host, uint16_t port) -> VoidResult
std::map< uint32_t, std::vector< uint8_t > > receive_buffer_
auto send_reliable(std::vector< uint8_t > &&data) -> VoidResult
auto set_retransmission_timeout(uint32_t timeout_ms) -> void
std::shared_ptr< integration::thread_pool_interface > thread_pool_
impl(std::string_view client_id, reliability_mode mode)
auto handle_sequenced_delivery(uint32_t sequence_number, std::vector< uint8_t > &&payload) -> void
auto send_sequenced(std::vector< uint8_t > &&data) -> VoidResult
auto handle_received_packet(const std::vector< uint8_t > &data) -> void
auto deliver_to_application(std::vector< uint8_t > &&payload) -> void
A UDP client with optional reliability layer for configurable delivery guarantees.
Definition udp.cppm:356
auto get_stats() const -> reliable_udp_stats
Returns current connection statistics.
reliable_udp_client(std::string_view client_id, reliability_mode mode=reliability_mode::reliable_ordered)
Constructs a reliable UDP client with specified mode.
auto set_max_retries(size_t retries) -> void
Sets maximum retransmission attempts before giving up.
auto wait_for_stop() -> void
Blocks until the client is stopped.
auto mode() const -> reliability_mode
Returns the current reliability mode.
~reliable_udp_client() noexcept
Destructor. Automatically stops the client if running.
auto send_packet(std::vector< uint8_t > &&data) -> VoidResult
Sends a packet with reliability handling based on mode.
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Sets callback for received data.
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
Sets callback for connection errors.
auto stop_client() -> VoidResult
Stops the client and releases resources.
auto set_congestion_window(size_t packets) -> void
Sets the congestion window size (maximum unacknowledged packets).
auto client_id() const -> const std::string &
Returns the client identifier.
auto set_retransmission_timeout(uint32_t timeout_ms) -> void
Sets retransmission timeout in milliseconds.
auto is_running() const -> bool
Checks if client is currently running.
Logger system integration interface for network_system.
#define NETWORK_LOG_TRACE(msg)
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
UDP client class.
reliability_mode
Defines the reliability level for UDP packet transmission.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
Global context for shared network system resources.
auto serialize() const -> std::vector< uint8_t >
static auto deserialize(const std::vector< uint8_t > &buffer) -> packet_header
std::chrono::steady_clock::time_point send_time
Statistics for monitoring reliable UDP connection performance.