11 : is_server_{is_server}
12 , initial_max_stream_data_{initial_max_stream_data}
13 , next_local_bidi_id_{is_server ? stream_id_type::server_bidi : stream_id_type::client_bidi}
14 , next_local_uni_id_{is_server ? stream_id_type::server_uni : stream_id_type::client_uni}
15 , peer_bidi_type_{is_server ? stream_id_type::client_bidi : stream_id_type::server_bidi}
16 , peer_uni_type_{is_server ? stream_id_type::client_uni : stream_id_type::server_uni}
26 if (!can_create_local_stream(
true)) {
28 "Bidirectional stream limit reached",
29 "quic::stream_manager");
32 std::unique_lock lock(streams_mutex_);
34 uint64_t stream_id = next_local_bidi_id_;
35 next_local_bidi_id_ += 4;
37 auto new_stream = std::make_unique<stream>(stream_id,
true, initial_max_stream_data_);
38 streams_[stream_id] = std::move(new_stream);
40 return ok(std::move(stream_id));
45 if (!can_create_local_stream(
false)) {
47 "Unidirectional stream limit reached",
48 "quic::stream_manager");
51 std::unique_lock lock(streams_mutex_);
53 uint64_t stream_id = next_local_uni_id_;
54 next_local_uni_id_ += 4;
56 auto new_stream = std::make_unique<stream>(stream_id,
true, initial_max_stream_data_);
57 streams_[stream_id] = std::move(new_stream);
59 return ok(std::move(stream_id));
68 std::shared_lock lock(streams_mutex_);
69 auto it = streams_.find(stream_id);
70 return (it != streams_.end()) ? it->second.get() :
nullptr;
75 std::shared_lock lock(streams_mutex_);
76 auto it = streams_.find(stream_id);
77 return (it != streams_.end()) ? it->second.get() :
nullptr;
84 std::shared_lock lock(streams_mutex_);
85 auto it = streams_.find(stream_id);
86 if (it != streams_.end()) {
87 return ok(it->second.get());
92 auto validation = validate_stream_id(stream_id);
93 if (validation.is_err()) {
95 validation.error().message,
96 "quic::stream_manager");
100 if (is_local_stream(stream_id)) {
102 "Local stream must be explicitly created",
103 "quic::stream_manager");
107 if (!can_accept_peer_stream(stream_id)) {
109 "Peer stream limit exceeded",
110 "quic::stream_manager");
116 std::unique_lock lock(streams_mutex_);
122 uint64_t first_id = type;
124 if (highest_peer_bidi_id_ > 0 || streams_.count(peer_bidi_type_) > 0) {
125 first_id = highest_peer_bidi_id_ + 4;
128 if (highest_peer_uni_id_ > 0 || streams_.count(peer_uni_type_) > 0) {
129 first_id = highest_peer_uni_id_ + 4;
134 for (uint64_t
id = first_id;
id <= stream_id;
id += 4) {
135 if (streams_.count(
id) == 0) {
136 auto new_stream = std::make_unique<stream>(
id,
false, initial_max_stream_data_);
137 streams_[id] = std::move(new_stream);
143 if (stream_id > highest_peer_bidi_id_ || highest_peer_bidi_id_ == 0) {
144 highest_peer_bidi_id_ = stream_id;
147 if (stream_id > highest_peer_uni_id_ || highest_peer_uni_id_ == 0) {
148 highest_peer_uni_id_ = stream_id;
152 return ok(streams_[stream_id].get());
157 std::shared_lock lock(streams_mutex_);
158 return streams_.count(stream_id) > 0;
164 std::vector<uint64_t> ids;
166 for (
const auto& [
id, _] :
streams_) {
208 std::shared_lock lock(streams_mutex_);
209 std::vector<stream*> result;
210 for (
auto& [_, s] : streams_) {
211 if (s->pending_bytes() > 0 || (s->fin_sent() && !s->is_fin_received())) {
212 result.push_back(s.get());
220 std::shared_lock lock(streams_mutex_);
221 std::vector<stream*> result;
222 for (
auto& [_, s] : streams_) {
223 if (s->should_send_max_stream_data()) {
224 result.push_back(s.get());
241 for (
const auto& [_, s] :
streams_) {
252 std::unique_lock lock(streams_mutex_);
255 for (
auto it = streams_.begin(); it != streams_.end();) {
256 const auto& s = it->second;
265 if (s->is_unidirectional()) {
269 it = streams_.erase(it);
276 it = streams_.erase(it);
283 if (send_terminal && recv_terminal) {
284 it = streams_.erase(it);
301 (void)s->reset(error_code);
327 if (is_local_stream(stream_id)) {
331 if (stream_id >= next_local_bidi_id_) {
333 "Local bidi stream ID too high",
334 "quic::stream_manager");
337 if (stream_id >= next_local_uni_id_) {
339 "Local uni stream ID too high",
340 "quic::stream_manager");
351 return is_server_initiated == is_server_;
358 const uint64_t current_count = next_local_bidi_id_ / 4;
359 return current_count < peer_max_streams_bidi_;
362 const uint64_t current_count = (next_local_uni_id_ - 2) / 4;
363 return current_count < peer_max_streams_uni_;
374 return stream_seq < local_max_streams_bidi_;
377 return stream_seq < local_max_streams_uni_;
auto remove_closed_streams() -> size_t
Remove closed/terminal streams.
auto stream_ids() const -> std::vector< uint64_t >
Get all active stream IDs.
auto has_stream(uint64_t stream_id) const -> bool
Check if a stream exists.
void close_all_streams(uint64_t error_code)
Close all streams with an error code.
uint64_t local_max_streams_bidi_
auto get_stream(uint64_t stream_id) -> stream *
Get a stream by ID.
auto create_bidirectional_stream() -> Result< uint64_t >
Create a new locally-initiated bidirectional stream.
uint64_t highest_peer_bidi_id_
auto streams_needing_flow_control_update() -> std::vector< stream * >
Get streams that need MAX_STREAM_DATA updates.
uint64_t peer_max_streams_uni_
stream_manager(bool is_server, uint64_t initial_max_stream_data=65536)
Construct a stream manager.
auto streams_with_pending_data() -> std::vector< stream * >
Get streams with pending data to send.
void set_peer_max_streams_uni(uint64_t max)
Set maximum unidirectional streams peer can initiate.
auto create_unidirectional_stream() -> Result< uint64_t >
Create a new locally-initiated unidirectional stream.
void set_local_max_streams_uni(uint64_t max)
Set our maximum unidirectional streams (advertised to peer)
std::shared_mutex streams_mutex_
uint64_t local_max_streams_uni_
auto can_accept_peer_stream(uint64_t stream_id) const -> bool
void set_local_max_streams_bidi(uint64_t max)
Set our maximum bidirectional streams (advertised to peer)
void reset()
Reset manager state (for connection close)
auto stream_count() const -> size_t
Get number of active streams.
uint64_t peer_max_streams_bidi_
void set_peer_max_streams_bidi(uint64_t max)
Set maximum bidirectional streams peer can initiate.
auto get_or_create_stream(uint64_t stream_id) -> Result< stream * >
Get or create a stream for a peer-initiated stream ID.
void for_each_stream(const std::function< void(stream &)> &callback)
Iterate over all streams.
auto is_local_stream(uint64_t stream_id) const noexcept -> bool
auto can_create_local_stream(bool bidirectional) const -> bool
auto validate_stream_id(uint64_t stream_id) const -> VoidResult
uint64_t next_local_uni_id_
std::map< uint64_t, std::unique_ptr< stream > > streams_
uint64_t highest_peer_uni_id_
uint64_t next_local_bidi_id_
QUIC stream implementation (RFC 9000 Sections 2-4)
constexpr int stream_limit_exceeded
constexpr int invalid_stream_id
constexpr int stream_not_found
constexpr uint64_t client_uni
constexpr auto is_server_initiated(uint64_t stream_id) noexcept -> bool
Check if stream is server-initiated.
constexpr uint64_t server_uni
constexpr auto is_bidirectional(uint64_t stream_id) noexcept -> bool
Check if stream is bidirectional.
constexpr auto get_type(uint64_t stream_id) noexcept -> uint64_t
Get stream type bits (0-3)
constexpr uint64_t client_bidi
constexpr uint64_t server_bidi
constexpr auto get_sequence(uint64_t stream_id) noexcept -> uint64_t
Get stream sequence number (stream_id >> 2)
@ error
Black hole detected, reset to base.
@ data_read
All data read by application (terminal)
@ reset_read
Reset acknowledged by application (terminal)
@ reset_recvd
Reset acknowledged by peer (terminal)
@ data_recvd
All data acknowledged (terminal)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")