PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
prefetch_manager.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16#ifdef PACS_WITH_DATABASE_SYSTEM
19#endif
24
25#include <algorithm>
26#include <atomic>
27#include <chrono>
28#include <condition_variable>
29#include <iomanip>
30#include <mutex>
31#include <random>
32#include <shared_mutex>
33#include <sstream>
34#include <thread>
35#include <unordered_set>
36
37namespace kcenon::pacs::client {
38
39// =============================================================================
40// UUID Generation
41// =============================================================================
42
43namespace {
44
48std::string generate_uuid() {
49 static thread_local std::random_device rd;
50 static thread_local std::mt19937_64 gen(rd());
51 static thread_local std::uniform_int_distribution<uint64_t> dis;
52
53 uint64_t ab = dis(gen);
54 uint64_t cd = dis(gen);
55
56 // Set version (4) and variant (8, 9, A, or B)
57 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
58 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
59
60 std::ostringstream oss;
61 oss << std::hex << std::setfill('0');
62 oss << std::setw(8) << (ab >> 32);
63 oss << '-';
64 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
65 oss << '-';
66 oss << std::setw(4) << (ab & 0xFFFF);
67 oss << '-';
68 oss << std::setw(4) << (cd >> 48);
69 oss << '-';
70 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
71
72 return oss.str();
73}
74
78bool matches_filter(const std::string& value, const std::string& filter) {
79 if (filter.empty()) return true;
80 if (value.empty()) return false;
81
82 size_t start = 0;
83 while (start < filter.size()) {
84 auto end = filter.find(',', start);
85 if (end == std::string::npos) {
86 end = filter.size();
87 }
88
89 auto pattern = filter.substr(start, end - start);
90 // Trim whitespace
91 while (!pattern.empty() && pattern.front() == ' ') pattern.erase(0, 1);
92 while (!pattern.empty() && pattern.back() == ' ') pattern.pop_back();
93
94 if (pattern == value) {
95 return true;
96 }
97
98 start = end + 1;
99 }
100
101 return false;
102}
103
107std::string get_dataset_value(const core::dicom_dataset& ds, core::dicom_tag tag) {
108 return ds.get_string(tag);
109}
110
111// Local tag definitions for tags not in constants file
112constexpr core::dicom_tag body_part_examined_tag{0x0018, 0x0015};
113
114} // namespace
115
116// =============================================================================
117// Implementation Structure
118// =============================================================================
119
121 // Configuration
123
124 // Dependencies
126 std::shared_ptr<storage::prefetch_repository> compatibility_repo;
127 std::shared_ptr<remote_node_manager> node_manager;
128 std::shared_ptr<job_manager> job_mgr;
129 std::shared_ptr<services::worklist_scu> worklist_scu;
130 std::shared_ptr<di::ILogger> logger;
131
132 // Rules cache
133 std::vector<prefetch_rule> rules_cache;
134 mutable std::shared_mutex rules_mutex;
135
136 // Scheduler thread
137 std::thread scheduler_thread;
138 std::atomic<bool> scheduler_running{false};
139 std::condition_variable scheduler_cv;
140 std::mutex scheduler_mutex;
141
142 // Worklist monitor thread
144 std::atomic<bool> worklist_monitor_running{false};
145 std::string worklist_node_id;
146 std::condition_variable worklist_cv;
147 std::mutex worklist_mutex;
148
149 // Deduplication
150 std::unordered_set<std::string> pending_study_uids;
151 mutable std::mutex pending_mutex;
152
153 // Statistics
154 std::atomic<size_t> pending_count{0};
155
156 // =========================================================================
157 // Helper Methods
158 // =========================================================================
159
161 #ifdef PACS_WITH_DATABASE_SYSTEM
162 if (repositories.rules) {
163 auto result = repositories.rules->find_enabled();
164 if (result.is_err()) {
165 return;
166 }
167
168 std::unique_lock lock(rules_mutex);
169 rules_cache = std::move(result.value());
170 return;
171 }
172 #endif
173
174 if (!compatibility_repo) {
175 return;
176 }
177
178 std::unique_lock lock(rules_mutex);
179 rules_cache = compatibility_repo->find_enabled_rules();
180 }
181
183 #ifdef PACS_WITH_DATABASE_SYSTEM
184 if (repositories.rules) {
185 [[maybe_unused]] auto result = repositories.rules->save(rule);
186 } else if (compatibility_repo) {
187 [[maybe_unused]] auto result = compatibility_repo->save_rule(rule);
188 }
189 #else
190 if (compatibility_repo) {
191 [[maybe_unused]] auto result = compatibility_repo->save_rule(rule);
192 }
193 #endif
194 }
195
196 bool is_study_pending(const std::string& study_uid) {
197 std::lock_guard lock(pending_mutex);
198 return pending_study_uids.count(study_uid) > 0;
199 }
200
201 void mark_study_pending(const std::string& study_uid) {
202 std::lock_guard lock(pending_mutex);
203 pending_study_uids.insert(study_uid);
204 pending_count.fetch_add(1);
205 }
206
207 void mark_study_complete(const std::string& study_uid) {
208 std::lock_guard lock(pending_mutex);
209 pending_study_uids.erase(study_uid);
210 auto count = pending_count.load();
211 if (count > 0) {
212 pending_count.fetch_sub(1);
213 }
214 }
215
216 bool is_study_local(std::string_view study_uid) const {
217 // Check history for completed prefetch
218 #ifdef PACS_WITH_DATABASE_SYSTEM
219 if (repositories.history) {
220 auto result = repositories.history->is_study_prefetched(study_uid);
221 return result.is_ok() && result.value();
222 }
223 #endif
224 if (compatibility_repo) {
225 return compatibility_repo->is_study_prefetched(study_uid);
226 }
227 return false;
228 }
229
230 std::vector<prefetch_rule> get_matching_rules(
231 prefetch_trigger trigger,
232 const std::string& modality,
233 const std::string& body_part,
234 const std::string& station_ae) {
235 std::vector<prefetch_rule> matches;
236
237 std::shared_lock lock(rules_mutex);
238 for (const auto& rule : rules_cache) {
239 if (!rule.enabled) continue;
240 if (rule.trigger != trigger) continue;
241
242 if (!matches_filter(modality, rule.modality_filter)) continue;
243 if (!matches_filter(body_part, rule.body_part_filter)) continue;
244 if (!matches_filter(station_ae, rule.station_ae_filter)) continue;
245
246 matches.push_back(rule);
247 }
248
249 return matches;
250 }
251
253 const std::string& patient_id,
254 const std::string& study_uid,
255 const std::string& rule_id,
256 const std::string& source_node_id,
257 const std::string& job_id,
258 const std::string& status) {
259 #ifdef PACS_WITH_DATABASE_SYSTEM
260 if (!repositories.history && !compatibility_repo) return;
261 #else
262 if (!compatibility_repo) return;
263 #endif
264
265 prefetch_history history;
266 history.patient_id = patient_id;
267 history.study_uid = study_uid;
268 history.rule_id = rule_id;
269 history.source_node_id = source_node_id;
270 history.job_id = job_id;
271 history.status = status;
272 history.prefetched_at = std::chrono::system_clock::now();
273
274 #ifdef PACS_WITH_DATABASE_SYSTEM
275 if (repositories.history) {
276 [[maybe_unused]] auto result = repositories.history->save(history);
277 } else if (compatibility_repo) {
278 [[maybe_unused]] auto result =
279 compatibility_repo->save_history(history);
280 }
281 #else
282 if (compatibility_repo) {
283 [[maybe_unused]] auto result =
284 compatibility_repo->save_history(history);
285 }
286 #endif
287 }
288
289 void increment_rule_stats(const std::string& rule_id, size_t studies) {
290 #ifdef PACS_WITH_DATABASE_SYSTEM
291 if (!repositories.rules && !compatibility_repo) return;
292 #else
293 if (!compatibility_repo) return;
294 #endif
295
296 #ifdef PACS_WITH_DATABASE_SYSTEM
297 if (repositories.rules) {
298 [[maybe_unused]] auto result1 =
299 repositories.rules->increment_triggered(rule_id);
300 if (studies > 0) {
301 [[maybe_unused]] auto result2 =
302 repositories.rules->increment_studies_prefetched(rule_id, studies);
303 }
304 } else if (compatibility_repo) {
305 [[maybe_unused]] auto result1 =
306 compatibility_repo->increment_triggered(rule_id);
307 if (studies > 0) {
308 [[maybe_unused]] auto result2 =
309 compatibility_repo->increment_studies_prefetched(rule_id, studies);
310 }
311 }
312 #else
313 [[maybe_unused]] auto result1 =
314 compatibility_repo->increment_triggered(rule_id);
315 if (studies > 0) {
316 [[maybe_unused]] auto result2 =
317 compatibility_repo->increment_studies_prefetched(rule_id, studies);
318 }
319 #endif
320 }
321
322 // =========================================================================
323 // Scheduler Loop
324 // =========================================================================
325
327 while (scheduler_running.load()) {
328 std::unique_lock lock(scheduler_mutex);
329 scheduler_cv.wait_for(lock, std::chrono::minutes(1), [this] {
330 return !scheduler_running.load();
331 });
332
333 if (!scheduler_running.load()) break;
334
335 // Check scheduled rules
337 }
338 }
339
341 std::shared_lock lock(rules_mutex);
342 for (const auto& rule : rules_cache) {
343 if (!rule.enabled) continue;
344 if (rule.trigger != prefetch_trigger::scheduled_exam) continue;
345 if (rule.schedule_cron.empty()) continue;
346
347 // Simple cron check - in production would use proper cron parsing
348 // For now, just check if rule should run
349 // This is a placeholder for actual cron implementation
350 }
351 }
352
353 // =========================================================================
354 // Worklist Monitor Loop
355 // =========================================================================
356
358 while (worklist_monitor_running.load()) {
359 std::unique_lock lock(worklist_mutex);
360 worklist_cv.wait_for(lock, config.worklist_check_interval, [this] {
361 return !worklist_monitor_running.load();
362 });
363
364 if (!worklist_monitor_running.load()) break;
365
366 // Query worklist and process items
368 }
369 }
370
372 if (!worklist_scu || !node_manager || worklist_node_id.empty()) {
373 return;
374 }
375
376 // Get node configuration
377 auto node = node_manager->get_node(worklist_node_id);
378 if (!node) {
379 logger->warn_fmt("Worklist node {} not found", worklist_node_id);
380 return;
381 }
382
383 // Query today's worklist
384 // Note: This would establish association and query in production
385 // For now, this is a placeholder for the actual worklist query
386 logger->debug_fmt("Checking worklist from node {}", worklist_node_id);
387 }
388};
389
390// =============================================================================
391// Construction / Destruction
392// =============================================================================
393
395 prefetch_repositories repositories,
396 std::shared_ptr<remote_node_manager> node_manager,
397 std::shared_ptr<job_manager> job_manager,
398 std::shared_ptr<services::worklist_scu> worklist_scu,
399 std::shared_ptr<di::ILogger> logger)
402 std::move(repositories),
403 std::move(node_manager),
404 std::move(job_manager),
405 std::move(worklist_scu),
406 std::move(logger)) {}
407
409 const prefetch_manager_config& config,
410 prefetch_repositories repositories,
411 std::shared_ptr<remote_node_manager> node_manager,
412 std::shared_ptr<job_manager> job_manager,
413 std::shared_ptr<services::worklist_scu> worklist_scu,
414 std::shared_ptr<di::ILogger> logger)
415 : impl_(std::make_unique<impl>()) {
417 impl_->repositories = std::move(repositories);
418 impl_->node_manager = std::move(node_manager);
419 impl_->job_mgr = std::move(job_manager);
420 impl_->worklist_scu = std::move(worklist_scu);
421 impl_->logger = logger ? std::move(logger) : std::make_shared<di::NullLogger>();
422
424}
425
427 std::shared_ptr<storage::prefetch_repository> repo,
428 std::shared_ptr<remote_node_manager> node_manager,
429 std::shared_ptr<job_manager> job_manager,
430 std::shared_ptr<services::worklist_scu> worklist_scu,
431 std::shared_ptr<di::ILogger> logger)
434 std::move(repo),
435 std::move(node_manager),
436 std::move(job_manager),
437 std::move(worklist_scu),
438 std::move(logger)) {}
439
441 const prefetch_manager_config& config,
442 std::shared_ptr<storage::prefetch_repository> repo,
443 std::shared_ptr<remote_node_manager> node_manager,
444 std::shared_ptr<job_manager> job_manager,
445 std::shared_ptr<services::worklist_scu> worklist_scu,
446 std::shared_ptr<di::ILogger> logger)
447 : impl_(std::make_unique<impl>()) {
449 impl_->compatibility_repo = std::move(repo);
450 impl_->node_manager = std::move(node_manager);
451 impl_->job_mgr = std::move(job_manager);
452 impl_->worklist_scu = std::move(worklist_scu);
453 impl_->logger = logger ? std::move(logger) : std::make_shared<di::NullLogger>();
454
455 // Initialize tables if repository exists
457 [[maybe_unused]] auto result = impl_->compatibility_repo->initialize_tables();
458 }
459
460 // Load rules from repository
462}
463
468
469// =============================================================================
470// Rule Management
471// =============================================================================
472
473kcenon::pacs::VoidResult prefetch_manager::add_rule(const prefetch_rule& rule) {
474 prefetch_rule new_rule = rule;
475 if (new_rule.rule_id.empty()) {
476 new_rule.rule_id = generate_uuid();
477 }
478
479 // Save to repository
480 #ifdef PACS_WITH_DATABASE_SYSTEM
481 if (impl_->repositories.rules) {
482 auto result = impl_->repositories.rules->save(new_rule);
483 if (result.is_err()) {
484 return result.error();
485 }
486 } else if (impl_->compatibility_repo) {
487 auto result = impl_->compatibility_repo->save_rule(new_rule);
488 if (result.is_err()) {
489 return result;
490 }
491 }
492 #else
494 auto result = impl_->compatibility_repo->save_rule(new_rule);
495 if (result.is_err()) {
496 return result;
497 }
498 }
499 #endif
500
501 // Update cache
502 {
503 std::unique_lock lock(impl_->rules_mutex);
504 impl_->rules_cache.push_back(new_rule);
505 }
506
507 impl_->logger->info_fmt("Added prefetch rule: {} ({})", new_rule.name, new_rule.rule_id);
508 return kcenon::common::ok();
509}
510
511kcenon::pacs::VoidResult prefetch_manager::update_rule(const prefetch_rule& rule) {
512 if (rule.rule_id.empty()) {
513 return kcenon::pacs::pacs_void_error(-1, "Rule ID is required for update");
514 }
515
516 // Save to repository
517 #ifdef PACS_WITH_DATABASE_SYSTEM
518 if (impl_->repositories.rules) {
519 auto result = impl_->repositories.rules->save(rule);
520 if (result.is_err()) {
521 return result.error();
522 }
523 } else if (impl_->compatibility_repo) {
524 auto result = impl_->compatibility_repo->save_rule(rule);
525 if (result.is_err()) {
526 return result;
527 }
528 }
529 #else
531 auto result = impl_->compatibility_repo->save_rule(rule);
532 if (result.is_err()) {
533 return result;
534 }
535 }
536 #endif
537
538 // Update cache
539 {
540 std::unique_lock lock(impl_->rules_mutex);
541 for (auto& cached_rule : impl_->rules_cache) {
542 if (cached_rule.rule_id == rule.rule_id) {
543 cached_rule = rule;
544 break;
545 }
546 }
547 }
548
549 impl_->logger->info_fmt("Updated prefetch rule: {}", rule.rule_id);
550 return kcenon::common::ok();
551}
552
553kcenon::pacs::VoidResult prefetch_manager::remove_rule(std::string_view rule_id) {
554 // Remove from repository
555 #ifdef PACS_WITH_DATABASE_SYSTEM
556 if (impl_->repositories.rules) {
557 auto result = impl_->repositories.rules->remove(std::string(rule_id));
558 if (result.is_err()) {
559 return result;
560 }
561 } else if (impl_->compatibility_repo) {
562 auto result = impl_->compatibility_repo->remove_rule(rule_id);
563 if (result.is_err()) {
564 return result;
565 }
566 }
567 #else
569 auto result = impl_->compatibility_repo->remove_rule(rule_id);
570 if (result.is_err()) {
571 return result;
572 }
573 }
574 #endif
575
576 // Remove from cache
577 {
578 std::unique_lock lock(impl_->rules_mutex);
579 impl_->rules_cache.erase(
580 std::remove_if(impl_->rules_cache.begin(), impl_->rules_cache.end(),
581 [&rule_id](const prefetch_rule& r) { return r.rule_id == rule_id; }),
582 impl_->rules_cache.end());
583 }
584
585 impl_->logger->info_fmt("Removed prefetch rule: {}", std::string(rule_id));
586 return kcenon::common::ok();
587}
588
589std::optional<prefetch_rule> prefetch_manager::get_rule(std::string_view rule_id) const {
590 std::shared_lock lock(impl_->rules_mutex);
591 for (const auto& rule : impl_->rules_cache) {
592 if (rule.rule_id == rule_id) {
593 return rule;
594 }
595 }
596 return std::nullopt;
597}
598
599std::vector<prefetch_rule> prefetch_manager::list_rules() const {
600 std::shared_lock lock(impl_->rules_mutex);
601 return impl_->rules_cache;
602}
603
604// =============================================================================
605// Worklist-Driven Prefetch
606// =============================================================================
607
609 const std::vector<core::dicom_dataset>& worklist_items) {
610 for (const auto& item : worklist_items) {
611 // Extract relevant fields from worklist item
612 auto patient_id = get_dataset_value(item, core::tags::patient_id);
613 auto modality_val = get_dataset_value(item, core::tags::modality);
614 auto body_part = get_dataset_value(item, body_part_examined_tag);
615 auto station_ae = get_dataset_value(item, core::tags::scheduled_station_ae_title);
616
617 if (patient_id.empty()) {
618 continue;
619 }
620
621 // Find matching rules
622 auto rules = impl_->get_matching_rules(
624
625 for (const auto& rule : rules) {
626 // Trigger prefetch for this patient
627 auto result = prefetch_priors(patient_id, modality_val, body_part);
628
629 // Update rule statistics
630 impl_->increment_rule_stats(rule.rule_id, result.studies_prefetched);
631
632 impl_->logger->debug_fmt("Worklist prefetch for patient {}: {} studies prefetched",
633 patient_id, result.studies_prefetched);
634 }
635 }
636}
637
639 const std::vector<core::dicom_dataset>& worklist_items) {
640 return std::async(std::launch::async, [this, worklist_items]() {
641 process_worklist(worklist_items);
642 });
643}
644
645// =============================================================================
646// Prior Study Prefetch
647// =============================================================================
648
650 std::string_view patient_id,
651 std::string_view current_modality,
652 std::optional<std::string_view> body_part) {
653 auto start_time = std::chrono::steady_clock::now();
654 prefetch_result result;
655 result.patient_id = std::string(patient_id);
656
657 // Find matching prior study rules
658 auto rules = impl_->get_matching_rules(
660 std::string(current_modality),
661 body_part ? std::string(*body_part) : "",
662 "");
663
664 if (rules.empty()) {
665 impl_->logger->debug_fmt("No prior study rules match for patient {} modality {}",
666 std::string(patient_id), std::string(current_modality));
667 return result;
668 }
669
670 // Use the first matching rule (or could combine multiple)
671 const auto& rule = rules.front();
672
673 // Query prior studies from each source node
674 for (const auto& source_node_id : rule.source_node_ids) {
675 auto node = impl_->node_manager->get_node(source_node_id);
676 if (!node) {
677 impl_->logger->warn_fmt("Source node {} not found", source_node_id);
678 continue;
679 }
680
681 // Query for prior studies
682 // In production, this would use query_scu to search for studies
683 // For now, create prefetch jobs based on rule configuration
684
685 // Record that we attempted prefetch
686 impl_->logger->info_fmt("Prefetching priors for patient {} from node {}",
687 std::string(patient_id), source_node_id);
688 }
689
690 auto end_time = std::chrono::steady_clock::now();
691 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
692 end_time - start_time);
693
694 return result;
695}
696
697std::future<prefetch_result> prefetch_manager::prefetch_priors_async(
698 std::string_view patient_id,
699 std::string_view current_modality,
700 std::optional<std::string_view> body_part) {
701 std::string pid(patient_id);
702 std::string mod(current_modality);
703 std::optional<std::string> bp = body_part ?
704 std::optional<std::string>(std::string(*body_part)) : std::nullopt;
705
706 return std::async(std::launch::async, [this, pid, mod, bp]() {
707 return prefetch_priors(pid, mod, bp ? std::optional<std::string_view>(*bp) : std::nullopt);
708 });
709}
710
711// =============================================================================
712// Manual Prefetch
713// =============================================================================
714
716 std::string_view source_node_id,
717 std::string_view study_uid) {
718 // Check deduplication
720 if (impl_->is_study_pending(std::string(study_uid))) {
721 impl_->logger->debug_fmt("Study {} already pending", std::string(study_uid));
722 return "";
723 }
724 if (impl_->is_study_local(study_uid)) {
725 impl_->logger->debug_fmt("Study {} already local", std::string(study_uid));
726 return "";
727 }
728 }
729
730 // Mark as pending
731 impl_->mark_study_pending(std::string(study_uid));
732
733 // Create retrieve job
734 if (impl_->job_mgr) {
735 auto job_id = impl_->job_mgr->create_retrieve_job(
736 source_node_id, study_uid, std::nullopt, job_priority::low);
737
738 // Record history
740 "", // patient_id unknown
741 std::string(study_uid),
742 "", // no rule
743 std::string(source_node_id),
744 job_id,
745 "pending");
746
747 impl_->logger->info_fmt("Created prefetch job {} for study {}", job_id, std::string(study_uid));
748 return job_id;
749 }
750
751 return "";
752}
753
755 std::string_view source_node_id,
756 std::string_view patient_id,
757 std::chrono::hours lookback) {
758 (void)lookback; // Reserved for future time-based filtering
759 // Create prefetch job for the patient
760 if (impl_->job_mgr) {
761 auto job_id = impl_->job_mgr->create_prefetch_job(
762 source_node_id, patient_id, job_priority::low);
763
764 impl_->logger->info_fmt("Created prefetch job {} for patient {}",
765 job_id, std::string(patient_id));
766 return job_id;
767 }
768
769 return "";
770}
771
772// =============================================================================
773// Scheduler Control
774// =============================================================================
775
777 if (impl_->scheduler_running.load()) {
778 return;
779 }
780
781 impl_->scheduler_running.store(true);
782 impl_->scheduler_thread = std::thread([this]() {
784 });
785
786 impl_->logger->info("Started prefetch scheduler");
787}
788
790 if (!impl_->scheduler_running.load()) {
791 return;
792 }
793
794 impl_->scheduler_running.store(false);
795
796 // Notify with lock held to ensure thread sees the flag change
797 {
798 std::lock_guard<std::mutex> lock(impl_->scheduler_mutex);
799 impl_->scheduler_cv.notify_all();
800 }
801
802 if (impl_->scheduler_thread.joinable()) {
803 impl_->scheduler_thread.join();
804 }
805
806 impl_->logger->info("Stopped prefetch scheduler");
807}
808
810 return impl_->scheduler_running.load();
811}
812
813// =============================================================================
814// Worklist Monitor Control
815// =============================================================================
816
817void prefetch_manager::start_worklist_monitor(std::string_view worklist_node_id) {
818 if (impl_->worklist_monitor_running.load()) {
819 return;
820 }
821
822 impl_->worklist_node_id = std::string(worklist_node_id);
823 impl_->worklist_monitor_running.store(true);
824 impl_->worklist_monitor_thread = std::thread([this]() {
826 });
827
828 impl_->logger->info_fmt("Started worklist monitor for node {}", std::string(worklist_node_id));
829}
830
832 if (!impl_->worklist_monitor_running.load()) {
833 return;
834 }
835
836 impl_->worklist_monitor_running.store(false);
837
838 // Notify with lock held to ensure thread sees the flag change
839 {
840 std::lock_guard<std::mutex> lock(impl_->worklist_mutex);
841 impl_->worklist_cv.notify_all();
842 }
843
844 if (impl_->worklist_monitor_thread.joinable()) {
846 }
847
848 impl_->logger->info("Stopped worklist monitor");
849}
850
852 return impl_->worklist_monitor_running.load();
853}
854
855// =============================================================================
856// Status and Statistics
857// =============================================================================
858
860 return impl_->pending_count.load();
861}
862
864 #ifdef PACS_WITH_DATABASE_SYSTEM
866 auto result =
867 impl_->repositories.history->count_by_status_on_current_date("completed");
868 return result.is_ok() ? result.value() : 0;
869 }
870 #endif
872 return impl_->compatibility_repo->count_completed_today();
873 }
874 return 0;
875}
876
878 #ifdef PACS_WITH_DATABASE_SYSTEM
880 auto result =
881 impl_->repositories.history->count_by_status_on_current_date("failed");
882 return result.is_ok() ? result.value() : 0;
883 }
884 #endif
886 return impl_->compatibility_repo->count_failed_today();
887 }
888 return 0;
889}
890
892 std::string_view rule_id) const {
894
895 auto rule = get_rule(rule_id);
896 if (rule) {
897 stats.triggered_count = rule->triggered_count;
898 stats.studies_prefetched = rule->studies_prefetched;
899 }
900
901 return stats;
902}
903
904// =============================================================================
905// Configuration
906// =============================================================================
907
909 return impl_->config;
910}
911
913 impl_->config = new_config;
914}
915
916} // namespace kcenon::pacs::client
void stop_worklist_monitor()
Stop the worklist monitor.
auto config() const noexcept -> const prefetch_manager_config &
Get current configuration.
auto prefetch_study(std::string_view source_node_id, std::string_view study_uid) -> std::string
Prefetch a specific study.
auto prefetch_patient(std::string_view source_node_id, std::string_view patient_id, std::chrono::hours lookback=std::chrono::hours{8760}) -> std::string
Prefetch all studies for a patient.
auto is_scheduler_running() const noexcept -> bool
Check if scheduler is running.
auto get_rule(std::string_view rule_id) const -> std::optional< prefetch_rule >
Get a rule by ID.
auto pending_prefetches() const -> size_t
Get number of pending prefetch operations.
auto completed_today() const -> size_t
Get number of prefetches completed today.
void process_worklist(const std::vector< core::dicom_dataset > &worklist_items)
Process worklist items and trigger prefetch.
prefetch_manager(prefetch_repositories repositories, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< job_manager > job_manager, std::shared_ptr< services::worklist_scu > worklist_scu=nullptr, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a prefetch manager from split repositories.
auto failed_today() const -> size_t
Get number of prefetches failed today.
auto remove_rule(std::string_view rule_id) -> kcenon::pacs::VoidResult
Remove a prefetch rule.
void set_config(prefetch_manager_config new_config)
Update configuration.
auto prefetch_priors_async(std::string_view patient_id, std::string_view current_modality, std::optional< std::string_view > body_part=std::nullopt) -> std::future< prefetch_result >
Prefetch prior studies asynchronously.
void start_worklist_monitor(std::string_view worklist_node_id)
Start the worklist monitor.
auto list_rules() const -> std::vector< prefetch_rule >
List all prefetch rules.
auto prefetch_priors(std::string_view patient_id, std::string_view current_modality, std::optional< std::string_view > body_part=std::nullopt) -> prefetch_result
Prefetch prior studies for a patient.
auto is_worklist_monitor_running() const noexcept -> bool
Check if worklist monitor is running.
~prefetch_manager()
Destructor - stops scheduler and monitor if running.
auto get_rule_statistics(std::string_view rule_id) const -> prefetch_rule_statistics
Get statistics for a specific rule.
auto add_rule(const prefetch_rule &rule) -> kcenon::pacs::VoidResult
Add a new prefetch rule.
auto update_rule(const prefetch_rule &rule) -> kcenon::pacs::VoidResult
Update an existing prefetch rule.
auto process_worklist_async(const std::vector< core::dicom_dataset > &worklist_items) -> std::future< void >
Process worklist items asynchronously.
void start_scheduler()
Start the scheduler for cron-based rules.
DICOM Dataset - ordered collection of Data Elements.
Compile-time constants for commonly used DICOM tags.
Job manager for asynchronous DICOM operations.
@ body_part
(0018,0015) Body Part Examined
@ modality
(0008,0060) Modality - CT, MR, US, etc.
@ station_ae
(0008,1010) Station Name or calling AE
@ low
Background operations.
prefetch_trigger
Trigger type for prefetch operations.
@ worklist_match
Triggered by worklist entry.
@ scheduled_exam
Based on scheduled procedure.
@ prior_studies
Fetch prior studies for patient.
constexpr dicom_tag patient_id
Patient ID.
constexpr dicom_tag modality
Modality.
constexpr dicom_tag scheduled_station_ae_title
Scheduled Station AE Title.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
Repository for prefetch history records using base_repository pattern.
Prefetch manager for proactive DICOM data loading.
Repository for prefetch rule and history persistence.
Repository for prefetch rules using base_repository pattern.
Remote PACS node manager for client operations.
History record for a single prefetch operation.
std::string source_node_id
Source node ID.
std::chrono::system_clock::time_point prefetched_at
Timestamp.
std::string job_id
Associated job ID.
std::string study_uid
Study Instance UID.
std::string status
Status (pending, completed, failed)
std::string rule_id
Rule that triggered this (if any)
bool is_study_local(std::string_view study_uid) const
std::shared_ptr< services::worklist_scu > worklist_scu
bool is_study_pending(const std::string &study_uid)
std::vector< prefetch_rule > get_matching_rules(prefetch_trigger trigger, const std::string &modality, const std::string &body_part, const std::string &station_ae)
std::unordered_set< std::string > pending_study_uids
void mark_study_pending(const std::string &study_uid)
void record_prefetch_history(const std::string &patient_id, const std::string &study_uid, const std::string &rule_id, const std::string &source_node_id, const std::string &job_id, const std::string &status)
std::shared_ptr< remote_node_manager > node_manager
std::shared_ptr< storage::prefetch_repository > compatibility_repo
void save_rule_to_repo(const prefetch_rule &rule)
void mark_study_complete(const std::string &study_uid)
void increment_rule_stats(const std::string &rule_id, size_t studies)
Configuration for the prefetch manager.
bool deduplicate_requests
Deduplicate pending requests.
std::chrono::seconds worklist_check_interval
Worklist polling interval.
std::shared_ptr< storage::prefetch_history_repository > history
std::shared_ptr< storage::prefetch_rule_repository > rules
Result of a prefetch operation.
std::chrono::milliseconds elapsed
Operation duration.
Rule defining when and how to prefetch DICOM data.
std::string name
Human-readable name.
std::string rule_id
Unique rule identifier (UUID)
DICOM Modality Worklist SCU service (MWL C-FIND sender)