19 std::string lower_name = name;
20 std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(),
21 [](
unsigned char c) { return std::tolower(c); });
23 for (
const auto& header :
headers)
25 std::string lower_header_name = header.name;
26 std::transform(lower_header_name.begin(), lower_header_name.end(),
27 lower_header_name.begin(),
28 [](
unsigned char c) { return std::tolower(c); });
30 if (lower_header_name == lower_name)
40 return std::string(
body.begin(),
body.end());
45 : client_id_(client_id)
74 span->set_attribute(
"net.peer.name", host)
75 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(port))
76 .set_attribute(
"net.transport",
"tcp")
77 .set_attribute(
"http.flavor",
"2.0")
78 .set_attribute(
"client.id", client_id_);
85 span->set_error(
"Already connected");
88 "Already connected",
"http2_client::connect");
95 span->set_error(
"Host cannot be empty");
98 "Host cannot be empty",
"http2_client::connect");
107 io_context_ = std::make_unique<asio::io_context>();
110 ssl_context_ = std::make_unique<asio::ssl::context>(asio::ssl::context::tlsv13_client);
113 SSL_CTX* native_ctx = ssl_context_->native_handle();
114 static const unsigned char alpn_protos[] = { 2,
'h',
'2' };
115 SSL_CTX_set_alpn_protos(native_ctx, alpn_protos,
sizeof(alpn_protos));
118 ssl_context_->set_default_verify_paths();
119 ssl_context_->set_verify_mode(asio::ssl::verify_peer);
122 asio::ip::tcp::resolver resolver(*io_context_);
123 auto endpoints = resolver.resolve(host, std::to_string(port));
125 socket_ = std::make_unique<asio::ssl::stream<asio::ip::tcp::socket>>(
126 *io_context_, *ssl_context_);
129 SSL_set_tlsext_host_name(socket_->native_handle(), host.c_str());
132 asio::connect(socket_->lowest_layer(), endpoints);
135 socket_->handshake(asio::ssl::stream_base::client);
138 const unsigned char* alpn_result =
nullptr;
139 unsigned int alpn_len = 0;
140 SSL_get0_alpn_selected(socket_->native_handle(), &alpn_result, &alpn_len);
142 if (alpn_len != 2 || alpn_result ==
nullptr ||
143 std::string(
reinterpret_cast<const char*
>(alpn_result), alpn_len) !=
"h2")
145 socket_->lowest_layer().close();
148 span->set_error(
"Server does not support HTTP/2 via ALPN");
151 "Server does not support HTTP/2 via ALPN",
152 "http2_client::connect");
155 is_connected_ =
true;
159 auto preface_result = send_connection_preface();
160 if (preface_result.is_err())
162 is_connected_ =
false;
166 span->set_error(preface_result.error().message);
168 return preface_result;
172 auto settings_result = send_settings();
173 if (settings_result.is_err())
175 is_connected_ =
false;
179 span->set_error(settings_result.error().message);
181 return settings_result;
185 work_guard_ = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
186 asio::make_work_guard(*io_context_));
188 io_future_ = std::async(std::launch::async, [
this]() { run_io(); });
196 catch (
const std::exception& e)
198 is_connected_ =
false;
202 span->set_error(std::string(
"Connection failed: ") + e.what());
205 std::string(
"Connection failed: ") + e.what(),
206 "http2_client::connect");
220 if (socket_ && socket_->lowest_layer().is_open())
234 is_connected_ =
false;
245 const std::vector<http_header>&
headers)
248 return send_request(
"GET", path,
headers, {});
252 const std::string& body,
253 const std::vector<http_header>&
headers)
256 std::vector<uint8_t> body_bytes(body.begin(), body.end());
257 return send_request(
"POST", path,
headers, body_bytes);
261 const std::vector<uint8_t>& body,
262 const std::vector<http_header>&
headers)
265 return send_request(
"POST", path,
headers, body);
269 const std::string& body,
270 const std::vector<http_header>&
headers)
273 std::vector<uint8_t> body_bytes(body.begin(), body.end());
274 return send_request(
"PUT", path,
headers, body_bytes);
278 const std::vector<http_header>&
headers)
281 return send_request(
"DELETE", path,
headers, {});
302 encoder_.set_max_table_size(
settings.header_table_size);
303 decoder_.set_max_table_size(
settings.header_table_size);
307 const std::vector<http_header>&
headers,
308 std::function<
void(std::vector<uint8_t>)> on_data,
309 std::function<
void(std::vector<http_header>)> on_headers,
310 std::function<
void(
int)> on_complete)
317 "http2_client::start_stream");
324 stream.
on_data = std::move(on_data);
329 auto request_headers = build_headers(
"POST", path,
headers);
333 auto encoded_headers = encoder_.encode(request_headers);
338 auto send_result = send_frame(hf);
339 if (send_result.is_err())
342 const auto& err = send_result.error();
344 "http2_client::start_stream",
349 return ok(std::move(result_id));
353 const std::vector<uint8_t>&
data,
360 "http2_client::write_stream");
363 auto* stream = get_stream(stream_id);
368 "http2_client::write_stream");
375 "Stream not writable",
376 "http2_client::write_stream");
381 auto send_result = send_frame(df);
382 if (send_result.is_err())
401 "http2_client::close_stream_writer");
404 auto* stream = get_stream(stream_id);
409 "http2_client::close_stream_writer");
420 auto send_result = send_frame(df);
421 if (send_result.is_err())
436 "http2_client::cancel_stream");
439 auto* stream = get_stream(stream_id);
444 "http2_client::cancel_stream");
454 auto send_result = send_frame(rsf);
455 if (send_result.is_err())
470 asio::write(*socket_,
471 asio::buffer(CONNECTION_PREFACE.data(), CONNECTION_PREFACE.size()));
474 catch (
const std::exception& e)
477 std::string(
"Failed to send connection preface: ") + e.what(),
478 "http2_client::send_connection_preface");
484 std::vector<setting_parameter> params = {
486 local_settings_.header_table_size},
488 local_settings_.enable_push ? 1u : 0u},
490 local_settings_.max_concurrent_streams},
492 local_settings_.initial_window_size},
494 local_settings_.max_frame_size},
496 local_settings_.max_header_list_size}
512 for (
const auto& param :
frame.settings())
517 remote_settings_.header_table_size = param.value;
518 encoder_.set_max_table_size(param.value);
521 remote_settings_.enable_push = (param.value != 0);
524 remote_settings_.max_concurrent_streams = param.value;
527 remote_settings_.initial_window_size = param.value;
530 remote_settings_.max_frame_size = param.value;
533 remote_settings_.max_header_list_size = param.value;
538 return send_settings_ack();
544 return send_frame(ack);
549 if (!socket_ || !socket_->lowest_layer().is_open())
552 "Connection closed",
"http2_client::send_frame");
557 auto data = f.serialize();
558 asio::write(*socket_, asio::buffer(
data));
561 catch (
const std::exception& e)
564 std::string(
"Failed to send frame: ") + e.what(),
565 "http2_client::send_frame");
571 if (!socket_ || !socket_->lowest_layer().is_open())
575 "http2_client::read_frame");
581 std::vector<uint8_t> header_buf(FRAME_HEADER_SIZE);
582 asio::read(*socket_, asio::buffer(header_buf));
585 if (header_result.is_err())
587 const auto& err = header_result.error();
589 "http2_client::read_frame",
593 const auto& header = header_result.value();
596 std::vector<uint8_t> payload(header.length);
597 if (header.length > 0)
599 asio::read(*socket_, asio::buffer(payload));
603 std::vector<uint8_t> frame_data;
604 frame_data.reserve(FRAME_HEADER_SIZE + payload.size());
605 frame_data.insert(frame_data.end(), header_buf.begin(), header_buf.end());
606 frame_data.insert(frame_data.end(), payload.begin(), payload.end());
610 catch (
const std::exception& e)
613 std::string(
"Failed to read frame: ") + e.what(),
614 "http2_client::read_frame");
623 "Null frame",
"http2_client::process_frame");
626 switch (f->header().type)
631 return handle_settings_frame(*sf);
638 return handle_headers_frame(*hf);
643 if (
auto* df =
dynamic_cast<data_frame*
>(f.get()))
645 return handle_data_frame(*df);
652 return handle_rst_stream_frame(*rf);
659 return handle_goaway_frame(*gf);
666 return handle_window_update_frame(*wf);
671 if (
auto* pf =
dynamic_cast<ping_frame*
>(f.get()))
673 return handle_ping_frame(*pf);
687 uint32_t
id = next_stream_id_.fetch_add(2);
693 std::lock_guard<std::mutex> lock(streams_mutex_);
694 auto it = streams_.find(stream_id);
695 if (it != streams_.end())
704 std::lock_guard<std::mutex> lock(streams_mutex_);
705 uint32_t stream_id = allocate_stream_id();
706 auto& stream = streams_[stream_id];
707 stream.stream_id = stream_id;
709 stream.window_size = remote_settings_.initial_window_size;
715 std::lock_guard<std::mutex> lock(streams_mutex_);
716 auto it = streams_.find(stream_id);
717 if (it != streams_.end())
724 const std::string& path,
725 const std::vector<http_header>&
headers,
726 const std::vector<uint8_t>& body)
735 span->set_attribute(
"http.method", method)
736 .set_attribute(
"http.url",
"https://" + host_ +
":" + std::to_string(port_) + path)
737 .set_attribute(
"http.flavor",
"2.0")
738 .set_attribute(
"http.request.body_size",
static_cast<int64_t
>(body.size()))
739 .set_attribute(
"net.peer.name", host_)
740 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(port_));
747 span->set_error(
"Not connected");
751 "http2_client::send_request");
760 span->set_attribute(
"http2.stream_id",
static_cast<int64_t
>(stream.
stream_id));
764 auto request_headers = build_headers(method, path,
headers);
769 auto encoded_headers = encoder_.encode(request_headers);
772 bool end_stream = body.empty();
775 auto send_result = send_frame(hf);
776 if (send_result.is_err())
779 const auto& err = send_result.error();
782 span->set_error(err.message);
785 "http2_client::send_request",
793 send_result = send_frame(df);
794 if (send_result.is_err())
797 const auto& err = send_result.error();
800 span->set_error(err.message);
803 "http2_client::send_request",
814 auto future = stream.
promise.get_future();
815 auto status = future.wait_for(timeout_);
817 if (status == std::future_status::timeout)
822 span->set_error(
"Request timeout");
826 "http2_client::send_request");
829 auto response = future.get();
832 span->set_attribute(
"http.status_code",
static_cast<int64_t
>(response.status_code))
833 .set_attribute(
"http.response.body_size",
static_cast<int64_t
>(response.body.size()));
834 if (response.status_code >= 400)
843 return ok(std::move(response));
847 const std::string& path,
848 const std::vector<http_header>& additional)
849 -> std::vector<http_header>
851 std::vector<http_header>
headers;
854 headers.emplace_back(
":method", method);
855 headers.emplace_back(
":scheme",
"https");
856 headers.emplace_back(
":authority", host_);
857 headers.emplace_back(
":path", path.empty() ?
"/" : path);
860 headers.emplace_back(
"user-agent",
"network_system/http2_client");
863 for (
const auto& h : additional)
866 if (!h.name.empty() && h.name[0] !=
':')
877 uint32_t stream_id = f.header().stream_id;
878 auto* stream = get_stream(stream_id);
884 "http2_client::handle_headers_frame");
888 auto decode_result = decoder_.decode(f.header_block());
889 if (decode_result.is_err())
891 const auto& err = decode_result.error();
893 "http2_client::handle_headers_frame",
897 auto& decoded_headers = decode_result.value();
900 stream->response_headers.insert(stream->response_headers.end(),
901 decoded_headers.begin(),
902 decoded_headers.end());
904 if (f.is_end_headers())
906 stream->headers_complete =
true;
909 if (stream->is_streaming && stream->on_headers)
911 stream->on_headers(stream->response_headers);
915 if (f.is_end_stream())
917 stream->body_complete =
true;
922 for (
const auto& h : stream->response_headers)
924 if (h.name ==
":status")
928 status_code = std::stoi(h.value);
939 if (stream->is_streaming)
941 if (stream->on_complete)
943 stream->on_complete(status_code);
950 response.
headers = stream->response_headers;
951 response.
body = stream->response_body;
953 stream->promise.set_value(std::move(response));
962 uint32_t stream_id = f.header().stream_id;
963 auto* stream = get_stream(stream_id);
969 "http2_client::handle_data_frame");
973 auto data = f.data();
976 if (stream->is_streaming && stream->on_data)
978 stream->on_data(std::vector<uint8_t>(
data.begin(),
data.end()));
982 stream->response_body.insert(stream->response_body.end(),
987 stream->window_size -=
static_cast<int32_t
>(
data.size());
988 connection_window_size_ -=
static_cast<int32_t
>(
data.size());
991 if (stream->window_size <
static_cast<int32_t
>(DEFAULT_WINDOW_SIZE / 2))
993 int32_t increment = DEFAULT_WINDOW_SIZE - stream->window_size;
996 stream->window_size += increment;
999 if (connection_window_size_ <
static_cast<int32_t
>(DEFAULT_WINDOW_SIZE / 2))
1001 int32_t increment = DEFAULT_WINDOW_SIZE - connection_window_size_;
1004 connection_window_size_ += increment;
1007 if (f.is_end_stream())
1009 stream->body_complete =
true;
1013 int status_code = 0;
1014 for (
const auto& h : stream->response_headers)
1016 if (h.name ==
":status")
1020 status_code = std::stoi(h.value);
1031 if (stream->is_streaming)
1033 if (stream->on_complete)
1035 stream->on_complete(status_code);
1041 response.
headers = stream->response_headers;
1042 response.
body = stream->response_body;
1044 stream->promise.set_value(std::move(response));
1053 uint32_t stream_id = f.header().stream_id;
1054 auto* stream = get_stream(stream_id);
1063 stream->promise.set_value(std::move(response));
1071 goaway_received_ =
true;
1074 uint32_t last_stream = f.last_stream_id();
1076 std::lock_guard<std::mutex> lock(streams_mutex_);
1077 for (
auto& [
id, stream] : streams_)
1084 stream.promise.set_value(std::move(response));
1094 uint32_t stream_id = f.header().stream_id;
1095 int32_t increment =
static_cast<int32_t
>(f.window_size_increment());
1100 connection_window_size_ += increment;
1104 auto* stream = get_stream(stream_id);
1107 stream->window_size += increment;
1120 return send_frame(ack);
1127 while (is_running_ && is_connected_)
1131 auto frame_result = read_frame();
1132 if (frame_result.is_err())
1136 is_connected_ =
false;
1141 process_frame(std::move(frame_result.value()));
1147 is_connected_ =
false;
1156 is_running_ =
false;
1160 work_guard_->reset();
1163 if (socket_ && socket_->lowest_layer().is_open())
1166 socket_->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
1167 socket_->lowest_layer().close(ec);
1170 if (io_future_.valid())
1172 io_future_.wait_for(std::chrono::seconds(5));
1177 io_context_->stop();
DATA frame (RFC 7540 Section 6.1)
Base class for HTTP/2 frames.
static auto parse(std::span< const uint8_t > data) -> Result< std::unique_ptr< frame > >
Parse frame from raw bytes.
GOAWAY frame (RFC 7540 Section 6.8)
~http2_client()
Destructor - closes connection gracefully.
auto send_settings_ack() -> VoidResult
auto cancel_stream(uint32_t stream_id) -> VoidResult
Cancel a stream.
auto is_connected() const -> bool
Check if connected.
auto write_stream(uint32_t stream_id, const std::vector< uint8_t > &data, bool end_stream=false) -> VoidResult
Write data to an open stream.
auto create_stream() -> http2_stream &
auto put(const std::string &path, const std::string &body, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 PUT request.
auto set_settings(const http2_settings &settings) -> void
Update local settings.
auto get_settings() const -> http2_settings
Get local settings.
std::atomic< bool > goaway_received_
auto read_frame() -> Result< std::unique_ptr< frame > >
auto handle_window_update_frame(const window_update_frame &f) -> VoidResult
http2_client(std::string_view client_id)
Construct HTTP/2 client.
auto send_request(const std::string &method, const std::string &path, const std::vector< http_header > &headers, const std::vector< uint8_t > &body) -> Result< http2_response >
auto send_frame(const frame &f) -> VoidResult
auto handle_rst_stream_frame(const rst_stream_frame &f) -> VoidResult
std::chrono::milliseconds timeout_
auto handle_headers_frame(const headers_frame &f) -> VoidResult
std::atomic< bool > is_connected_
auto get_stream(uint32_t stream_id) -> http2_stream *
auto post(const std::string &path, const std::string &body, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 POST request.
auto close_stream_writer(uint32_t stream_id) -> VoidResult
Close the write side of a stream.
auto handle_data_frame(const data_frame &f) -> VoidResult
auto handle_settings_frame(const settings_frame &frame) -> VoidResult
auto handle_ping_frame(const ping_frame &f) -> VoidResult
auto close_stream(uint32_t stream_id) -> void
auto handle_goaway_frame(const goaway_frame &f) -> VoidResult
http2_settings local_settings_
auto process_frame(std::unique_ptr< frame > f) -> VoidResult
auto send_settings() -> VoidResult
auto disconnect() -> VoidResult
Disconnect from server.
auto get(const std::string &path, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 GET request.
auto del(const std::string &path, const std::vector< http_header > &headers={}) -> Result< http2_response >
Perform HTTP/2 DELETE request.
auto get_timeout() const -> std::chrono::milliseconds
Get current timeout.
auto connect(const std::string &host, unsigned short port=443) -> VoidResult
Connect to HTTP/2 server.
auto build_headers(const std::string &method, const std::string &path, const std::vector< http_header > &additional) -> std::vector< http_header >
auto start_stream(const std::string &path, const std::vector< http_header > &headers, std::function< void(std::vector< uint8_t >)> on_data, std::function< void(std::vector< http_header >)> on_headers, std::function< void(int)> on_complete) -> Result< uint32_t >
Start a streaming POST request.
auto set_timeout(std::chrono::milliseconds timeout) -> void
Set request timeout.
auto allocate_stream_id() -> uint32_t
auto send_connection_preface() -> VoidResult
PING frame (RFC 7540 Section 6.7)
RST_STREAM frame (RFC 7540 Section 6.4)
SETTINGS frame (RFC 7540 Section 6.5)
WINDOW_UPDATE frame (RFC 7540 Section 6.9)
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
struct ssl_ctx_st SSL_CTX
constexpr int invalid_argument
constexpr int internal_error
constexpr int already_exists
constexpr int receive_failed
constexpr int send_failed
constexpr int connection_closed
constexpr int connection_failed
@ cancel
Stream cancelled.
@ no_error
Graceful shutdown.
setting_identifier
SETTINGS frame parameter identifiers (RFC 7540 Section 6.5.2)
@ max_frame_size
SETTINGS_MAX_FRAME_SIZE.
@ max_header_list_size
SETTINGS_MAX_HEADER_LIST_SIZE.
@ max_concurrent_streams
SETTINGS_MAX_CONCURRENT_STREAMS.
@ enable_push
SETTINGS_ENABLE_PUSH.
@ header_table_size
SETTINGS_HEADER_TABLE_SIZE.
@ initial_window_size
SETTINGS_INITIAL_WINDOW_SIZE.
@ closed
Stream fully closed.
@ open
Stream open and active.
@ half_closed_local
Local end closed, remote can send.
@ idle
Stream not yet opened.
@ settings
SETTINGS frame.
@ rst_stream
RST_STREAM frame.
@ window_update
WINDOW_UPDATE frame.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
@ ok
Operation completed successfully.
std::string get_error_details(const simple_error &err)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
RAII span implementation for distributed tracing.
std::vector< uint8_t > body
Response body.
auto get_header(const std::string &name) const -> std::optional< std::string >
Get header value by name.
auto get_body_string() const -> std::string
Get body as string.
std::vector< http_header > headers
Response headers.
int status_code
HTTP status code.
HTTP/2 connection settings.
HTTP/2 stream state and data.
std::vector< uint8_t > request_body
Request body.
std::promise< http2_response > promise
Response promise.
std::vector< http_header > request_headers
Request headers.
std::function< void(std::vector< uint8_t >)> on_data
Callback for streaming data.
std::function< void(std::vector< http_header >)> on_headers
Callback for headers.
std::function< void(int)> on_complete
Callback when stream ends (status code)
stream_state state
Current state.
uint32_t stream_id
Stream identifier.
bool is_streaming
Streaming support.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.