Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
stream_manager.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
8{
9
10stream_manager::stream_manager(bool is_server, uint64_t initial_max_stream_data)
11 : is_server_{is_server}
12 , initial_max_stream_data_{initial_max_stream_data}
13 , next_local_bidi_id_{is_server ? stream_id_type::server_bidi : stream_id_type::client_bidi}
14 , next_local_uni_id_{is_server ? stream_id_type::server_uni : stream_id_type::client_uni}
15 , peer_bidi_type_{is_server ? stream_id_type::client_bidi : stream_id_type::server_bidi}
16 , peer_uni_type_{is_server ? stream_id_type::client_uni : stream_id_type::server_uni}
17{
18}
19
20// ============================================================================
21// Stream Creation
22// ============================================================================
23
25{
26 if (!can_create_local_stream(true)) {
28 "Bidirectional stream limit reached",
29 "quic::stream_manager");
30 }
31
32 std::unique_lock lock(streams_mutex_);
33
34 uint64_t stream_id = next_local_bidi_id_;
35 next_local_bidi_id_ += 4;
36
37 auto new_stream = std::make_unique<stream>(stream_id, true, initial_max_stream_data_);
38 streams_[stream_id] = std::move(new_stream);
39
40 return ok(std::move(stream_id));
41}
42
44{
45 if (!can_create_local_stream(false)) {
47 "Unidirectional stream limit reached",
48 "quic::stream_manager");
49 }
50
51 std::unique_lock lock(streams_mutex_);
52
53 uint64_t stream_id = next_local_uni_id_;
54 next_local_uni_id_ += 4;
55
56 auto new_stream = std::make_unique<stream>(stream_id, true, initial_max_stream_data_);
57 streams_[stream_id] = std::move(new_stream);
58
59 return ok(std::move(stream_id));
60}
61
62// ============================================================================
63// Stream Access
64// ============================================================================
65
66auto stream_manager::get_stream(uint64_t stream_id) -> stream*
67{
68 std::shared_lock lock(streams_mutex_);
69 auto it = streams_.find(stream_id);
70 return (it != streams_.end()) ? it->second.get() : nullptr;
71}
72
73auto stream_manager::get_stream(uint64_t stream_id) const -> const stream*
74{
75 std::shared_lock lock(streams_mutex_);
76 auto it = streams_.find(stream_id);
77 return (it != streams_.end()) ? it->second.get() : nullptr;
78}
79
81{
82 // First, try to get existing stream
83 {
84 std::shared_lock lock(streams_mutex_);
85 auto it = streams_.find(stream_id);
86 if (it != streams_.end()) {
87 return ok(it->second.get());
88 }
89 }
90
91 // Validate stream ID
92 auto validation = validate_stream_id(stream_id);
93 if (validation.is_err()) {
94 return error<stream*>(validation.error().code,
95 validation.error().message,
96 "quic::stream_manager");
97 }
98
99 // Check if this is a peer-initiated stream
100 if (is_local_stream(stream_id)) {
102 "Local stream must be explicitly created",
103 "quic::stream_manager");
104 }
105
106 // Check stream limits for peer-initiated stream
107 if (!can_accept_peer_stream(stream_id)) {
109 "Peer stream limit exceeded",
110 "quic::stream_manager");
111 }
112
113 // Create implicit streams as needed (RFC 9000 Section 2.1)
114 // When a stream is created, all lower-numbered streams of the same type
115 // must also be created
116 std::unique_lock lock(streams_mutex_);
117
118 const auto type = stream_id_type::get_type(stream_id);
119 const bool is_bidi = stream_id_type::is_bidirectional(stream_id);
120
121 // Determine the first stream ID of this type we need to create
122 uint64_t first_id = type;
123 if (is_bidi) {
124 if (highest_peer_bidi_id_ > 0 || streams_.count(peer_bidi_type_) > 0) {
125 first_id = highest_peer_bidi_id_ + 4;
126 }
127 } else {
128 if (highest_peer_uni_id_ > 0 || streams_.count(peer_uni_type_) > 0) {
129 first_id = highest_peer_uni_id_ + 4;
130 }
131 }
132
133 // Create all streams up to and including the requested one
134 for (uint64_t id = first_id; id <= stream_id; id += 4) {
135 if (streams_.count(id) == 0) {
136 auto new_stream = std::make_unique<stream>(id, false, initial_max_stream_data_);
137 streams_[id] = std::move(new_stream);
138 }
139 }
140
141 // Update highest peer stream ID
142 if (is_bidi) {
143 if (stream_id > highest_peer_bidi_id_ || highest_peer_bidi_id_ == 0) {
144 highest_peer_bidi_id_ = stream_id;
145 }
146 } else {
147 if (stream_id > highest_peer_uni_id_ || highest_peer_uni_id_ == 0) {
148 highest_peer_uni_id_ = stream_id;
149 }
150 }
151
152 return ok(streams_[stream_id].get());
153}
154
155auto stream_manager::has_stream(uint64_t stream_id) const -> bool
156{
157 std::shared_lock lock(streams_mutex_);
158 return streams_.count(stream_id) > 0;
159}
160
161auto stream_manager::stream_ids() const -> std::vector<uint64_t>
162{
163 std::shared_lock lock(streams_mutex_);
164 std::vector<uint64_t> ids;
165 ids.reserve(streams_.size());
166 for (const auto& [id, _] : streams_) {
167 ids.push_back(id);
168 }
169 return ids;
170}
171
172auto stream_manager::stream_count() const -> size_t
173{
174 std::shared_lock lock(streams_mutex_);
175 return streams_.size();
176}
177
178// ============================================================================
179// Stream Limits
180// ============================================================================
181
186
191
196
201
202// ============================================================================
203// Stream Queries
204// ============================================================================
205
206auto stream_manager::streams_with_pending_data() -> std::vector<stream*>
207{
208 std::shared_lock lock(streams_mutex_);
209 std::vector<stream*> result;
210 for (auto& [_, s] : streams_) {
211 if (s->pending_bytes() > 0 || (s->fin_sent() && !s->is_fin_received())) {
212 result.push_back(s.get());
213 }
214 }
215 return result;
216}
217
219{
220 std::shared_lock lock(streams_mutex_);
221 std::vector<stream*> result;
222 for (auto& [_, s] : streams_) {
223 if (s->should_send_max_stream_data()) {
224 result.push_back(s.get());
225 }
226 }
227 return result;
228}
229
230void stream_manager::for_each_stream(const std::function<void(stream&)>& callback)
231{
232 std::shared_lock lock(streams_mutex_);
233 for (auto& [_, s] : streams_) {
234 callback(*s);
235 }
236}
237
238void stream_manager::for_each_stream(const std::function<void(const stream&)>& callback) const
239{
240 std::shared_lock lock(streams_mutex_);
241 for (const auto& [_, s] : streams_) {
242 callback(*s);
243 }
244}
245
246// ============================================================================
247// Stream Lifecycle
248// ============================================================================
249
251{
252 std::unique_lock lock(streams_mutex_);
253 size_t removed = 0;
254
255 for (auto it = streams_.begin(); it != streams_.end();) {
256 const auto& s = it->second;
257
258 // Check if stream is in terminal state on both sides
259 bool send_terminal = (s->send_state() == send_stream_state::data_recvd ||
260 s->send_state() == send_stream_state::reset_recvd);
261 bool recv_terminal = (s->recv_state() == recv_stream_state::data_read ||
262 s->recv_state() == recv_stream_state::reset_read);
263
264 // For unidirectional streams, only check the relevant side
265 if (s->is_unidirectional()) {
266 if (s->is_local()) {
267 // Local unidirectional: only check send side
268 if (send_terminal) {
269 it = streams_.erase(it);
270 ++removed;
271 continue;
272 }
273 } else {
274 // Peer unidirectional: only check receive side
275 if (recv_terminal) {
276 it = streams_.erase(it);
277 ++removed;
278 continue;
279 }
280 }
281 } else {
282 // Bidirectional: check both sides
283 if (send_terminal && recv_terminal) {
284 it = streams_.erase(it);
285 ++removed;
286 continue;
287 }
288 }
289
290 ++it;
291 }
292
293 return removed;
294}
295
296void stream_manager::close_all_streams(uint64_t error_code)
297{
298 std::unique_lock lock(streams_mutex_);
299 for (auto& [_, s] : streams_) {
300 if (s->can_send()) {
301 (void)s->reset(error_code);
302 }
303 }
304}
305
317
318// ============================================================================
319// Internal Helpers
320// ============================================================================
321
322auto stream_manager::validate_stream_id(uint64_t stream_id) const -> VoidResult
323{
324 const auto type = stream_id_type::get_type(stream_id);
325
326 // Check if stream type is valid for the role
327 if (is_local_stream(stream_id)) {
328 // Local stream: check against our limits
329 const bool is_bidi = stream_id_type::is_bidirectional(stream_id);
330 if (is_bidi) {
331 if (stream_id >= next_local_bidi_id_) {
333 "Local bidi stream ID too high",
334 "quic::stream_manager");
335 }
336 } else {
337 if (stream_id >= next_local_uni_id_) {
339 "Local uni stream ID too high",
340 "quic::stream_manager");
341 }
342 }
343 }
344
345 return ok();
346}
347
348auto stream_manager::is_local_stream(uint64_t stream_id) const noexcept -> bool
349{
350 const bool is_server_initiated = stream_id_type::is_server_initiated(stream_id);
351 return is_server_initiated == is_server_;
352}
353
354auto stream_manager::can_create_local_stream(bool bidirectional) const -> bool
355{
356 if (bidirectional) {
357 // Check against peer's MAX_STREAMS_BIDI
358 const uint64_t current_count = next_local_bidi_id_ / 4;
359 return current_count < peer_max_streams_bidi_;
360 } else {
361 // Check against peer's MAX_STREAMS_UNI
362 const uint64_t current_count = (next_local_uni_id_ - 2) / 4;
363 return current_count < peer_max_streams_uni_;
364 }
365}
366
367auto stream_manager::can_accept_peer_stream(uint64_t stream_id) const -> bool
368{
369 const bool is_bidi = stream_id_type::is_bidirectional(stream_id);
370 const uint64_t stream_seq = stream_id_type::get_sequence(stream_id);
371
372 if (is_bidi) {
373 // Check against our MAX_STREAMS_BIDI
374 return stream_seq < local_max_streams_bidi_;
375 } else {
376 // Check against our MAX_STREAMS_UNI
377 return stream_seq < local_max_streams_uni_;
378 }
379}
380
381} // namespace kcenon::network::protocols::quic
auto remove_closed_streams() -> size_t
Remove closed/terminal streams.
auto stream_ids() const -> std::vector< uint64_t >
Get all active stream IDs.
auto has_stream(uint64_t stream_id) const -> bool
Check if a stream exists.
void close_all_streams(uint64_t error_code)
Close all streams with an error code.
auto get_stream(uint64_t stream_id) -> stream *
Get a stream by ID.
auto create_bidirectional_stream() -> Result< uint64_t >
Create a new locally-initiated bidirectional stream.
auto streams_needing_flow_control_update() -> std::vector< stream * >
Get streams that need MAX_STREAM_DATA updates.
stream_manager(bool is_server, uint64_t initial_max_stream_data=65536)
Construct a stream manager.
auto streams_with_pending_data() -> std::vector< stream * >
Get streams with pending data to send.
void set_peer_max_streams_uni(uint64_t max)
Set maximum unidirectional streams peer can initiate.
auto create_unidirectional_stream() -> Result< uint64_t >
Create a new locally-initiated unidirectional stream.
void set_local_max_streams_uni(uint64_t max)
Set our maximum unidirectional streams (advertised to peer)
auto can_accept_peer_stream(uint64_t stream_id) const -> bool
void set_local_max_streams_bidi(uint64_t max)
Set our maximum bidirectional streams (advertised to peer)
void reset()
Reset manager state (for connection close)
auto stream_count() const -> size_t
Get number of active streams.
void set_peer_max_streams_bidi(uint64_t max)
Set maximum bidirectional streams peer can initiate.
auto get_or_create_stream(uint64_t stream_id) -> Result< stream * >
Get or create a stream for a peer-initiated stream ID.
void for_each_stream(const std::function< void(stream &)> &callback)
Iterate over all streams.
auto is_local_stream(uint64_t stream_id) const noexcept -> bool
auto can_create_local_stream(bool bidirectional) const -> bool
auto validate_stream_id(uint64_t stream_id) const -> VoidResult
std::map< uint64_t, std::unique_ptr< stream > > streams_
QUIC stream implementation (RFC 9000 Sections 2-4)
Definition stream.h:147
constexpr auto is_server_initiated(uint64_t stream_id) noexcept -> bool
Check if stream is server-initiated.
Definition stream.h:52
constexpr auto is_bidirectional(uint64_t stream_id) noexcept -> bool
Check if stream is bidirectional.
Definition stream.h:60
constexpr auto get_type(uint64_t stream_id) noexcept -> uint64_t
Get stream type bits (0-3)
Definition stream.h:76
constexpr auto get_sequence(uint64_t stream_id) noexcept -> uint64_t
Get stream sequence number (stream_id >> 2)
Definition stream.h:84
@ error
Black hole detected, reset to base.
@ data_read
All data read by application (terminal)
@ reset_read
Reset acknowledged by application (terminal)
@ reset_recvd
Reset acknowledged by peer (terminal)
@ data_recvd
All data acknowledged (terminal)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()