30namespace kcenon {
namespace monitoring {
41 std::unordered_map<std::string, std::string>
metadata;
82 virtual common::VoidResult
connect(
const std::string&
host, uint16_t port) = 0;
112 virtual std::string
name()
const = 0;
161 common::VoidResult
connect(
const std::string&
host, uint16_t port)
override {
165 "Simulated connection failure",
166 "stub_grpc_transport"
167 ).to_common_error());
185 return common::ok(response);
196 response.
elapsed = std::chrono::milliseconds(10);
201 return common::ok(response);
218 std::string
name()
const override {
241#ifdef MONITORING_HAS_GRPC
243#include <grpcpp/grpcpp.h>
245#include <unordered_map>
251struct grpc_channel_config {
253 bool use_tls =
false;
254 std::string root_certificates;
255 std::string private_key;
256 std::string certificate_chain;
257 std::chrono::milliseconds connect_timeout{5000};
258 std::chrono::milliseconds keepalive_time{10000};
259 bool enable_retry =
true;
269class grpc_channel_manager {
271 std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
273 grpc_channel_config default_config_;
276 explicit grpc_channel_manager(
const grpc_channel_config& config = {})
277 : default_config_(config) {}
284 std::shared_ptr<grpc::Channel> get_channel(
const std::string& target) {
285 std::lock_guard<std::mutex> lock(mutex_);
287 auto it = channels_.find(target);
288 if (it != channels_.end()) {
290 auto state = it->second->GetState(
false);
291 if (state != GRPC_CHANNEL_SHUTDOWN) {
299 auto channel = create_channel(target);
300 channels_[target] = channel;
310 std::shared_ptr<grpc::Channel> get_channel(
311 const std::string& target,
312 const grpc_channel_config& config) {
314 std::lock_guard<std::mutex> lock(mutex_);
317 std::string key = target + (config.use_tls ?
"_tls" :
"_insecure");
319 auto it = channels_.find(key);
320 if (it != channels_.end()) {
321 auto state = it->second->GetState(
false);
322 if (state != GRPC_CHANNEL_SHUTDOWN) {
328 auto channel = create_channel(target, config);
329 channels_[key] = channel;
337 std::lock_guard<std::mutex> lock(mutex_);
345 std::size_t channel_count()
const {
346 std::lock_guard<std::mutex> lock(
const_cast<std::mutex&
>(mutex_));
347 return channels_.size();
351 std::shared_ptr<grpc::Channel> create_channel(
const std::string& target) {
352 return create_channel(target, default_config_);
355 std::shared_ptr<grpc::Channel> create_channel(
356 const std::string& target,
357 const grpc_channel_config& config) {
359 grpc::ChannelArguments args;
362 args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
363 static_cast<int>(config.keepalive_time.count()));
364 args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
365 args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
368 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
369 static_cast<int>(config.connect_timeout.count()));
371 if (config.use_tls) {
372 grpc::SslCredentialsOptions ssl_opts;
373 if (!config.root_certificates.empty()) {
374 ssl_opts.pem_root_certs = config.root_certificates;
376 if (!config.private_key.empty()) {
377 ssl_opts.pem_private_key = config.private_key;
379 if (!config.certificate_chain.empty()) {
380 ssl_opts.pem_cert_chain = config.certificate_chain;
382 auto creds = grpc::SslCredentials(ssl_opts);
383 return grpc::CreateCustomChannel(target, creds, args);
385 return grpc::CreateCustomChannel(
386 target, grpc::InsecureChannelCredentials(), args);
400class network_grpc_transport :
public grpc_transport {
402 std::shared_ptr<grpc::Channel> channel_;
403 std::unique_ptr<grpc::GenericStub> stub_;
406 grpc_channel_config config_;
407 mutable std::atomic<std::size_t> requests_sent_{0};
408 mutable std::atomic<std::size_t> bytes_sent_{0};
409 mutable std::atomic<std::size_t> send_failures_{0};
410 std::mutex connect_mutex_;
413 explicit network_grpc_transport(
const grpc_channel_config& config = {})
416 common::VoidResult connect(
const std::string& host, uint16_t port)
override {
417 std::lock_guard<std::mutex> lock(connect_mutex_);
421 std::string target =
host +
":" + std::to_string(port);
422 config_.target = target;
425 grpc_channel_manager manager(config_);
426 channel_ = manager.get_channel(target, config_);
427 stub_ = std::make_unique<grpc::GenericStub>(channel_);
430 auto deadline = std::chrono::system_clock::now() + config_.connect_timeout;
431 if (!channel_->WaitForConnected(deadline)) {
432 return common::VoidResult::err(error_info(
433 monitoring_error_code::network_error,
434 "Connection timeout to " + target,
435 "network_grpc_transport"
436 ).to_common_error());
440 }
catch (
const std::exception& e) {
441 return common::VoidResult::err(error_info(
442 monitoring_error_code::network_error,
443 "Failed to connect: " + std::string(e.what()),
444 "network_grpc_transport"
445 ).to_common_error());
449 common::Result<grpc_response> send(
const grpc_request& request)
override {
450 if (!is_connected()) {
451 send_failures_.fetch_add(1, std::memory_order_relaxed);
452 return common::Result<grpc_response>::err(error_info(monitoring_error_code::network_error,
"Not connected to server").to_common_error());
456 grpc::ClientContext context;
459 auto deadline = std::chrono::system_clock::now() + request.timeout;
460 context.set_deadline(deadline);
463 for (
const auto& [key, value] : request.metadata) {
464 context.AddMetadata(key, value);
468 std::string method_path =
"/" + request.service +
"/" + request.method;
471 grpc::Slice request_slice(request.body.data(), request.body.size());
472 grpc::ByteBuffer request_buffer(&request_slice, 1);
475 grpc::ByteBuffer response_buffer;
478 auto start_time = std::chrono::steady_clock::now();
479 grpc::Status status = stub_->UnaryCall(
480 &context, method_path, request_buffer, &response_buffer);
481 auto end_time = std::chrono::steady_clock::now();
483 grpc_response response;
484 response.status_code =
static_cast<int>(status.error_code());
485 response.status_message = status.error_message();
486 response.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
487 end_time - start_time);
490 if (response_buffer.Length() > 0) {
491 std::vector<grpc::Slice> slices;
492 response_buffer.Dump(&slices);
493 for (
const auto& slice : slices) {
494 const uint8_t* data = slice.begin();
495 response.body.insert(response.body.end(), data, data + slice.size());
500 requests_sent_.fetch_add(1, std::memory_order_relaxed);
501 bytes_sent_.fetch_add(request.body.size(), std::memory_order_relaxed);
502 return common::ok(response);
504 send_failures_.fetch_add(1, std::memory_order_relaxed);
505 return common::make_error<grpc_response>(
506 static_cast<int>(monitoring_error_code::network_error),
507 "gRPC call failed: " + status.error_message());
509 }
catch (
const std::exception& e) {
510 send_failures_.fetch_add(1, std::memory_order_relaxed);
511 return common::make_error<grpc_response>(
512 static_cast<int>(monitoring_error_code::network_error),
513 "gRPC exception: " + std::string(e.what()));
517 bool is_connected()
const override {
518 if (!channel_)
return false;
519 auto state = channel_->GetState(
false);
520 return state == GRPC_CHANNEL_READY ||
521 state == GRPC_CHANNEL_IDLE ||
522 state == GRPC_CHANNEL_CONNECTING;
525 void disconnect()
override {
526 std::lock_guard<std::mutex> lock(connect_mutex_);
533 bool is_available()
const override {
537 std::string name()
const override {
541 grpc_statistics get_statistics()
const override {
543 requests_sent_.load(std::memory_order_relaxed),
544 bytes_sent_.load(std::memory_order_relaxed),
545 send_failures_.load(std::memory_order_relaxed)
549 void reset_statistics()
override {
550 requests_sent_.store(0, std::memory_order_relaxed);
551 bytes_sent_.store(0, std::memory_order_relaxed);
552 send_failures_.store(0, std::memory_order_relaxed);
556 std::string get_host()
const {
return host_; }
557 uint16_t get_port()
const {
return port_; }
558 grpc_connectivity_state get_channel_state()
const {
559 return channel_ ? channel_->GetState(
false) : GRPC_CHANNEL_SHUTDOWN;
570inline bool grpc_health_check(
571 const std::string& target,
572 const grpc_channel_config& config = {},
573 std::chrono::milliseconds timeout = std::chrono::seconds(5)) {
575 grpc_channel_manager manager(config);
576 auto channel = manager.get_channel(target, config);
578 auto deadline = std::chrono::system_clock::now() + timeout;
579 return channel->WaitForConnected(deadline);
591#ifdef MONITORING_HAS_GRPC
592 return std::make_unique<network_grpc_transport>();
594 return std::make_unique<stub_grpc_transport>();
602 return std::make_unique<stub_grpc_transport>();
Abstract gRPC transport interface.
virtual bool is_connected() const =0
Check if connected to the server.
virtual ~grpc_transport()=default
virtual void disconnect()=0
Disconnect from the server.
virtual void reset_statistics()=0
Reset statistics.
virtual bool is_available() const =0
Check if transport is available.
virtual common::VoidResult connect(const std::string &host, uint16_t port)=0
Connect to a gRPC server.
virtual grpc_statistics get_statistics() const =0
Get transport statistics.
virtual common::Result< grpc_response > send(const grpc_request &request)=0
Send a gRPC request.
virtual std::string name() const =0
Get transport name.
Stub gRPC transport for testing.
std::atomic< std::size_t > send_failures_
std::string name() const override
Get transport name.
std::function< grpc_response(const grpc_request &)> response_handler_
void set_simulate_success(bool success)
Set whether to simulate success or failure.
std::atomic< std::size_t > bytes_sent_
common::VoidResult connect(const std::string &host, uint16_t port) override
Connect to a gRPC server.
grpc_statistics get_statistics() const override
Get transport statistics.
common::Result< grpc_response > send(const grpc_request &request) override
Send a gRPC request.
bool is_available() const override
Check if transport is available.
void disconnect() override
Disconnect from the server.
void reset_statistics() override
Reset statistics.
void set_response_handler(std::function< grpc_response(const grpc_request &)> handler)
Set custom response handler for testing.
bool is_connected() const override
Check if connected to the server.
std::string get_host() const
stub_grpc_transport()=default
std::atomic< std::size_t > requests_sent_
uint16_t get_port() const
Monitoring system specific error codes.
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< stub_grpc_transport > create_stub_grpc_transport()
Create stub gRPC transport for testing.
Result pattern type definitions for monitoring system.
Extended error information with context.
gRPC request configuration
std::chrono::milliseconds timeout
std::unordered_map< std::string, std::string > metadata
std::vector< uint8_t > body
std::vector< uint8_t > body
std::string status_message
std::chrono::milliseconds elapsed
Statistics for gRPC transport operations.
std::size_t send_failures
std::size_t requests_sent