15#include <condition_variable>
19#if NETWORK_GRPC_OFFICIAL
20#include <grpcpp/grpcpp.h>
21#include <grpcpp/generic/generic_stub.h>
22#include <grpcpp/support/byte_buffer.h>
31#if NETWORK_GRPC_OFFICIAL
40auto vector_to_byte_buffer(
const std::vector<uint8_t>& data) -> ::grpc::ByteBuffer
42 ::grpc::Slice slice(
data.data(),
data.size());
43 return ::grpc::ByteBuffer(&slice, 1);
46auto byte_buffer_to_vector(const ::grpc::ByteBuffer& buffer) -> std::vector<uint8_t>
48 std::vector<::grpc::Slice> slices;
51 std::vector<uint8_t> result;
52 result.reserve(buffer.Length());
54 for (
const auto& slice : slices)
56 const auto* begin =
reinterpret_cast<const uint8_t*
>(slice.begin());
57 result.insert(result.end(), begin, begin + slice.size());
66class official_server_stream_reader :
public grpc_client::server_stream_reader
69 official_server_stream_reader(
70 std::unique_ptr<::grpc::ClientContext> ctx,
71 std::unique_ptr<::grpc::GenericClientAsyncReader> reader,
72 ::grpc::CompletionQueue* cq)
73 : ctx_(std::move(ctx))
74 , reader_(std::move(reader))
80 auto read() -> Result<grpc_message>
override
84 return error<grpc_message>(
85 static_cast<int>(status_code::ok),
87 "grpc::client::server_stream_reader");
90 ::grpc::ByteBuffer buffer;
94 reader_->Read(&buffer, &tag);
96 if (!cq_->Next(&tag, &ok) || !ok)
99 return error<grpc_message>(
100 static_cast<int>(status_code::ok),
102 "grpc::client::server_stream_reader");
105 auto data = detail::byte_buffer_to_vector(buffer);
109 auto has_more() const ->
bool override
116 ::grpc::Status status;
120 reader_->Finish(&status, &tag);
121 cq_->Next(&tag, &ok);
123 return from_grpc_status(status);
127 std::unique_ptr<::grpc::ClientContext> ctx_;
128 std::unique_ptr<::grpc::GenericClientAsyncReader> reader_;
129 ::grpc::CompletionQueue* cq_;
134class official_client_stream_writer :
public grpc_client::client_stream_writer
137 official_client_stream_writer(
138 std::unique_ptr<::grpc::ClientContext> ctx,
139 std::unique_ptr<::grpc::GenericClientAsyncWriter> writer,
140 ::grpc::CompletionQueue* cq)
141 : ctx_(std::move(ctx))
142 , writer_(std::move(writer))
144 , writes_done_(false)
148 auto write(
const std::vector<uint8_t>& message) ->
VoidResult override
153 error_codes::common_errors::internal_error,
154 "Stream writes already done",
155 "grpc::client::client_stream_writer");
158 ::grpc::ByteBuffer buffer = detail::vector_to_byte_buffer(message);
162 writer_->Write(buffer, &tag);
164 if (!cq_->Next(&tag, &ok) || !ok)
167 error_codes::network_system::connection_failed,
168 "Failed to write message",
169 "grpc::client::client_stream_writer");
185 writer_->WritesDone(&tag);
186 cq_->Next(&tag, &ok);
192 auto finish() -> Result<grpc_message>
override
199 ::grpc::ByteBuffer response;
200 ::grpc::Status status;
204 writer_->Finish(&status, &tag);
205 cq_->Next(&tag, &ok);
209 auto grpc_st = from_grpc_status(status);
210 return error<grpc_message>(
211 static_cast<int>(grpc_st.code),
213 "grpc::client::client_stream_writer");
221 std::unique_ptr<::grpc::ClientContext> ctx_;
222 std::unique_ptr<::grpc::GenericClientAsyncWriter> writer_;
223 ::grpc::CompletionQueue* cq_;
228class official_bidi_stream :
public grpc_client::bidi_stream
231 official_bidi_stream(
232 std::unique_ptr<::grpc::ClientContext> ctx,
233 std::unique_ptr<::grpc::GenericClientAsyncReaderWriter> stream,
234 ::grpc::CompletionQueue* cq)
235 : ctx_(std::move(ctx))
236 , stream_(std::move(stream))
238 , writes_done_(false)
242 auto write(
const std::vector<uint8_t>& message) ->
VoidResult override
247 error_codes::common_errors::internal_error,
248 "Stream writes already done",
249 "grpc::client::bidi_stream");
252 ::grpc::ByteBuffer buffer = detail::vector_to_byte_buffer(message);
256 stream_->Write(buffer, &tag);
258 if (!cq_->Next(&tag, &ok) || !ok)
261 error_codes::network_system::connection_failed,
262 "Failed to write message",
263 "grpc::client::bidi_stream");
269 auto read() -> Result<grpc_message>
override
271 ::grpc::ByteBuffer buffer;
275 stream_->Read(&buffer, &tag);
277 if (!cq_->Next(&tag, &ok) || !ok)
279 return error<grpc_message>(
280 static_cast<int>(status_code::ok),
282 "grpc::client::bidi_stream");
285 auto data = detail::byte_buffer_to_vector(buffer);
299 stream_->WritesDone(&tag);
300 cq_->Next(&tag, &ok);
313 ::grpc::Status status;
317 stream_->Finish(&status, &tag);
318 cq_->Next(&tag, &ok);
320 return from_grpc_status(status);
324 std::unique_ptr<::grpc::ClientContext> ctx_;
325 std::unique_ptr<::grpc::GenericClientAsyncReaderWriter> stream_;
326 ::grpc::CompletionQueue* cq_;
331class grpc_client::impl
334 explicit impl(std::string
target, grpc_channel_config config)
353 span->set_attribute(
"rpc.system",
"grpc")
354 .set_attribute(
"rpc.grpc.target",
target_)
355 .set_attribute(
"net.transport",
"tcp")
359 std::lock_guard<std::mutex> lock(
mutex_);
365 span->set_attribute(
"grpc.client.already_connected",
true);
371 channel_credentials_config creds_config;
381 channel_ = create_channel(
target_, creds_config);
387 span->set_error(
"Failed to create gRPC channel");
391 "Failed to create gRPC channel",
400 span->set_error(
"Failed to connect to gRPC server");
404 "Failed to connect to gRPC server",
409 stub_ = std::make_unique<::grpc::GenericStub>(channel_);
417 std::lock_guard<std::mutex> lock(
mutex_);
431 auto state = channel_->GetState(
false);
432 return state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE;
442 return wait_for_channel_ready(channel_, timeout);
445 auto target() const -> const std::
string&
450 auto call_raw(
const std::string& method,
451 const std::vector<uint8_t>& request,
452 const call_options& options) -> Result<grpc_message>
459 span->set_attribute(
"rpc.system",
"grpc")
460 .set_attribute(
"rpc.method", method)
461 .set_attribute(
"rpc.grpc.target",
target_)
462 .set_attribute(
"rpc.request.size",
static_cast<int64_t
>(request.size()));
469 span->set_error(
"Not connected to server");
473 "Not connected to server",
477 ::grpc::ClientContext ctx;
480 if (options.deadline.has_value())
482 set_deadline(&ctx, options.deadline.value());
491 for (
const auto& [key, value] : options.metadata)
493 ctx.AddMetadata(key, value);
497 if (options.wait_for_ready)
499 ctx.set_wait_for_ready(
true);
503 ::grpc::ByteBuffer request_buffer = detail::vector_to_byte_buffer(request);
504 ::grpc::ByteBuffer response_buffer;
507 ::grpc::Status status = stub_->UnaryCall(&ctx, method, request_buffer, &response_buffer);
511 auto grpc_st = from_grpc_status(status);
514 span->set_error(grpc_st.message)
515 .set_attribute(
"rpc.grpc.status_code",
static_cast<int64_t
>(grpc_st.code));
518 static_cast<int>(grpc_st.code),
523 auto response_data = detail::byte_buffer_to_vector(response_buffer);
526 span->set_attribute(
"rpc.response.size",
static_cast<int64_t
>(response_data.size()))
527 .set_attribute(
"rpc.grpc.status_code",
static_cast<int64_t
>(0));
533 const std::vector<uint8_t>& request,
534 std::function<
void(Result<grpc_message>)> callback,
535 const call_options& options) ->
void
538 std::thread([
this, method, request, callback, options]() {
539 auto result =
call_raw(method, request, options);
542 callback(std::move(result));
548 const std::vector<uint8_t>& request,
549 const call_options& options)
550 -> Result<std::unique_ptr<grpc_client::server_stream_reader>>
556 "Not connected to server",
560 auto ctx = std::make_unique<::grpc::ClientContext>();
563 if (options.deadline.has_value())
565 set_deadline(ctx.get(), options.deadline.value());
573 for (
const auto& [key, value] : options.metadata)
575 ctx->AddMetadata(key, value);
584 "Server streaming not fully implemented for official gRPC wrapper",
589 const call_options& options)
590 -> Result<std::unique_ptr<grpc_client::client_stream_writer>>
596 "Not connected to server",
602 "Client streaming not fully implemented for official gRPC wrapper",
607 const call_options& options)
608 -> Result<std::unique_ptr<grpc_client::bidi_stream>>
614 "Not connected to server",
620 "Bidirectional streaming not fully implemented for official gRPC wrapper",
627 std::shared_ptr<::grpc::Channel> channel_;
628 std::unique_ptr<::grpc::GenericStub> stub_;
630 mutable std::mutex
mutex_;
653 std::unique_lock<std::mutex> lock(
mutex_);
665 "server_stream_reader::read");
671 if (parse_result.is_err())
677 auto& msg = parse_result.value();
678 size_t msg_size = 5 + msg.data.size();
679 if (
buffer_.size() >= msg_size)
684 return ok(std::move(msg));
689 std::lock_guard<std::mutex> lock(
mutex_);
695 std::unique_lock<std::mutex> lock(
mutex_);
700 void on_data(
const std::vector<uint8_t>& data)
702 std::lock_guard<std::mutex> lock(
mutex_);
707 void on_headers(
const std::vector<http2::http_header>& headers)
709 std::lock_guard<std::mutex> lock(
mutex_);
710 for (
const auto& h : headers)
712 if (h.name ==
"grpc-status")
717 else if (h.name ==
"grpc-message")
726 std::lock_guard<std::mutex> lock(
mutex_);
740 std::condition_variable
cv_;
764 "Stream writes already done",
765 "client_stream_writer::write");
792 if (wd_result.is_err())
794 const auto& err = wd_result.error();
800 std::unique_lock<std::mutex> lock(
mutex_);
811 "client_stream_writer::finish");
814 void on_data(
const std::vector<uint8_t>& data)
816 std::lock_guard<std::mutex> lock(
mutex_);
820 void on_headers(
const std::vector<http2::http_header>& headers)
822 std::lock_guard<std::mutex> lock(
mutex_);
823 for (
const auto& h : headers)
825 if (h.name ==
"grpc-status")
830 else if (h.name ==
"grpc-message")
839 std::lock_guard<std::mutex> lock(
mutex_);
851 if (parse_result.is_ok())
853 response_ = std::move(parse_result.value());
866 std::condition_variable
cv_;
892 "Stream writes already done",
893 "bidi_stream::write");
904 std::unique_lock<std::mutex> lock(
mutex_);
915 "bidi_stream::read");
920 if (parse_result.is_err())
925 auto& msg = parse_result.value();
926 size_t msg_size = 5 + msg.data.size();
927 if (
buffer_.size() >= msg_size)
932 return ok(std::move(msg));
953 std::unique_lock<std::mutex> lock(
mutex_);
958 void on_data(
const std::vector<uint8_t>& data)
960 std::lock_guard<std::mutex> lock(
mutex_);
965 void on_headers(
const std::vector<http2::http_header>& headers)
967 std::lock_guard<std::mutex> lock(
mutex_);
968 for (
const auto& h : headers)
970 if (h.name ==
"grpc-status")
975 else if (h.name ==
"grpc-message")
984 std::lock_guard<std::mutex> lock(
mutex_);
1028 span->set_attribute(
"rpc.system",
"grpc")
1029 .set_attribute(
"rpc.grpc.target",
target_)
1030 .set_attribute(
"net.transport",
"tcp");
1033 std::lock_guard<std::mutex> lock(
mutex_);
1039 span->set_attribute(
"grpc.client.already_connected",
true);
1045 auto colon_pos =
target_.find(
':');
1046 if (colon_pos == std::string::npos)
1050 span->set_error(
"Invalid target address format");
1054 "Invalid target address format",
1056 "Expected format: host:port");
1060 auto port_str =
target_.substr(colon_pos + 1);
1062 unsigned short port = 0;
1063 auto [ptr, ec] = std::from_chars(
1065 port_str.data() + port_str.size(),
1068 if (ec != std::errc())
1072 span->set_error(
"Invalid port number");
1076 "Invalid port number",
1082 span->set_attribute(
"net.peer.name",
host_)
1083 .set_attribute(
"net.peer.port",
static_cast<int64_t
>(port));
1087 http2_client_ = std::make_shared<http2::http2_client>(
"grpc-client");
1092 if (result.is_err())
1094 const auto& err = result.error();
1097 span->set_error(err.message);
1099 return error_void(err.code, err.message,
"grpc::client",
1109 std::lock_guard<std::mutex> lock(
mutex_);
1127 auto deadline = std::chrono::steady_clock::now() + timeout;
1129 while (std::chrono::steady_clock::now() < deadline)
1135 std::this_thread::sleep_for(std::chrono::milliseconds(10));
1147 const std::vector<uint8_t>& request,
1155 span->set_attribute(
"rpc.system",
"grpc")
1156 .set_attribute(
"rpc.method", method)
1157 .set_attribute(
"rpc.grpc.target",
target_)
1158 .set_attribute(
"rpc.request.size",
static_cast<int64_t
>(request.size()));
1165 span->set_error(
"Not connected to server");
1169 "Not connected to server",
1174 if (method.empty() || method[0] !=
'/')
1178 span->set_error(
"Invalid method format");
1182 "Invalid method format",
1184 "Method must start with '/'");
1188 if (options.deadline.has_value())
1190 if (std::chrono::system_clock::now() > options.deadline.value())
1194 span->set_error(
"Deadline exceeded before call started")
1199 "Deadline exceeded before call started",
1205 std::vector<http2::http_header> headers;
1213 if (options.deadline.has_value())
1215 auto now = std::chrono::system_clock::now();
1216 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1217 options.deadline.value() - now);
1218 if (remaining.count() > 0)
1226 for (
const auto& [key, value] : options.metadata)
1228 headers.emplace_back(key, value);
1232 grpc_message request_msg{std::vector<uint8_t>{request.begin(), request.end()}};
1233 auto serialized_request = request_msg.
serialize();
1236 auto response_result =
http2_client_->post(method, serialized_request, headers);
1237 if (response_result.is_err())
1239 const auto& err = response_result.error();
1242 span->set_error(err.message);
1248 const auto& response = response_result.value();
1251 if (response.status_code != 200)
1255 span->set_error(
"HTTP error: " + std::to_string(response.status_code))
1256 .set_attribute(
"http.status_code",
static_cast<int64_t
>(response.status_code));
1260 "HTTP error: " + std::to_string(response.status_code),
1266 std::string grpc_message_str;
1268 for (
const auto& header : response.headers)
1273 auto [ptr, ec] = std::from_chars(
1274 header.value.data(),
1275 header.value.data() + header.value.size(),
1277 if (ec == std::errc())
1284 grpc_message_str = header.value;
1293 span->set_error(grpc_message_str.empty() ?
1295 .set_attribute(
"rpc.grpc.status_code",
static_cast<int64_t
>(
grpc_status));
1299 grpc_message_str.empty() ?
1305 if (response.body.empty())
1310 span->set_attribute(
"rpc.response.size",
static_cast<int64_t
>(0))
1311 .set_attribute(
"rpc.grpc.status_code",
static_cast<int64_t
>(0));
1317 if (parse_result.is_err())
1319 const auto& err = parse_result.error();
1322 span->set_error(err.message);
1330 span->set_attribute(
"rpc.response.size",
static_cast<int64_t
>(parse_result.value().data.size()))
1331 .set_attribute(
"rpc.grpc.status_code",
static_cast<int64_t
>(0));
1333 return ok(std::move(parse_result.value()));
1337 const std::vector<uint8_t>& request,
1343 [
this, method, request, callback, options]() {
1344 auto result =
call_raw(method, request, options);
1347 callback(std::move(result));
1353 const std::vector<uint8_t>& request,
1361 "Not connected to server",
1366 if (method.empty() || method[0] !=
'/')
1370 "Invalid method format",
1372 "Method must start with '/'");
1376 std::vector<http2::http_header> headers;
1378 headers.emplace_back(
"te",
"trailers");
1379 headers.emplace_back(
"grpc-accept-encoding",
1384 if (options.deadline.has_value())
1386 auto now = std::chrono::system_clock::now();
1387 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1388 options.deadline.value() - now);
1389 if (remaining.count() > 0)
1391 headers.emplace_back(
"grpc-timeout",
1397 for (
const auto& [key, value] : options.metadata)
1399 headers.emplace_back(key, value);
1403 auto reader = std::make_shared<server_stream_reader_impl>(
http2_client_, 0);
1409 [reader](std::vector<uint8_t> data) { reader->on_data(data); },
1410 [reader](std::vector<http2::http_header> hdrs) { reader->on_headers(hdrs); },
1411 [reader](
int status) { reader->on_complete(status); });
1413 if (stream_result.is_err())
1415 const auto& err = stream_result.error();
1421 grpc_message request_msg{std::vector<uint8_t>(request)};
1422 auto serialized = request_msg.
serialize();
1423 auto write_result =
http2_client_->write_stream(stream_result.value(), serialized,
true);
1425 if (write_result.is_err())
1427 const auto& err = write_result.error();
1435 std::shared_ptr<server_stream_reader_impl>
impl;
1436 explicit shared_holder(std::shared_ptr<server_stream_reader_impl> p) :
impl(std::move(p)) {}
1438 auto has_more()
const ->
bool override {
return impl->has_more(); }
1442 return ok(std::unique_ptr<grpc_client::server_stream_reader>(
1443 new shared_holder(reader)));
1454 "Not connected to server",
1459 if (method.empty() || method[0] !=
'/')
1463 "Invalid method format",
1465 "Method must start with '/'");
1469 std::vector<http2::http_header> headers;
1471 headers.emplace_back(
"te",
"trailers");
1472 headers.emplace_back(
"grpc-accept-encoding",
1477 if (options.deadline.has_value())
1479 auto now = std::chrono::system_clock::now();
1480 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1481 options.deadline.value() - now);
1482 if (remaining.count() > 0)
1484 headers.emplace_back(
"grpc-timeout",
1490 for (
const auto& [key, value] : options.metadata)
1492 headers.emplace_back(key, value);
1496 auto stream_id_holder = std::make_shared<uint32_t>(0);
1497 auto writer = std::make_shared<client_stream_writer_impl>(
http2_client_, 0);
1503 [writer](std::vector<uint8_t> data) { writer->on_data(data); },
1504 [writer](std::vector<http2::http_header> hdrs) { writer->on_headers(hdrs); },
1505 [writer](
int status) { writer->on_complete(status); });
1507 if (stream_result.is_err())
1509 const auto& err = stream_result.error();
1515 auto actual_writer = std::make_shared<client_stream_writer_impl>(
http2_client_, stream_result.value());
1522 std::shared_ptr<client_stream_writer_impl>
impl;
1523 explicit shared_writer_holder(std::shared_ptr<client_stream_writer_impl> p) :
impl(std::move(p)) {}
1525 auto writes_done() ->
VoidResult override {
return impl->writes_done(); }
1529 return ok(std::unique_ptr<grpc_client::client_stream_writer>(
1530 new shared_writer_holder(actual_writer)));
1541 "Not connected to server",
1546 if (method.empty() || method[0] !=
'/')
1550 "Invalid method format",
1552 "Method must start with '/'");
1556 std::vector<http2::http_header> headers;
1558 headers.emplace_back(
"te",
"trailers");
1559 headers.emplace_back(
"grpc-accept-encoding",
1564 if (options.deadline.has_value())
1566 auto now = std::chrono::system_clock::now();
1567 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1568 options.deadline.value() - now);
1569 if (remaining.count() > 0)
1571 headers.emplace_back(
"grpc-timeout",
1577 for (
const auto& [key, value] : options.metadata)
1579 headers.emplace_back(key, value);
1583 auto bidi = std::make_shared<bidi_stream_impl>(
http2_client_, 0);
1589 [bidi](std::vector<uint8_t> data) { bidi->on_data(data); },
1590 [bidi](std::vector<http2::http_header> hdrs) { bidi->on_headers(hdrs); },
1591 [bidi](
int status) { bidi->on_complete(status); });
1593 if (stream_result.is_err())
1595 const auto& err = stream_result.error();
1601 auto actual_bidi = std::make_shared<bidi_stream_impl>(
http2_client_, stream_result.value());
1604 std::shared_ptr<bidi_stream_impl>
impl;
1605 explicit shared_bidi_holder(std::shared_ptr<bidi_stream_impl> p) :
impl(std::move(p)) {}
1608 auto writes_done() ->
VoidResult override {
return impl->writes_done(); }
1612 return ok(std::unique_ptr<grpc_client::bidi_stream>(
1613 new shared_bidi_holder(actual_bidi)));
1631 : impl_(std::make_unique<
impl>(target,
config))
1642 return impl_->connect();
1647 impl_->disconnect();
1657 return impl_->wait_for_connected(timeout);
1666 const std::vector<uint8_t>& request,
1669 return impl_->call_raw(method, request, options);
1673 const std::vector<uint8_t>& request,
1677 impl_->call_raw_async(method, request, std::move(callback), options);
1681 const std::vector<uint8_t>& request,
1685 return impl_->server_stream_raw(method, request, options);
1692 return impl_->client_stream_raw(method, options);
1699 return impl_->bidi_stream_raw(method, options);
static thread_integration_manager & instance()
Get the singleton instance.
std::future< void > submit_task(std::function< void()> task)
Submit a task to the thread pool.
auto write(const std::vector< uint8_t > &message) -> VoidResult override
Write message to stream.
std::condition_variable cv_
void on_complete(int status_code)
void on_headers(const std::vector< http2::http_header > &headers)
auto read() -> Result< grpc_message > override
Read next message from stream.
void on_data(const std::vector< uint8_t > &data)
auto finish() -> grpc_status override
Finish the call and get final status.
std::shared_ptr< http2::http2_client > http2_client_
auto writes_done() -> VoidResult override
Signal that writing is done.
bidi_stream_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
std::vector< uint8_t > buffer_
grpc_status final_status_
std::condition_variable cv_
auto finish() -> Result< grpc_message > override
Finish the call and get response.
void on_headers(const std::vector< http2::http_header > &headers)
void on_complete(int status_code)
std::shared_ptr< http2::http2_client > http2_client_
void on_data(const std::vector< uint8_t > &data)
auto write(const std::vector< uint8_t > &message) -> VoidResult override
Write message to stream.
auto writes_done() -> VoidResult override
Signal that writing is done.
std::vector< uint8_t > response_buffer_
client_stream_writer_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
grpc_status final_status_
Bidirectional streaming RPC handle.
Writer for client streaming RPC.
auto is_connected() const -> bool
auto disconnect() -> void
auto call_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options) -> Result< grpc_message >
auto connect() -> VoidResult
std::atomic< bool > connected_
auto bidi_stream_raw(const std::string &method, const call_options &options) -> Result< std::unique_ptr< grpc_client::bidi_stream > >
std::shared_ptr< http2::http2_client > http2_client_
impl(std::string target, grpc_channel_config config)
auto client_stream_raw(const std::string &method, const call_options &options) -> Result< std::unique_ptr< grpc_client::client_stream_writer > >
auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
auto server_stream_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options) -> Result< std::unique_ptr< grpc_client::server_stream_reader > >
auto target() const -> const std::string &
auto call_raw_async(const std::string &method, const std::vector< uint8_t > &request, std::function< void(Result< grpc_message >)> callback, const call_options &options) -> void
grpc_channel_config config_
Reader for server streaming RPC.
gRPC client for making RPC calls
auto is_connected() const -> bool
Check if connected.
auto client_stream_raw(const std::string &method, const call_options &options={}) -> Result< std::unique_ptr< client_stream_writer > >
Start a client streaming RPC call.
auto target() const -> const std::string &
Get the target address.
auto call_raw_async(const std::string &method, const std::vector< uint8_t > &request, std::function< void(Result< grpc_message >)> callback, const call_options &options={}) -> void
Make an async unary RPC call.
~grpc_client()
Destructor.
auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
Wait for connection to be ready.
std::unique_ptr< impl > impl_
auto disconnect() -> void
Disconnect from the server.
auto call_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options={}) -> Result< grpc_message >
Make a unary RPC call.
auto server_stream_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options={}) -> Result< std::unique_ptr< server_stream_reader > >
Start a server streaming RPC call.
auto bidi_stream_raw(const std::string &method, const call_options &options={}) -> Result< std::unique_ptr< bidi_stream > >
Start a bidirectional streaming RPC call.
grpc_client(const std::string &target, const grpc_channel_config &config={})
Construct gRPC client.
auto has_more() const -> bool override
Check if stream has more messages.
auto finish() -> grpc_status override
Get final status after stream ends.
auto read() -> Result< grpc_message > override
Read next message from stream.
grpc_status final_status_
void on_complete(int status_code)
std::shared_ptr< http2::http2_client > http2_client_
void on_headers(const std::vector< http2::http_header > &headers)
server_stream_reader_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
std::vector< uint8_t > buffer_
std::condition_variable cv_
void on_data(const std::vector< uint8_t > &data)
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
gRPC client channel configuration and connection.
Official gRPC library wrapper interfaces.
gRPC message framing and serialization.
constexpr int invalid_argument
constexpr int internal_error
constexpr int connection_failed
constexpr const char * identity
constexpr const char * deflate
constexpr const char * gzip
constexpr const char * grpc_status
constexpr const char * grpc_message
gRPC protocol implementation
auto format_timeout(uint64_t timeout_ms) -> std::string
Format timeout as gRPC timeout string.
constexpr const char * grpc_content_type
gRPC content-type header value
status_code
gRPC status codes (as defined in grpc/status.h)
@ deadline_exceeded
Deadline expired before operation completed.
@ unimplemented
Operation not implemented.
@ ok
Not an error; returned on success.
@ unavailable
Service unavailable.
constexpr auto status_code_to_string(status_code code) -> std::string_view
Convert status code to string.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
std::string get_error_details(const simple_error &err)
Result< std::monostate > VoidResult
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
RAII span implementation for distributed tracing.
Options for individual RPC calls.
Configuration for gRPC channel.
std::string root_certificates
Root certificates for TLS (PEM format)
std::optional< std::string > client_key
Client private key for mutual TLS (PEM format)
bool use_tls
Whether to use TLS.
std::optional< std::string > client_certificate
Client certificate for mutual TLS (PEM format)
std::chrono::milliseconds default_timeout
Default timeout for RPC calls.
gRPC message with compression flag and payload
static auto parse(std::span< const uint8_t > input) -> Result< grpc_message >
Parse gRPC message from raw bytes.
auto serialize() const -> std::vector< uint8_t >
Serialize message to bytes with length prefix.
std::vector< uint8_t > data
Message payload.
gRPC status with code, message, and optional details
Thread system integration interface for network_system.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.