16#ifdef PACS_WITH_DATABASE_SYSTEM
28#include <condition_variable>
32#include <shared_mutex>
35#include <unordered_set>
48std::string generate_uuid() {
49 static thread_local std::random_device rd;
50 static thread_local std::mt19937_64 gen(rd());
51 static thread_local std::uniform_int_distribution<uint64_t> dis;
53 uint64_t ab = dis(gen);
54 uint64_t cd = dis(gen);
57 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
58 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
60 std::ostringstream oss;
61 oss << std::hex << std::setfill(
'0');
62 oss << std::setw(8) << (ab >> 32);
64 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
66 oss << std::setw(4) << (ab & 0xFFFF);
68 oss << std::setw(4) << (cd >> 48);
70 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
78bool matches_filter(
const std::string& value,
const std::string& filter) {
79 if (filter.empty())
return true;
80 if (value.empty())
return false;
83 while (start < filter.size()) {
84 auto end = filter.find(
',', start);
85 if (end == std::string::npos) {
89 auto pattern = filter.substr(start, end - start);
91 while (!pattern.empty() && pattern.front() ==
' ') pattern.erase(0, 1);
92 while (!pattern.empty() && pattern.back() ==
' ') pattern.pop_back();
94 if (pattern == value) {
107std::string get_dataset_value(
const core::dicom_dataset& ds, core::dicom_tag tag) {
108 return ds.get_string(tag);
112constexpr core::dicom_tag body_part_examined_tag{0x0018, 0x0015};
161 #ifdef PACS_WITH_DATABASE_SYSTEM
164 if (result.is_err()) {
183 #ifdef PACS_WITH_DATABASE_SYSTEM
218 #ifdef PACS_WITH_DATABASE_SYSTEM
221 return result.is_ok() && result.value();
235 std::vector<prefetch_rule> matches;
239 if (!rule.enabled)
continue;
240 if (rule.trigger != trigger)
continue;
242 if (!matches_filter(
modality, rule.modality_filter))
continue;
243 if (!matches_filter(
body_part, rule.body_part_filter))
continue;
244 if (!matches_filter(
station_ae, rule.station_ae_filter))
continue;
246 matches.push_back(rule);
253 const std::string& patient_id,
254 const std::string& study_uid,
255 const std::string& rule_id,
256 const std::string& source_node_id,
257 const std::string& job_id,
258 const std::string& status) {
259 #ifdef PACS_WITH_DATABASE_SYSTEM
274 #ifdef PACS_WITH_DATABASE_SYSTEM
278 [[maybe_unused]]
auto result =
283 [[maybe_unused]]
auto result =
290 #ifdef PACS_WITH_DATABASE_SYSTEM
296 #ifdef PACS_WITH_DATABASE_SYSTEM
298 [[maybe_unused]]
auto result1 =
301 [[maybe_unused]]
auto result2 =
305 [[maybe_unused]]
auto result1 =
308 [[maybe_unused]]
auto result2 =
313 [[maybe_unused]]
auto result1 =
316 [[maybe_unused]]
auto result2 =
329 scheduler_cv.wait_for(lock, std::chrono::minutes(1), [
this] {
343 if (!rule.enabled)
continue;
345 if (rule.schedule_cron.empty())
continue;
361 return !worklist_monitor_running.load();
396 std::shared_ptr<remote_node_manager> node_manager,
398 std::shared_ptr<services::worklist_scu> worklist_scu,
399 std::shared_ptr<di::ILogger> logger)
402 std::move(repositories),
403 std::move(node_manager),
405 std::move(worklist_scu),
406 std::move(logger)) {}
411 std::shared_ptr<remote_node_manager> node_manager,
413 std::shared_ptr<services::worklist_scu> worklist_scu,
414 std::shared_ptr<di::ILogger> logger)
415 : impl_(std::make_unique<
impl>()) {
421 impl_->
logger = logger ? std::move(logger) : std::make_shared<di::NullLogger>();
427 std::shared_ptr<storage::prefetch_repository> repo,
428 std::shared_ptr<remote_node_manager> node_manager,
430 std::shared_ptr<services::worklist_scu> worklist_scu,
431 std::shared_ptr<di::ILogger> logger)
435 std::move(node_manager),
437 std::move(worklist_scu),
438 std::move(logger)) {}
442 std::shared_ptr<storage::prefetch_repository> repo,
443 std::shared_ptr<remote_node_manager> node_manager,
445 std::shared_ptr<services::worklist_scu> worklist_scu,
446 std::shared_ptr<di::ILogger> logger)
447 : impl_(std::make_unique<
impl>()) {
453 impl_->
logger = logger ? std::move(logger) : std::make_shared<di::NullLogger>();
475 if (new_rule.
rule_id.empty()) {
476 new_rule.
rule_id = generate_uuid();
480 #ifdef PACS_WITH_DATABASE_SYSTEM
483 if (result.is_err()) {
484 return result.error();
488 if (result.is_err()) {
495 if (result.is_err()) {
508 return kcenon::common::ok();
517 #ifdef PACS_WITH_DATABASE_SYSTEM
520 if (result.is_err()) {
521 return result.error();
525 if (result.is_err()) {
532 if (result.is_err()) {
542 if (cached_rule.rule_id == rule.
rule_id) {
550 return kcenon::common::ok();
555 #ifdef PACS_WITH_DATABASE_SYSTEM
558 if (result.is_err()) {
563 if (result.is_err()) {
570 if (result.is_err()) {
581 [&rule_id](
const prefetch_rule& r) { return r.rule_id == rule_id; }),
585 impl_->
logger->info_fmt(
"Removed prefetch rule: {}", std::string(rule_id));
586 return kcenon::common::ok();
592 if (rule.rule_id == rule_id) {
609 const std::vector<core::dicom_dataset>& worklist_items) {
610 for (
const auto& item : worklist_items) {
614 auto body_part = get_dataset_value(item, body_part_examined_tag);
617 if (patient_id.empty()) {
625 for (
const auto& rule : rules) {
632 impl_->
logger->debug_fmt(
"Worklist prefetch for patient {}: {} studies prefetched",
633 patient_id, result.studies_prefetched);
639 const std::vector<core::dicom_dataset>& worklist_items) {
640 return std::async(std::launch::async, [
this, worklist_items]() {
650 std::string_view patient_id,
651 std::string_view current_modality,
652 std::optional<std::string_view>
body_part) {
653 auto start_time = std::chrono::steady_clock::now();
660 std::string(current_modality),
665 impl_->
logger->debug_fmt(
"No prior study rules match for patient {} modality {}",
666 std::string(patient_id), std::string(current_modality));
671 const auto& rule = rules.front();
674 for (
const auto& source_node_id : rule.source_node_ids) {
677 impl_->
logger->warn_fmt(
"Source node {} not found", source_node_id);
686 impl_->
logger->info_fmt(
"Prefetching priors for patient {} from node {}",
687 std::string(patient_id), source_node_id);
690 auto end_time = std::chrono::steady_clock::now();
691 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
692 end_time - start_time);
698 std::string_view patient_id,
699 std::string_view current_modality,
700 std::optional<std::string_view>
body_part) {
701 std::string pid(patient_id);
702 std::string mod(current_modality);
703 std::optional<std::string> bp =
body_part ?
704 std::optional<std::string>(std::string(*
body_part)) : std::nullopt;
706 return std::async(std::launch::async, [
this, pid, mod, bp]() {
707 return prefetch_priors(pid, mod, bp ? std::optional<std::string_view>(*bp) : std::nullopt);
716 std::string_view source_node_id,
717 std::string_view study_uid) {
721 impl_->
logger->debug_fmt(
"Study {} already pending", std::string(study_uid));
725 impl_->
logger->debug_fmt(
"Study {} already local", std::string(study_uid));
741 std::string(study_uid),
743 std::string(source_node_id),
747 impl_->
logger->info_fmt(
"Created prefetch job {} for study {}", job_id, std::string(study_uid));
755 std::string_view source_node_id,
756 std::string_view patient_id,
757 std::chrono::hours lookback) {
764 impl_->
logger->info_fmt(
"Created prefetch job {} for patient {}",
765 job_id, std::string(patient_id));
828 impl_->
logger->info_fmt(
"Started worklist monitor for node {}", std::string(worklist_node_id));
864 #ifdef PACS_WITH_DATABASE_SYSTEM
868 return result.is_ok() ? result.value() : 0;
878 #ifdef PACS_WITH_DATABASE_SYSTEM
882 return result.is_ok() ? result.value() : 0;
892 std::string_view rule_id)
const {
897 stats.triggered_count = rule->triggered_count;
898 stats.studies_prefetched = rule->studies_prefetched;
void stop_worklist_monitor()
Stop the worklist monitor.
auto config() const noexcept -> const prefetch_manager_config &
Get current configuration.
auto prefetch_study(std::string_view source_node_id, std::string_view study_uid) -> std::string
Prefetch a specific study.
auto prefetch_patient(std::string_view source_node_id, std::string_view patient_id, std::chrono::hours lookback=std::chrono::hours{8760}) -> std::string
Prefetch all studies for a patient.
auto is_scheduler_running() const noexcept -> bool
Check if scheduler is running.
auto get_rule(std::string_view rule_id) const -> std::optional< prefetch_rule >
Get a rule by ID.
auto pending_prefetches() const -> size_t
Get number of pending prefetch operations.
auto completed_today() const -> size_t
Get number of prefetches completed today.
void process_worklist(const std::vector< core::dicom_dataset > &worklist_items)
Process worklist items and trigger prefetch.
prefetch_manager(prefetch_repositories repositories, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< job_manager > job_manager, std::shared_ptr< services::worklist_scu > worklist_scu=nullptr, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a prefetch manager from split repositories.
auto failed_today() const -> size_t
Get number of prefetches failed today.
auto remove_rule(std::string_view rule_id) -> kcenon::pacs::VoidResult
Remove a prefetch rule.
void set_config(prefetch_manager_config new_config)
Update configuration.
auto prefetch_priors_async(std::string_view patient_id, std::string_view current_modality, std::optional< std::string_view > body_part=std::nullopt) -> std::future< prefetch_result >
Prefetch prior studies asynchronously.
void start_worklist_monitor(std::string_view worklist_node_id)
Start the worklist monitor.
auto list_rules() const -> std::vector< prefetch_rule >
List all prefetch rules.
auto prefetch_priors(std::string_view patient_id, std::string_view current_modality, std::optional< std::string_view > body_part=std::nullopt) -> prefetch_result
Prefetch prior studies for a patient.
auto is_worklist_monitor_running() const noexcept -> bool
Check if worklist monitor is running.
~prefetch_manager()
Destructor - stops scheduler and monitor if running.
auto get_rule_statistics(std::string_view rule_id) const -> prefetch_rule_statistics
Get statistics for a specific rule.
auto add_rule(const prefetch_rule &rule) -> kcenon::pacs::VoidResult
Add a new prefetch rule.
void stop_scheduler()
Stop the scheduler.
auto update_rule(const prefetch_rule &rule) -> kcenon::pacs::VoidResult
Update an existing prefetch rule.
auto process_worklist_async(const std::vector< core::dicom_dataset > &worklist_items) -> std::future< void >
Process worklist items asynchronously.
std::unique_ptr< impl > impl_
void start_scheduler()
Start the scheduler for cron-based rules.
DICOM Dataset - ordered collection of Data Elements.
Compile-time constants for commonly used DICOM tags.
Job manager for asynchronous DICOM operations.
@ body_part
(0018,0015) Body Part Examined
@ modality
(0008,0060) Modality - CT, MR, US, etc.
@ station_ae
(0008,1010) Station Name or calling AE
@ low
Background operations.
prefetch_trigger
Trigger type for prefetch operations.
@ worklist_match
Triggered by worklist entry.
@ scheduled_exam
Based on scheduled procedure.
@ prior_studies
Fetch prior studies for patient.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Repository for prefetch history records using base_repository pattern.
Prefetch manager for proactive DICOM data loading.
Repository for prefetch rule and history persistence.
Repository for prefetch rules using base_repository pattern.
Remote PACS node manager for client operations.
History record for a single prefetch operation.
std::string patient_id
Patient ID.
std::string source_node_id
Source node ID.
std::chrono::system_clock::time_point prefetched_at
Timestamp.
std::string job_id
Associated job ID.
std::string study_uid
Study Instance UID.
std::string status
Status (pending, completed, failed)
std::string rule_id
Rule that triggered this (if any)
bool is_study_local(std::string_view study_uid) const
std::condition_variable worklist_cv
std::shared_ptr< services::worklist_scu > worklist_scu
std::mutex worklist_mutex
void load_rules_from_repo()
bool is_study_pending(const std::string &study_uid)
std::thread scheduler_thread
std::shared_mutex rules_mutex
prefetch_manager_config config
std::atomic< bool > scheduler_running
void check_scheduled_rules()
std::vector< prefetch_rule > get_matching_rules(prefetch_trigger trigger, const std::string &modality, const std::string &body_part, const std::string &station_ae)
std::unordered_set< std::string > pending_study_uids
void worklist_monitor_loop()
void mark_study_pending(const std::string &study_uid)
prefetch_repositories repositories
void record_prefetch_history(const std::string &patient_id, const std::string &study_uid, const std::string &rule_id, const std::string &source_node_id, const std::string &job_id, const std::string &status)
std::shared_ptr< di::ILogger > logger
std::shared_ptr< remote_node_manager > node_manager
std::shared_ptr< storage::prefetch_repository > compatibility_repo
std::thread worklist_monitor_thread
void save_rule_to_repo(const prefetch_rule &rule)
std::string worklist_node_id
std::atomic< size_t > pending_count
void mark_study_complete(const std::string &study_uid)
std::atomic< bool > worklist_monitor_running
void query_and_process_worklist()
std::mutex scheduler_mutex
std::vector< prefetch_rule > rules_cache
void increment_rule_stats(const std::string &rule_id, size_t studies)
std::shared_ptr< job_manager > job_mgr
std::condition_variable scheduler_cv
Configuration for the prefetch manager.
bool deduplicate_requests
Deduplicate pending requests.
std::chrono::seconds worklist_check_interval
Worklist polling interval.
std::shared_ptr< storage::prefetch_history_repository > history
std::shared_ptr< storage::prefetch_rule_repository > rules
Result of a prefetch operation.
std::string patient_id
Patient ID.
std::chrono::milliseconds elapsed
Operation duration.
Statistics for a prefetch rule.
Rule defining when and how to prefetch DICOM data.
std::string name
Human-readable name.
std::string rule_id
Unique rule identifier (UUID)
DICOM Modality Worklist SCU service (MWL C-FIND sender)