Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
grpc_transport.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
20#include "../core/error_codes.h"
21#include <string>
22#include <vector>
23#include <cstdint>
24#include <chrono>
25#include <memory>
26#include <atomic>
27#include <span>
28#include <functional>
29
30namespace kcenon { namespace monitoring {
31
37 std::string service;
38 std::string method;
39 std::vector<uint8_t> body;
40 std::chrono::milliseconds timeout{30000};
41 std::unordered_map<std::string, std::string> metadata;
42};
43
50 std::string status_message;
51 std::vector<uint8_t> body;
52 std::chrono::milliseconds elapsed{0};
53};
54
60 std::size_t requests_sent{0};
61 std::size_t bytes_sent{0};
62 std::size_t send_failures{0};
63};
64
73public:
74 virtual ~grpc_transport() = default;
75
82 virtual common::VoidResult connect(const std::string& host, uint16_t port) = 0;
83
89 virtual common::Result<grpc_response> send(const grpc_request& request) = 0;
90
95 virtual bool is_connected() const = 0;
96
100 virtual void disconnect() = 0;
101
106 virtual bool is_available() const = 0;
107
112 virtual std::string name() const = 0;
113
118 virtual grpc_statistics get_statistics() const = 0;
119
123 virtual void reset_statistics() = 0;
124};
125
134private:
135 std::string host_;
136 uint16_t port_{0};
137 bool connected_{false};
140 mutable std::atomic<std::size_t> requests_sent_{0};
141 mutable std::atomic<std::size_t> bytes_sent_{0};
142 mutable std::atomic<std::size_t> send_failures_{0};
143
144public:
146
153
157 void set_response_handler(std::function<grpc_response(const grpc_request&)> handler) {
158 response_handler_ = std::move(handler);
159 }
160
161 common::VoidResult connect(const std::string& host, uint16_t port) override {
162 if (!simulate_success_) {
163 return common::VoidResult::err(error_info(
165 "Simulated connection failure",
166 "stub_grpc_transport"
167 ).to_common_error());
168 }
169 host_ = host;
170 port_ = port;
171 connected_ = true;
172 return common::ok();
173 }
174
175 common::Result<grpc_response> send(const grpc_request& request) override {
176 if (!connected_) {
177 send_failures_.fetch_add(1, std::memory_order_relaxed);
178 return common::Result<grpc_response>::err(error_info(monitoring_error_code::network_error, "Not connected").to_common_error());
179 }
180
181 if (response_handler_) {
182 auto response = response_handler_(request);
183 requests_sent_.fetch_add(1, std::memory_order_relaxed);
184 bytes_sent_.fetch_add(request.body.size(), std::memory_order_relaxed);
185 return common::ok(response);
186 }
187
188 if (!simulate_success_) {
189 send_failures_.fetch_add(1, std::memory_order_relaxed);
190 return common::Result<grpc_response>::err(error_info(monitoring_error_code::network_error, "Simulated send failure").to_common_error());
191 }
192
193 grpc_response response;
194 response.status_code = 0; // OK in gRPC
195 response.status_message = "OK";
196 response.elapsed = std::chrono::milliseconds(10);
197
198 requests_sent_.fetch_add(1, std::memory_order_relaxed);
199 bytes_sent_.fetch_add(request.body.size(), std::memory_order_relaxed);
200
201 return common::ok(response);
202 }
203
204 bool is_connected() const override {
205 return connected_;
206 }
207
208 void disconnect() override {
209 connected_ = false;
210 host_.clear();
211 port_ = 0;
212 }
213
214 bool is_available() const override {
215 return true;
216 }
217
218 std::string name() const override {
219 return "stub";
220 }
221
223 return {
224 requests_sent_.load(std::memory_order_relaxed),
225 bytes_sent_.load(std::memory_order_relaxed),
226 send_failures_.load(std::memory_order_relaxed)
227 };
228 }
229
230 void reset_statistics() override {
231 requests_sent_.store(0, std::memory_order_relaxed);
232 bytes_sent_.store(0, std::memory_order_relaxed);
233 send_failures_.store(0, std::memory_order_relaxed);
234 }
235
236 // Test helpers
237 std::string get_host() const { return host_; }
238 uint16_t get_port() const { return port_; }
239};
240
241#ifdef MONITORING_HAS_GRPC
242
243#include <grpcpp/grpcpp.h>
244#include <mutex>
245#include <unordered_map>
246
251struct grpc_channel_config {
252 std::string target;
253 bool use_tls = false;
254 std::string root_certificates;
255 std::string private_key;
256 std::string certificate_chain;
257 std::chrono::milliseconds connect_timeout{5000};
258 std::chrono::milliseconds keepalive_time{10000};
259 bool enable_retry = true;
260};
261
269class grpc_channel_manager {
270private:
271 std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channels_;
272 std::mutex mutex_;
273 grpc_channel_config default_config_;
274
275public:
276 explicit grpc_channel_manager(const grpc_channel_config& config = {})
277 : default_config_(config) {}
278
284 std::shared_ptr<grpc::Channel> get_channel(const std::string& target) {
285 std::lock_guard<std::mutex> lock(mutex_);
286
287 auto it = channels_.find(target);
288 if (it != channels_.end()) {
289 // Check if channel is still usable
290 auto state = it->second->GetState(false);
291 if (state != GRPC_CHANNEL_SHUTDOWN) {
292 return it->second;
293 }
294 // Remove dead channel
295 channels_.erase(it);
296 }
297
298 // Create new channel
299 auto channel = create_channel(target);
300 channels_[target] = channel;
301 return channel;
302 }
303
310 std::shared_ptr<grpc::Channel> get_channel(
311 const std::string& target,
312 const grpc_channel_config& config) {
313
314 std::lock_guard<std::mutex> lock(mutex_);
315
316 // For custom config, create a unique key
317 std::string key = target + (config.use_tls ? "_tls" : "_insecure");
318
319 auto it = channels_.find(key);
320 if (it != channels_.end()) {
321 auto state = it->second->GetState(false);
322 if (state != GRPC_CHANNEL_SHUTDOWN) {
323 return it->second;
324 }
325 channels_.erase(it);
326 }
327
328 auto channel = create_channel(target, config);
329 channels_[key] = channel;
330 return channel;
331 }
332
336 void shutdown() {
337 std::lock_guard<std::mutex> lock(mutex_);
338 channels_.clear();
339 }
340
345 std::size_t channel_count() const {
346 std::lock_guard<std::mutex> lock(const_cast<std::mutex&>(mutex_));
347 return channels_.size();
348 }
349
350private:
351 std::shared_ptr<grpc::Channel> create_channel(const std::string& target) {
352 return create_channel(target, default_config_);
353 }
354
355 std::shared_ptr<grpc::Channel> create_channel(
356 const std::string& target,
357 const grpc_channel_config& config) {
358
359 grpc::ChannelArguments args;
360
361 // Set keepalive parameters
362 args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
363 static_cast<int>(config.keepalive_time.count()));
364 args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
365 args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
366
367 // Set connection timeout
368 args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
369 static_cast<int>(config.connect_timeout.count()));
370
371 if (config.use_tls) {
372 grpc::SslCredentialsOptions ssl_opts;
373 if (!config.root_certificates.empty()) {
374 ssl_opts.pem_root_certs = config.root_certificates;
375 }
376 if (!config.private_key.empty()) {
377 ssl_opts.pem_private_key = config.private_key;
378 }
379 if (!config.certificate_chain.empty()) {
380 ssl_opts.pem_cert_chain = config.certificate_chain;
381 }
382 auto creds = grpc::SslCredentials(ssl_opts);
383 return grpc::CreateCustomChannel(target, creds, args);
384 } else {
385 return grpc::CreateCustomChannel(
386 target, grpc::InsecureChannelCredentials(), args);
387 }
388 }
389};
390
400class network_grpc_transport : public grpc_transport {
401private:
402 std::shared_ptr<grpc::Channel> channel_;
403 std::unique_ptr<grpc::GenericStub> stub_;
404 std::string host_;
405 uint16_t port_{0};
406 grpc_channel_config config_;
407 mutable std::atomic<std::size_t> requests_sent_{0};
408 mutable std::atomic<std::size_t> bytes_sent_{0};
409 mutable std::atomic<std::size_t> send_failures_{0};
410 std::mutex connect_mutex_;
411
412public:
413 explicit network_grpc_transport(const grpc_channel_config& config = {})
414 : config_(config) {}
415
416 common::VoidResult connect(const std::string& host, uint16_t port) override {
417 std::lock_guard<std::mutex> lock(connect_mutex_);
418
419 host_ = host;
420 port_ = port;
421 std::string target = host + ":" + std::to_string(port);
422 config_.target = target;
423
424 try {
425 grpc_channel_manager manager(config_);
426 channel_ = manager.get_channel(target, config_);
427 stub_ = std::make_unique<grpc::GenericStub>(channel_);
428
429 // Wait for channel to be ready (with timeout)
430 auto deadline = std::chrono::system_clock::now() + config_.connect_timeout;
431 if (!channel_->WaitForConnected(deadline)) {
432 return common::VoidResult::err(error_info(
433 monitoring_error_code::network_error,
434 "Connection timeout to " + target,
435 "network_grpc_transport"
436 ).to_common_error());
437 }
438
439 return common::ok();
440 } catch (const std::exception& e) {
441 return common::VoidResult::err(error_info(
442 monitoring_error_code::network_error,
443 "Failed to connect: " + std::string(e.what()),
444 "network_grpc_transport"
445 ).to_common_error());
446 }
447 }
448
449 common::Result<grpc_response> send(const grpc_request& request) override {
450 if (!is_connected()) {
451 send_failures_.fetch_add(1, std::memory_order_relaxed);
452 return common::Result<grpc_response>::err(error_info(monitoring_error_code::network_error, "Not connected to server").to_common_error());
453 }
454
455 try {
456 grpc::ClientContext context;
457
458 // Set timeout
459 auto deadline = std::chrono::system_clock::now() + request.timeout;
460 context.set_deadline(deadline);
461
462 // Set metadata
463 for (const auto& [key, value] : request.metadata) {
464 context.AddMetadata(key, value);
465 }
466
467 // Prepare the method path
468 std::string method_path = "/" + request.service + "/" + request.method;
469
470 // Create request slice
471 grpc::Slice request_slice(request.body.data(), request.body.size());
472 grpc::ByteBuffer request_buffer(&request_slice, 1);
473
474 // Prepare response buffer
475 grpc::ByteBuffer response_buffer;
476
477 // Make the call
478 auto start_time = std::chrono::steady_clock::now();
479 grpc::Status status = stub_->UnaryCall(
480 &context, method_path, request_buffer, &response_buffer);
481 auto end_time = std::chrono::steady_clock::now();
482
483 grpc_response response;
484 response.status_code = static_cast<int>(status.error_code());
485 response.status_message = status.error_message();
486 response.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
487 end_time - start_time);
488
489 // Extract response body
490 if (response_buffer.Length() > 0) {
491 std::vector<grpc::Slice> slices;
492 response_buffer.Dump(&slices);
493 for (const auto& slice : slices) {
494 const uint8_t* data = slice.begin();
495 response.body.insert(response.body.end(), data, data + slice.size());
496 }
497 }
498
499 if (status.ok()) {
500 requests_sent_.fetch_add(1, std::memory_order_relaxed);
501 bytes_sent_.fetch_add(request.body.size(), std::memory_order_relaxed);
502 return common::ok(response);
503 } else {
504 send_failures_.fetch_add(1, std::memory_order_relaxed);
505 return common::make_error<grpc_response>(
506 static_cast<int>(monitoring_error_code::network_error),
507 "gRPC call failed: " + status.error_message());
508 }
509 } catch (const std::exception& e) {
510 send_failures_.fetch_add(1, std::memory_order_relaxed);
511 return common::make_error<grpc_response>(
512 static_cast<int>(monitoring_error_code::network_error),
513 "gRPC exception: " + std::string(e.what()));
514 }
515 }
516
517 bool is_connected() const override {
518 if (!channel_) return false;
519 auto state = channel_->GetState(false);
520 return state == GRPC_CHANNEL_READY ||
521 state == GRPC_CHANNEL_IDLE ||
522 state == GRPC_CHANNEL_CONNECTING;
523 }
524
525 void disconnect() override {
526 std::lock_guard<std::mutex> lock(connect_mutex_);
527 stub_.reset();
528 channel_.reset();
529 host_.clear();
530 port_ = 0;
531 }
532
533 bool is_available() const override {
534 return true; // gRPC library is available when this class is compiled
535 }
536
537 std::string name() const override {
538 return "grpc";
539 }
540
541 grpc_statistics get_statistics() const override {
542 return {
543 requests_sent_.load(std::memory_order_relaxed),
544 bytes_sent_.load(std::memory_order_relaxed),
545 send_failures_.load(std::memory_order_relaxed)
546 };
547 }
548
549 void reset_statistics() override {
550 requests_sent_.store(0, std::memory_order_relaxed);
551 bytes_sent_.store(0, std::memory_order_relaxed);
552 send_failures_.store(0, std::memory_order_relaxed);
553 }
554
555 // Accessors for testing
556 std::string get_host() const { return host_; }
557 uint16_t get_port() const { return port_; }
558 grpc_connectivity_state get_channel_state() const {
559 return channel_ ? channel_->GetState(false) : GRPC_CHANNEL_SHUTDOWN;
560 }
561};
562
570inline bool grpc_health_check(
571 const std::string& target,
572 const grpc_channel_config& config = {},
573 std::chrono::milliseconds timeout = std::chrono::seconds(5)) {
574
575 grpc_channel_manager manager(config);
576 auto channel = manager.get_channel(target, config);
577
578 auto deadline = std::chrono::system_clock::now() + timeout;
579 return channel->WaitForConnected(deadline);
580}
581
582#endif // MONITORING_HAS_GRPC
583
590inline std::unique_ptr<grpc_transport> create_default_grpc_transport() {
591#ifdef MONITORING_HAS_GRPC
592 return std::make_unique<network_grpc_transport>();
593#else
594 return std::make_unique<stub_grpc_transport>();
595#endif
596}
597
601inline std::unique_ptr<stub_grpc_transport> create_stub_grpc_transport() {
602 return std::make_unique<stub_grpc_transport>();
603}
604
605} } // namespace kcenon::monitoring
Abstract gRPC transport interface.
virtual bool is_connected() const =0
Check if connected to the server.
virtual void disconnect()=0
Disconnect from the server.
virtual void reset_statistics()=0
Reset statistics.
virtual bool is_available() const =0
Check if transport is available.
virtual common::VoidResult connect(const std::string &host, uint16_t port)=0
Connect to a gRPC server.
virtual grpc_statistics get_statistics() const =0
Get transport statistics.
virtual common::Result< grpc_response > send(const grpc_request &request)=0
Send a gRPC request.
virtual std::string name() const =0
Get transport name.
Stub gRPC transport for testing.
std::atomic< std::size_t > send_failures_
std::string name() const override
Get transport name.
std::function< grpc_response(const grpc_request &)> response_handler_
void set_simulate_success(bool success)
Set whether to simulate success or failure.
std::atomic< std::size_t > bytes_sent_
common::VoidResult connect(const std::string &host, uint16_t port) override
Connect to a gRPC server.
grpc_statistics get_statistics() const override
Get transport statistics.
common::Result< grpc_response > send(const grpc_request &request) override
Send a gRPC request.
bool is_available() const override
Check if transport is available.
void disconnect() override
Disconnect from the server.
void reset_statistics() override
Reset statistics.
void set_response_handler(std::function< grpc_response(const grpc_request &)> handler)
Set custom response handler for testing.
bool is_connected() const override
Check if connected to the server.
std::atomic< std::size_t > requests_sent_
Monitoring system specific error codes.
std::unique_ptr< grpc_transport > create_default_grpc_transport()
Create default gRPC transport.
std::unique_ptr< stub_grpc_transport > create_stub_grpc_transport()
Create stub gRPC transport for testing.
Result pattern type definitions for monitoring system.
Extended error information with context.
gRPC request configuration
std::chrono::milliseconds timeout
std::unordered_map< std::string, std::string > metadata
std::vector< uint8_t > body
std::chrono::milliseconds elapsed
Statistics for gRPC transport operations.