Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
8
12
13#include <atomic>
14#include <charconv>
15#include <condition_variable>
16#include <mutex>
17#include <thread>
18
19#if NETWORK_GRPC_OFFICIAL
20#include <grpcpp/grpcpp.h>
21#include <grpcpp/generic/generic_stub.h>
22#include <grpcpp/support/byte_buffer.h>
23#else
26#endif
27
29{
30
31#if NETWORK_GRPC_OFFICIAL
32
33// ============================================================================
34// Official gRPC Library Client Implementation
35// ============================================================================
36
37namespace detail
38{
39
40auto vector_to_byte_buffer(const std::vector<uint8_t>& data) -> ::grpc::ByteBuffer
41{
42 ::grpc::Slice slice(data.data(), data.size());
43 return ::grpc::ByteBuffer(&slice, 1);
44}
45
46auto byte_buffer_to_vector(const ::grpc::ByteBuffer& buffer) -> std::vector<uint8_t>
47{
48 std::vector<::grpc::Slice> slices;
49 buffer.Dump(&slices);
50
51 std::vector<uint8_t> result;
52 result.reserve(buffer.Length());
53
54 for (const auto& slice : slices)
55 {
56 const auto* begin = reinterpret_cast<const uint8_t*>(slice.begin());
57 result.insert(result.end(), begin, begin + slice.size());
58 }
59
60 return result;
61}
62
63} // namespace detail
64
65// Server stream reader implementation for official gRPC
66class official_server_stream_reader : public grpc_client::server_stream_reader
67{
68public:
69 official_server_stream_reader(
70 std::unique_ptr<::grpc::ClientContext> ctx,
71 std::unique_ptr<::grpc::GenericClientAsyncReader> reader,
72 ::grpc::CompletionQueue* cq)
73 : ctx_(std::move(ctx))
74 , reader_(std::move(reader))
75 , cq_(cq)
76 , has_more_(true)
77 {
78 }
79
80 auto read() -> Result<grpc_message> override
81 {
82 if (!has_more_)
83 {
84 return error<grpc_message>(
85 static_cast<int>(status_code::ok),
86 "End of stream",
87 "grpc::client::server_stream_reader");
88 }
89
90 ::grpc::ByteBuffer buffer;
91 void* tag = nullptr;
92 bool ok = false;
93
94 reader_->Read(&buffer, &tag);
95
96 if (!cq_->Next(&tag, &ok) || !ok)
97 {
98 has_more_ = false;
99 return error<grpc_message>(
100 static_cast<int>(status_code::ok),
101 "End of stream",
102 "grpc::client::server_stream_reader");
103 }
104
105 auto data = detail::byte_buffer_to_vector(buffer);
106 return kcenon::network::protocols::grpc::ok(grpc_message{std::move(data)});
107 }
108
109 auto has_more() const -> bool override
110 {
111 return has_more_;
112 }
113
114 auto finish() -> grpc_status override
115 {
116 ::grpc::Status status;
117 void* tag = nullptr;
118 bool ok = false;
119
120 reader_->Finish(&status, &tag);
121 cq_->Next(&tag, &ok);
122
123 return from_grpc_status(status);
124 }
125
126private:
127 std::unique_ptr<::grpc::ClientContext> ctx_;
128 std::unique_ptr<::grpc::GenericClientAsyncReader> reader_;
129 ::grpc::CompletionQueue* cq_;
130 bool has_more_;
131};
132
133// Client stream writer implementation for official gRPC
134class official_client_stream_writer : public grpc_client::client_stream_writer
135{
136public:
137 official_client_stream_writer(
138 std::unique_ptr<::grpc::ClientContext> ctx,
139 std::unique_ptr<::grpc::GenericClientAsyncWriter> writer,
140 ::grpc::CompletionQueue* cq)
141 : ctx_(std::move(ctx))
142 , writer_(std::move(writer))
143 , cq_(cq)
144 , writes_done_(false)
145 {
146 }
147
148 auto write(const std::vector<uint8_t>& message) -> VoidResult override
149 {
150 if (writes_done_)
151 {
152 return error_void(
153 error_codes::common_errors::internal_error,
154 "Stream writes already done",
155 "grpc::client::client_stream_writer");
156 }
157
158 ::grpc::ByteBuffer buffer = detail::vector_to_byte_buffer(message);
159 void* tag = nullptr;
160 bool ok = false;
161
162 writer_->Write(buffer, &tag);
163
164 if (!cq_->Next(&tag, &ok) || !ok)
165 {
166 return error_void(
167 error_codes::network_system::connection_failed,
168 "Failed to write message",
169 "grpc::client::client_stream_writer");
170 }
171
173 }
174
175 auto writes_done() -> VoidResult override
176 {
177 if (writes_done_)
178 {
180 }
181
182 void* tag = nullptr;
183 bool ok = false;
184
185 writer_->WritesDone(&tag);
186 cq_->Next(&tag, &ok);
187
188 writes_done_ = true;
190 }
191
192 auto finish() -> Result<grpc_message> override
193 {
194 if (!writes_done_)
195 {
196 writes_done();
197 }
198
199 ::grpc::ByteBuffer response;
200 ::grpc::Status status;
201 void* tag = nullptr;
202 bool ok = false;
203
204 writer_->Finish(&status, &tag);
205 cq_->Next(&tag, &ok);
206
207 if (!status.ok())
208 {
209 auto grpc_st = from_grpc_status(status);
210 return error<grpc_message>(
211 static_cast<int>(grpc_st.code),
212 grpc_st.message,
213 "grpc::client::client_stream_writer");
214 }
215
216 // Note: Response is not available from writer, need to handle differently
217 return kcenon::network::protocols::grpc::ok(grpc_message{});
218 }
219
220private:
221 std::unique_ptr<::grpc::ClientContext> ctx_;
222 std::unique_ptr<::grpc::GenericClientAsyncWriter> writer_;
223 ::grpc::CompletionQueue* cq_;
224 bool writes_done_;
225};
226
227// Bidirectional stream implementation for official gRPC
228class official_bidi_stream : public grpc_client::bidi_stream
229{
230public:
231 official_bidi_stream(
232 std::unique_ptr<::grpc::ClientContext> ctx,
233 std::unique_ptr<::grpc::GenericClientAsyncReaderWriter> stream,
234 ::grpc::CompletionQueue* cq)
235 : ctx_(std::move(ctx))
236 , stream_(std::move(stream))
237 , cq_(cq)
238 , writes_done_(false)
239 {
240 }
241
242 auto write(const std::vector<uint8_t>& message) -> VoidResult override
243 {
244 if (writes_done_)
245 {
246 return error_void(
247 error_codes::common_errors::internal_error,
248 "Stream writes already done",
249 "grpc::client::bidi_stream");
250 }
251
252 ::grpc::ByteBuffer buffer = detail::vector_to_byte_buffer(message);
253 void* tag = nullptr;
254 bool ok = false;
255
256 stream_->Write(buffer, &tag);
257
258 if (!cq_->Next(&tag, &ok) || !ok)
259 {
260 return error_void(
261 error_codes::network_system::connection_failed,
262 "Failed to write message",
263 "grpc::client::bidi_stream");
264 }
265
267 }
268
269 auto read() -> Result<grpc_message> override
270 {
271 ::grpc::ByteBuffer buffer;
272 void* tag = nullptr;
273 bool ok = false;
274
275 stream_->Read(&buffer, &tag);
276
277 if (!cq_->Next(&tag, &ok) || !ok)
278 {
279 return error<grpc_message>(
280 static_cast<int>(status_code::ok),
281 "End of stream",
282 "grpc::client::bidi_stream");
283 }
284
285 auto data = detail::byte_buffer_to_vector(buffer);
286 return kcenon::network::protocols::grpc::ok(grpc_message{std::move(data)});
287 }
288
289 auto writes_done() -> VoidResult override
290 {
291 if (writes_done_)
292 {
294 }
295
296 void* tag = nullptr;
297 bool ok = false;
298
299 stream_->WritesDone(&tag);
300 cq_->Next(&tag, &ok);
301
302 writes_done_ = true;
304 }
305
306 auto finish() -> grpc_status override
307 {
308 if (!writes_done_)
309 {
310 writes_done();
311 }
312
313 ::grpc::Status status;
314 void* tag = nullptr;
315 bool ok = false;
316
317 stream_->Finish(&status, &tag);
318 cq_->Next(&tag, &ok);
319
320 return from_grpc_status(status);
321 }
322
323private:
324 std::unique_ptr<::grpc::ClientContext> ctx_;
325 std::unique_ptr<::grpc::GenericClientAsyncReaderWriter> stream_;
326 ::grpc::CompletionQueue* cq_;
327 bool writes_done_;
328};
329
330// Implementation class using official gRPC
331class grpc_client::impl
332{
333public:
334 explicit impl(std::string target, grpc_channel_config config)
335 : target_(std::move(target))
336 , config_(std::move(config))
337 , connected_(false)
338 {
339 }
340
341 ~impl()
342 {
343 disconnect();
344 }
345
346 auto connect() -> VoidResult
347 {
348 auto span = tracing::is_tracing_enabled()
349 ? std::make_optional(tracing::trace_context::create_span("grpc.client.connect"))
350 : std::nullopt;
351 if (span)
352 {
353 span->set_attribute("rpc.system", "grpc")
354 .set_attribute("rpc.grpc.target", target_)
355 .set_attribute("net.transport", "tcp")
356 .set_attribute("rpc.grpc.use_tls", config_.use_tls);
357 }
358
359 std::lock_guard<std::mutex> lock(mutex_);
360
361 if (connected_.load())
362 {
363 if (span)
364 {
365 span->set_attribute("grpc.client.already_connected", true);
366 }
368 }
369
370 // Create channel credentials
371 channel_credentials_config creds_config;
372 creds_config.insecure = !config_.use_tls;
373
374 if (config_.use_tls)
375 {
376 creds_config.root_certificates = config_.root_certificates;
377 creds_config.client_certificate = config_.client_certificate;
378 creds_config.client_key = config_.client_key;
379 }
380
381 channel_ = create_channel(target_, creds_config);
382
383 if (!channel_)
384 {
385 if (span)
386 {
387 span->set_error("Failed to create gRPC channel");
388 }
389 return error_void(
391 "Failed to create gRPC channel",
392 "grpc::client");
393 }
394
395 // Wait for channel to be ready
396 if (!wait_for_channel_ready(channel_, config_.default_timeout))
397 {
398 if (span)
399 {
400 span->set_error("Failed to connect to gRPC server");
401 }
402 return error_void(
404 "Failed to connect to gRPC server",
405 "grpc::client",
406 target_);
407 }
408
409 stub_ = std::make_unique<::grpc::GenericStub>(channel_);
410 connected_.store(true);
411
413 }
414
415 auto disconnect() -> void
416 {
417 std::lock_guard<std::mutex> lock(mutex_);
418
419 stub_.reset();
420 channel_.reset();
421 connected_.store(false);
422 }
423
424 auto is_connected() const -> bool
425 {
426 if (!connected_.load() || !channel_)
427 {
428 return false;
429 }
430
431 auto state = channel_->GetState(false);
432 return state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE;
433 }
434
435 auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
436 {
437 if (!channel_)
438 {
439 return false;
440 }
441
442 return wait_for_channel_ready(channel_, timeout);
443 }
444
445 auto target() const -> const std::string&
446 {
447 return target_;
448 }
449
450 auto call_raw(const std::string& method,
451 const std::vector<uint8_t>& request,
452 const call_options& options) -> Result<grpc_message>
453 {
454 auto span = tracing::is_tracing_enabled()
455 ? std::make_optional(tracing::trace_context::create_span("grpc.client.call"))
456 : std::nullopt;
457 if (span)
458 {
459 span->set_attribute("rpc.system", "grpc")
460 .set_attribute("rpc.method", method)
461 .set_attribute("rpc.grpc.target", target_)
462 .set_attribute("rpc.request.size", static_cast<int64_t>(request.size()));
463 }
464
465 if (!is_connected())
466 {
467 if (span)
468 {
469 span->set_error("Not connected to server");
470 }
471 return error<grpc_message>(
473 "Not connected to server",
474 "grpc::client");
475 }
476
477 ::grpc::ClientContext ctx;
478
479 // Set deadline if provided
480 if (options.deadline.has_value())
481 {
482 set_deadline(&ctx, options.deadline.value());
483 }
484 else
485 {
486 // Use default timeout
487 set_timeout(&ctx, config_.default_timeout);
488 }
489
490 // Add metadata
491 for (const auto& [key, value] : options.metadata)
492 {
493 ctx.AddMetadata(key, value);
494 }
495
496 // Set wait for ready
497 if (options.wait_for_ready)
498 {
499 ctx.set_wait_for_ready(true);
500 }
501
502 // Prepare request
503 ::grpc::ByteBuffer request_buffer = detail::vector_to_byte_buffer(request);
504 ::grpc::ByteBuffer response_buffer;
505
506 // Make unary call
507 ::grpc::Status status = stub_->UnaryCall(&ctx, method, request_buffer, &response_buffer);
508
509 if (!status.ok())
510 {
511 auto grpc_st = from_grpc_status(status);
512 if (span)
513 {
514 span->set_error(grpc_st.message)
515 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(grpc_st.code));
516 }
517 return error<grpc_message>(
518 static_cast<int>(grpc_st.code),
519 grpc_st.message,
520 "grpc::client");
521 }
522
523 auto response_data = detail::byte_buffer_to_vector(response_buffer);
524 if (span)
525 {
526 span->set_attribute("rpc.response.size", static_cast<int64_t>(response_data.size()))
527 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(0));
528 }
529 return kcenon::network::protocols::grpc::ok(grpc_message{std::move(response_data)});
530 }
531
532 auto call_raw_async(const std::string& method,
533 const std::vector<uint8_t>& request,
534 std::function<void(Result<grpc_message>)> callback,
535 const call_options& options) -> void
536 {
537 // Create async call in separate thread
538 std::thread([this, method, request, callback, options]() {
539 auto result = call_raw(method, request, options);
540 if (callback)
541 {
542 callback(std::move(result));
543 }
544 }).detach();
545 }
546
547 auto server_stream_raw(const std::string& method,
548 const std::vector<uint8_t>& request,
549 const call_options& options)
550 -> Result<std::unique_ptr<grpc_client::server_stream_reader>>
551 {
552 if (!is_connected())
553 {
556 "Not connected to server",
557 "grpc::client");
558 }
559
560 auto ctx = std::make_unique<::grpc::ClientContext>();
561
562 // Set deadline if provided
563 if (options.deadline.has_value())
564 {
565 set_deadline(ctx.get(), options.deadline.value());
566 }
567 else
568 {
569 set_timeout(ctx.get(), config_.default_timeout);
570 }
571
572 // Add metadata
573 for (const auto& [key, value] : options.metadata)
574 {
575 ctx->AddMetadata(key, value);
576 }
577
578 // Note: GenericStub doesn't have PrepareUnaryCall with streaming
579 // For full streaming support, we would need to use async API
580 // This is a simplified synchronous implementation
581
583 static_cast<int>(status_code::unimplemented),
584 "Server streaming not fully implemented for official gRPC wrapper",
585 "grpc::client");
586 }
587
588 auto client_stream_raw(const std::string& method,
589 const call_options& options)
590 -> Result<std::unique_ptr<grpc_client::client_stream_writer>>
591 {
592 if (!is_connected())
593 {
596 "Not connected to server",
597 "grpc::client");
598 }
599
601 static_cast<int>(status_code::unimplemented),
602 "Client streaming not fully implemented for official gRPC wrapper",
603 "grpc::client");
604 }
605
606 auto bidi_stream_raw(const std::string& method,
607 const call_options& options)
608 -> Result<std::unique_ptr<grpc_client::bidi_stream>>
609 {
610 if (!is_connected())
611 {
614 "Not connected to server",
615 "grpc::client");
616 }
617
619 static_cast<int>(status_code::unimplemented),
620 "Bidirectional streaming not fully implemented for official gRPC wrapper",
621 "grpc::client");
622 }
623
624private:
625 std::string target_;
626 grpc_channel_config config_;
627 std::shared_ptr<::grpc::Channel> channel_;
628 std::unique_ptr<::grpc::GenericStub> stub_;
629 std::atomic<bool> connected_;
630 mutable std::mutex mutex_;
631};
632
633#else // !NETWORK_GRPC_OFFICIAL
634
635// ============================================================================
636// Prototype Implementation (HTTP/2 Transport)
637// ============================================================================
638
639// Server stream reader implementation
641{
642public:
643 server_stream_reader_impl(std::shared_ptr<http2::http2_client> http2_client,
644 uint32_t stream_id)
645 : http2_client_(std::move(http2_client))
646 , stream_id_(stream_id)
647 , has_more_(true)
648 {
649 }
650
651 auto read() -> Result<grpc_message> override
652 {
653 std::unique_lock<std::mutex> lock(mutex_);
654
655 // Wait for data or end of stream
656 cv_.wait(lock, [this] { return !buffer_.empty() || !has_more_; });
657
658 if (buffer_.empty())
659 {
660 if (!has_more_)
661 {
662 return error<grpc_message>(
663 static_cast<int>(status_code::ok),
664 "End of stream",
665 "server_stream_reader::read");
666 }
667 }
668
669 // Parse gRPC message from buffer
670 auto parse_result = grpc_message::parse(buffer_);
671 if (parse_result.is_err())
672 {
673 return parse_result;
674 }
675
676 // Remove parsed data from buffer
677 auto& msg = parse_result.value();
678 size_t msg_size = 5 + msg.data.size(); // 1 byte compression + 4 bytes length + data
679 if (buffer_.size() >= msg_size)
680 {
681 buffer_.erase(buffer_.begin(), buffer_.begin() + static_cast<ptrdiff_t>(msg_size));
682 }
683
684 return ok(std::move(msg));
685 }
686
687 auto has_more() const -> bool override
688 {
689 std::lock_guard<std::mutex> lock(mutex_);
690 return has_more_ || !buffer_.empty();
691 }
692
693 auto finish() -> grpc_status override
694 {
695 std::unique_lock<std::mutex> lock(mutex_);
696 cv_.wait(lock, [this] { return !has_more_; });
697 return final_status_;
698 }
699
700 void on_data(const std::vector<uint8_t>& data)
701 {
702 std::lock_guard<std::mutex> lock(mutex_);
703 buffer_.insert(buffer_.end(), data.begin(), data.end());
704 cv_.notify_one();
705 }
706
707 void on_headers(const std::vector<http2::http_header>& headers)
708 {
709 std::lock_guard<std::mutex> lock(mutex_);
710 for (const auto& h : headers)
711 {
712 if (h.name == "grpc-status")
713 {
714 try { final_status_.code = static_cast<status_code>(std::stoi(h.value)); }
715 catch (...) {}
716 }
717 else if (h.name == "grpc-message")
718 {
719 final_status_.message = h.value;
720 }
721 }
722 }
723
725 {
726 std::lock_guard<std::mutex> lock(mutex_);
727 has_more_ = false;
728 if (status_code != 200)
729 {
731 final_status_.message = "HTTP error: " + std::to_string(status_code);
732 }
733 cv_.notify_all();
734 }
735
736private:
737 std::shared_ptr<http2::http2_client> http2_client_;
738 [[maybe_unused]] uint32_t stream_id_; // Reserved for future stream operations
739 mutable std::mutex mutex_;
740 std::condition_variable cv_;
741 std::vector<uint8_t> buffer_;
744};
745
746// Client stream writer implementation
748{
749public:
750 client_stream_writer_impl(std::shared_ptr<http2::http2_client> http2_client,
751 uint32_t stream_id)
752 : http2_client_(std::move(http2_client))
753 , stream_id_(stream_id)
754 , writes_done_(false)
755 {
756 }
757
758 auto write(const std::vector<uint8_t>& message) -> VoidResult override
759 {
760 if (writes_done_)
761 {
762 return error_void(
764 "Stream writes already done",
765 "client_stream_writer::write");
766 }
767
768 // Serialize with gRPC framing
769 grpc_message msg{std::vector<uint8_t>(message)};
770 auto serialized = msg.serialize();
771
772 return http2_client_->write_stream(stream_id_, serialized, false);
773 }
774
775 auto writes_done() -> VoidResult override
776 {
777 if (writes_done_)
778 {
779 return ok();
780 }
781
782 writes_done_ = true;
783 return http2_client_->close_stream_writer(stream_id_);
784 }
785
786 auto finish() -> Result<grpc_message> override
787 {
788 // Close write side if not already done
789 if (!writes_done_)
790 {
791 auto wd_result = writes_done();
792 if (wd_result.is_err())
793 {
794 const auto& err = wd_result.error();
795 return error<grpc_message>(err.code, err.message, "client_stream_writer::finish");
796 }
797 }
798
799 // Wait for response
800 std::unique_lock<std::mutex> lock(mutex_);
801 cv_.wait(lock, [this] { return response_ready_; });
802
804 {
805 return ok(std::move(response_));
806 }
807
808 return error<grpc_message>(
809 static_cast<int>(final_status_.code),
811 "client_stream_writer::finish");
812 }
813
814 void on_data(const std::vector<uint8_t>& data)
815 {
816 std::lock_guard<std::mutex> lock(mutex_);
817 response_buffer_.insert(response_buffer_.end(), data.begin(), data.end());
818 }
819
820 void on_headers(const std::vector<http2::http_header>& headers)
821 {
822 std::lock_guard<std::mutex> lock(mutex_);
823 for (const auto& h : headers)
824 {
825 if (h.name == "grpc-status")
826 {
827 try { final_status_.code = static_cast<status_code>(std::stoi(h.value)); }
828 catch (...) {}
829 }
830 else if (h.name == "grpc-message")
831 {
832 final_status_.message = h.value;
833 }
834 }
835 }
836
838 {
839 std::lock_guard<std::mutex> lock(mutex_);
840
841 if (status_code != 200)
842 {
844 final_status_.message = "HTTP error: " + std::to_string(status_code);
845 }
846
847 // Parse response
848 if (!response_buffer_.empty())
849 {
850 auto parse_result = grpc_message::parse(response_buffer_);
851 if (parse_result.is_ok())
852 {
853 response_ = std::move(parse_result.value());
854 }
855 }
856
857 response_ready_ = true;
858 cv_.notify_all();
859 }
860
861private:
862 std::shared_ptr<http2::http2_client> http2_client_;
863 uint32_t stream_id_;
865 mutable std::mutex mutex_;
866 std::condition_variable cv_;
867 std::vector<uint8_t> response_buffer_;
870 bool response_ready_ = false;
871};
872
873// Bidirectional stream implementation
875{
876public:
877 bidi_stream_impl(std::shared_ptr<http2::http2_client> http2_client,
878 uint32_t stream_id)
879 : http2_client_(std::move(http2_client))
880 , stream_id_(stream_id)
881 , writes_done_(false)
882 , stream_ended_(false)
883 {
884 }
885
886 auto write(const std::vector<uint8_t>& message) -> VoidResult override
887 {
888 if (writes_done_)
889 {
890 return error_void(
892 "Stream writes already done",
893 "bidi_stream::write");
894 }
895
896 grpc_message msg{std::vector<uint8_t>(message)};
897 auto serialized = msg.serialize();
898
899 return http2_client_->write_stream(stream_id_, serialized, false);
900 }
901
902 auto read() -> Result<grpc_message> override
903 {
904 std::unique_lock<std::mutex> lock(mutex_);
905
906 cv_.wait(lock, [this] { return !buffer_.empty() || stream_ended_; });
907
908 if (buffer_.empty())
909 {
910 if (stream_ended_)
911 {
912 return error<grpc_message>(
913 static_cast<int>(status_code::ok),
914 "End of stream",
915 "bidi_stream::read");
916 }
917 }
918
919 auto parse_result = grpc_message::parse(buffer_);
920 if (parse_result.is_err())
921 {
922 return parse_result;
923 }
924
925 auto& msg = parse_result.value();
926 size_t msg_size = 5 + msg.data.size();
927 if (buffer_.size() >= msg_size)
928 {
929 buffer_.erase(buffer_.begin(), buffer_.begin() + static_cast<ptrdiff_t>(msg_size));
930 }
931
932 return ok(std::move(msg));
933 }
934
935 auto writes_done() -> VoidResult override
936 {
937 if (writes_done_)
938 {
939 return ok();
940 }
941
942 writes_done_ = true;
943 return http2_client_->close_stream_writer(stream_id_);
944 }
945
946 auto finish() -> grpc_status override
947 {
948 if (!writes_done_)
949 {
950 writes_done();
951 }
952
953 std::unique_lock<std::mutex> lock(mutex_);
954 cv_.wait(lock, [this] { return stream_ended_; });
955 return final_status_;
956 }
957
958 void on_data(const std::vector<uint8_t>& data)
959 {
960 std::lock_guard<std::mutex> lock(mutex_);
961 buffer_.insert(buffer_.end(), data.begin(), data.end());
962 cv_.notify_one();
963 }
964
965 void on_headers(const std::vector<http2::http_header>& headers)
966 {
967 std::lock_guard<std::mutex> lock(mutex_);
968 for (const auto& h : headers)
969 {
970 if (h.name == "grpc-status")
971 {
972 try { final_status_.code = static_cast<status_code>(std::stoi(h.value)); }
973 catch (...) {}
974 }
975 else if (h.name == "grpc-message")
976 {
977 final_status_.message = h.value;
978 }
979 }
980 }
981
983 {
984 std::lock_guard<std::mutex> lock(mutex_);
985 stream_ended_ = true;
986 if (status_code != 200)
987 {
989 final_status_.message = "HTTP error: " + std::to_string(status_code);
990 }
991 cv_.notify_all();
992 }
993
994private:
995 std::shared_ptr<http2::http2_client> http2_client_;
996 uint32_t stream_id_;
999 mutable std::mutex mutex_;
1000 std::condition_variable cv_;
1001 std::vector<uint8_t> buffer_;
1003};
1004
1005// Implementation class using HTTP/2 transport
1007{
1008public:
1009 explicit impl(std::string target, grpc_channel_config config)
1010 : target_(std::move(target))
1011 , config_(std::move(config))
1012 , connected_(false)
1013 {
1014 }
1015
1017 {
1018 disconnect();
1019 }
1020
1022 {
1023 auto span = tracing::is_tracing_enabled()
1024 ? std::make_optional(tracing::trace_context::create_span("grpc.client.connect"))
1025 : std::nullopt;
1026 if (span)
1027 {
1028 span->set_attribute("rpc.system", "grpc")
1029 .set_attribute("rpc.grpc.target", target_)
1030 .set_attribute("net.transport", "tcp");
1031 }
1032
1033 std::lock_guard<std::mutex> lock(mutex_);
1034
1035 if (connected_.load())
1036 {
1037 if (span)
1038 {
1039 span->set_attribute("grpc.client.already_connected", true);
1040 }
1041 return ok();
1042 }
1043
1044 // Parse target address
1045 auto colon_pos = target_.find(':');
1046 if (colon_pos == std::string::npos)
1047 {
1048 if (span)
1049 {
1050 span->set_error("Invalid target address format");
1051 }
1052 return error_void(
1054 "Invalid target address format",
1055 "grpc::client",
1056 "Expected format: host:port");
1057 }
1058
1059 host_ = target_.substr(0, colon_pos);
1060 auto port_str = target_.substr(colon_pos + 1);
1061
1062 unsigned short port = 0;
1063 auto [ptr, ec] = std::from_chars(
1064 port_str.data(),
1065 port_str.data() + port_str.size(),
1066 port);
1067
1068 if (ec != std::errc())
1069 {
1070 if (span)
1071 {
1072 span->set_error("Invalid port number");
1073 }
1074 return error_void(
1076 "Invalid port number",
1077 "grpc::client");
1078 }
1079
1080 if (span)
1081 {
1082 span->set_attribute("net.peer.name", host_)
1083 .set_attribute("net.peer.port", static_cast<int64_t>(port));
1084 }
1085
1086 // Create HTTP/2 client
1087 http2_client_ = std::make_shared<http2::http2_client>("grpc-client");
1089
1090 // Connect using HTTP/2
1091 auto result = http2_client_->connect(host_, port);
1092 if (result.is_err())
1093 {
1094 const auto& err = result.error();
1095 if (span)
1096 {
1097 span->set_error(err.message);
1098 }
1099 return error_void(err.code, err.message, "grpc::client",
1100 get_error_details(err));
1101 }
1102
1103 connected_.store(true);
1104 return ok();
1105 }
1106
1107 auto disconnect() -> void
1108 {
1109 std::lock_guard<std::mutex> lock(mutex_);
1110
1111 if (http2_client_ && connected_.load())
1112 {
1113 http2_client_->disconnect();
1114 }
1115
1116 connected_.store(false);
1117 http2_client_.reset();
1118 }
1119
1120 auto is_connected() const -> bool
1121 {
1122 return connected_.load() && http2_client_ && http2_client_->is_connected();
1123 }
1124
1125 auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
1126 {
1127 auto deadline = std::chrono::steady_clock::now() + timeout;
1128
1129 while (std::chrono::steady_clock::now() < deadline)
1130 {
1131 if (is_connected())
1132 {
1133 return true;
1134 }
1135 std::this_thread::sleep_for(std::chrono::milliseconds(10));
1136 }
1137
1138 return is_connected();
1139 }
1140
1141 auto target() const -> const std::string&
1142 {
1143 return target_;
1144 }
1145
1146 auto call_raw(const std::string& method,
1147 const std::vector<uint8_t>& request,
1148 const call_options& options) -> Result<grpc_message>
1149 {
1150 auto span = tracing::is_tracing_enabled()
1151 ? std::make_optional(tracing::trace_context::create_span("grpc.client.call"))
1152 : std::nullopt;
1153 if (span)
1154 {
1155 span->set_attribute("rpc.system", "grpc")
1156 .set_attribute("rpc.method", method)
1157 .set_attribute("rpc.grpc.target", target_)
1158 .set_attribute("rpc.request.size", static_cast<int64_t>(request.size()));
1159 }
1160
1161 if (!is_connected())
1162 {
1163 if (span)
1164 {
1165 span->set_error("Not connected to server");
1166 }
1167 return error<grpc_message>(
1169 "Not connected to server",
1170 "grpc::client");
1171 }
1172
1173 // Validate method format
1174 if (method.empty() || method[0] != '/')
1175 {
1176 if (span)
1177 {
1178 span->set_error("Invalid method format");
1179 }
1180 return error<grpc_message>(
1182 "Invalid method format",
1183 "grpc::client",
1184 "Method must start with '/'");
1185 }
1186
1187 // Check deadline
1188 if (options.deadline.has_value())
1189 {
1190 if (std::chrono::system_clock::now() > options.deadline.value())
1191 {
1192 if (span)
1193 {
1194 span->set_error("Deadline exceeded before call started")
1195 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(status_code::deadline_exceeded));
1196 }
1197 return error<grpc_message>(
1198 static_cast<int>(status_code::deadline_exceeded),
1199 "Deadline exceeded before call started",
1200 "grpc::client");
1201 }
1202 }
1203
1204 // Build gRPC headers
1205 std::vector<http2::http_header> headers;
1206 headers.emplace_back(header_names::content_type, grpc_content_type);
1207 headers.emplace_back(header_names::te, "trailers");
1208 headers.emplace_back(header_names::grpc_accept_encoding,
1209 std::string(compression::identity) + "," +
1211
1212 // Add timeout header if deadline is set
1213 if (options.deadline.has_value())
1214 {
1215 auto now = std::chrono::system_clock::now();
1216 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1217 options.deadline.value() - now);
1218 if (remaining.count() > 0)
1219 {
1220 headers.emplace_back(header_names::grpc_timeout,
1221 format_timeout(static_cast<uint64_t>(remaining.count())));
1222 }
1223 }
1224
1225 // Add custom metadata
1226 for (const auto& [key, value] : options.metadata)
1227 {
1228 headers.emplace_back(key, value);
1229 }
1230
1231 // Serialize request with gRPC framing
1232 grpc_message request_msg{std::vector<uint8_t>{request.begin(), request.end()}};
1233 auto serialized_request = request_msg.serialize();
1234
1235 // Send HTTP/2 POST request
1236 auto response_result = http2_client_->post(method, serialized_request, headers);
1237 if (response_result.is_err())
1238 {
1239 const auto& err = response_result.error();
1240 if (span)
1241 {
1242 span->set_error(err.message);
1243 }
1244 return error<grpc_message>(err.code, err.message, "grpc::client",
1245 get_error_details(err));
1246 }
1247
1248 const auto& response = response_result.value();
1249
1250 // Check HTTP status - gRPC always uses 200 OK for successful transport
1251 if (response.status_code != 200)
1252 {
1253 if (span)
1254 {
1255 span->set_error("HTTP error: " + std::to_string(response.status_code))
1256 .set_attribute("http.status_code", static_cast<int64_t>(response.status_code));
1257 }
1258 return error<grpc_message>(
1259 static_cast<int>(status_code::unavailable),
1260 "HTTP error: " + std::to_string(response.status_code),
1261 "grpc::client");
1262 }
1263
1264 // Extract gRPC status from trailers/headers
1266 std::string grpc_message_str;
1267
1268 for (const auto& header : response.headers)
1269 {
1270 if (header.name == trailer_names::grpc_status)
1271 {
1272 int status_int = 0;
1273 auto [ptr, ec] = std::from_chars(
1274 header.value.data(),
1275 header.value.data() + header.value.size(),
1276 status_int);
1277 if (ec == std::errc())
1278 {
1279 grpc_status = static_cast<status_code>(status_int);
1280 }
1281 }
1282 else if (header.name == trailer_names::grpc_message)
1283 {
1284 grpc_message_str = header.value;
1285 }
1286 }
1287
1288 // Check gRPC status
1290 {
1291 if (span)
1292 {
1293 span->set_error(grpc_message_str.empty() ?
1294 std::string(status_code_to_string(grpc_status)) : grpc_message_str)
1295 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(grpc_status));
1296 }
1297 return error<grpc_message>(
1298 static_cast<int>(grpc_status),
1299 grpc_message_str.empty() ?
1300 std::string(status_code_to_string(grpc_status)) : grpc_message_str,
1301 "grpc::client");
1302 }
1303
1304 // Parse response message
1305 if (response.body.empty())
1306 {
1307 // Empty response is valid for some RPCs
1308 if (span)
1309 {
1310 span->set_attribute("rpc.response.size", static_cast<int64_t>(0))
1311 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(0));
1312 }
1313 return ok(grpc_message{});
1314 }
1315
1316 auto parse_result = grpc_message::parse(response.body);
1317 if (parse_result.is_err())
1318 {
1319 const auto& err = parse_result.error();
1320 if (span)
1321 {
1322 span->set_error(err.message);
1323 }
1324 return error<grpc_message>(err.code, err.message, "grpc::client",
1325 get_error_details(err));
1326 }
1327
1328 if (span)
1329 {
1330 span->set_attribute("rpc.response.size", static_cast<int64_t>(parse_result.value().data.size()))
1331 .set_attribute("rpc.grpc.status_code", static_cast<int64_t>(0));
1332 }
1333 return ok(std::move(parse_result.value()));
1334 }
1335
1336 auto call_raw_async(const std::string& method,
1337 const std::vector<uint8_t>& request,
1338 std::function<void(Result<grpc_message>)> callback,
1339 const call_options& options) -> void
1340 {
1341 // Execute asynchronously using thread pool
1343 [this, method, request, callback, options]() {
1344 auto result = call_raw(method, request, options);
1345 if (callback)
1346 {
1347 callback(std::move(result));
1348 }
1349 });
1350 }
1351
1352 auto server_stream_raw(const std::string& method,
1353 const std::vector<uint8_t>& request,
1354 const call_options& options)
1356 {
1357 if (!is_connected())
1358 {
1361 "Not connected to server",
1362 "grpc::client");
1363 }
1364
1365 // Validate method format
1366 if (method.empty() || method[0] != '/')
1367 {
1370 "Invalid method format",
1371 "grpc::client",
1372 "Method must start with '/'");
1373 }
1374
1375 // Build gRPC headers
1376 std::vector<http2::http_header> headers;
1377 headers.emplace_back("content-type", std::string(grpc_content_type));
1378 headers.emplace_back("te", "trailers");
1379 headers.emplace_back("grpc-accept-encoding",
1380 std::string(compression::identity) + "," +
1382
1383 // Add timeout header if deadline is set
1384 if (options.deadline.has_value())
1385 {
1386 auto now = std::chrono::system_clock::now();
1387 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1388 options.deadline.value() - now);
1389 if (remaining.count() > 0)
1390 {
1391 headers.emplace_back("grpc-timeout",
1392 format_timeout(static_cast<uint64_t>(remaining.count())));
1393 }
1394 }
1395
1396 // Add custom metadata
1397 for (const auto& [key, value] : options.metadata)
1398 {
1399 headers.emplace_back(key, value);
1400 }
1401
1402 // Create the reader as shared_ptr for callback capture
1403 auto reader = std::make_shared<server_stream_reader_impl>(http2_client_, 0);
1404
1405 // Start streaming request
1406 auto stream_result = http2_client_->start_stream(
1407 method,
1408 headers,
1409 [reader](std::vector<uint8_t> data) { reader->on_data(data); },
1410 [reader](std::vector<http2::http_header> hdrs) { reader->on_headers(hdrs); },
1411 [reader](int status) { reader->on_complete(status); });
1412
1413 if (stream_result.is_err())
1414 {
1415 const auto& err = stream_result.error();
1417 err.code, err.message, "grpc::client", get_error_details(err));
1418 }
1419
1420 // Send the request with gRPC framing
1421 grpc_message request_msg{std::vector<uint8_t>(request)};
1422 auto serialized = request_msg.serialize();
1423 auto write_result = http2_client_->write_stream(stream_result.value(), serialized, true);
1424
1425 if (write_result.is_err())
1426 {
1427 const auto& err = write_result.error();
1429 err.code, err.message, "grpc::client", get_error_details(err));
1430 }
1431
1432 // Return the reader wrapped in a shared_ptr-owning unique_ptr wrapper
1433 // Use a custom deleter that captures the shared_ptr to extend lifetime
1434 struct shared_holder : public grpc_client::server_stream_reader {
1435 std::shared_ptr<server_stream_reader_impl> impl;
1436 explicit shared_holder(std::shared_ptr<server_stream_reader_impl> p) : impl(std::move(p)) {}
1437 auto read() -> Result<grpc_message> override { return impl->read(); }
1438 auto has_more() const -> bool override { return impl->has_more(); }
1439 auto finish() -> grpc_status override { return impl->finish(); }
1440 };
1441
1442 return ok(std::unique_ptr<grpc_client::server_stream_reader>(
1443 new shared_holder(reader)));
1444 }
1445
1446 auto client_stream_raw(const std::string& method,
1447 const call_options& options)
1449 {
1450 if (!is_connected())
1451 {
1454 "Not connected to server",
1455 "grpc::client");
1456 }
1457
1458 // Validate method format
1459 if (method.empty() || method[0] != '/')
1460 {
1463 "Invalid method format",
1464 "grpc::client",
1465 "Method must start with '/'");
1466 }
1467
1468 // Build gRPC headers
1469 std::vector<http2::http_header> headers;
1470 headers.emplace_back("content-type", std::string(grpc_content_type));
1471 headers.emplace_back("te", "trailers");
1472 headers.emplace_back("grpc-accept-encoding",
1473 std::string(compression::identity) + "," +
1475
1476 // Add timeout header if deadline is set
1477 if (options.deadline.has_value())
1478 {
1479 auto now = std::chrono::system_clock::now();
1480 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1481 options.deadline.value() - now);
1482 if (remaining.count() > 0)
1483 {
1484 headers.emplace_back("grpc-timeout",
1485 format_timeout(static_cast<uint64_t>(remaining.count())));
1486 }
1487 }
1488
1489 // Add custom metadata
1490 for (const auto& [key, value] : options.metadata)
1491 {
1492 headers.emplace_back(key, value);
1493 }
1494
1495 // Create the writer as shared_ptr for callback capture
1496 auto stream_id_holder = std::make_shared<uint32_t>(0);
1497 auto writer = std::make_shared<client_stream_writer_impl>(http2_client_, 0);
1498
1499 // Start streaming request
1500 auto stream_result = http2_client_->start_stream(
1501 method,
1502 headers,
1503 [writer](std::vector<uint8_t> data) { writer->on_data(data); },
1504 [writer](std::vector<http2::http_header> hdrs) { writer->on_headers(hdrs); },
1505 [writer](int status) { writer->on_complete(status); });
1506
1507 if (stream_result.is_err())
1508 {
1509 const auto& err = stream_result.error();
1511 err.code, err.message, "grpc::client", get_error_details(err));
1512 }
1513
1514 // Update the writer with actual stream ID
1515 auto actual_writer = std::make_shared<client_stream_writer_impl>(http2_client_, stream_result.value());
1516
1517 // Re-register callbacks with actual writer
1518 // Note: This is a simplified implementation - in production, you'd want
1519 // to properly update the stream callbacks
1520
1521 struct shared_writer_holder : public grpc_client::client_stream_writer {
1522 std::shared_ptr<client_stream_writer_impl> impl;
1523 explicit shared_writer_holder(std::shared_ptr<client_stream_writer_impl> p) : impl(std::move(p)) {}
1524 auto write(const std::vector<uint8_t>& message) -> VoidResult override { return impl->write(message); }
1525 auto writes_done() -> VoidResult override { return impl->writes_done(); }
1526 auto finish() -> Result<grpc_message> override { return impl->finish(); }
1527 };
1528
1529 return ok(std::unique_ptr<grpc_client::client_stream_writer>(
1530 new shared_writer_holder(actual_writer)));
1531 }
1532
1533 auto bidi_stream_raw(const std::string& method,
1534 const call_options& options)
1536 {
1537 if (!is_connected())
1538 {
1541 "Not connected to server",
1542 "grpc::client");
1543 }
1544
1545 // Validate method format
1546 if (method.empty() || method[0] != '/')
1547 {
1550 "Invalid method format",
1551 "grpc::client",
1552 "Method must start with '/'");
1553 }
1554
1555 // Build gRPC headers
1556 std::vector<http2::http_header> headers;
1557 headers.emplace_back("content-type", std::string(grpc_content_type));
1558 headers.emplace_back("te", "trailers");
1559 headers.emplace_back("grpc-accept-encoding",
1560 std::string(compression::identity) + "," +
1562
1563 // Add timeout header if deadline is set
1564 if (options.deadline.has_value())
1565 {
1566 auto now = std::chrono::system_clock::now();
1567 auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
1568 options.deadline.value() - now);
1569 if (remaining.count() > 0)
1570 {
1571 headers.emplace_back("grpc-timeout",
1572 format_timeout(static_cast<uint64_t>(remaining.count())));
1573 }
1574 }
1575
1576 // Add custom metadata
1577 for (const auto& [key, value] : options.metadata)
1578 {
1579 headers.emplace_back(key, value);
1580 }
1581
1582 // Create the bidi stream as shared_ptr for callback capture
1583 auto bidi = std::make_shared<bidi_stream_impl>(http2_client_, 0);
1584
1585 // Start streaming request
1586 auto stream_result = http2_client_->start_stream(
1587 method,
1588 headers,
1589 [bidi](std::vector<uint8_t> data) { bidi->on_data(data); },
1590 [bidi](std::vector<http2::http_header> hdrs) { bidi->on_headers(hdrs); },
1591 [bidi](int status) { bidi->on_complete(status); });
1592
1593 if (stream_result.is_err())
1594 {
1595 const auto& err = stream_result.error();
1597 err.code, err.message, "grpc::client", get_error_details(err));
1598 }
1599
1600 // Create actual bidi stream with proper stream ID
1601 auto actual_bidi = std::make_shared<bidi_stream_impl>(http2_client_, stream_result.value());
1602
1603 struct shared_bidi_holder : public grpc_client::bidi_stream {
1604 std::shared_ptr<bidi_stream_impl> impl;
1605 explicit shared_bidi_holder(std::shared_ptr<bidi_stream_impl> p) : impl(std::move(p)) {}
1606 auto write(const std::vector<uint8_t>& message) -> VoidResult override { return impl->write(message); }
1607 auto read() -> Result<grpc_message> override { return impl->read(); }
1608 auto writes_done() -> VoidResult override { return impl->writes_done(); }
1609 auto finish() -> grpc_status override { return impl->finish(); }
1610 };
1611
1612 return ok(std::unique_ptr<grpc_client::bidi_stream>(
1613 new shared_bidi_holder(actual_bidi)));
1614 }
1615
1616private:
1617 std::string target_;
1618 std::string host_;
1620 std::shared_ptr<http2::http2_client> http2_client_;
1621 std::atomic<bool> connected_;
1622 mutable std::mutex mutex_;
1623};
1624
1625#endif // !NETWORK_GRPC_OFFICIAL
1626
1627// grpc_client implementation
1628
1629grpc_client::grpc_client(const std::string& target,
1630 const grpc_channel_config& config)
1631 : impl_(std::make_unique<impl>(target, config))
1632{
1633}
1634
1635grpc_client::~grpc_client() = default;
1636
1637grpc_client::grpc_client(grpc_client&&) noexcept = default;
1638grpc_client& grpc_client::operator=(grpc_client&&) noexcept = default;
1639
1640auto grpc_client::connect() -> VoidResult
1641{
1642 return impl_->connect();
1643}
1644
1646{
1647 impl_->disconnect();
1648}
1649
1650auto grpc_client::is_connected() const -> bool
1651{
1652 return impl_->is_connected();
1653}
1654
1655auto grpc_client::wait_for_connected(std::chrono::milliseconds timeout) -> bool
1656{
1657 return impl_->wait_for_connected(timeout);
1658}
1659
1660auto grpc_client::target() const -> const std::string&
1661{
1662 return impl_->target();
1663}
1664
1665auto grpc_client::call_raw(const std::string& method,
1666 const std::vector<uint8_t>& request,
1667 const call_options& options) -> Result<grpc_message>
1668{
1669 return impl_->call_raw(method, request, options);
1670}
1671
1672auto grpc_client::call_raw_async(const std::string& method,
1673 const std::vector<uint8_t>& request,
1674 std::function<void(Result<grpc_message>)> callback,
1675 const call_options& options) -> void
1676{
1677 impl_->call_raw_async(method, request, std::move(callback), options);
1678}
1679
1680auto grpc_client::server_stream_raw(const std::string& method,
1681 const std::vector<uint8_t>& request,
1682 const call_options& options)
1684{
1685 return impl_->server_stream_raw(method, request, options);
1686}
1687
1688auto grpc_client::client_stream_raw(const std::string& method,
1689 const call_options& options)
1691{
1692 return impl_->client_stream_raw(method, options);
1693}
1694
1695auto grpc_client::bidi_stream_raw(const std::string& method,
1696 const call_options& options)
1698{
1699 return impl_->bidi_stream_raw(method, options);
1700}
1701
1702} // namespace kcenon::network::protocols::grpc
static thread_integration_manager & instance()
Get the singleton instance.
std::future< void > submit_task(std::function< void()> task)
Submit a task to the thread pool.
auto write(const std::vector< uint8_t > &message) -> VoidResult override
Write message to stream.
Definition client.cpp:886
void on_headers(const std::vector< http2::http_header > &headers)
Definition client.cpp:965
auto read() -> Result< grpc_message > override
Read next message from stream.
Definition client.cpp:902
void on_data(const std::vector< uint8_t > &data)
Definition client.cpp:958
auto finish() -> grpc_status override
Finish the call and get final status.
Definition client.cpp:946
std::shared_ptr< http2::http2_client > http2_client_
Definition client.cpp:995
auto writes_done() -> VoidResult override
Signal that writing is done.
Definition client.cpp:935
bidi_stream_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
Definition client.cpp:877
auto finish() -> Result< grpc_message > override
Finish the call and get response.
Definition client.cpp:786
void on_headers(const std::vector< http2::http_header > &headers)
Definition client.cpp:820
std::shared_ptr< http2::http2_client > http2_client_
Definition client.cpp:862
void on_data(const std::vector< uint8_t > &data)
Definition client.cpp:814
auto write(const std::vector< uint8_t > &message) -> VoidResult override
Write message to stream.
Definition client.cpp:758
auto writes_done() -> VoidResult override
Signal that writing is done.
Definition client.cpp:775
client_stream_writer_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
Definition client.cpp:750
auto call_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options) -> Result< grpc_message >
Definition client.cpp:1146
auto bidi_stream_raw(const std::string &method, const call_options &options) -> Result< std::unique_ptr< grpc_client::bidi_stream > >
Definition client.cpp:1533
std::shared_ptr< http2::http2_client > http2_client_
Definition client.cpp:1620
impl(std::string target, grpc_channel_config config)
Definition client.cpp:1009
auto client_stream_raw(const std::string &method, const call_options &options) -> Result< std::unique_ptr< grpc_client::client_stream_writer > >
Definition client.cpp:1446
auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
Definition client.cpp:1125
auto server_stream_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options) -> Result< std::unique_ptr< grpc_client::server_stream_reader > >
Definition client.cpp:1352
auto target() const -> const std::string &
Definition client.cpp:1141
auto call_raw_async(const std::string &method, const std::vector< uint8_t > &request, std::function< void(Result< grpc_message >)> callback, const call_options &options) -> void
Definition client.cpp:1336
gRPC client for making RPC calls
Definition client.h:115
auto is_connected() const -> bool
Check if connected.
Definition client.cpp:1650
auto client_stream_raw(const std::string &method, const call_options &options={}) -> Result< std::unique_ptr< client_stream_writer > >
Start a client streaming RPC call.
Definition client.cpp:1688
auto target() const -> const std::string &
Get the target address.
Definition client.cpp:1660
auto call_raw_async(const std::string &method, const std::vector< uint8_t > &request, std::function< void(Result< grpc_message >)> callback, const call_options &options={}) -> void
Make an async unary RPC call.
Definition client.cpp:1672
auto wait_for_connected(std::chrono::milliseconds timeout) -> bool
Wait for connection to be ready.
Definition client.cpp:1655
auto disconnect() -> void
Disconnect from the server.
Definition client.cpp:1645
auto call_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options={}) -> Result< grpc_message >
Make a unary RPC call.
Definition client.cpp:1665
auto server_stream_raw(const std::string &method, const std::vector< uint8_t > &request, const call_options &options={}) -> Result< std::unique_ptr< server_stream_reader > >
Start a server streaming RPC call.
Definition client.cpp:1680
auto bidi_stream_raw(const std::string &method, const call_options &options={}) -> Result< std::unique_ptr< bidi_stream > >
Start a bidirectional streaming RPC call.
Definition client.cpp:1695
grpc_client(const std::string &target, const grpc_channel_config &config={})
Construct gRPC client.
Definition client.cpp:1629
auto has_more() const -> bool override
Check if stream has more messages.
Definition client.cpp:687
auto finish() -> grpc_status override
Get final status after stream ends.
Definition client.cpp:693
auto read() -> Result< grpc_message > override
Read next message from stream.
Definition client.cpp:651
std::shared_ptr< http2::http2_client > http2_client_
Definition client.cpp:737
void on_headers(const std::vector< http2::http_header > &headers)
Definition client.cpp:707
server_stream_reader_impl(std::shared_ptr< http2::http2_client > http2_client, uint32_t stream_id)
Definition client.cpp:643
void on_data(const std::vector< uint8_t > &data)
Definition client.cpp:700
static auto create_span(std::string_view name) -> span
Create a new root span with a new trace context.
gRPC client channel configuration and connection.
tracing_config config
Definition exporters.cpp:29
Official gRPC library wrapper interfaces.
gRPC message framing and serialization.
constexpr const char * grpc_accept_encoding
Definition frame.h:134
gRPC protocol implementation
Definition client.h:34
auto format_timeout(uint64_t timeout_ms) -> std::string
Format timeout as gRPC timeout string.
Definition frame.cpp:131
constexpr const char * grpc_content_type
gRPC content-type header value
Definition frame.h:109
status_code
gRPC status codes (as defined in grpc/status.h)
Definition status.h:36
@ deadline_exceeded
Deadline expired before operation completed.
@ unimplemented
Operation not implemented.
@ ok
Not an error; returned on success.
constexpr auto status_code_to_string(status_code code) -> std::string_view
Convert status code to string.
Definition status.h:61
auto is_tracing_enabled() -> bool
Check if tracing is enabled.
std::string get_error_details(const simple_error &err)
Result< std::monostate > VoidResult
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
RAII span implementation for distributed tracing.
Options for individual RPC calls.
Definition client.h:79
std::string root_certificates
Root certificates for TLS (PEM format)
Definition client.h:53
std::optional< std::string > client_key
Client private key for mutual TLS (PEM format)
Definition client.h:59
std::optional< std::string > client_certificate
Client certificate for mutual TLS (PEM format)
Definition client.h:56
std::chrono::milliseconds default_timeout
Default timeout for RPC calls.
Definition client.h:47
gRPC message with compression flag and payload
Definition frame.h:50
static auto parse(std::span< const uint8_t > input) -> Result< grpc_message >
Parse gRPC message from raw bytes.
Definition frame.cpp:14
auto serialize() const -> std::vector< uint8_t >
Serialize message to bytes with length prefix.
Definition frame.cpp:67
std::vector< uint8_t > data
Message payload.
Definition frame.h:52
gRPC status with code, message, and optional details
Definition status.h:95
Thread system integration interface for network_system.
Distributed tracing context for OpenTelemetry-compatible tracing.
Configuration structures for OpenTelemetry tracing.