PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
ai_service_connector.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11
14
15// Platform-specific socket headers for HTTP client
16#ifdef _WIN32
17#include <winsock2.h>
18#include <ws2tcpip.h>
19#else
20#include <arpa/inet.h>
21#include <netdb.h>
22#include <sys/socket.h>
23#include <sys/types.h>
24#include <unistd.h>
25#endif
26
27#include <atomic>
28#include <chrono>
29#include <condition_variable>
30#include <iomanip>
31#include <mutex>
32#include <queue>
33#include <shared_mutex>
34#include <sstream>
35#include <thread>
36#include <unordered_map>
37
38namespace kcenon::pacs::ai {
39
40// =============================================================================
41// Metric Names
42// =============================================================================
43
44namespace metrics {
45constexpr const char* inference_requests_total = "pacs_ai_inference_requests_total";
46constexpr const char* inference_requests_success = "pacs_ai_inference_requests_success";
47constexpr const char* inference_requests_failed = "pacs_ai_inference_requests_failed";
48constexpr const char* inference_duration = "pacs_ai_inference_duration_seconds";
49constexpr const char* active_jobs = "pacs_ai_active_jobs";
50} // namespace metrics
51
52// =============================================================================
53// JSON Utilities
54// =============================================================================
55
56namespace json_util {
57
61[[nodiscard]] inline std::string escape_string(std::string_view str) {
62 std::string result;
63 result.reserve(str.size() + 10);
64
65 for (char c : str) {
66 switch (c) {
67 case '"':
68 result += "\\\"";
69 break;
70 case '\\':
71 result += "\\\\";
72 break;
73 case '\b':
74 result += "\\b";
75 break;
76 case '\f':
77 result += "\\f";
78 break;
79 case '\n':
80 result += "\\n";
81 break;
82 case '\r':
83 result += "\\r";
84 break;
85 case '\t':
86 result += "\\t";
87 break;
88 default:
89 if (static_cast<unsigned char>(c) < 0x20) {
90 char buf[8];
91 std::snprintf(buf, sizeof(buf), "\\u%04x",
92 static_cast<unsigned char>(c));
93 result += buf;
94 } else {
95 result += c;
96 }
97 break;
98 }
99 }
100
101 return result;
102}
103
107[[nodiscard]] inline std::string to_iso8601(
108 std::chrono::system_clock::time_point tp) {
109 const auto time_t_val = std::chrono::system_clock::to_time_t(tp);
110 std::tm tm_val{};
111
112#if defined(_MSC_VER)
113 gmtime_s(&tm_val, &time_t_val);
114#else
115 gmtime_r(&time_t_val, &tm_val);
116#endif
117
118 std::ostringstream oss;
119 oss << std::put_time(&tm_val, "%Y-%m-%dT%H:%M:%SZ");
120 return oss.str();
121}
122
126[[nodiscard]] inline std::optional<std::chrono::system_clock::time_point>
127from_iso8601(const std::string& str) {
128 std::tm tm_val{};
129 std::istringstream iss(str);
130 iss >> std::get_time(&tm_val, "%Y-%m-%dT%H:%M:%SZ");
131
132 if (iss.fail()) {
133 return std::nullopt;
134 }
135
136#if defined(_MSC_VER)
137 auto time_t_val = _mkgmtime(&tm_val);
138#else
139 auto time_t_val = timegm(&tm_val);
140#endif
141
142 return std::chrono::system_clock::from_time_t(time_t_val);
143}
144
148[[nodiscard]] inline std::optional<std::string> extract_string(
149 const std::string& json,
150 const std::string& key) {
151 std::string search_key = "\"" + key + "\"";
152 auto pos = json.find(search_key);
153 if (pos == std::string::npos) {
154 return std::nullopt;
155 }
156
157 pos = json.find(':', pos);
158 if (pos == std::string::npos) {
159 return std::nullopt;
160 }
161
162 pos = json.find('"', pos);
163 if (pos == std::string::npos) {
164 return std::nullopt;
165 }
166
167 auto start = pos + 1;
168 auto end = json.find('"', start);
169 if (end == std::string::npos) {
170 return std::nullopt;
171 }
172
173 return json.substr(start, end - start);
174}
175
179[[nodiscard]] inline std::optional<int> extract_int(
180 const std::string& json,
181 const std::string& key) {
182 std::string search_key = "\"" + key + "\"";
183 auto pos = json.find(search_key);
184 if (pos == std::string::npos) {
185 return std::nullopt;
186 }
187
188 pos = json.find(':', pos);
189 if (pos == std::string::npos) {
190 return std::nullopt;
191 }
192
193 // Skip whitespace
194 pos++;
195 while (pos < json.size() && std::isspace(json[pos])) {
196 pos++;
197 }
198
199 if (pos >= json.size()) {
200 return std::nullopt;
201 }
202
203 try {
204 std::size_t end_pos;
205 int value = std::stoi(json.substr(pos), &end_pos);
206 return value;
207 } catch (...) {
208 return std::nullopt;
209 }
210}
211
215[[nodiscard]] inline std::string build_request_json(const inference_request& request) {
216 std::ostringstream oss;
217 oss << "{"
218 << R"("study_instance_uid":")" << escape_string(request.study_instance_uid) << "\"";
219
220 if (request.series_instance_uid) {
221 oss << R"(,"series_instance_uid":")"
222 << escape_string(*request.series_instance_uid) << "\"";
223 }
224
225 oss << R"(,"model_id":")" << escape_string(request.model_id) << "\""
226 << R"(,"priority":)" << request.priority;
227
228 if (request.callback_url) {
229 oss << R"(,"callback_url":")" << escape_string(*request.callback_url) << "\"";
230 }
231
232 if (!request.parameters.empty()) {
233 oss << R"(,"parameters":{)";
234 bool first = true;
235 for (const auto& [key, value] : request.parameters) {
236 if (!first) oss << ",";
237 oss << "\"" << escape_string(key) << "\":\"" << escape_string(value) << "\"";
238 first = false;
239 }
240 oss << "}";
241 }
242
243 if (!request.metadata.empty()) {
244 oss << R"(,"metadata":{)";
245 bool first = true;
246 for (const auto& [key, value] : request.metadata) {
247 if (!first) oss << ",";
248 oss << "\"" << escape_string(key) << "\":\"" << escape_string(value) << "\"";
249 first = false;
250 }
251 oss << "}";
252 }
253
254 oss << "}";
255 return oss.str();
256}
257
261[[nodiscard]] inline std::optional<inference_status> parse_status_json(
262 const std::string& json) {
263 inference_status status;
264
265 auto job_id = extract_string(json, "job_id");
266 if (!job_id) {
267 return std::nullopt;
268 }
269 status.job_id = *job_id;
270
271 auto status_str = extract_string(json, "status");
272 if (status_str) {
273 if (*status_str == "pending") {
274 status.status = inference_status_code::pending;
275 } else if (*status_str == "running") {
276 status.status = inference_status_code::running;
277 } else if (*status_str == "completed") {
278 status.status = inference_status_code::completed;
279 } else if (*status_str == "failed") {
280 status.status = inference_status_code::failed;
281 } else if (*status_str == "cancelled") {
282 status.status = inference_status_code::cancelled;
283 } else if (*status_str == "timeout") {
284 status.status = inference_status_code::timeout;
285 }
286 }
287
288 auto progress = extract_int(json, "progress");
289 if (progress) {
290 status.progress = *progress;
291 }
292
293 auto message = extract_string(json, "message");
294 if (message) {
295 status.message = *message;
296 }
297
298 auto error = extract_string(json, "error_message");
299 if (error) {
300 status.error_message = *error;
301 }
302
303 auto created_at = extract_string(json, "created_at");
304 if (created_at) {
305 auto tp = from_iso8601(*created_at);
306 if (tp) {
307 status.created_at = *tp;
308 }
309 }
310
311 auto started_at = extract_string(json, "started_at");
312 if (started_at) {
313 auto tp = from_iso8601(*started_at);
314 if (tp) {
315 status.started_at = *tp;
316 }
317 }
318
319 auto completed_at = extract_string(json, "completed_at");
320 if (completed_at) {
321 auto tp = from_iso8601(*completed_at);
322 if (tp) {
323 status.completed_at = *tp;
324 }
325 }
326
327 return status;
328}
329
333[[nodiscard]] inline std::optional<model_info> parse_model_json(
334 const std::string& json) {
335 model_info info;
336
337 auto model_id = extract_string(json, "model_id");
338 if (!model_id) {
339 return std::nullopt;
340 }
341 info.model_id = *model_id;
342
343 auto name = extract_string(json, "name");
344 if (name) {
345 info.name = *name;
346 }
347
348 auto description = extract_string(json, "description");
349 if (description) {
350 info.description = *description;
351 }
352
353 auto version = extract_string(json, "version");
354 if (version) {
355 info.version = *version;
356 }
357
358 return info;
359}
360
364[[nodiscard]] inline std::string base64_encode(std::string_view input) {
365 static constexpr char table[] =
366 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
367
368 std::string result;
369 result.reserve(((input.size() + 2) / 3) * 4);
370
371 auto ptr = reinterpret_cast<const unsigned char*>(input.data());
372 auto len = input.size();
373
374 for (std::size_t i = 0; i < len; i += 3) {
375 uint32_t octet_a = ptr[i];
376 uint32_t octet_b = (i + 1 < len) ? ptr[i + 1] : 0;
377 uint32_t octet_c = (i + 2 < len) ? ptr[i + 2] : 0;
378
379 uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
380
381 result += table[(triple >> 18) & 0x3F];
382 result += table[(triple >> 12) & 0x3F];
383 result += (i + 1 < len) ? table[(triple >> 6) & 0x3F] : '=';
384 result += (i + 2 < len) ? table[triple & 0x3F] : '=';
385 }
386
387 return result;
388}
389
393[[nodiscard]] inline std::vector<std::string> extract_json_array_objects(
394 const std::string& json) {
395 std::vector<std::string> objects;
396
397 // Find array start
398 auto arr_start = json.find('[');
399 if (arr_start == std::string::npos) {
400 return objects;
401 }
402
403 int depth = 0;
404 std::size_t obj_start = std::string::npos;
405
406 for (std::size_t i = arr_start + 1; i < json.size(); ++i) {
407 char c = json[i];
408
409 if (c == '"') {
410 // Skip string content
411 ++i;
412 while (i < json.size() && json[i] != '"') {
413 if (json[i] == '\\') ++i; // Skip escaped character
414 ++i;
415 }
416 continue;
417 }
418
419 if (c == '{') {
420 if (depth == 0) {
421 obj_start = i;
422 }
423 ++depth;
424 } else if (c == '}') {
425 --depth;
426 if (depth == 0 && obj_start != std::string::npos) {
427 objects.push_back(json.substr(obj_start, i - obj_start + 1));
428 obj_start = std::string::npos;
429 }
430 } else if (c == ']' && depth == 0) {
431 break;
432 }
433 }
434
435 return objects;
436}
437
438} // namespace json_util
439
440// =============================================================================
441// HTTP Client Abstraction
442// =============================================================================
443
448 int status_code{0};
449 std::string body;
450 std::map<std::string, std::string> headers;
451
452 [[nodiscard]] bool is_success() const noexcept {
453 return status_code >= 200 && status_code < 300;
454 }
455};
456
461public:
462#ifdef _WIN32
463 using native_type = SOCKET;
464 static constexpr native_type invalid_value = INVALID_SOCKET;
465#else
466 using native_type = int;
467 static constexpr native_type invalid_value = -1;
468#endif
469
470 socket_handle() = default;
471 explicit socket_handle(native_type fd) : fd_(fd) {}
472 ~socket_handle() { close(); }
473
474 socket_handle(const socket_handle&) = delete;
476
477 socket_handle(socket_handle&& other) noexcept : fd_(other.fd_) {
478 other.fd_ = invalid_value;
479 }
480
482 if (this != &other) {
483 close();
484 fd_ = other.fd_;
485 other.fd_ = invalid_value;
486 }
487 return *this;
488 }
489
490 [[nodiscard]] bool valid() const noexcept { return fd_ != invalid_value; }
491 [[nodiscard]] native_type get() const noexcept { return fd_; }
492
493 void close() noexcept {
494 if (fd_ != invalid_value) {
495#ifdef _WIN32
496 closesocket(fd_);
497#else
498 ::close(fd_);
499#endif
500 fd_ = invalid_value;
501 }
502 }
503
504private:
505 native_type fd_{invalid_value};
506};
507
512 std::string host;
513 std::string port;
514 std::string path;
515
516 static std::optional<parsed_url> parse(const std::string& url) {
517 parsed_url result;
518
519 // Find scheme end
520 auto scheme_end = url.find("://");
521 if (scheme_end == std::string::npos) return std::nullopt;
522
523 auto host_start = scheme_end + 3;
524 auto path_start = url.find('/', host_start);
525
526 std::string host_port;
527 if (path_start == std::string::npos) {
528 host_port = url.substr(host_start);
529 result.path = "/";
530 } else {
531 host_port = url.substr(host_start, path_start - host_start);
532 result.path = url.substr(path_start);
533 }
534
535 auto colon = host_port.find(':');
536 if (colon != std::string::npos) {
537 result.host = host_port.substr(0, colon);
538 result.port = host_port.substr(colon + 1);
539 } else {
540 result.host = host_port;
541 result.port = "80";
542 }
543
544 return result;
545 }
546};
547
555public:
556 explicit http_client(const ai_service_config& config)
557 : config_(config) {}
558
559 [[nodiscard]] auto post(const std::string& path,
560 const std::string& body,
561 const std::string& content_type = "application/json")
562 -> Result<http_response> {
564 "AI service POST request: {} (body size: {} bytes)",
565 config_.base_url + path, body.size());
566
567 auto headers = build_headers();
568 headers["Content-Type"] = content_type;
569 return send_request("POST", path, headers, body);
570 }
571
572 [[nodiscard]] auto get(const std::string& path)
573 -> Result<http_response> {
575 "AI service GET request: {}", config_.base_url + path);
576
577 auto headers = build_headers();
578 return send_request("GET", path, headers, "");
579 }
580
581 [[nodiscard]] auto del(const std::string& path)
582 -> Result<http_response> {
584 "AI service DELETE request: {}", config_.base_url + path);
585
586 auto headers = build_headers();
587 return send_request("DELETE", path, headers, "");
588 }
589
590 [[nodiscard]] auto check_connectivity() -> bool {
591 auto result = get("/health");
592 return result.is_ok() && result.value().is_success();
593 }
594
595 [[nodiscard]] auto measure_latency() -> std::optional<std::chrono::milliseconds> {
596 auto start = std::chrono::steady_clock::now();
597 auto result = get("/health");
598 auto end = std::chrono::steady_clock::now();
599
600 if (!result.is_ok()) {
601 return std::nullopt;
602 }
603
604 return std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
605 }
606
607private:
608 ai_service_config config_;
609
610 [[nodiscard]] auto send_request(
611 const std::string& method,
612 const std::string& path,
613 const std::map<std::string, std::string>& headers,
614 const std::string& body) -> Result<http_response> {
615
616 std::string full_url = config_.base_url + path;
617 auto url = parsed_url::parse(full_url);
618 if (!url) {
619 return error_info(-1, "Invalid URL: " + full_url, "ai_service_connector");
620 }
621
622 // Resolve hostname
623 struct addrinfo hints{};
624 hints.ai_family = AF_UNSPEC;
625 hints.ai_socktype = SOCK_STREAM;
626 hints.ai_protocol = IPPROTO_TCP;
627
628 struct addrinfo* addr_result = nullptr;
629 int gai_err = getaddrinfo(url->host.c_str(), url->port.c_str(),
630 &hints, &addr_result);
631 if (gai_err != 0) {
632 return error_info(-1,
633 "DNS resolution failed for " + url->host + ": " +
634 gai_strerror(gai_err),
635 "ai_service_connector");
636 }
637
638 // RAII cleanup for addrinfo
639 auto addr_cleanup = [](struct addrinfo* p) { freeaddrinfo(p); };
640 std::unique_ptr<struct addrinfo, decltype(addr_cleanup)>
641 addr_guard(addr_result, addr_cleanup);
642
643 // Create and connect socket
644 socket_handle sock(
645 ::socket(addr_result->ai_family, addr_result->ai_socktype,
646 addr_result->ai_protocol));
647 if (!sock.valid()) {
648 return error_info(-1, "Failed to create socket", "ai_service_connector");
649 }
650
651 // Set socket timeout
652 auto timeout_sec = std::chrono::duration_cast<std::chrono::seconds>(
653 config_.connection_timeout).count();
654 if (timeout_sec <= 0) timeout_sec = 30;
655
656#ifdef _WIN32
657 DWORD tv = static_cast<DWORD>(timeout_sec * 1000);
658 setsockopt(sock.get(), SOL_SOCKET, SO_RCVTIMEO,
659 reinterpret_cast<const char*>(&tv), sizeof(tv));
660 setsockopt(sock.get(), SOL_SOCKET, SO_SNDTIMEO,
661 reinterpret_cast<const char*>(&tv), sizeof(tv));
662#else
663 struct timeval tv{};
664 tv.tv_sec = timeout_sec;
665 setsockopt(sock.get(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
666 setsockopt(sock.get(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
667#endif
668
669 // Connect
670 if (::connect(sock.get(), addr_result->ai_addr,
671 static_cast<int>(addr_result->ai_addrlen)) != 0) {
672 return error_info(-1,
673 "Connection failed to " + url->host + ":" + url->port,
674 "ai_service_connector");
675 }
676
677 // Build HTTP request
678 std::ostringstream request_stream;
679 request_stream << method << " " << url->path << " HTTP/1.1\r\n";
680 request_stream << "Host: " << url->host << "\r\n";
681 request_stream << "Connection: close\r\n";
682
683 for (const auto& [name, value] : headers) {
684 request_stream << name << ": " << value << "\r\n";
685 }
686
687 if (!body.empty()) {
688 request_stream << "Content-Length: " << body.size() << "\r\n";
689 }
690 request_stream << "\r\n";
691 request_stream << body;
692
693 std::string request_data = request_stream.str();
694
695 // Send request
696 auto total_sent = std::size_t{0};
697 while (total_sent < request_data.size()) {
698 auto sent = ::send(sock.get(),
699 request_data.data() + total_sent,
700 static_cast<int>(request_data.size() - total_sent),
701 0);
702 if (sent <= 0) {
703 return error_info(-1, "Failed to send HTTP request",
704 "ai_service_connector");
705 }
706 total_sent += static_cast<std::size_t>(sent);
707 }
708
709 // Receive response
710 std::string response_data;
711 char buffer[4096];
712 while (true) {
713 auto received = ::recv(sock.get(), buffer, sizeof(buffer), 0);
714 if (received < 0) {
715 return error_info(-1, "Failed to receive HTTP response",
716 "ai_service_connector");
717 }
718 if (received == 0) break; // Connection closed
719 response_data.append(buffer, static_cast<std::size_t>(received));
720 }
721
722 // Parse HTTP response
723 return parse_http_response(response_data);
724 }
725
726 [[nodiscard]] static auto parse_http_response(const std::string& raw)
727 -> Result<http_response> {
728 http_response response;
729
730 auto header_end = raw.find("\r\n\r\n");
731 if (header_end == std::string::npos) {
732 return error_info(-1, "Malformed HTTP response", "ai_service_connector");
733 }
734
735 // Parse status line
736 auto first_line_end = raw.find("\r\n");
737 std::string status_line = raw.substr(0, first_line_end);
738
739 // Parse "HTTP/1.x STATUS_CODE REASON"
740 auto first_space = status_line.find(' ');
741 if (first_space == std::string::npos) {
742 return error_info(-1, "Invalid HTTP status line", "ai_service_connector");
743 }
744 auto code_start = first_space + 1;
745 try {
746 response.status_code = std::stoi(status_line.substr(code_start));
747 } catch (...) {
748 return error_info(-1, "Invalid HTTP status code", "ai_service_connector");
749 }
750
751 // Parse headers
752 auto headers_str = raw.substr(first_line_end + 2,
753 header_end - first_line_end - 2);
754 std::istringstream header_stream(headers_str);
755 std::string line;
756 while (std::getline(header_stream, line)) {
757 if (!line.empty() && line.back() == '\r') line.pop_back();
758 auto colon = line.find(':');
759 if (colon != std::string::npos) {
760 auto name = line.substr(0, colon);
761 auto value = line.substr(colon + 1);
762 // Trim leading whitespace from value
763 auto val_start = value.find_first_not_of(' ');
764 if (val_start != std::string::npos) {
765 value = value.substr(val_start);
766 }
767 response.headers[name] = value;
768 }
769 }
770
771 // Extract body
772 response.body = raw.substr(header_end + 4);
773
774 return response;
775 }
776
777 [[nodiscard]] std::map<std::string, std::string> build_headers() const {
778 std::map<std::string, std::string> headers;
779 headers["Content-Type"] = "application/json";
780
781 switch (config_.auth_type) {
782 case authentication_type::api_key:
783 headers["X-API-Key"] = config_.api_key;
784 break;
785 case authentication_type::bearer_token:
786 headers["Authorization"] = "Bearer " + config_.bearer_token;
787 break;
788 case authentication_type::basic: {
789 std::string credentials = config_.username + ":" + config_.password;
790 headers["Authorization"] = "Basic " + json_util::base64_encode(credentials);
791 break;
792 }
793 case authentication_type::none:
794 default:
795 break;
796 }
797
798 return headers;
799 }
800};
801
802// =============================================================================
803// Job Tracker
804// =============================================================================
805
810public:
811 void add_job(const std::string& job_id, const inference_request& request) {
812 std::lock_guard lock(mutex_);
813
814 inference_status status;
815 status.job_id = job_id;
816 status.status = inference_status_code::pending;
817 status.progress = 0;
818 status.message = "Job submitted";
819 status.created_at = std::chrono::system_clock::now();
820
821 jobs_[job_id] = status;
822 request_map_[job_id] = request;
823
825 metrics::active_jobs, static_cast<double>(jobs_.size()));
826 }
827
828 void update_status(const std::string& job_id, const inference_status& status) {
829 std::lock_guard lock(mutex_);
830
831 auto it = jobs_.find(job_id);
832 if (it != jobs_.end()) {
833 it->second = status;
834
835 // If job completed or failed, record metrics
836 if (status.status == inference_status_code::completed ||
837 status.status == inference_status_code::failed) {
838
839 // Note: request_map_ entry may be used for future logging
840 (void)request_map_[job_id];
841 auto duration = status.completed_at.value_or(std::chrono::system_clock::now())
842 - status.created_at;
843
845 metrics::inference_duration,
846 std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
847
848 if (status.status == inference_status_code::completed) {
850 metrics::inference_requests_success);
851 } else {
853 metrics::inference_requests_failed);
854 }
855 }
856 }
857 }
858
859 void remove_job(const std::string& job_id) {
860 std::lock_guard lock(mutex_);
861 jobs_.erase(job_id);
862 request_map_.erase(job_id);
863
865 metrics::active_jobs, static_cast<double>(jobs_.size()));
866 }
867
868 [[nodiscard]] std::optional<inference_status> get_status(const std::string& job_id) const {
869 std::shared_lock lock(mutex_);
870 auto it = jobs_.find(job_id);
871 if (it != jobs_.end()) {
872 return it->second;
873 }
874 return std::nullopt;
875 }
876
877 [[nodiscard]] std::vector<inference_status> get_active_jobs() const {
878 std::shared_lock lock(mutex_);
879 std::vector<inference_status> result;
880 for (const auto& [id, status] : jobs_) {
881 if (status.status == inference_status_code::pending ||
882 status.status == inference_status_code::running) {
883 result.push_back(status);
884 }
885 }
886 return result;
887 }
888
889private:
890 mutable std::shared_mutex mutex_;
891 std::unordered_map<std::string, inference_status> jobs_;
892 std::unordered_map<std::string, inference_request> request_map_;
893};
894
895// =============================================================================
896// AI Service Connector Implementation
897// =============================================================================
898
900public:
901 impl() = default;
902
904 shutdown();
905 }
906
907 auto initialize(const ai_service_config& config) -> Result<std::monostate> {
908 std::lock_guard lock(mutex_);
909
910 if (initialized_) {
911 return std::monostate{};
912 }
913
914 config_ = config;
915
916 // Validate configuration
917 if (config.base_url.empty()) {
918 return error_info(-1, "Base URL is required", "ai_service_connector");
919 }
920
921 // Initialize HTTP client
922 http_client_ = std::make_unique<http_client>(config);
923
924 // Test connectivity
925 if (!http_client_->check_connectivity()) {
927 "AI service at {} is not responding, continuing anyway",
928 config.base_url);
929 }
930
931 // Initialize job tracker
932 job_tracker_ = std::make_unique<job_tracker>();
933
934 // Initialize metrics
936
938 "AI service connector initialized: url={}, auth={}",
939 config.base_url, to_string(config.auth_type));
940
941 initialized_ = true;
942 return std::monostate{};
943 }
944
945 void shutdown() {
946 std::lock_guard lock(mutex_);
947
948 if (!initialized_) {
949 return;
950 }
951
952 kcenon::pacs::integration::logger_adapter::info("AI service connector shutting down");
953
954 http_client_.reset();
955 job_tracker_.reset();
956 initialized_ = false;
957 }
958
959 [[nodiscard]] bool is_initialized() const noexcept {
960 return initialized_.load();
961 }
962
963 auto request_inference(const inference_request& request) -> Result<std::string> {
964 if (!initialized_) {
965 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
966 }
967
968 // Validate request
969 if (request.study_instance_uid.empty()) {
970 return error_info(-1, "Study Instance UID is required", "ai_service_connector");
971 }
972 if (request.model_id.empty()) {
973 return error_info(-1, "Model ID is required", "ai_service_connector");
974 }
975
976 // Build request JSON
977 std::string body = json_util::build_request_json(request);
978
979 // Send request
980 auto response = http_client_->post("/inference", body);
981 if (response.is_err()) {
983 "Failed to submit inference request: {}",
984 response.error().message);
985 return response.error();
986 }
987
988 if (!response.value().is_success()) {
989 return error_info(
990 response.value().status_code,
991 "AI service returned error: " + response.value().body,
992 "ai_service_connector");
993 }
994
995 // Parse response to get job ID
996 auto job_id = json_util::extract_string(response.value().body, "job_id");
997 if (!job_id) {
998 return error_info(-1, "Failed to parse job ID from response", "ai_service_connector");
999 }
1000
1001 // Track the job
1002 job_tracker_->add_job(*job_id, request);
1003
1005 "Inference request submitted: job_id={}, study={}, model={}",
1006 *job_id, request.study_instance_uid, request.model_id);
1007
1009 metrics::inference_requests_total);
1010
1011 return *job_id;
1012 }
1013
1014 auto check_status(const std::string& job_id) -> Result<inference_status> {
1015 if (!initialized_) {
1016 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1017 }
1018
1019 // Check local cache first
1020 auto cached = job_tracker_->get_status(job_id);
1021 if (cached && (cached->status == inference_status_code::completed ||
1022 cached->status == inference_status_code::failed ||
1023 cached->status == inference_status_code::cancelled)) {
1024 return *cached;
1025 }
1026
1027 // Query remote service
1028 auto response = http_client_->get("/inference/" + job_id);
1029 if (response.is_err()) {
1030 return response.error();
1031 }
1032
1033 if (!response.value().is_success()) {
1034 if (response.value().status_code == 404) {
1035 return error_info(404, "Job not found: " + job_id, "ai_service_connector");
1036 }
1037 return error_info(
1038 response.value().status_code,
1039 "Failed to get job status",
1040 "ai_service_connector");
1041 }
1042
1043 // Parse status
1044 auto status = json_util::parse_status_json(response.value().body);
1045 if (!status) {
1046 return error_info(-1, "Failed to parse status response", "ai_service_connector");
1047 }
1048
1049 // Update local cache
1050 job_tracker_->update_status(job_id, *status);
1051
1052 return *status;
1053 }
1054
1055 auto cancel(const std::string& job_id) -> Result<std::monostate> {
1056 if (!initialized_) {
1057 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1058 }
1059
1060 auto response = http_client_->del("/inference/" + job_id);
1061 if (response.is_err()) {
1062 return response.error();
1063 }
1064
1065 if (!response.value().is_success() && response.value().status_code != 404) {
1066 return error_info(
1067 response.value().status_code,
1068 "Failed to cancel job",
1069 "ai_service_connector");
1070 }
1071
1072 // Update local status
1073 inference_status status;
1074 status.job_id = job_id;
1075 status.status = inference_status_code::cancelled;
1076 status.message = "Job cancelled by user";
1077 status.completed_at = std::chrono::system_clock::now();
1078 job_tracker_->update_status(job_id, status);
1079
1080 kcenon::pacs::integration::logger_adapter::info("Inference job cancelled: {}", job_id);
1081
1082 return std::monostate{};
1083 }
1084
1085 auto wait_for_completion(const std::string& job_id,
1086 std::chrono::milliseconds timeout,
1087 status_callback callback) -> Result<inference_status> {
1088 if (!initialized_) {
1089 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1090 }
1091
1092 auto start_time = std::chrono::steady_clock::now();
1093 auto deadline = start_time + timeout;
1094
1095 while (std::chrono::steady_clock::now() < deadline) {
1096 auto result = check_status(job_id);
1097 if (result.is_err()) {
1098 return result;
1099 }
1100
1101 auto& status = result.value();
1102
1103 // Call callback if provided
1104 if (callback) {
1105 callback(status);
1106 }
1107
1108 // Check if job is complete
1109 if (status.status == inference_status_code::completed ||
1110 status.status == inference_status_code::failed ||
1111 status.status == inference_status_code::cancelled ||
1112 status.status == inference_status_code::timeout) {
1113 return status;
1114 }
1115
1116 // Wait before polling again
1117 std::this_thread::sleep_for(config_.polling_interval);
1118 }
1119
1120 // Timeout
1121 inference_status timeout_status;
1122 timeout_status.job_id = job_id;
1123 timeout_status.status = inference_status_code::timeout;
1124 timeout_status.message = "Timed out waiting for job completion";
1125 return timeout_status;
1126 }
1127
1128 auto list_active_jobs() -> Result<std::vector<inference_status>> {
1129 if (!initialized_) {
1130 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1131 }
1132
1133 return job_tracker_->get_active_jobs();
1134 }
1135
1136 auto list_models() -> Result<std::vector<model_info>> {
1137 if (!initialized_) {
1138 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1139 }
1140
1141 auto response = http_client_->get("/models");
1142 if (response.is_err()) {
1143 return response.error();
1144 }
1145
1146 if (!response.value().is_success()) {
1147 return error_info(
1148 response.value().status_code,
1149 "Failed to list models",
1150 "ai_service_connector");
1151 }
1152
1153 // Parse models array from JSON response
1154 std::vector<model_info> models;
1155 auto model_objects = json_util::extract_json_array_objects(
1156 response.value().body);
1157
1158 for (const auto& obj : model_objects) {
1159 auto info = json_util::parse_model_json(obj);
1160 if (info) {
1161 models.push_back(std::move(*info));
1162 }
1163 }
1164
1165 return models;
1166 }
1167
1168 auto get_model_info(const std::string& model_id) -> Result<model_info> {
1169 if (!initialized_) {
1170 return error_info(-1, "AI service connector not initialized", "ai_service_connector");
1171 }
1172
1173 auto response = http_client_->get("/models/" + model_id);
1174 if (response.is_err()) {
1175 return response.error();
1176 }
1177
1178 if (!response.value().is_success()) {
1179 return error_info(
1180 response.value().status_code,
1181 "Failed to get model info",
1182 "ai_service_connector");
1183 }
1184
1185 auto info = json_util::parse_model_json(response.value().body);
1186 if (!info) {
1187 return error_info(-1, "Failed to parse model info", "ai_service_connector");
1188 }
1189
1190 return *info;
1191 }
1192
1193 [[nodiscard]] bool check_health() {
1194 if (!initialized_ || !http_client_) {
1195 return false;
1196 }
1197 return http_client_->check_connectivity();
1198 }
1199
1200 [[nodiscard]] auto get_latency() -> std::optional<std::chrono::milliseconds> {
1201 if (!initialized_ || !http_client_) {
1202 return std::nullopt;
1203 }
1204 return http_client_->measure_latency();
1205 }
1206
1207 [[nodiscard]] const ai_service_config& get_config() const {
1208 return config_;
1209 }
1210
1211 auto update_credentials(authentication_type auth_type,
1212 const std::string& credentials) -> Result<std::monostate> {
1213 std::lock_guard lock(mutex_);
1214
1215 config_.auth_type = auth_type;
1216
1217 switch (auth_type) {
1218 case authentication_type::api_key:
1219 config_.api_key = credentials;
1220 break;
1221 case authentication_type::bearer_token:
1222 config_.bearer_token = credentials;
1223 break;
1224 case authentication_type::basic: {
1225 auto colon_pos = credentials.find(':');
1226 if (colon_pos != std::string::npos) {
1227 config_.username = credentials.substr(0, colon_pos);
1228 config_.password = credentials.substr(colon_pos + 1);
1229 } else {
1230 return error_info(-1, "Basic auth credentials must be in format 'username:password'",
1231 "ai_service_connector");
1232 }
1233 break;
1234 }
1235 case authentication_type::none:
1236 break;
1237 }
1238
1239 // Recreate HTTP client with new credentials
1240 http_client_ = std::make_unique<http_client>(config_);
1241
1243 "AI service credentials updated: auth={}",
1244 to_string(auth_type));
1245
1246 return std::monostate{};
1247 }
1248
1249private:
1250 mutable std::mutex mutex_;
1251 std::atomic<bool> initialized_{false};
1252 ai_service_config config_;
1253 std::unique_ptr<http_client> http_client_;
1254 std::unique_ptr<job_tracker> job_tracker_;
1255};
1256
1257// =============================================================================
1258// Static Member Initialization
1259// =============================================================================
1260
1261std::unique_ptr<ai_service_connector::impl> ai_service_connector::pimpl_ =
1262 std::make_unique<ai_service_connector::impl>();
1263
1264// =============================================================================
1265// Public API Implementation
1266// =============================================================================
1267
1268auto ai_service_connector::initialize(const ai_service_config& config)
1270 return pimpl_->initialize(config);
1271}
1272
1274 pimpl_->shutdown();
1275}
1276
1277auto ai_service_connector::is_initialized() noexcept -> bool {
1278 return pimpl_->is_initialized();
1279}
1280
1281auto ai_service_connector::request_inference(const inference_request& request)
1283 return pimpl_->request_inference(request);
1284}
1285
1286auto ai_service_connector::check_status(const std::string& job_id)
1288 return pimpl_->check_status(job_id);
1289}
1290
1291auto ai_service_connector::cancel(const std::string& job_id)
1293 return pimpl_->cancel(job_id);
1294}
1295
1297 const std::string& job_id,
1298 std::chrono::milliseconds timeout,
1299 status_callback callback)
1301 return pimpl_->wait_for_completion(job_id, timeout, std::move(callback));
1302}
1303
1306 return pimpl_->list_active_jobs();
1307}
1308
1311 return pimpl_->list_models();
1312}
1313
1314auto ai_service_connector::get_model_info(const std::string& model_id)
1316 return pimpl_->get_model_info(model_id);
1317}
1318
1320 return pimpl_->check_health();
1321}
1322
1324 -> std::optional<std::chrono::milliseconds> {
1325 return pimpl_->get_latency();
1326}
1327
1328auto ai_service_connector::get_config() -> const ai_service_config& {
1329 return pimpl_->get_config();
1330}
1331
1333 authentication_type auth_type,
1334 const std::string& credentials)
1336 return pimpl_->update_credentials(auth_type, credentials);
1337}
1338
1339} // namespace kcenon::pacs::ai
std::optional< inference_status > parse_status_json(const std::string &json)
Parse inference status from JSON response.
std::optional< model_info > parse_model_json(const std::string &json)
Parse model info from JSON.
std::string base64_encode(std::string_view input)
Encode data to Base64.
std::vector< std::string > extract_json_array_objects(const std::string &json)
Extract JSON objects from a JSON array string.
Connector for external AI inference services.
auto list_models() -> Result< std::vector< model_info > >
std::unique_ptr< job_tracker > job_tracker_
auto initialize(const ai_service_config &config) -> Result< std::monostate >
auto check_status(const std::string &job_id) -> Result< inference_status >
std::unique_ptr< http_client > http_client_
auto cancel(const std::string &job_id) -> Result< std::monostate >
auto update_credentials(authentication_type auth_type, const std::string &credentials) -> Result< std::monostate >
auto request_inference(const inference_request &request) -> Result< std::string >
const ai_service_config & get_config() const
bool is_initialized() const noexcept
auto get_model_info(const std::string &model_id) -> Result< model_info >
auto list_active_jobs() -> Result< std::vector< inference_status > >
auto get_latency() -> std::optional< std::chrono::milliseconds >
auto wait_for_completion(const std::string &job_id, std::chrono::milliseconds timeout, status_callback callback) -> Result< inference_status >
HTTP client for AI service communication.
auto send_request(const std::string &method, const std::string &path, const std::map< std::string, std::string > &headers, const std::string &body) -> Result< http_response >
auto measure_latency() -> std::optional< std::chrono::milliseconds >
ai_service_config config_
auto check_connectivity() -> bool
auto get(const std::string &path) -> Result< http_response >
static auto parse_http_response(const std::string &raw) -> Result< http_response >
auto del(const std::string &path) -> Result< http_response >
http_client(const ai_service_config &config)
std::map< std::string, std::string > build_headers() const
auto post(const std::string &path, const std::string &body, const std::string &content_type="application/json") -> Result< http_response >
Tracks active inference jobs.
void add_job(const std::string &job_id, const inference_request &request)
std::shared_mutex mutex_
std::optional< inference_status > get_status(const std::string &job_id) const
std::vector< inference_status > get_active_jobs() const
void update_status(const std::string &job_id, const inference_status &status)
void remove_job(const std::string &job_id)
std::unordered_map< std::string, inference_request > request_map_
std::unordered_map< std::string, inference_status > jobs_
static std::unique_ptr< impl > pimpl_
static auto update_credentials(authentication_type auth_type, const std::string &credentials) -> Result< std::monostate >
Update authentication credentials.
static auto request_inference(const inference_request &request) -> Result< std::string >
Request AI inference for a study.
static auto check_status(const std::string &job_id) -> Result< inference_status >
Check the status of an inference job.
static auto list_active_jobs() -> Result< std::vector< inference_status > >
List active inference jobs.
static auto is_initialized() noexcept -> bool
Check if the connector is initialized.
static auto initialize(const ai_service_config &config) -> Result< std::monostate >
Initialize the AI service connector.
static auto list_models() -> Result< std::vector< model_info > >
List available AI models.
static auto get_model_info(const std::string &model_id) -> Result< model_info >
Get information about a specific model.
static void shutdown()
Shutdown the AI service connector.
static auto get_config() -> const ai_service_config &
Get the current configuration.
static auto cancel(const std::string &job_id) -> Result< std::monostate >
Cancel an inference job.
static auto get_latency() -> std::optional< std::chrono::milliseconds >
Get current latency to the AI service.
static auto check_health() -> bool
Check AI service health.
static auto wait_for_completion(const std::string &job_id, std::chrono::milliseconds timeout=std::chrono::minutes{30}, status_callback callback=nullptr) -> Result< inference_status >
Wait for a job to complete.
static void debug(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a debug-level message.
static void info(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an info-level message.
static void warn(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a warning-level message.
static void error(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an error-level message.
static void record_timing(std::string_view name, std::chrono::nanoseconds duration)
Record a timing measurement.
static void increment_counter(std::string_view name, std::int64_t value=1)
Increment a counter metric.
static void set_gauge(std::string_view name, double value)
Set a gauge metric value.
RAII wrapper for platform socket handle.
bool valid() const noexcept
socket_handle()=default
native_type get() const noexcept
socket_handle(native_type fd)
socket_handle & operator=(socket_handle &&other) noexcept
socket_handle(const socket_handle &)=delete
void close() noexcept
socket_handle(socket_handle &&other) noexcept
socket_handle & operator=(const socket_handle &)=delete
Adapter for DICOM audit logging using logger_system.
Adapter for PACS performance metrics and distributed tracing.
std::string escape_string(std::string_view str)
Escape special characters in JSON string.
std::string build_request_json(const inference_request &request)
Build inference request JSON.
std::string to_iso8601(std::chrono::system_clock::time_point tp)
Convert time_point to ISO 8601 string.
std::optional< std::string > extract_string(const std::string &json, const std::string &key)
Simple JSON value extractor (for basic parsing)
std::optional< std::chrono::system_clock::time_point > from_iso8601(const std::string &str)
Parse ISO 8601 string to time_point.
std::optional< int > extract_int(const std::string &json, const std::string &key)
Simple JSON integer extractor.
constexpr const char * inference_duration
constexpr const char * inference_requests_success
constexpr const char * inference_requests_total
constexpr const char * inference_requests_failed
constexpr const char * active_jobs
authentication_type
Types of authentication for AI services.
kcenon::common::Result< T > Result
Result type alias for operations returning a value.
@ running
Job is currently processing.
@ pending
Job is queued but not started.
@ completed
Job completed successfully.
Simple HTTP response structure.
std::map< std::string, std::string > headers
bool is_success() const noexcept
Request structure for AI inference.
std::optional< std::string > series_instance_uid
Series Instance UID (optional, for series-level inference)
std::string model_id
Model ID to use for inference.
int priority
Priority level (higher = more urgent)
std::map< std::string, std::string > metadata
Custom metadata to include with request.
std::string study_instance_uid
Study Instance UID to process.
std::map< std::string, std::string > parameters
Custom parameters for the model.
std::optional< std::string > callback_url
Callback URL for result notification (optional)
Parse URL into host, port, and path components.
static std::optional< parsed_url > parse(const std::string &url)
std::string_view name