20#include <kcenon/common/interfaces/executor_interface.h>
45 std::shared_ptr<kcenon::thread::thread_pool> thread_pool,
48 , thread_pool_(std::move(thread_pool))
57 std::shared_ptr<kcenon::common::interfaces::IExecutor> executor,
60 , executor_(std::move(executor))
92 "Auto prefetch service started interval_seconds={} max_concurrent={}",
98 stop(wait_for_completion);
133 const std::string& patient_id,
138 request.
request_time = std::chrono::system_clock::now();
141 auto saved_lookback = config_.criteria.lookback_period;
142 config_.criteria.lookback_period = lookback;
144 auto result = process_request(request);
146 config_.criteria.lookback_period = saved_lookback;
152 const std::vector<storage::worklist_item>& worklist_items) {
157 std::lock_guard<std::mutex> lock(
mutex_);
163 return execute_cycle();
171 const std::vector<storage::worklist_item>& worklist_items) {
173 for (
const auto& item : worklist_items) {
174 if (item.patient_id.empty()) {
183 request.
request_time = std::chrono::system_clock::now();
192 "Queued prefetch requests from worklist worklist_items={} queue_size={}",
193 worklist_items.size(),
203 std::lock_guard<std::mutex> lock(
mutex_);
208 std::lock_guard<std::mutex> lock(
mutex_);
213 -> std::optional<std::chrono::seconds> {
218 std::lock_guard<std::mutex> lock(
mutex_);
219 auto now = std::chrono::steady_clock::now();
221 return std::chrono::seconds{0};
224 return std::chrono::duration_cast<std::chrono::seconds>(
242 std::chrono::seconds interval) {
243 std::lock_guard<std::mutex> lock(
mutex_);
248 -> std::chrono::seconds {
249 std::lock_guard<std::mutex> lock(
mutex_);
255 std::lock_guard<std::mutex> lock(
mutex_);
261 std::lock_guard<std::mutex> lock(
mutex_);
267 std::lock_guard<std::mutex> lock(
mutex_);
273 std::lock_guard<std::mutex> lock(
mutex_);
285 std::unique_lock<std::mutex> lock(
mutex_);
289 cv_.wait_until(lock, wait_until, [
this]() {
300 auto now = std::chrono::steady_clock::now();
304 if (should_run_cycle) {
336 cycle_result.
timestamp = std::chrono::system_clock::now();
337 auto cycle_start = std::chrono::steady_clock::now();
340 while (
auto request = dequeue_request()) {
341 if (stop_requested_.load()) {
345 auto result = process_request(*request);
346 cycle_result += result;
349 cycle_result.
duration = std::chrono::duration_cast<std::chrono::milliseconds>(
350 std::chrono::steady_clock::now() - cycle_start);
353 "Prefetch cycle completed patients={} studies_prefetched={} studies_failed={} duration_ms={}",
361 "prefetch_cycle_duration_ms",
362 static_cast<double>(cycle_result.
duration.count()));
364 "prefetch_studies_total",
367 "prefetch_failures_total",
378 result.
timestamp = std::chrono::system_clock::now();
379 auto start_time = std::chrono::steady_clock::now();
382 "Processing prefetch request patient_id={} scheduled_modality={}",
384 request.scheduled_modality);
387 for (
const auto&
pacs : config_.remote_pacs) {
388 if (!
pacs.is_valid()) {
393 auto prior_studies = query_prior_studies(
396 config_.criteria.lookback_period);
399 auto filtered_studies = filter_studies(prior_studies, request);
402 for (
const auto& study : filtered_studies) {
404 if (study_exists_locally(study.study_instance_uid)) {
410 if (study.study_instance_uid == request.scheduled_study_uid) {
415 bool success = prefetch_study(
pacs, study);
422 if (config_.on_prefetch_complete) {
423 config_.on_prefetch_complete(
424 request.patient_id, study,
true,
"");
429 if (config_.on_prefetch_error) {
430 config_.on_prefetch_error(
432 study.study_instance_uid,
433 "Failed to prefetch study");
438 if (config_.rate_limit_per_minute > 0) {
439 auto elapsed = std::chrono::steady_clock::now() - start_time;
440 auto elapsed_minutes =
441 std::chrono::duration_cast<std::chrono::minutes>(elapsed)
443 if (elapsed_minutes < 1) {
444 auto prefetched_this_minute =
446 if (prefetched_this_minute >= config_.rate_limit_per_minute) {
448 std::this_thread::sleep_for(
449 std::chrono::seconds(60) - elapsed);
456 result.
duration = std::chrono::duration_cast<std::chrono::milliseconds>(
457 std::chrono::steady_clock::now() - start_time);
464 const std::string& patient_id,
465 std::chrono::days lookback) -> std::vector<prior_study_info> {
467 std::vector<prior_study_info> results;
470 auto now = std::chrono::system_clock::now();
471 auto from_time = now - lookback;
474 auto format_date = [](std::chrono::system_clock::time_point tp) {
475 auto time_t = std::chrono::system_clock::to_time_t(tp);
476 std::tm tm = *std::localtime(&time_t);
478 std::strftime(buffer,
sizeof(buffer),
"%Y%m%d", &tm);
479 return std::string(buffer);
482 std::string from_date = format_date(from_time);
483 std::string to_date = format_date(now);
486 "Querying prior studies remote_pacs={} patient_id={} from_date={} to_date={}",
487 pacs_config.ae_title,
500 {
"1.2.840.10008.1.2.1",
"1.2.840.10008.1.2"}
508 std::chrono::duration_cast<network::association::duration>(
509 pacs_config.connection_timeout));
511 if (connect_result.is_err()) {
513 "Failed to connect to remote PACS for C-FIND remote_pacs={} error={}",
514 pacs_config.ae_title,
515 connect_result.error().message);
519 auto assoc = std::move(connect_result.value());
530 query_config.
timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
531 pacs_config.association_timeout);
536 if (find_result.is_ok() && find_result.value().is_success()) {
537 for (
const auto& dataset : find_result.value().matches) {
539 info.study_instance_uid =
541 info.patient_id = patient_id;
546 info.study_description =
550 auto modalities_str =
552 if (!modalities_str.empty()) {
553 std::istringstream ss(modalities_str);
555 while (std::getline(ss, mod,
'\\')) {
557 info.modalities.insert(mod);
563 auto num_series_str = dataset.get_string(
565 if (!num_series_str.empty()) {
567 info.number_of_series = std::stoull(num_series_str);
571 auto num_instances_str = dataset.get_string(
573 if (!num_instances_str.empty()) {
575 info.number_of_instances = std::stoull(num_instances_str);
579 results.push_back(std::move(info));
583 "C-FIND query returned matches={} remote_pacs={} patient_id={}",
585 pacs_config.ae_title,
587 }
else if (find_result.is_err()) {
589 "C-FIND query failed remote_pacs={} error={}",
590 pacs_config.ae_title,
591 find_result.error().message);
595 (void)assoc.release();
596 }
catch (
const std::exception& e) {
598 "Exception querying prior studies remote_pacs={} error={}",
599 pacs_config.ae_title,
607 const std::vector<prior_study_info>& studies,
610 std::vector<prior_study_info> filtered;
612 for (
const auto& study : studies) {
614 bool modality_match =
true;
617 if (!config_.criteria.include_modalities.empty()) {
618 modality_match =
false;
619 for (
const auto& mod : study.modalities) {
620 if (config_.criteria.include_modalities.count(mod) > 0) {
621 modality_match =
true;
628 if (modality_match && !config_.criteria.exclude_modalities.empty()) {
629 for (
const auto& mod : study.modalities) {
630 if (config_.criteria.exclude_modalities.count(mod) > 0) {
631 modality_match =
false;
637 if (!modality_match) {
642 if (!config_.criteria.include_body_parts.empty()) {
643 if (config_.criteria.include_body_parts.count(
644 study.body_part_examined) == 0) {
649 filtered.push_back(study);
653 if (config_.criteria.prefer_same_modality ||
654 config_.criteria.prefer_same_body_part) {
655 std::ranges::sort(filtered, [&](
const auto& a,
const auto& b) {
659 if (config_.criteria.prefer_same_modality) {
660 if (a.modalities.count(request.scheduled_modality) > 0) {
663 if (b.modalities.count(request.scheduled_modality) > 0) {
668 if (config_.criteria.prefer_same_body_part) {
669 if (a.body_part_examined == request.scheduled_body_part) {
672 if (b.body_part_examined == request.scheduled_body_part) {
678 if (score_a != score_b) {
679 return score_a > score_b;
681 return a.study_date > b.study_date;
686 if (config_.criteria.max_studies_per_patient > 0 &&
687 filtered.size() > config_.criteria.max_studies_per_patient) {
688 filtered.resize(config_.criteria.max_studies_per_patient);
697 auto study_record = database_.find_study(study_uid);
698 return study_record.has_value();
706 "Prefetching study study_uid={} patient_id={} remote_pacs={}",
707 study.study_instance_uid,
709 pacs_config.ae_title);
719 {
"1.2.840.10008.1.2.1",
"1.2.840.10008.1.2"}
727 std::chrono::duration_cast<network::association::duration>(
728 pacs_config.connection_timeout));
730 if (connect_result.is_err()) {
732 "Failed to connect to remote PACS for C-MOVE remote_pacs={} error={}",
733 pacs_config.ae_title,
734 connect_result.error().message);
738 auto assoc = std::move(connect_result.value());
746 retrieve_config.
timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
747 pacs_config.association_timeout);
751 assoc, study.study_instance_uid);
754 (void)assoc.release();
756 if (move_result.is_ok() && move_result.value().is_success()) {
758 "Successfully prefetched study study_uid={} completed={} remote_pacs={}",
759 study.study_instance_uid,
760 move_result.value().completed,
761 pacs_config.ae_title);
765 if (move_result.is_err()) {
767 "C-MOVE failed study_uid={} remote_pacs={} error={}",
768 study.study_instance_uid,
769 pacs_config.ae_title,
770 move_result.error().message);
771 }
else if (move_result.value().has_failures()) {
773 "C-MOVE completed with failures study_uid={} completed={} failed={} remote_pacs={}",
774 study.study_instance_uid,
775 move_result.value().completed,
776 move_result.value().failed,
777 pacs_config.ae_title);
781 }
catch (
const std::exception& e) {
783 "Exception during prefetch study_uid={} remote_pacs={} error={}",
784 study.study_instance_uid,
785 pacs_config.ae_title,
808 -> std::optional<prefetch_request> {
809 std::lock_guard<std::mutex> lock(queue_mutex_);
811 if (request_queue_.empty()) {
815 auto request = std::move(request_queue_.front());
816 request_queue_.pop();
817 queued_patients_.erase(request.patient_id);
DICOM Association management per PS3.8.
Automatic prefetch service for prior studies.
static void debug(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a debug-level message.
static void info(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an info-level message.
static void error(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an error-level message.
static void record_histogram(std::string_view name, double value)
Record a histogram sample.
static void increment_counter(std::string_view name, std::int64_t value=1)
Increment a counter metric.
static Result< association > connect(const std::string &host, uint16_t port, const association_config &config, duration timeout=default_timeout)
Initiate an SCU association to a remote SCP.
network::Result< query_result > find_studies(network::association &assoc, const study_query_keys &keys)
Query for studies.
network::Result< retrieve_result > retrieve_study(network::association &assoc, std::string_view study_uid, retrieve_progress_callback progress=nullptr)
Retrieve a study by Study Instance UID.
void enable()
Enable the prefetch service.
std::set< std::string > queued_patients_
Set of patient IDs currently in queue (for deduplication)
void run_loop()
Background thread main loop.
auto get_last_result() const -> std::optional< prefetch_result >
Get the result of the last prefetch cycle.
auto get_cumulative_stats() const -> prefetch_result
Get cumulative statistics since service started.
void on_worklist_query(const std::vector< storage::worklist_item > &worklist_items)
Handle worklist query event.
std::optional< prefetch_result > last_result_
Last prefetch result.
std::condition_variable cv_
Condition variable for sleep/wake.
auto cycles_completed() const noexcept -> std::size_t
Get the number of cycles completed.
void set_prefetch_criteria(const prefetch_criteria &criteria)
Update the prefetch criteria.
void disable(bool wait_for_completion=true)
Disable/stop the prefetch service.
auto is_running() const noexcept -> bool
Check if the service is running (alias for is_enabled)
std::atomic< bool > enabled_
Flag indicating service is enabled.
std::queue< prefetch_request > request_queue_
Queue of pending prefetch requests.
auto_prefetch_service(storage::index_database &database, const prefetch_service_config &config={})
Construct auto prefetch service.
std::atomic< bool > cycle_in_progress_
Flag indicating a cycle is in progress.
void set_error_callback(prefetch_service_config::error_callback callback)
Set the error callback.
void trigger_cycle()
Trigger next cycle immediately.
void update_stats(const prefetch_result &result)
Update cumulative statistics.
auto execute_cycle() -> prefetch_result
Execute a single prefetch cycle.
std::thread worker_thread_
Background worker thread.
prefetch_result cumulative_stats_
Cumulative statistics.
auto is_enabled() const noexcept -> bool
Check if the service is enabled/running.
std::chrono::steady_clock::time_point next_cycle_time_
Time of next scheduled cycle.
auto prefetch_priors(const std::string &patient_id, std::chrono::days lookback=std::chrono::days{365}) -> prefetch_result
Manually prefetch prior studies for a patient.
std::atomic< std::size_t > cycles_count_
Number of completed cycles.
prefetch_service_config config_
Service configuration.
auto query_prior_studies(const remote_pacs_config &pacs_config, const std::string &patient_id, std::chrono::days lookback) -> std::vector< prior_study_info >
Query remote PACS for prior studies.
~auto_prefetch_service()
Destructor - ensures graceful shutdown.
auto process_request(const prefetch_request &request) -> prefetch_result
Process a single prefetch request.
auto filter_studies(const std::vector< prior_study_info > &studies, const prefetch_request &request) -> std::vector< prior_study_info >
Filter prior studies based on criteria.
std::mutex queue_mutex_
Mutex for request queue.
auto time_until_next_cycle() const -> std::optional< std::chrono::seconds >
Get the time until the next scheduled prefetch cycle.
std::atomic< bool > stop_requested_
Flag to signal shutdown.
auto run_prefetch_cycle() -> prefetch_result
Run a prefetch cycle manually.
auto get_prefetch_interval() const noexcept -> std::chrono::seconds
Get the current prefetch interval.
void set_prefetch_interval(std::chrono::seconds interval)
Update the prefetch interval.
void stop(bool wait_for_completion=true)
Stop the prefetch service (alias for disable)
void trigger_for_worklist(const std::vector< storage::worklist_item > &worklist_items)
Trigger prefetch for worklist items.
std::mutex mutex_
Mutex for thread synchronization.
void start()
Start the prefetch service (alias for enable)
void queue_request(const prefetch_request &request)
Add request to queue (deduplicated)
auto study_exists_locally(const std::string &study_uid) -> bool
Check if study already exists locally.
auto pending_requests() const noexcept -> std::size_t
Get the number of pending prefetch requests.
auto dequeue_request() -> std::optional< prefetch_request >
Get next request from queue.
void set_cycle_complete_callback(prefetch_service_config::cycle_complete_callback callback)
Set the cycle complete callback.
auto prefetch_study(const remote_pacs_config &pacs_config, const prior_study_info &study) -> bool
Prefetch a single study via C-MOVE.
auto get_prefetch_criteria() const noexcept -> const prefetch_criteria &
Get the current prefetch criteria.
Compile-time constants for commonly used DICOM tags.
Adapter for integrating common_system's IExecutor interface.
PACS index database for metadata storage and retrieval.
Adapter for DICOM audit logging using logger_system.
Adapter for PACS performance metrics and distributed tracing.
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
@ c_move
Request SCP to send to third party (requires move destination)
constexpr std::string_view study_root_move_sop_class_uid
Study Root Query/Retrieve Information Model - MOVE.
@ study
Study level - query study information.
@ study_root
Study Root Query/Retrieve Information Model.
DICOM Query SCU service (C-FIND sender)
DICOM Retrieve SCU service (C-MOVE/C-GET sender)
Configuration for SCU association request.
std::string called_ae_title
Remote AE Title (16 chars max)
std::string calling_ae_title
Our AE Title (16 chars max)
std::vector< proposed_presentation_context > proposed_contexts
Configuration for Query SCU service.
std::chrono::milliseconds timeout
Timeout for receiving query responses (milliseconds)
query_model model
Query information model (Patient Root or Study Root)
query_level level
Query level (Patient, Study, Series, or Image)
Configuration for Retrieve SCU service.
std::chrono::milliseconds timeout
Timeout for receiving responses (milliseconds)
query_level level
Query level (Study, Series, or Image)
query_model model
Query information model (Patient Root or Study Root)
retrieve_mode mode
Retrieve mode (C-MOVE or C-GET)
std::string move_destination
Move destination AE title (required for C-MOVE mode)
Query keys for STUDY level queries.
std::string patient_id
Patient ID (0010,0020) - for filtering.
std::string study_date
Study Date (0008,0020) - YYYYMMDD or range.
Prefetch selection criteria.
Prefetch request for a single patient.
std::string scheduled_study_uid
Study Instance UID of scheduled study (to avoid prefetching)
std::string patient_name
Patient Name.
std::string scheduled_modality
Scheduled modality (for preference matching)
std::string patient_id
Patient ID.
std::chrono::system_clock::time_point request_time
Request timestamp.
Prefetch result statistics.
std::size_t studies_prefetched
Number of studies prefetched successfully.
std::size_t studies_already_present
Number of studies already present (skipped)
std::chrono::system_clock::time_point timestamp
Time when this result was recorded.
std::size_t series_prefetched
Number of series prefetched successfully.
std::size_t instances_prefetched
Number of instances (images) prefetched.
std::chrono::milliseconds duration
Duration of the prefetch operation.
std::size_t studies_failed
Number of studies that failed to prefetch.
std::size_t patients_processed
Number of patients processed.
Configuration for the auto prefetch service.
prefetch_criteria criteria
Selection criteria for prior studies.
bool auto_start
Whether to start automatically on construction.
std::function< void(const prefetch_result &result)> cycle_complete_callback
Callback for prefetch cycle completion.
error_callback on_prefetch_error
std::size_t max_concurrent_prefetches
Maximum concurrent prefetch operations.
std::function< void(const std::string &patient_id, const std::string &study_uid, const std::string &error)> error_callback
Callback for prefetch errors.
cycle_complete_callback on_cycle_complete
bool enabled
Enable/disable the prefetch service.
std::chrono::seconds prefetch_interval
Interval between prefetch cycles (default: 5 minutes)
Remote PACS connection configuration.