12#include <condition_variable>
15#include <unordered_map>
17#if NETWORK_GRPC_OFFICIAL
19#include <grpcpp/grpcpp.h>
20#include <grpcpp/ext/proto_server_reflection_plugin.h>
21#include <grpcpp/health_check_service_interface.h>
27#if NETWORK_GRPC_OFFICIAL
34class generic_service_impl :
public ::grpc::Service
37 using unary_method_handler = std::function<
38 ::grpc::Status(::grpc::ServerContext*, const ::grpc::ByteBuffer*, ::grpc::ByteBuffer*)>;
40 auto add_method(
const std::string& method_name, unary_method_handler handler) ->
void
43 auto* method = AddMethod(new ::grpc::internal::RpcServiceMethod(
45 ::grpc::internal::RpcMethod::NORMAL_RPC,
48 handlers_[method_name] = std::move(handler);
52 std::unordered_map<std::string, unary_method_handler> handlers_;
56class official_server_context :
public server_context
59 explicit official_server_context(::grpc::ServerContext* ctx)
63 for (
const auto& md : ctx_->client_metadata())
65 metadata_.emplace_back(
66 std::string(md.first.data(), md.first.size()),
67 std::string(md.second.data(), md.second.size()));
71 auto client_metadata() const -> const grpc_metadata&
override
76 auto add_trailing_metadata(
const std::string& key,
77 const std::string& value) ->
void override
79 ctx_->AddTrailingMetadata(key, value);
82 auto set_trailing_metadata(grpc_metadata metadata) ->
void override
84 for (
const auto& [key, value] : metadata)
86 ctx_->AddTrailingMetadata(key, value);
90 auto is_cancelled() const ->
bool override
92 return ctx_->IsCancelled();
96 -> std::optional<std::chrono::system_clock::time_point>
override
98 auto deadline = ctx_->deadline();
99 if (deadline == std::chrono::system_clock::time_point::max())
106 auto peer() const -> std::
string override
111 auto auth_context() const -> std::
string override
113 auto auth = ctx_->auth_context();
116 auto peer_identity = auth->GetPeerIdentity();
117 if (!peer_identity.empty())
119 return std::string(peer_identity[0].
data(), peer_identity[0].size());
126 ::grpc::ServerContext* ctx_;
131class grpc_server::impl
134 explicit impl(grpc_server_config config)
153 span->set_attribute(
"rpc.system",
"grpc")
154 .set_attribute(
"net.host.port",
static_cast<int64_t
>(
port))
155 .set_attribute(
"rpc.grpc.use_tls",
false);
158 std::lock_guard<std::mutex> lock(
mutex_);
164 span->set_error(
"Server is already running");
168 "Server is already running",
176 span->set_error(
"Invalid port number");
180 "Invalid port number",
184 ::grpc::ServerBuilder builder;
187 std::string server_address =
"0.0.0.0:" + std::to_string(
port);
188 builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials(), &bound_port_);
196 builder.AddChannelArgument(GRPC_ARG_MAX_CONCURRENT_STREAMS,
201 ::grpc::reflection::InitProtoReflectionServerBuilderPlugin();
202 ::grpc::EnableDefaultHealthCheckService(
true);
205 server_ = builder.BuildAndStart();
211 span->set_error(
"Failed to start gRPC server");
215 "Failed to start gRPC server",
219 port_ =
static_cast<uint16_t
>(bound_port_);
223 server_thread_ = std::thread([
this]() {
229 span->set_attribute(
"net.host.port.bound",
static_cast<int64_t
>(bound_port_));
236 const std::string& cert_path,
237 const std::string& key_path,
240 std::lock_guard<std::mutex> lock(
mutex_);
246 "Server is already running",
250 if (cert_path.empty() || key_path.empty())
254 "Certificate and key paths are required for TLS",
259 std::string cert_contents, key_contents, ca_contents;
262 std::ifstream cert_file(cert_path);
267 "Failed to read certificate file",
271 cert_contents = std::string(
272 std::istreambuf_iterator<char>(cert_file),
273 std::istreambuf_iterator<char>());
277 std::ifstream key_file(key_path);
282 "Failed to read key file",
286 key_contents = std::string(
287 std::istreambuf_iterator<char>(key_file),
288 std::istreambuf_iterator<char>());
291 if (!ca_path.empty())
293 std::ifstream ca_file(ca_path);
296 ca_contents = std::string(
297 std::istreambuf_iterator<char>(ca_file),
298 std::istreambuf_iterator<char>());
303 ::grpc::SslServerCredentialsOptions ssl_opts;
304 ssl_opts.pem_key_cert_pairs.push_back({key_contents, cert_contents});
306 if (!ca_contents.empty())
308 ssl_opts.pem_root_certs = ca_contents;
309 ssl_opts.client_certificate_request =
310 GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
313 auto creds = ::grpc::SslServerCredentials(ssl_opts);
315 ::grpc::ServerBuilder builder;
316 std::string server_address =
"0.0.0.0:" + std::to_string(
port);
317 builder.AddListeningPort(server_address, creds, &bound_port_);
324 ::grpc::reflection::InitProtoReflectionServerBuilderPlugin();
325 ::grpc::EnableDefaultHealthCheckService(
true);
327 server_ = builder.BuildAndStart();
333 "Failed to start gRPC TLS server",
337 port_ =
static_cast<uint16_t
>(bound_port_);
340 server_thread_ = std::thread([
this]() {
349 std::lock_guard<std::mutex> lock(
mutex_);
361 if (server_thread_.joinable())
363 server_thread_.join();
373 std::unique_lock<std::mutex> lock(
mutex_);
382 auto port() const -> uint16_t
389 if (service ==
nullptr)
393 "Service cannot be null",
397 std::lock_guard<std::mutex> lock(
mutex_);
406 if (full_method_name.empty() || full_method_name[0] !=
'/')
410 "Invalid method name format",
412 "Method name must start with '/'");
419 "Handler cannot be null",
423 std::lock_guard<std::mutex> lock(
mutex_);
429 "Method already registered",
436 mh.unary = std::move(handler);
437 methods_[full_method_name] = std::move(mh);
445 if (full_method_name.empty() || full_method_name[0] !=
'/')
449 "Invalid method name format",
457 "Handler cannot be null",
461 std::lock_guard<std::mutex> lock(
mutex_);
467 "Method already registered",
473 mh.server_streaming = std::move(handler);
474 methods_[full_method_name] = std::move(mh);
482 if (full_method_name.empty() || full_method_name[0] !=
'/')
486 "Invalid method name format",
494 "Handler cannot be null",
498 std::lock_guard<std::mutex> lock(
mutex_);
504 "Method already registered",
510 mh.client_streaming = std::move(handler);
511 methods_[full_method_name] = std::move(mh);
519 if (full_method_name.empty() || full_method_name[0] !=
'/')
523 "Invalid method name format",
531 "Handler cannot be null",
535 std::lock_guard<std::mutex> lock(
mutex_);
541 "Method already registered",
547 mh.bidi_streaming = std::move(handler);
548 methods_[full_method_name] = std::move(mh);
563 struct method_handler
577 std::unique_ptr<::grpc::Server> server_;
578 std::thread server_thread_;
581 std::unordered_map<std::string, method_handler>
methods_;
583 mutable std::mutex
mutex_;
635 span->set_attribute(
"rpc.system",
"grpc")
636 .set_attribute(
"net.host.port",
static_cast<int64_t
>(
port))
637 .set_attribute(
"rpc.grpc.use_tls",
false);
640 std::lock_guard<std::mutex> lock(
mutex_);
646 span->set_error(
"Server is already running");
650 "Server is already running",
658 span->set_error(
"Invalid port number");
662 "Invalid port number",
678 const std::string& cert_path,
679 const std::string& key_path,
682 std::lock_guard<std::mutex> lock(
mutex_);
688 "Server is already running",
692 if (cert_path.empty() || key_path.empty())
696 "Certificate and key paths are required for TLS",
713 std::lock_guard<std::mutex> lock(
mutex_);
729 std::unique_lock<std::mutex> lock(
mutex_);
745 if (service ==
nullptr)
749 "Service cannot be null",
753 std::lock_guard<std::mutex> lock(
mutex_);
762 if (full_method_name.empty() || full_method_name[0] !=
'/')
766 "Invalid method name format",
768 "Method name must start with '/'");
775 "Handler cannot be null",
779 std::lock_guard<std::mutex> lock(
mutex_);
785 "Method already registered",
792 mh.unary = std::move(handler);
793 methods_[full_method_name] = std::move(mh);
801 if (full_method_name.empty() || full_method_name[0] !=
'/')
805 "Invalid method name format",
813 "Handler cannot be null",
817 std::lock_guard<std::mutex> lock(
mutex_);
823 "Method already registered",
829 mh.server_streaming = std::move(handler);
830 methods_[full_method_name] = std::move(mh);
838 if (full_method_name.empty() || full_method_name[0] !=
'/')
842 "Invalid method name format",
850 "Handler cannot be null",
854 std::lock_guard<std::mutex> lock(
mutex_);
860 "Method already registered",
866 mh.client_streaming = std::move(handler);
867 methods_[full_method_name] = std::move(mh);
875 if (full_method_name.empty() || full_method_name[0] !=
'/')
879 "Invalid method name format",
887 "Handler cannot be null",
891 std::lock_guard<std::mutex> lock(
mutex_);
897 "Method already registered",
903 mh.bidi_streaming = std::move(handler);
904 methods_[full_method_name] = std::move(mh);
919 std::unordered_map<std::string, method_handler>
methods_;
941 return impl_->start(port);
945 const std::string& cert_path,
946 const std::string& key_path,
949 return impl_->start_tls(port, cert_path, key_path, ca_path);
974 return impl_->register_service(service);
980 return impl_->register_unary_method(full_method_name, std::move(handler));
986 return impl_->register_server_streaming_method(full_method_name, std::move(handler));
992 return impl_->register_client_streaming_method(full_method_name, std::move(handler));
998 return impl_->register_bidi_streaming_method(full_method_name, std::move(handler));
impl(grpc_server_config config)
auto register_bidi_streaming_method(const std::string &full_method_name, bidi_streaming_handler handler) -> VoidResult
auto register_unary_method(const std::string &full_method_name, unary_handler handler) -> VoidResult
std::unordered_map< std::string, method_handler > methods_
auto start(uint16_t port) -> VoidResult
std::condition_variable stop_cv_
auto port() const -> uint16_t
auto start_tls(uint16_t port, const std::string &cert_path, const std::string &key_path, const std::string &ca_path) -> VoidResult
auto register_client_streaming_method(const std::string &full_method_name, client_streaming_handler handler) -> VoidResult
auto register_service(grpc_service *service) -> VoidResult
std::vector< grpc_service * > services_
grpc_server_config config_
auto register_server_streaming_method(const std::string &full_method_name, server_streaming_handler handler) -> VoidResult
std::atomic< bool > running_
auto is_running() const -> bool
gRPC server for handling RPC requests
~grpc_server()
Destructor.
grpc_server(const grpc_server_config &config={})
Construct gRPC server.
auto stop() -> void
Stop the server.
auto register_bidi_streaming_method(const std::string &full_method_name, bidi_streaming_handler handler) -> VoidResult
Register a bidirectional streaming RPC method handler.
auto wait() -> void
Wait for server to finish (blocks)
auto register_unary_method(const std::string &full_method_name, unary_handler handler) -> VoidResult
Register a unary RPC method handler.
auto is_running() const -> bool
Check if server is running.
auto register_service(grpc_service *service) -> VoidResult
Register a service.
std::unique_ptr< impl > impl_
auto port() const -> uint16_t
Get the port the server is listening on.
auto start_tls(uint16_t port, const std::string &cert_path, const std::string &key_path, const std::string &ca_path="") -> VoidResult
Start the server with TLS.
auto register_client_streaming_method(const std::string &full_method_name, client_streaming_handler handler) -> VoidResult
Register a client streaming RPC method handler.
auto register_server_streaming_method(const std::string &full_method_name, server_streaming_handler handler) -> VoidResult
Register a server streaming RPC method handler.
Base class for gRPC service implementations.
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
@ peer
Verify peer certificate.
constexpr int invalid_argument
constexpr int internal_error
constexpr int already_exists
constexpr int server_already_running
gRPC protocol implementation
method_type
Type of RPC method.
@ server_streaming
Server streaming (single request, multiple responses)
@ client_streaming
Client streaming (multiple requests, single response)
@ bidi_streaming
Bidirectional streaming (multiple requests and responses)
@ unary
Unary RPC (single request, single response)
std::function< std::pair< grpc_status, std::vector< uint8_t > >( server_context &ctx, const std::vector< uint8_t > &request)> unary_handler
Handler function type for unary RPC.
std::vector< std::pair< std::string, std::string > > grpc_metadata
Metadata key-value pair for gRPC requests/responses.
std::function< grpc_status( server_context &ctx, const std::vector< uint8_t > &request, server_writer &writer)> server_streaming_handler
Handler function type for server streaming RPC.
@ ok
Not an error; returned on success.
std::function< grpc_status( server_context &ctx, server_reader_writer &stream)> bidi_streaming_handler
Handler function type for bidirectional streaming RPC.
std::function< std::pair< grpc_status, std::vector< uint8_t > >( server_context &ctx, server_reader &reader)> client_streaming_handler
Handler function type for client streaming RPC.
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
Result< std::monostate > VoidResult
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
gRPC server configuration and service hosting.
RAII span implementation for distributed tracing.
Configuration for gRPC server.
size_t max_concurrent_streams
Maximum number of concurrent streams per connection.
size_t max_message_size
Maximum message size in bytes.
client_streaming_handler client_streaming
bidi_streaming_handler bidi_streaming
server_streaming_handler server_streaming
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.