Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
stream.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <cstring>
9
11{
12
13stream::stream(uint64_t id, bool is_local, uint64_t initial_max_data)
14 : id_{id}
15 , is_local_{is_local}
16 , max_send_offset_{initial_max_data}
17 , max_recv_offset_{initial_max_data}
18 , recv_window_size_{initial_max_data}
19{
20}
21
22// ============================================================================
23// Send Side
24// ============================================================================
25
26auto stream::can_send() const noexcept -> bool
27{
28 if (is_unidirectional() && !is_local_) {
29 // Cannot send on peer-initiated unidirectional stream
30 return false;
31 }
32
33 switch (send_state_) {
36 return true;
37 default:
38 return false;
39 }
40}
41
42auto stream::write(std::span<const uint8_t> data) -> Result<size_t>
43{
44 if (!can_send()) {
46 "Stream cannot send data in current state",
47 "quic::stream");
48 }
49
50 if (fin_sent_) {
52 "Cannot write after FIN sent",
53 "quic::stream");
54 }
55
56 // Check flow control
57 const auto available = available_send_window();
58 if (available == 0) {
60 "Send window exhausted",
61 "quic::stream");
62 }
63
64 // Write up to available window
65 size_t to_write = std::min(data.size(), available);
66 send_buffer_.insert(send_buffer_.end(), data.begin(), data.begin() + static_cast<std::ptrdiff_t>(to_write));
67
68 // Update state
69 if (send_state_ == send_stream_state::ready && !send_buffer_.empty()) {
70 send_state_ = send_stream_state::send;
71 }
72
73 return ok(std::move(to_write));
74}
75
77{
78 if (!can_send()) {
80 "Stream cannot be finished in current state",
81 "quic::stream");
82 }
83
84 if (fin_sent_) {
86 "FIN already sent",
87 "quic::stream");
88 }
89
90 fin_sent_ = true;
91 update_send_state();
92 return ok();
93}
94
95auto stream::reset(uint64_t error_code) -> VoidResult
96{
97 if (send_state_ == send_stream_state::reset_sent ||
98 send_state_ == send_stream_state::reset_recvd ||
99 send_state_ == send_stream_state::data_recvd) {
101 "Stream already in terminal state",
102 "quic::stream");
103 }
104
105 reset_error_code_ = error_code;
106 send_state_ = send_stream_state::reset_sent;
107 send_buffer_.clear();
108 return ok();
109}
110
111auto stream::next_stream_frame(size_t max_size) -> std::optional<stream_frame>
112{
113 if (send_buffer_.empty() && !fin_sent_) {
114 return std::nullopt;
115 }
116
117 // Check if we have pending data or need to send FIN
118 if (send_buffer_.empty() && fin_sent_ && !fin_acked_) {
119 // Send FIN-only frame
121 frame.stream_id = id_;
122 frame.offset = send_offset_;
123 frame.fin = true;
124 return frame;
125 }
126
127 if (send_buffer_.empty()) {
128 return std::nullopt;
129 }
130
131 // Calculate how much we can send (considering flow control)
132 const auto window_available = available_send_window();
133 if (window_available == 0) {
134 return std::nullopt;
135 }
136
137 // Reserve space for frame header (stream_id + offset + length as varints)
138 // Maximum varint size is 8 bytes each, plus 1 byte for frame type
139 constexpr size_t max_header_size = 1 + 8 + 8 + 8;
140 if (max_size <= max_header_size) {
141 return std::nullopt;
142 }
143
144 const size_t max_payload = max_size - max_header_size;
145 const size_t to_send = std::min({send_buffer_.size(), window_available, max_payload});
146
148 frame.stream_id = id_;
149 frame.offset = send_offset_;
150 frame.data.reserve(to_send);
151
152 // Copy data from buffer
153 for (size_t i = 0; i < to_send; ++i) {
154 frame.data.push_back(send_buffer_[i]);
155 }
156
157 // Remove sent data from buffer
158 send_buffer_.erase(send_buffer_.begin(), send_buffer_.begin() + to_send);
159 send_offset_ += to_send;
160
161 // Set FIN if this is the last data and FIN was requested
162 if (fin_sent_ && send_buffer_.empty()) {
163 frame.fin = true;
164 }
165
166 return frame;
167}
168
169void stream::acknowledge_data(uint64_t offset, uint64_t length)
170{
171 // Track acknowledgments
172 if (offset + length > acked_offset_) {
173 // Simple implementation: assume in-order ACKs
174 acked_offset_ = offset + length;
175 }
176
177 // Check if FIN is acknowledged
178 if (fin_sent_ && send_buffer_.empty() && acked_offset_ >= send_offset_) {
179 fin_acked_ = true;
180 }
181
183}
184
185// ============================================================================
186// Receive Side
187// ============================================================================
188
189auto stream::has_data() const noexcept -> bool
190{
191 return !recv_ready_.empty();
192}
193
194auto stream::read(std::span<uint8_t> buffer) -> Result<size_t>
195{
196 if (is_unidirectional() && is_local_) {
197 // Cannot receive on locally-initiated unidirectional stream
199 "Cannot read from local unidirectional stream",
200 "quic::stream");
201 }
202
203 if (recv_state_ == recv_stream_state::reset_recvd) {
205 "Stream was reset by peer",
206 "quic::stream");
207 }
208
209 if (recv_ready_.empty()) {
210 if (recv_fin_ && recv_buffer_.empty()) {
211 // All data has been read and FIN was received
212 recv_state_ = recv_stream_state::data_read;
213 return ok(static_cast<size_t>(0));
214 }
215 return ok(static_cast<size_t>(0));
216 }
217
218 size_t to_read = std::min(buffer.size(), recv_ready_.size());
219 for (size_t i = 0; i < to_read; ++i) {
220 buffer[i] = recv_ready_[i];
221 }
222
223 recv_ready_.erase(recv_ready_.begin(), recv_ready_.begin() + static_cast<std::ptrdiff_t>(to_read));
224
225 // Update receive state
226 update_recv_state();
227
228 return ok(std::move(to_read));
229}
230
231auto stream::stop_sending(uint64_t error_code) -> VoidResult
232{
233 if (is_unidirectional() && is_local_) {
235 "Cannot stop sending on local unidirectional stream",
236 "quic::stream");
237 }
238
239 stop_sending_error_code_ = error_code;
240 return ok();
241}
242
243auto stream::receive_data(uint64_t offset, std::span<const uint8_t> data, bool fin)
244 -> VoidResult
245{
246 if (is_unidirectional() && is_local_) {
248 "Cannot receive on local unidirectional stream",
249 "quic::stream");
250 }
251
252 if (recv_state_ == recv_stream_state::reset_recvd ||
253 recv_state_ == recv_stream_state::reset_read) {
255 "Stream was reset",
256 "quic::stream");
257 }
258
259 // Check for final size violations
260 if (final_size_.has_value()) {
261 if (offset + data.size() > *final_size_) {
263 "Data exceeds final size",
264 "quic::stream");
265 }
266 }
267
268 if (fin) {
269 const uint64_t new_final_size = offset + data.size();
270 if (final_size_.has_value() && *final_size_ != new_final_size) {
272 "Final size changed",
273 "quic::stream");
274 }
275 final_size_ = new_final_size;
276 recv_fin_ = true;
277 }
278
279 // Check flow control
280 if (offset + data.size() > max_recv_offset_) {
282 "Received data exceeds flow control limit",
283 "quic::stream");
284 }
285
286 // Handle data placement
287 if (offset < recv_offset_) {
288 // Overlapping retransmission
289 const size_t overlap = static_cast<size_t>(recv_offset_ - offset);
290 if (overlap >= data.size()) {
291 // Entirely duplicate, ignore
292 update_recv_state();
293 return ok();
294 }
295 // Trim leading overlap
296 offset = recv_offset_;
297 data = data.subspan(overlap);
298 }
299
300 if (offset == recv_offset_) {
301 // Contiguous data - append directly
302 recv_ready_.insert(recv_ready_.end(), data.begin(), data.end());
303 recv_offset_ += data.size();
304
305 // Try to reassemble any buffered data
306 reassemble_data();
307 } else {
308 // Gap in data - buffer for later
309 recv_buffer_[offset] = std::vector<uint8_t>(data.begin(), data.end());
310 }
311
312 update_recv_state();
313 return ok();
314}
315
316auto stream::receive_reset(uint64_t error_code, uint64_t final_size) -> VoidResult
317{
318 if (is_unidirectional() && is_local_) {
320 "Cannot receive reset on local unidirectional stream",
321 "quic::stream");
322 }
323
324 // Check final size consistency
325 if (final_size_.has_value() && *final_size_ != final_size) {
327 "Reset final size differs from previously received",
328 "quic::stream");
329 }
330
331 final_size_ = final_size;
332 reset_error_code_ = error_code;
333 recv_state_ = recv_stream_state::reset_recvd;
334
335 // Clear buffered data
336 recv_buffer_.clear();
337
338 return ok();
339}
340
341auto stream::receive_stop_sending(uint64_t error_code) -> VoidResult
342{
343 if (!can_send()) {
345 "Cannot handle STOP_SENDING in current state",
346 "quic::stream");
347 }
348
349 stop_sending_error_code_ = error_code;
350 // The sender SHOULD reset the stream in response
351 return ok();
352}
353
354// ============================================================================
355// Flow Control
356// ============================================================================
357
359{
360 if (max > max_send_offset_) {
361 max_send_offset_ = max;
362 }
363}
364
365auto stream::available_send_window() const noexcept -> size_t
366{
367 const uint64_t sent = send_offset_ + send_buffer_.size();
368 if (sent >= max_send_offset_) {
369 return 0;
370 }
371 return static_cast<size_t>(max_send_offset_ - sent);
372}
373
375{
376 if (max > max_recv_offset_) {
377 max_recv_offset_ = max;
378 }
379}
380
381auto stream::should_send_max_stream_data() const noexcept -> bool
382{
383 // Send update when we've consumed more than threshold of the window
384 const uint64_t consumed = recv_offset_;
385 const uint64_t threshold = static_cast<uint64_t>(
387
388 return consumed > (max_recv_offset_ - recv_window_size_ + threshold);
389}
390
391auto stream::generate_max_stream_data() -> std::optional<uint64_t>
392{
393 if (!should_send_max_stream_data()) {
394 return std::nullopt;
395 }
396
397 // Increase the window
398 max_recv_offset_ = recv_offset_ + recv_window_size_;
399 return max_recv_offset_;
400}
401
402// ============================================================================
403// Internal Helpers
404// ============================================================================
405
407{
408 while (!recv_buffer_.empty()) {
409 auto it = recv_buffer_.find(recv_offset_);
410 if (it == recv_buffer_.end()) {
411 break; // Gap in data
412 }
413
414 recv_ready_.insert(recv_ready_.end(), it->second.begin(), it->second.end());
415 recv_offset_ += it->second.size();
416 recv_buffer_.erase(it);
417 }
418}
419
421{
422 switch (send_state_) {
424 if (!send_buffer_.empty()) {
426 }
427 break;
428
430 if (fin_sent_ && send_buffer_.empty()) {
432 }
433 break;
434
436 if (fin_acked_) {
438 }
439 break;
440
442 // Wait for acknowledgment
443 break;
444
447 // Terminal states
448 break;
449 }
450}
451
453{
454 switch (recv_state_) {
456 if (recv_fin_) {
458 }
459 break;
460
462 if (final_size_.has_value() && recv_offset_ >= *final_size_) {
464 }
465 break;
466
468 if (recv_ready_.empty()) {
470 }
471 break;
472
476 // Terminal states
477 break;
478 }
479}
480
481// ============================================================================
482// State String Conversion
483// ============================================================================
484
485auto send_state_to_string(send_stream_state state) -> const char*
486{
487 switch (state) {
488 case send_stream_state::ready: return "ready";
489 case send_stream_state::send: return "send";
490 case send_stream_state::data_sent: return "data_sent";
491 case send_stream_state::reset_sent: return "reset_sent";
492 case send_stream_state::reset_recvd: return "reset_recvd";
493 case send_stream_state::data_recvd: return "data_recvd";
494 default: return "unknown";
495 }
496}
497
498auto recv_state_to_string(recv_stream_state state) -> const char*
499{
500 switch (state) {
501 case recv_stream_state::recv: return "recv";
502 case recv_stream_state::size_known: return "size_known";
503 case recv_stream_state::data_recvd: return "data_recvd";
504 case recv_stream_state::reset_recvd: return "reset_recvd";
505 case recv_stream_state::data_read: return "data_read";
506 case recv_stream_state::reset_read: return "reset_read";
507 default: return "unknown";
508 }
509}
510
511} // namespace kcenon::network::protocols::quic
auto should_send_max_stream_data() const noexcept -> bool
Check if MAX_STREAM_DATA frame should be sent.
Definition stream.cpp:381
auto finish() -> VoidResult
Mark stream as finished (send FIN)
Definition stream.cpp:76
std::optional< uint64_t > final_size_
Definition stream.h:398
auto stop_sending(uint64_t error_code) -> VoidResult
Signal that incoming data is no longer wanted.
Definition stream.cpp:231
auto generate_max_stream_data() -> std::optional< uint64_t >
Generate MAX_STREAM_DATA frame if needed.
Definition stream.cpp:391
auto receive_reset(uint64_t error_code, uint64_t final_size) -> VoidResult
Handle received RESET_STREAM frame.
Definition stream.cpp:316
void set_max_recv_data(uint64_t max)
Update our MAX_STREAM_DATA (peer's send limit)
Definition stream.cpp:374
auto is_unidirectional() const noexcept -> bool
Check if stream is unidirectional.
Definition stream.h:184
std::map< uint64_t, std::vector< uint8_t > > recv_buffer_
Definition stream.h:394
auto receive_data(uint64_t offset, std::span< const uint8_t > data, bool fin) -> VoidResult
Receive STREAM frame data.
Definition stream.cpp:243
auto has_data() const noexcept -> bool
Check if stream has data to read.
Definition stream.cpp:189
auto can_send() const noexcept -> bool
Check if stream can send data.
Definition stream.cpp:26
stream(uint64_t id, bool is_local, uint64_t initial_max_data=65536)
Construct a stream.
Definition stream.cpp:13
auto write(std::span< const uint8_t > data) -> Result< size_t >
Write data to stream.
Definition stream.cpp:42
void set_max_send_data(uint64_t max)
Set peer's MAX_STREAM_DATA (our send limit)
Definition stream.cpp:358
auto reset(uint64_t error_code) -> VoidResult
Reset the stream with error code.
Definition stream.cpp:95
auto available_send_window() const noexcept -> size_t
Get available send window.
Definition stream.cpp:365
std::deque< uint8_t > recv_ready_
Definition stream.h:395
void acknowledge_data(uint64_t offset, uint64_t length)
Acknowledge sent data.
Definition stream.cpp:169
std::deque< uint8_t > send_buffer_
Definition stream.h:386
static constexpr double window_update_threshold_
Definition stream.h:406
auto next_stream_frame(size_t max_size) -> std::optional< stream_frame >
Get next STREAM frame to send.
Definition stream.cpp:111
auto receive_stop_sending(uint64_t error_code) -> VoidResult
Handle received STOP_SENDING frame.
Definition stream.cpp:341
auto read(std::span< uint8_t > buffer) -> Result< size_t >
Read data from stream.
Definition stream.cpp:194
auto recv_state_to_string(recv_stream_state state) -> const char *
Get string representation of receive stream state.
Definition stream.cpp:498
@ error
Black hole detected, reset to base.
recv_stream_state
Stream state for receiving (RFC 9000 Section 3.2)
Definition stream.h:115
@ size_known
FIN received, final size known.
@ data_read
All data read by application (terminal)
@ reset_read
Reset acknowledged by application (terminal)
auto send_state_to_string(send_stream_state state) -> const char *
Get string representation of send stream state.
Definition stream.cpp:485
std::variant< padding_frame, ping_frame, ack_frame, reset_stream_frame, stop_sending_frame, crypto_frame, new_token_frame, stream_frame, max_data_frame, max_stream_data_frame, max_streams_frame, data_blocked_frame, stream_data_blocked_frame, streams_blocked_frame, new_connection_id_frame, retire_connection_id_frame, path_challenge_frame, path_response_frame, connection_close_frame, handshake_done_frame > frame
Variant type holding any QUIC frame.
send_stream_state
Stream state for sending (RFC 9000 Section 3.1)
Definition stream.h:102
@ reset_recvd
Reset acknowledged by peer (terminal)
@ data_recvd
All data acknowledged (terminal)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
STREAM frame (RFC 9000 Section 19.8)