Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
http2_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
10
11#include <algorithm>
12#include <thread>
13
15{
16 // http2_response implementation
17 auto http2_response::get_header(const std::string& name) const -> std::optional<std::string>
18 {
19 std::string lower_name = name;
20 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(),
21 [](unsigned char c) { return std::tolower(c); });
22
23 for (const auto& header : headers)
24 {
25 std::string lower_header_name = header.name;
26 std::transform(lower_header_name.begin(), lower_header_name.end(),
27 lower_header_name.begin(),
28 [](unsigned char c) { return std::tolower(c); });
29
30 if (lower_header_name == lower_name)
31 {
32 return header.value;
33 }
34 }
35 return std::nullopt;
36 }
37
38 auto http2_response::get_body_string() const -> std::string
39 {
40 return std::string(body.begin(), body.end());
41 }
42
43 // http2_client implementation
44 http2_client::http2_client(std::string_view client_id)
45 : client_id_(client_id)
46 , encoder_(local_settings_.header_table_size)
47 , decoder_(local_settings_.header_table_size)
48 {
49 }
50
52 {
53 try
54 {
55 if (is_connected_)
56 {
57 disconnect();
58 }
59 }
60 catch (...)
61 {
62 // Suppress exceptions in destructor
63 }
64 }
65
66 auto http2_client::connect(const std::string& host, unsigned short port) -> VoidResult
67 {
68 // Create tracing span for connect operation
69 auto span = tracing::is_tracing_enabled()
70 ? std::make_optional(tracing::trace_context::create_span("http2.client.connect"))
71 : std::nullopt;
72 if (span)
73 {
74 span->set_attribute("net.peer.name", host)
75 .set_attribute("net.peer.port", static_cast<int64_t>(port))
76 .set_attribute("net.transport", "tcp")
77 .set_attribute("http.flavor", "2.0")
78 .set_attribute("client.id", client_id_);
79 }
80
81 if (is_connected_)
82 {
83 if (span)
84 {
85 span->set_error("Already connected");
86 }
88 "Already connected", "http2_client::connect");
89 }
90
91 if (host.empty())
92 {
93 if (span)
94 {
95 span->set_error("Host cannot be empty");
96 }
98 "Host cannot be empty", "http2_client::connect");
99 }
100
101 host_ = host;
102 port_ = port;
103
104 try
105 {
106 // Create I/O context
107 io_context_ = std::make_unique<asio::io_context>();
108
109 // Create SSL context with TLS 1.3
110 ssl_context_ = std::make_unique<asio::ssl::context>(asio::ssl::context::tlsv13_client);
111
112 // Set ALPN for HTTP/2
113 SSL_CTX* native_ctx = ssl_context_->native_handle();
114 static const unsigned char alpn_protos[] = { 2, 'h', '2' };
115 SSL_CTX_set_alpn_protos(native_ctx, alpn_protos, sizeof(alpn_protos));
116
117 // Verify peer certificate
118 ssl_context_->set_default_verify_paths();
119 ssl_context_->set_verify_mode(asio::ssl::verify_peer);
120
121 // Create socket
122 asio::ip::tcp::resolver resolver(*io_context_);
123 auto endpoints = resolver.resolve(host, std::to_string(port));
124
125 socket_ = std::make_unique<asio::ssl::stream<asio::ip::tcp::socket>>(
126 *io_context_, *ssl_context_);
127
128 // Set SNI hostname
129 SSL_set_tlsext_host_name(socket_->native_handle(), host.c_str());
130
131 // Connect TCP
132 asio::connect(socket_->lowest_layer(), endpoints);
133
134 // Perform SSL handshake
135 socket_->handshake(asio::ssl::stream_base::client);
136
137 // Verify ALPN negotiation
138 const unsigned char* alpn_result = nullptr;
139 unsigned int alpn_len = 0;
140 SSL_get0_alpn_selected(socket_->native_handle(), &alpn_result, &alpn_len);
141
142 if (alpn_len != 2 || alpn_result == nullptr ||
143 std::string(reinterpret_cast<const char*>(alpn_result), alpn_len) != "h2")
144 {
145 socket_->lowest_layer().close();
146 if (span)
147 {
148 span->set_error("Server does not support HTTP/2 via ALPN");
149 }
151 "Server does not support HTTP/2 via ALPN",
152 "http2_client::connect");
153 }
154
155 is_connected_ = true;
156 is_running_ = true;
157
158 // Send connection preface
159 auto preface_result = send_connection_preface();
160 if (preface_result.is_err())
161 {
162 is_connected_ = false;
163 is_running_ = false;
164 if (span)
165 {
166 span->set_error(preface_result.error().message);
167 }
168 return preface_result;
169 }
170
171 // Send initial SETTINGS
172 auto settings_result = send_settings();
173 if (settings_result.is_err())
174 {
175 is_connected_ = false;
176 is_running_ = false;
177 if (span)
178 {
179 span->set_error(settings_result.error().message);
180 }
181 return settings_result;
182 }
183
184 // Start I/O thread
185 work_guard_ = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
186 asio::make_work_guard(*io_context_));
187
188 io_future_ = std::async(std::launch::async, [this]() { run_io(); });
189
190 if (span)
191 {
192 span->set_status(tracing::span_status::ok);
193 }
194 return ok();
195 }
196 catch (const std::exception& e)
197 {
198 is_connected_ = false;
199 is_running_ = false;
200 if (span)
201 {
202 span->set_error(std::string("Connection failed: ") + e.what());
203 }
205 std::string("Connection failed: ") + e.what(),
206 "http2_client::connect");
207 }
208 }
209
211 {
212 if (!is_connected_)
213 {
214 return ok();
215 }
216
217 try
218 {
219 // Send GOAWAY frame
220 if (socket_ && socket_->lowest_layer().is_open())
221 {
222 goaway_frame goaway(next_stream_id_ - 2,
223 static_cast<uint32_t>(error_code::no_error));
224 send_frame(goaway);
225 }
226 }
227 catch (...)
228 {
229 // Ignore errors during GOAWAY
230 }
231
232 stop_io();
233
234 is_connected_ = false;
235
236 return ok();
237 }
238
239 auto http2_client::is_connected() const -> bool
240 {
242 }
243
244 auto http2_client::get(const std::string& path,
245 const std::vector<http_header>& headers)
247 {
248 return send_request("GET", path, headers, {});
249 }
250
251 auto http2_client::post(const std::string& path,
252 const std::string& body,
253 const std::vector<http_header>& headers)
255 {
256 std::vector<uint8_t> body_bytes(body.begin(), body.end());
257 return send_request("POST", path, headers, body_bytes);
258 }
259
260 auto http2_client::post(const std::string& path,
261 const std::vector<uint8_t>& body,
262 const std::vector<http_header>& headers)
264 {
265 return send_request("POST", path, headers, body);
266 }
267
268 auto http2_client::put(const std::string& path,
269 const std::string& body,
270 const std::vector<http_header>& headers)
272 {
273 std::vector<uint8_t> body_bytes(body.begin(), body.end());
274 return send_request("PUT", path, headers, body_bytes);
275 }
276
277 auto http2_client::del(const std::string& path,
278 const std::vector<http_header>& headers)
280 {
281 return send_request("DELETE", path, headers, {});
282 }
283
284 auto http2_client::set_timeout(std::chrono::milliseconds timeout) -> void
285 {
286 timeout_ = timeout;
287 }
288
289 auto http2_client::get_timeout() const -> std::chrono::milliseconds
290 {
291 return timeout_;
292 }
293
295 {
296 return local_settings_;
297 }
298
300 {
301 local_settings_ = settings;
302 encoder_.set_max_table_size(settings.header_table_size);
303 decoder_.set_max_table_size(settings.header_table_size);
304 }
305
306 auto http2_client::start_stream(const std::string& path,
307 const std::vector<http_header>& headers,
308 std::function<void(std::vector<uint8_t>)> on_data,
309 std::function<void(std::vector<http_header>)> on_headers,
310 std::function<void(int)> on_complete)
312 {
313 if (!is_connected())
314 {
316 "Not connected",
317 "http2_client::start_stream");
318 }
319
320 // Create stream with callbacks
321 http2_stream& stream = create_stream();
322 stream.state = stream_state::open;
323 stream.is_streaming = true;
324 stream.on_data = std::move(on_data);
325 stream.on_headers = std::move(on_headers);
326 stream.on_complete = std::move(on_complete);
327
328 // Build headers for POST (streaming requests use POST)
329 auto request_headers = build_headers("POST", path, headers);
330 stream.request_headers = request_headers;
331
332 // Encode headers with HPACK
333 auto encoded_headers = encoder_.encode(request_headers);
334
335 // Send HEADERS frame (not end_stream since we might send more data)
336 headers_frame hf(stream.stream_id, std::move(encoded_headers), false, true);
337
338 auto send_result = send_frame(hf);
339 if (send_result.is_err())
340 {
341 close_stream(stream.stream_id);
342 const auto& err = send_result.error();
343 return error<uint32_t>(err.code, err.message,
344 "http2_client::start_stream",
345 get_error_details(err));
346 }
347
348 uint32_t result_id = stream.stream_id;
349 return ok(std::move(result_id));
350 }
351
352 auto http2_client::write_stream(uint32_t stream_id,
353 const std::vector<uint8_t>& data,
354 bool end_stream) -> VoidResult
355 {
356 if (!is_connected())
357 {
359 "Not connected",
360 "http2_client::write_stream");
361 }
362
363 auto* stream = get_stream(stream_id);
364 if (!stream)
365 {
367 "Stream not found",
368 "http2_client::write_stream");
369 }
370
371 if (stream->state == stream_state::closed ||
372 stream->state == stream_state::half_closed_local)
373 {
375 "Stream not writable",
376 "http2_client::write_stream");
377 }
378
379 // Send DATA frame
380 data_frame df(stream_id, std::vector<uint8_t>(data), end_stream);
381 auto send_result = send_frame(df);
382 if (send_result.is_err())
383 {
384 return send_result;
385 }
386
387 if (end_stream)
388 {
389 stream->state = stream_state::half_closed_local;
390 }
391
392 return ok();
393 }
394
396 {
397 if (!is_connected())
398 {
400 "Not connected",
401 "http2_client::close_stream_writer");
402 }
403
404 auto* stream = get_stream(stream_id);
405 if (!stream)
406 {
408 "Stream not found",
409 "http2_client::close_stream_writer");
410 }
411
412 if (stream->state == stream_state::closed ||
413 stream->state == stream_state::half_closed_local)
414 {
415 return ok(); // Already closed
416 }
417
418 // Send empty DATA frame with END_STREAM
419 data_frame df(stream_id, {}, true);
420 auto send_result = send_frame(df);
421 if (send_result.is_err())
422 {
423 return send_result;
424 }
425
426 stream->state = stream_state::half_closed_local;
427 return ok();
428 }
429
430 auto http2_client::cancel_stream(uint32_t stream_id) -> VoidResult
431 {
432 if (!is_connected())
433 {
435 "Not connected",
436 "http2_client::cancel_stream");
437 }
438
439 auto* stream = get_stream(stream_id);
440 if (!stream)
441 {
443 "Stream not found",
444 "http2_client::cancel_stream");
445 }
446
447 if (stream->state == stream_state::closed)
448 {
449 return ok(); // Already closed
450 }
451
452 // Send RST_STREAM frame
453 rst_stream_frame rsf(stream_id, static_cast<uint32_t>(error_code::cancel));
454 auto send_result = send_frame(rsf);
455 if (send_result.is_err())
456 {
457 return send_result;
458 }
459
460 stream->state = stream_state::closed;
461 return ok();
462 }
463
464 // Private methods
465
467 {
468 try
469 {
470 asio::write(*socket_,
471 asio::buffer(CONNECTION_PREFACE.data(), CONNECTION_PREFACE.size()));
472 return ok();
473 }
474 catch (const std::exception& e)
475 {
477 std::string("Failed to send connection preface: ") + e.what(),
478 "http2_client::send_connection_preface");
479 }
480 }
481
483 {
484 std::vector<setting_parameter> params = {
485 {static_cast<uint16_t>(setting_identifier::header_table_size),
486 local_settings_.header_table_size},
487 {static_cast<uint16_t>(setting_identifier::enable_push),
488 local_settings_.enable_push ? 1u : 0u},
489 {static_cast<uint16_t>(setting_identifier::max_concurrent_streams),
490 local_settings_.max_concurrent_streams},
491 {static_cast<uint16_t>(setting_identifier::initial_window_size),
492 local_settings_.initial_window_size},
493 {static_cast<uint16_t>(setting_identifier::max_frame_size),
494 local_settings_.max_frame_size},
495 {static_cast<uint16_t>(setting_identifier::max_header_list_size),
496 local_settings_.max_header_list_size}
497 };
498
499 settings_frame settings(params, false);
500 return send_frame(settings);
501 }
502
504 {
505 if (frame.is_ack())
506 {
507 // Our settings were acknowledged
508 return ok();
509 }
510
511 // Apply remote settings
512 for (const auto& param : frame.settings())
513 {
514 switch (static_cast<setting_identifier>(param.identifier))
515 {
517 remote_settings_.header_table_size = param.value;
518 encoder_.set_max_table_size(param.value);
519 break;
521 remote_settings_.enable_push = (param.value != 0);
522 break;
524 remote_settings_.max_concurrent_streams = param.value;
525 break;
527 remote_settings_.initial_window_size = param.value;
528 break;
530 remote_settings_.max_frame_size = param.value;
531 break;
533 remote_settings_.max_header_list_size = param.value;
534 break;
535 }
536 }
537
538 return send_settings_ack();
539 }
540
542 {
543 settings_frame ack({}, true);
544 return send_frame(ack);
545 }
546
548 {
549 if (!socket_ || !socket_->lowest_layer().is_open())
550 {
552 "Connection closed", "http2_client::send_frame");
553 }
554
555 try
556 {
557 auto data = f.serialize();
558 asio::write(*socket_, asio::buffer(data));
559 return ok();
560 }
561 catch (const std::exception& e)
562 {
564 std::string("Failed to send frame: ") + e.what(),
565 "http2_client::send_frame");
566 }
567 }
568
570 {
571 if (!socket_ || !socket_->lowest_layer().is_open())
572 {
574 "Connection closed",
575 "http2_client::read_frame");
576 }
577
578 try
579 {
580 // Read frame header (9 bytes)
581 std::vector<uint8_t> header_buf(FRAME_HEADER_SIZE);
582 asio::read(*socket_, asio::buffer(header_buf));
583
584 auto header_result = frame_header::parse(header_buf);
585 if (header_result.is_err())
586 {
587 const auto& err = header_result.error();
588 return error<std::unique_ptr<frame>>(err.code, err.message,
589 "http2_client::read_frame",
590 get_error_details(err));
591 }
592
593 const auto& header = header_result.value();
594
595 // Read frame payload
596 std::vector<uint8_t> payload(header.length);
597 if (header.length > 0)
598 {
599 asio::read(*socket_, asio::buffer(payload));
600 }
601
602 // Combine header and payload for parsing
603 std::vector<uint8_t> frame_data;
604 frame_data.reserve(FRAME_HEADER_SIZE + payload.size());
605 frame_data.insert(frame_data.end(), header_buf.begin(), header_buf.end());
606 frame_data.insert(frame_data.end(), payload.begin(), payload.end());
607
608 return frame::parse(frame_data);
609 }
610 catch (const std::exception& e)
611 {
613 std::string("Failed to read frame: ") + e.what(),
614 "http2_client::read_frame");
615 }
616 }
617
618 auto http2_client::process_frame(std::unique_ptr<frame> f) -> VoidResult
619 {
620 if (!f)
621 {
623 "Null frame", "http2_client::process_frame");
624 }
625
626 switch (f->header().type)
627 {
629 if (auto* sf = dynamic_cast<settings_frame*>(f.get()))
630 {
631 return handle_settings_frame(*sf);
632 }
633 break;
634
636 if (auto* hf = dynamic_cast<headers_frame*>(f.get()))
637 {
638 return handle_headers_frame(*hf);
639 }
640 break;
641
642 case frame_type::data:
643 if (auto* df = dynamic_cast<data_frame*>(f.get()))
644 {
645 return handle_data_frame(*df);
646 }
647 break;
648
650 if (auto* rf = dynamic_cast<rst_stream_frame*>(f.get()))
651 {
652 return handle_rst_stream_frame(*rf);
653 }
654 break;
655
657 if (auto* gf = dynamic_cast<goaway_frame*>(f.get()))
658 {
659 return handle_goaway_frame(*gf);
660 }
661 break;
662
664 if (auto* wf = dynamic_cast<window_update_frame*>(f.get()))
665 {
666 return handle_window_update_frame(*wf);
667 }
668 break;
669
670 case frame_type::ping:
671 if (auto* pf = dynamic_cast<ping_frame*>(f.get()))
672 {
673 return handle_ping_frame(*pf);
674 }
675 break;
676
677 default:
678 // Ignore unknown frame types
679 break;
680 }
681
682 return ok();
683 }
684
686 {
687 uint32_t id = next_stream_id_.fetch_add(2);
688 return id;
689 }
690
691 auto http2_client::get_stream(uint32_t stream_id) -> http2_stream*
692 {
693 std::lock_guard<std::mutex> lock(streams_mutex_);
694 auto it = streams_.find(stream_id);
695 if (it != streams_.end())
696 {
697 return &it->second;
698 }
699 return nullptr;
700 }
701
703 {
704 std::lock_guard<std::mutex> lock(streams_mutex_);
705 uint32_t stream_id = allocate_stream_id();
706 auto& stream = streams_[stream_id];
707 stream.stream_id = stream_id;
708 stream.state = stream_state::idle;
709 stream.window_size = remote_settings_.initial_window_size;
710 return stream;
711 }
712
713 auto http2_client::close_stream(uint32_t stream_id) -> void
714 {
715 std::lock_guard<std::mutex> lock(streams_mutex_);
716 auto it = streams_.find(stream_id);
717 if (it != streams_.end())
718 {
719 it->second.state = stream_state::closed;
720 }
721 }
722
723 auto http2_client::send_request(const std::string& method,
724 const std::string& path,
725 const std::vector<http_header>& headers,
726 const std::vector<uint8_t>& body)
728 {
729 // Create tracing span for HTTP request
730 auto span = tracing::is_tracing_enabled()
731 ? std::make_optional(tracing::trace_context::create_span("http2.request"))
732 : std::nullopt;
733 if (span)
734 {
735 span->set_attribute("http.method", method)
736 .set_attribute("http.url", "https://" + host_ + ":" + std::to_string(port_) + path)
737 .set_attribute("http.flavor", "2.0")
738 .set_attribute("http.request.body_size", static_cast<int64_t>(body.size()))
739 .set_attribute("net.peer.name", host_)
740 .set_attribute("net.peer.port", static_cast<int64_t>(port_));
741 }
742
743 if (!is_connected())
744 {
745 if (span)
746 {
747 span->set_error("Not connected");
748 }
750 "Not connected",
751 "http2_client::send_request");
752 }
753
754 // Create stream
755 http2_stream& stream = create_stream();
756 stream.state = stream_state::open;
757
758 if (span)
759 {
760 span->set_attribute("http2.stream_id", static_cast<int64_t>(stream.stream_id));
761 }
762
763 // Build headers
764 auto request_headers = build_headers(method, path, headers);
765 stream.request_headers = request_headers;
766 stream.request_body = body;
767
768 // Encode headers with HPACK
769 auto encoded_headers = encoder_.encode(request_headers);
770
771 // Send HEADERS frame
772 bool end_stream = body.empty();
773 headers_frame hf(stream.stream_id, std::move(encoded_headers), end_stream, true);
774
775 auto send_result = send_frame(hf);
776 if (send_result.is_err())
777 {
778 close_stream(stream.stream_id);
779 const auto& err = send_result.error();
780 if (span)
781 {
782 span->set_error(err.message);
783 }
784 return error<http2_response>(err.code, err.message,
785 "http2_client::send_request",
786 get_error_details(err));
787 }
788
789 // Send DATA frame if body exists
790 if (!body.empty())
791 {
792 data_frame df(stream.stream_id, std::vector<uint8_t>(body), true);
793 send_result = send_frame(df);
794 if (send_result.is_err())
795 {
796 close_stream(stream.stream_id);
797 const auto& err = send_result.error();
798 if (span)
799 {
800 span->set_error(err.message);
801 }
802 return error<http2_response>(err.code, err.message,
803 "http2_client::send_request",
804 get_error_details(err));
805 }
807 }
808 else
809 {
811 }
812
813 // Wait for response with timeout
814 auto future = stream.promise.get_future();
815 auto status = future.wait_for(timeout_);
816
817 if (status == std::future_status::timeout)
818 {
819 close_stream(stream.stream_id);
820 if (span)
821 {
822 span->set_error("Request timeout");
823 }
825 "Request timeout",
826 "http2_client::send_request");
827 }
828
829 auto response = future.get();
830 if (span)
831 {
832 span->set_attribute("http.status_code", static_cast<int64_t>(response.status_code))
833 .set_attribute("http.response.body_size", static_cast<int64_t>(response.body.size()));
834 if (response.status_code >= 400)
835 {
836 span->set_status(tracing::span_status::error, "HTTP " + std::to_string(response.status_code));
837 }
838 else
839 {
840 span->set_status(tracing::span_status::ok);
841 }
842 }
843 return ok(std::move(response));
844 }
845
846 auto http2_client::build_headers(const std::string& method,
847 const std::string& path,
848 const std::vector<http_header>& additional)
849 -> std::vector<http_header>
850 {
851 std::vector<http_header> headers;
852
853 // Pseudo-headers (must come first)
854 headers.emplace_back(":method", method);
855 headers.emplace_back(":scheme", "https");
856 headers.emplace_back(":authority", host_);
857 headers.emplace_back(":path", path.empty() ? "/" : path);
858
859 // Standard headers
860 headers.emplace_back("user-agent", "network_system/http2_client");
861
862 // Additional headers
863 for (const auto& h : additional)
864 {
865 // Skip pseudo-headers in additional
866 if (!h.name.empty() && h.name[0] != ':')
867 {
868 headers.push_back(h);
869 }
870 }
871
872 return headers;
873 }
874
876 {
877 uint32_t stream_id = f.header().stream_id;
878 auto* stream = get_stream(stream_id);
879
880 if (!stream)
881 {
883 "Unknown stream ID",
884 "http2_client::handle_headers_frame");
885 }
886
887 // Decode headers
888 auto decode_result = decoder_.decode(f.header_block());
889 if (decode_result.is_err())
890 {
891 const auto& err = decode_result.error();
892 return error_void(err.code, err.message,
893 "http2_client::handle_headers_frame",
894 get_error_details(err));
895 }
896
897 auto& decoded_headers = decode_result.value();
898
899 // Append to stream headers
900 stream->response_headers.insert(stream->response_headers.end(),
901 decoded_headers.begin(),
902 decoded_headers.end());
903
904 if (f.is_end_headers())
905 {
906 stream->headers_complete = true;
907
908 // Call streaming callback if set
909 if (stream->is_streaming && stream->on_headers)
910 {
911 stream->on_headers(stream->response_headers);
912 }
913 }
914
915 if (f.is_end_stream())
916 {
917 stream->body_complete = true;
918 stream->state = stream_state::closed;
919
920 // Extract status code
921 int status_code = 0;
922 for (const auto& h : stream->response_headers)
923 {
924 if (h.name == ":status")
925 {
926 try
927 {
928 status_code = std::stoi(h.value);
929 }
930 catch (...)
931 {
932 status_code = 0;
933 }
934 break;
935 }
936 }
937
938 // Handle streaming vs non-streaming
939 if (stream->is_streaming)
940 {
941 if (stream->on_complete)
942 {
943 stream->on_complete(status_code);
944 }
945 }
946 else
947 {
948 // Build response for non-streaming
949 http2_response response;
950 response.headers = stream->response_headers;
951 response.body = stream->response_body;
952 response.status_code = status_code;
953 stream->promise.set_value(std::move(response));
954 }
955 }
956
957 return ok();
958 }
959
961 {
962 uint32_t stream_id = f.header().stream_id;
963 auto* stream = get_stream(stream_id);
964
965 if (!stream)
966 {
968 "Unknown stream ID",
969 "http2_client::handle_data_frame");
970 }
971
972 // Get data
973 auto data = f.data();
974
975 // For streaming, call callback immediately; otherwise buffer
976 if (stream->is_streaming && stream->on_data)
977 {
978 stream->on_data(std::vector<uint8_t>(data.begin(), data.end()));
979 }
980 else
981 {
982 stream->response_body.insert(stream->response_body.end(),
983 data.begin(), data.end());
984 }
985
986 // Update flow control window
987 stream->window_size -= static_cast<int32_t>(data.size());
988 connection_window_size_ -= static_cast<int32_t>(data.size());
989
990 // Send WINDOW_UPDATE if needed
991 if (stream->window_size < static_cast<int32_t>(DEFAULT_WINDOW_SIZE / 2))
992 {
993 int32_t increment = DEFAULT_WINDOW_SIZE - stream->window_size;
994 window_update_frame wuf(stream_id, increment);
995 send_frame(wuf);
996 stream->window_size += increment;
997 }
998
999 if (connection_window_size_ < static_cast<int32_t>(DEFAULT_WINDOW_SIZE / 2))
1000 {
1001 int32_t increment = DEFAULT_WINDOW_SIZE - connection_window_size_;
1002 window_update_frame wuf(0, increment); // Stream 0 = connection level
1003 send_frame(wuf);
1004 connection_window_size_ += increment;
1005 }
1006
1007 if (f.is_end_stream())
1008 {
1009 stream->body_complete = true;
1010 stream->state = stream_state::closed;
1011
1012 // Extract status code
1013 int status_code = 0;
1014 for (const auto& h : stream->response_headers)
1015 {
1016 if (h.name == ":status")
1017 {
1018 try
1019 {
1020 status_code = std::stoi(h.value);
1021 }
1022 catch (...)
1023 {
1024 status_code = 0;
1025 }
1026 break;
1027 }
1028 }
1029
1030 // Handle streaming vs non-streaming
1031 if (stream->is_streaming)
1032 {
1033 if (stream->on_complete)
1034 {
1035 stream->on_complete(status_code);
1036 }
1037 }
1038 else
1039 {
1040 http2_response response;
1041 response.headers = stream->response_headers;
1042 response.body = stream->response_body;
1043 response.status_code = status_code;
1044 stream->promise.set_value(std::move(response));
1045 }
1046 }
1047
1048 return ok();
1049 }
1050
1052 {
1053 uint32_t stream_id = f.header().stream_id;
1054 auto* stream = get_stream(stream_id);
1055
1056 if (stream)
1057 {
1058 stream->state = stream_state::closed;
1059
1060 // Set error response
1061 http2_response response;
1062 response.status_code = 0; // Indicate error
1063 stream->promise.set_value(std::move(response));
1064 }
1065
1066 return ok();
1067 }
1068
1070 {
1071 goaway_received_ = true;
1072
1073 // Close all streams with ID > last_stream_id
1074 uint32_t last_stream = f.last_stream_id();
1075 {
1076 std::lock_guard<std::mutex> lock(streams_mutex_);
1077 for (auto& [id, stream] : streams_)
1078 {
1079 if (id > last_stream && stream.state != stream_state::closed)
1080 {
1081 stream.state = stream_state::closed;
1082 http2_response response;
1083 response.status_code = 0;
1084 stream.promise.set_value(std::move(response));
1085 }
1086 }
1087 }
1088
1089 return ok();
1090 }
1091
1093 {
1094 uint32_t stream_id = f.header().stream_id;
1095 int32_t increment = static_cast<int32_t>(f.window_size_increment());
1096
1097 if (stream_id == 0)
1098 {
1099 // Connection-level window update
1100 connection_window_size_ += increment;
1101 }
1102 else
1103 {
1104 auto* stream = get_stream(stream_id);
1105 if (stream)
1106 {
1107 stream->window_size += increment;
1108 }
1109 }
1110
1111 return ok();
1112 }
1113
1115 {
1116 if (!f.is_ack())
1117 {
1118 // Send PING ACK
1119 ping_frame ack(f.opaque_data(), true);
1120 return send_frame(ack);
1121 }
1122 return ok();
1123 }
1124
1126 {
1127 while (is_running_ && is_connected_)
1128 {
1129 try
1130 {
1131 auto frame_result = read_frame();
1132 if (frame_result.is_err())
1133 {
1134 if (is_running_)
1135 {
1136 is_connected_ = false;
1137 }
1138 break;
1139 }
1140
1141 process_frame(std::move(frame_result.value()));
1142 }
1143 catch (...)
1144 {
1145 if (is_running_)
1146 {
1147 is_connected_ = false;
1148 }
1149 break;
1150 }
1151 }
1152 }
1153
1155 {
1156 is_running_ = false;
1157
1158 if (work_guard_)
1159 {
1160 work_guard_->reset();
1161 }
1162
1163 if (socket_ && socket_->lowest_layer().is_open())
1164 {
1165 std::error_code ec;
1166 socket_->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
1167 socket_->lowest_layer().close(ec);
1168 }
1169
1170 if (io_future_.valid())
1171 {
1172 io_future_.wait_for(std::chrono::seconds(5));
1173 }
1174
1175 if (io_context_)
1176 {
1177 io_context_->stop();
1178 }
1179 }
1180
1181} // namespace kcenon::network::protocols::http2
DATA frame (RFC 7540 Section 6.1)
Definition frame.h:139
Base class for HTTP/2 frames.
Definition frame.h:82
static auto parse(std::span< const uint8_t > data) -> Result< std::unique_ptr< frame > >
Parse frame from raw bytes.
Definition frame.cpp:94
GOAWAY frame (RFC 7540 Section 6.8)
Definition frame.h:387
HEADERS frame (RFC 7540 Section 6.2)
Definition frame.h:189
~http2_client()
Destructor - closes connection gracefully.
auto cancel_stream(uint32_t stream_id) -> VoidResult
Cancel a stream.
auto is_connected() const -> bool
Check if connected.
auto write_stream(uint32_t stream_id, const std::vector< uint8_t > &data, bool end_stream=false) -> VoidResult
Write data to an open stream.
auto put(const std::string &path, const std::string &body, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 PUT request.
auto set_settings(const http2_settings &settings) -> void
Update local settings.
auto get_settings() const -> http2_settings
Get local settings.
auto read_frame() -> Result< std::unique_ptr< frame > >
auto handle_window_update_frame(const window_update_frame &f) -> VoidResult
http2_client(std::string_view client_id)
Construct HTTP/2 client.
auto send_request(const std::string &method, const std::string &path, const std::vector< http_header > &headers, const std::vector< uint8_t > &body) -> Result< http2_response >
auto send_frame(const frame &f) -> VoidResult
auto handle_rst_stream_frame(const rst_stream_frame &f) -> VoidResult
auto handle_headers_frame(const headers_frame &f) -> VoidResult
auto get_stream(uint32_t stream_id) -> http2_stream *
auto post(const std::string &path, const std::string &body, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 POST request.
auto close_stream_writer(uint32_t stream_id) -> VoidResult
Close the write side of a stream.
auto handle_data_frame(const data_frame &f) -> VoidResult
auto handle_settings_frame(const settings_frame &frame) -> VoidResult
auto handle_ping_frame(const ping_frame &f) -> VoidResult
auto close_stream(uint32_t stream_id) -> void
auto handle_goaway_frame(const goaway_frame &f) -> VoidResult
auto process_frame(std::unique_ptr< frame > f) -> VoidResult
auto disconnect() -> VoidResult
Disconnect from server.
auto get(const std::string &path, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 GET request.
auto del(const std::string &path, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 DELETE request.
auto get_timeout() const -> std::chrono::milliseconds
Get current timeout.
auto connect(const std::string &host, unsigned short port=443) -> VoidResult
Connect to HTTP/2 server.
auto build_headers(const std::string &method, const std::string &path, const std::vector< http_header > &additional) -> std::vector< http_header >
auto start_stream(const std::string &path, const std::vector< http_header > &headers, std::function< void(std::vector< uint8_t >)> on_data, std::function< void(std::vector< http_header >)> on_headers, std::function< void(int)> on_complete) -> Result< uint32_t >
Start a streaming POST request.
auto set_timeout(std::chrono::milliseconds timeout) -> void
Set request timeout.
PING frame (RFC 7540 Section 6.7)
Definition frame.h:344
RST_STREAM frame (RFC 7540 Section 6.4)
Definition frame.h:307
SETTINGS frame (RFC 7540 Section 6.5)
Definition frame.h:265
WINDOW_UPDATE frame (RFC 7540 Section 6.9)
Definition frame.h:438
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
struct ssl_ctx_st SSL_CTX
Definition crypto.h:20
setting_identifier
SETTINGS frame parameter identifiers (RFC 7540 Section 6.5.2)
Definition frame.h:248
@ max_header_list_size
SETTINGS_MAX_HEADER_LIST_SIZE.
@ max_concurrent_streams
SETTINGS_MAX_CONCURRENT_STREAMS.
@ initial_window_size
SETTINGS_INITIAL_WINDOW_SIZE.
@ half_closed_local
Local end closed, remote can send.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
std::string get_error_details(const simple_error &err)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()
RAII span implementation for distributed tracing.
static auto parse(std::span< const uint8_t > data) -> Result< frame_header >
Parse frame header from raw bytes.
Definition frame.cpp:52
std::vector< uint8_t > body
Response body.
auto get_header(const std::string &name) const -> std::optional< std::string >
Get header value by name.
auto get_body_string() const -> std::string
Get body as string.
std::vector< http_header > headers
Response headers.
std::vector< uint8_t > request_body
Request body.
std::promise< http2_response > promise
Response promise.
std::vector< http_header > request_headers
Request headers.
std::function< void(std::vector< uint8_t >)> on_data
Callback for streaming data.
std::function< void(std::vector< http_header >)> on_headers
Callback for headers.
std::function< void(int)> on_complete
Callback when stream ends (status code)
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.