16 , max_send_offset_{initial_max_data}
17 , max_recv_offset_{initial_max_data}
18 , recv_window_size_{initial_max_data}
46 "Stream cannot send data in current state",
52 "Cannot write after FIN sent",
57 const auto available = available_send_window();
60 "Send window exhausted",
65 size_t to_write = std::min(data.size(), available);
66 send_buffer_.insert(send_buffer_.end(), data.begin(), data.begin() +
static_cast<std::ptrdiff_t
>(to_write));
73 return ok(std::move(to_write));
80 "Stream cannot be finished in current state",
101 "Stream already in terminal state",
105 reset_error_code_ = error_code;
107 send_buffer_.clear();
113 if (send_buffer_.empty() && !fin_sent_) {
118 if (send_buffer_.empty() && fin_sent_ && !fin_acked_) {
122 frame.offset = send_offset_;
127 if (send_buffer_.empty()) {
132 const auto window_available = available_send_window();
133 if (window_available == 0) {
139 constexpr size_t max_header_size = 1 + 8 + 8 + 8;
140 if (max_size <= max_header_size) {
144 const size_t max_payload = max_size - max_header_size;
145 const size_t to_send = std::min({send_buffer_.size(), window_available, max_payload});
149 frame.offset = send_offset_;
150 frame.data.reserve(to_send);
153 for (
size_t i = 0; i < to_send; ++i) {
154 frame.data.push_back(send_buffer_[i]);
158 send_buffer_.erase(send_buffer_.begin(), send_buffer_.begin() + to_send);
159 send_offset_ += to_send;
162 if (fin_sent_ && send_buffer_.empty()) {
196 if (is_unidirectional() && is_local_) {
199 "Cannot read from local unidirectional stream",
205 "Stream was reset by peer",
209 if (recv_ready_.empty()) {
210 if (recv_fin_ && recv_buffer_.empty()) {
213 return ok(
static_cast<size_t>(0));
215 return ok(
static_cast<size_t>(0));
218 size_t to_read = std::min(buffer.size(), recv_ready_.size());
219 for (
size_t i = 0; i < to_read; ++i) {
220 buffer[i] = recv_ready_[i];
223 recv_ready_.erase(recv_ready_.begin(), recv_ready_.begin() +
static_cast<std::ptrdiff_t
>(to_read));
228 return ok(std::move(to_read));
233 if (is_unidirectional() && is_local_) {
235 "Cannot stop sending on local unidirectional stream",
239 stop_sending_error_code_ = error_code;
246 if (is_unidirectional() && is_local_) {
248 "Cannot receive on local unidirectional stream",
260 if (final_size_.has_value()) {
261 if (offset + data.size() > *final_size_) {
263 "Data exceeds final size",
269 const uint64_t new_final_size = offset + data.size();
270 if (final_size_.has_value() && *final_size_ != new_final_size) {
272 "Final size changed",
275 final_size_ = new_final_size;
280 if (offset + data.size() > max_recv_offset_) {
282 "Received data exceeds flow control limit",
287 if (offset < recv_offset_) {
289 const size_t overlap =
static_cast<size_t>(recv_offset_ - offset);
290 if (overlap >= data.size()) {
296 offset = recv_offset_;
297 data = data.subspan(overlap);
300 if (offset == recv_offset_) {
302 recv_ready_.insert(recv_ready_.end(), data.begin(), data.end());
303 recv_offset_ += data.size();
309 recv_buffer_[offset] = std::vector<uint8_t>(data.begin(), data.end());
318 if (is_unidirectional() && is_local_) {
320 "Cannot receive reset on local unidirectional stream",
325 if (final_size_.has_value() && *final_size_ != final_size) {
327 "Reset final size differs from previously received",
331 final_size_ = final_size;
332 reset_error_code_ = error_code;
336 recv_buffer_.clear();
345 "Cannot handle STOP_SENDING in current state",
349 stop_sending_error_code_ = error_code;
385 const uint64_t threshold =
static_cast<uint64_t
>(
393 if (!should_send_max_stream_data()) {
398 max_recv_offset_ = recv_offset_ + recv_window_size_;
399 return max_recv_offset_;
494 default:
return "unknown";
507 default:
return "unknown";
auto should_send_max_stream_data() const noexcept -> bool
Check if MAX_STREAM_DATA frame should be sent.
auto finish() -> VoidResult
Mark stream as finished (send FIN)
std::optional< uint64_t > final_size_
auto stop_sending(uint64_t error_code) -> VoidResult
Signal that incoming data is no longer wanted.
auto generate_max_stream_data() -> std::optional< uint64_t >
Generate MAX_STREAM_DATA frame if needed.
auto receive_reset(uint64_t error_code, uint64_t final_size) -> VoidResult
Handle received RESET_STREAM frame.
uint64_t max_recv_offset_
void set_max_recv_data(uint64_t max)
Update our MAX_STREAM_DATA (peer's send limit)
auto is_unidirectional() const noexcept -> bool
Check if stream is unidirectional.
std::map< uint64_t, std::vector< uint8_t > > recv_buffer_
auto receive_data(uint64_t offset, std::span< const uint8_t > data, bool fin) -> VoidResult
Receive STREAM frame data.
auto has_data() const noexcept -> bool
Check if stream has data to read.
auto can_send() const noexcept -> bool
Check if stream can send data.
stream(uint64_t id, bool is_local, uint64_t initial_max_data=65536)
Construct a stream.
auto write(std::span< const uint8_t > data) -> Result< size_t >
Write data to stream.
void set_max_send_data(uint64_t max)
Set peer's MAX_STREAM_DATA (our send limit)
auto reset(uint64_t error_code) -> VoidResult
Reset the stream with error code.
auto available_send_window() const noexcept -> size_t
Get available send window.
uint64_t max_send_offset_
std::deque< uint8_t > recv_ready_
recv_stream_state recv_state_
void acknowledge_data(uint64_t offset, uint64_t length)
Acknowledge sent data.
std::deque< uint8_t > send_buffer_
static constexpr double window_update_threshold_
send_stream_state send_state_
uint64_t recv_window_size_
auto next_stream_frame(size_t max_size) -> std::optional< stream_frame >
Get next STREAM frame to send.
auto receive_stop_sending(uint64_t error_code) -> VoidResult
Handle received STOP_SENDING frame.
auto read(std::span< uint8_t > buffer) -> Result< size_t >
Read data from stream.
constexpr int stream_reset
constexpr int flow_control_error
constexpr int final_size_error
constexpr int stream_state_error
auto recv_state_to_string(recv_stream_state state) -> const char *
Get string representation of receive stream state.
@ error
Black hole detected, reset to base.
recv_stream_state
Stream state for receiving (RFC 9000 Section 3.2)
@ size_known
FIN received, final size known.
@ reset_recvd
RESET_STREAM received.
@ data_recvd
All data received.
@ data_read
All data read by application (terminal)
@ reset_read
Reset acknowledged by application (terminal)
auto send_state_to_string(send_stream_state state) -> const char *
Get string representation of send stream state.
std::variant< padding_frame, ping_frame, ack_frame, reset_stream_frame, stop_sending_frame, crypto_frame, new_token_frame, stream_frame, max_data_frame, max_stream_data_frame, max_streams_frame, data_blocked_frame, stream_data_blocked_frame, streams_blocked_frame, new_connection_id_frame, retire_connection_id_frame, path_challenge_frame, path_response_frame, connection_close_frame, handshake_done_frame > frame
Variant type holding any QUIC frame.
send_stream_state
Stream state for sending (RFC 9000 Section 3.1)
@ reset_sent
RESET_STREAM sent.
@ reset_recvd
Reset acknowledged by peer (terminal)
@ data_recvd
All data acknowledged (terminal)
@ ready
Stream is ready, can send data.
@ data_sent
All data sent, awaiting ACKs.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
STREAM frame (RFC 9000 Section 19.8)
uint64_t stream_id
Stream identifier.