PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
kcenon::pacs::client::job_manager::impl Struct Reference
Collaboration diagram for kcenon::pacs::client::job_manager::impl:
Collaboration graph

Public Member Functions

void save_job (const job_record &job)
 
std::optional< job_recordget_job_from_cache (std::string_view job_id) const
 
void update_job_status (const std::string &job_id, job_status status, const std::string &error_msg="", const std::string &error_details="")
 
void notify_progress (const std::string &job_id, const job_progress &progress)
 
void notify_completion (const std::string &job_id, const job_record &record)
 
bool is_job_cancelled (const std::string &job_id)
 
bool is_job_paused (const std::string &job_id)
 
void mark_job_active (const std::string &job_id)
 
void mark_job_inactive (const std::string &job_id)
 
void enqueue_job (const std::string &job_id, job_priority priority)
 
void worker_loop ()
 
void execute_job (job_record &job)
 
void execute_retrieve_job (job_record &job)
 
void execute_store_job (job_record &job)
 
void execute_query_job (job_record &job)
 
void execute_sync_job (job_record &job)
 
void execute_prefetch_job (job_record &job)
 
void update_progress (const std::string &job_id, const job_progress &progress)
 
void complete_job (const std::string &job_id, job_status status, const std::string &error_msg="")
 
void load_pending_jobs_from_repo ()
 

Public Attributes

job_manager_config config
 
std::shared_ptr< storage::job_repositoryrepo
 
std::shared_ptr< remote_node_managernode_manager
 
std::shared_ptr< di::ILoggerlogger
 
std::unordered_map< std::string, job_recordjob_cache
 
std::shared_mutex cache_mutex
 
std::priority_queue< queue_entryjob_queue
 
std::mutex queue_mutex
 
std::condition_variable queue_cv
 
std::unordered_set< std::string > active_job_ids
 
std::mutex active_mutex
 
std::unordered_set< std::string > paused_job_ids
 
std::mutex paused_mutex
 
std::unordered_set< std::string > cancelled_job_ids
 
std::mutex cancelled_mutex
 
std::vector< std::thread > workers
 
std::atomic< bool > running {false}
 
job_progress_callback progress_callback
 
job_completion_callback completion_callback
 
std::shared_mutex callbacks_mutex
 
std::unordered_map< std::string, std::shared_ptr< std::promise< job_record > > > completion_promises
 
std::mutex promises_mutex
 

Detailed Description

Definition at line 102 of file job_manager.cpp.

Member Function Documentation

◆ complete_job()

void kcenon::pacs::client::job_manager::impl::complete_job ( const std::string & job_id,
job_status status,
const std::string & error_msg = "" )
inline

Definition at line 1080 of file job_manager.cpp.

1081 {
1082 update_job_status(job_id, status, error_msg);
1083
1084 // Get final job record for notification
1085 auto job_opt = get_job_from_cache(job_id);
1086 if (job_opt) {
1087 notify_completion(job_id, *job_opt);
1088 }
1089
1090 // Clean up cancelled set
1091 if (status == job_status::cancelled) {
1092 std::lock_guard lock(cancelled_mutex);
1093 cancelled_job_ids.erase(job_id);
1094 }
1095
1096 logger->info_fmt("Job {} completed with status: {}", job_id, to_string(status));
1097 }
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
@ cancelled
Job was cancelled by user.
void update_job_status(const std::string &job_id, job_status status, const std::string &error_msg="", const std::string &error_details="")
std::unordered_set< std::string > cancelled_job_ids
std::shared_ptr< di::ILogger > logger
void notify_completion(const std::string &job_id, const job_record &record)
std::optional< job_record > get_job_from_cache(std::string_view job_id) const

References kcenon::pacs::client::cancelled, cancelled_job_ids, cancelled_mutex, get_job_from_cache(), logger, notify_completion(), kcenon::pacs::client::to_string(), and update_job_status().

Referenced by execute_job(), execute_prefetch_job(), execute_query_job(), execute_retrieve_job(), execute_store_job(), execute_sync_job(), and worker_loop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ enqueue_job()

void kcenon::pacs::client::job_manager::impl::enqueue_job ( const std::string & job_id,
job_priority priority )
inline

Definition at line 257 of file job_manager.cpp.

257 {
258 {
259 std::lock_guard lock(queue_mutex);
260 job_queue.push({job_id, priority, std::chrono::system_clock::now()});
261 }
262 queue_cv.notify_one();
263 }
constexpr dicom_tag priority
Priority.
std::priority_queue< queue_entry > job_queue

References job_queue, queue_cv, and queue_mutex.

Referenced by load_pending_jobs_from_repo(), kcenon::pacs::client::job_manager::start_job(), and worker_loop().

Here is the caller graph for this function:

◆ execute_job()

void kcenon::pacs::client::job_manager::impl::execute_job ( job_record & job)
inline

Definition at line 335 of file job_manager.cpp.

335 {
336 switch (job.type) {
339 break;
340 case job_type::store:
342 break;
343 case job_type::query:
345 break;
346 case job_type::sync:
347 execute_sync_job(job);
348 break;
351 break;
352 default:
353 complete_job(job.job_id, job_status::failed, "Unsupported job type");
354 break;
355 }
356 }
@ prefetch
Prefetch prior studies.
@ store
C-STORE operation.
@ retrieve
C-MOVE/C-GET operation.
@ failed
Job failed with error.
void complete_job(const std::string &job_id, job_status status, const std::string &error_msg="")

References complete_job(), execute_prefetch_job(), execute_query_job(), execute_retrieve_job(), execute_store_job(), execute_sync_job(), kcenon::pacs::client::failed, kcenon::pacs::client::prefetch, kcenon::pacs::client::query, kcenon::pacs::client::retrieve, kcenon::pacs::client::store, and kcenon::pacs::client::sync.

Referenced by worker_loop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_prefetch_job()

void kcenon::pacs::client::job_manager::impl::execute_prefetch_job ( job_record & job)
inline

Definition at line 931 of file job_manager.cpp.

931 {
932 logger->info_fmt("Executing prefetch job {} for patient {} from node {}",
933 job.job_id, job.patient_id.value_or("unknown"),
934 job.source_node_id);
935
936 // Prefetch retrieves prior studies for a patient before they arrive
937
938 // Validate patient ID
939 if (!job.patient_id.has_value()) {
941 "No patient ID specified for prefetch job");
942 return;
943 }
944
945 // Get the source node
946 auto node_opt = node_manager->get_node(job.source_node_id);
947 if (!node_opt) {
949 "Source node not found: " + job.source_node_id);
950 return;
951 }
952
953 const auto& node = *node_opt;
954 if (!node.is_online()) {
956 "Source node is offline: " + job.source_node_id);
957 return;
958 }
959
960 if (!node.supports_find || !node.supports_query_retrieve()) {
962 "Node does not support query/retrieve for prefetch");
963 return;
964 }
965
966 // Check for cancellation
967 if (is_job_cancelled(job.job_id)) {
969 return;
970 }
971
972 // Wait if paused
973 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
974 std::this_thread::sleep_for(std::chrono::milliseconds(100));
975 }
976 if (is_job_cancelled(job.job_id)) {
978 return;
979 }
980
981 // Initialize progress
982 job_progress progress;
983 progress.total_items = 2; // Query + Retrieve phases
984 progress.current_item_description = "Querying prior studies";
985 update_progress(job.job_id, progress);
986
987 // Acquire association for query
988 std::vector<std::string> query_sop_classes = {
990 };
991
992 auto assoc_result = node_manager->acquire_association(
993 job.source_node_id, query_sop_classes);
994 if (assoc_result.is_err()) {
996 "Failed to acquire association: " + assoc_result.error().message);
997 return;
998 }
999
1000 auto assoc = std::move(assoc_result.value());
1001
1002 // Configure query for patient's studies
1003 services::query_scu_config query_config;
1004 query_config.model = services::query_model::study_root;
1005 query_config.level = services::query_level::study;
1006
1007 services::query_scu query_scu(query_config, logger);
1008
1009 // Query for all studies of this patient
1010 core::dicom_dataset query_keys;
1011 query_keys.set_string(core::tags::query_retrieve_level, encoding::vr_type::CS, "STUDY");
1012 query_keys.set_string(core::tags::patient_id, encoding::vr_type::LO, *job.patient_id);
1013 query_keys.set_string(core::tags::study_instance_uid, encoding::vr_type::UI, "");
1014 query_keys.set_string(core::tags::study_date, encoding::vr_type::DA, "");
1015 query_keys.set_string(core::tags::modality, encoding::vr_type::CS, "");
1016
1017 // Execute query
1018 auto query_result_res = query_scu.find(*assoc, query_keys);
1019
1020 // Release query association
1021 node_manager->release_association(job.source_node_id, std::move(assoc));
1022
1023 if (query_result_res.is_err()) {
1025 "Prefetch query failed: " + query_result_res.error().message);
1026 return;
1027 }
1028
1029 const auto& query_result = query_result_res.value();
1030
1031 progress.completed_items = 1;
1032 progress.current_item_description =
1033 std::to_string(query_result.matches.size()) + " prior studies found";
1034 update_progress(job.job_id, progress);
1035
1036 if (is_job_cancelled(job.job_id)) {
1038 return;
1039 }
1040
1041 // Complete prefetch - in full implementation would create
1042 // retrieve jobs for each prior study
1043 progress.completed_items = 2;
1044 progress.percent_complete = 100.0f;
1045 progress.current_item_description =
1046 "Prefetch complete: " + std::to_string(query_result.matches.size()) + " studies identified";
1047 update_progress(job.job_id, progress);
1048
1049 // Store result in metadata
1050 {
1051 std::unique_lock lock(cache_mutex);
1052 auto it = job_cache.find(job.job_id);
1053 if (it != job_cache.end()) {
1054 it->second.metadata["prior_studies"] = std::to_string(query_result.matches.size());
1055 }
1056 }
1057
1059 }
@ completed
Job completed successfully.
constexpr dicom_tag patient_id
Patient ID.
constexpr dicom_tag query_retrieve_level
Query/Retrieve Level.
constexpr dicom_tag modality
Modality.
constexpr dicom_tag study_instance_uid
Study Instance UID.
constexpr dicom_tag study_date
Study Date.
@ DA
Date (8 chars, format: YYYYMMDD)
@ LO
Long String (64 chars max)
@ UI
Unique Identifier (64 chars max)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
Definition query_scp.h:42
@ study
Study level - query study information.
@ study_root
Study Root Query/Retrieve Information Model.
bool is_job_paused(const std::string &job_id)
void update_progress(const std::string &job_id, const job_progress &progress)
bool is_job_cancelled(const std::string &job_id)
std::unordered_map< std::string, job_record > job_cache
std::shared_ptr< remote_node_manager > node_manager

References cache_mutex, kcenon::pacs::client::cancelled, complete_job(), kcenon::pacs::client::completed, kcenon::pacs::client::job_progress::completed_items, kcenon::pacs::encoding::CS, kcenon::pacs::client::job_progress::current_item_description, kcenon::pacs::encoding::DA, kcenon::pacs::client::failed, is_job_cancelled(), is_job_paused(), job_cache, kcenon::pacs::services::query_scu_config::level, kcenon::pacs::encoding::LO, logger, kcenon::pacs::core::tags::modality, kcenon::pacs::services::query_scu_config::model, node_manager, kcenon::pacs::core::tags::patient_id, kcenon::pacs::client::job_progress::percent_complete, kcenon::pacs::core::tags::query_retrieve_level, kcenon::pacs::core::dicom_dataset::set_string(), kcenon::pacs::services::study, kcenon::pacs::core::tags::study_date, kcenon::pacs::core::tags::study_instance_uid, kcenon::pacs::services::study_root, kcenon::pacs::services::study_root_find_sop_class_uid, kcenon::pacs::client::job_progress::total_items, kcenon::pacs::encoding::UI, and update_progress().

Referenced by execute_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_query_job()

void kcenon::pacs::client::job_manager::impl::execute_query_job ( job_record & job)
inline

Definition at line 647 of file job_manager.cpp.

647 {
648 logger->info_fmt("Executing query job {} on node {}",
649 job.job_id, job.source_node_id);
650
651 // Get the node information
652 auto node_opt = node_manager->get_node(job.source_node_id);
653 if (!node_opt) {
655 "Node not found: " + job.source_node_id);
656 return;
657 }
658
659 const auto& node = *node_opt;
660 if (!node.is_online()) {
662 "Node is offline: " + job.source_node_id);
663 return;
664 }
665
666 if (!node.supports_find) {
668 "Node does not support C-FIND");
669 return;
670 }
671
672 // Check for cancellation
673 if (is_job_cancelled(job.job_id)) {
675 return;
676 }
677
678 // Wait if paused
679 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
680 std::this_thread::sleep_for(std::chrono::milliseconds(100));
681 }
682 if (is_job_cancelled(job.job_id)) {
684 return;
685 }
686
687 // Get query level from metadata
688 std::string query_level = "STUDY"; // Default
689 auto level_it = job.metadata.find("query_level");
690 if (level_it != job.metadata.end()) {
691 query_level = level_it->second;
692 }
693
694 // Determine SOP class based on model (default to study root)
695 std::vector<std::string> sop_classes = {
697 };
698
699 // Acquire association
700 auto assoc_result = node_manager->acquire_association(
701 job.source_node_id, sop_classes);
702 if (assoc_result.is_err()) {
704 "Failed to acquire association: " + assoc_result.error().message);
705 return;
706 }
707
708 auto assoc = std::move(assoc_result.value());
709
710 // Configure query SCU
711 services::query_scu_config query_config;
712 query_config.model = services::query_model::study_root;
713
714 // Set level
715 if (query_level == "PATIENT") {
716 query_config.level = services::query_level::patient;
717 } else if (query_level == "STUDY") {
718 query_config.level = services::query_level::study;
719 } else if (query_level == "SERIES") {
720 query_config.level = services::query_level::series;
721 } else if (query_level == "IMAGE") {
722 query_config.level = services::query_level::image;
723 }
724
725 services::query_scu scu(query_config, logger);
726
727 // Build query keys from metadata
728 core::dicom_dataset query_keys;
729 query_keys.set_string(core::tags::query_retrieve_level, encoding::vr_type::CS, query_level);
730
731 // Copy query keys from job metadata
732 for (const auto& [key, value] : job.metadata) {
733 if (key.starts_with("query_") && key != "query_level") {
734 std::string tag_name = key.substr(6); // Remove "query_" prefix
735 // Map common tag names to DICOM tags
736 if (tag_name == "PatientID") {
737 query_keys.set_string(core::tags::patient_id, encoding::vr_type::LO, value);
738 } else if (tag_name == "PatientName") {
739 query_keys.set_string(core::tags::patient_name, encoding::vr_type::PN, value);
740 } else if (tag_name == "StudyDate") {
741 query_keys.set_string(core::tags::study_date, encoding::vr_type::DA, value);
742 } else if (tag_name == "StudyInstanceUID") {
743 query_keys.set_string(core::tags::study_instance_uid, encoding::vr_type::UI, value);
744 } else if (tag_name == "AccessionNumber") {
745 query_keys.set_string(core::tags::accession_number, encoding::vr_type::SH, value);
746 } else if (tag_name == "Modality") {
747 query_keys.set_string(core::tags::modality, encoding::vr_type::CS, value);
748 } else if (tag_name == "SeriesInstanceUID") {
749 query_keys.set_string(core::tags::series_instance_uid, encoding::vr_type::UI, value);
750 }
751 }
752 }
753
754 // Initialize progress
755 job_progress progress;
756 progress.total_items = 1; // Query is a single operation
757 update_progress(job.job_id, progress);
758
759 // Execute query
760 auto result = scu.find(*assoc, query_keys);
761
762 // Release association
763 node_manager->release_association(job.source_node_id, std::move(assoc));
764
765 // Handle result
766 if (result.is_err()) {
768 "Query failed: " + result.error().message);
769 return;
770 }
771
772 const auto& query_result = result.value();
773
774 if (is_job_cancelled(job.job_id)) {
776 } else if (query_result.is_success()) {
777 progress.completed_items = 1;
778 progress.percent_complete = 100.0f;
779 progress.current_item_description = std::to_string(query_result.matches.size())
780 + " matches found";
781 update_progress(job.job_id, progress);
782
783 // Store result count in metadata
784 {
785 std::unique_lock lock(cache_mutex);
786 auto it = job_cache.find(job.job_id);
787 if (it != job_cache.end()) {
788 it->second.metadata["result_count"] = std::to_string(query_result.matches.size());
789 }
790 }
791
793 } else if (query_result.is_cancelled()) {
795 } else {
796 std::ostringstream oss;
797 oss << "Query completed with status: 0x" << std::hex << query_result.status;
798 complete_job(job.job_id, job_status::failed, oss.str());
799 }
800 }
constexpr dicom_tag accession_number
Accession Number.
constexpr dicom_tag patient_name
Patient's Name.
constexpr dicom_tag series_instance_uid
Series Instance UID.
@ PN
Person Name (64 chars max per component group)
@ SH
Short String (16 chars max)
@ image
Image (Instance) level - query instance information.
@ patient
Patient level - query patient demographics.
@ series
Series level - query series information.

References kcenon::pacs::core::tags::accession_number, cache_mutex, kcenon::pacs::client::cancelled, complete_job(), kcenon::pacs::client::completed, kcenon::pacs::client::job_progress::completed_items, kcenon::pacs::encoding::CS, kcenon::pacs::client::job_progress::current_item_description, kcenon::pacs::encoding::DA, kcenon::pacs::client::failed, kcenon::pacs::services::query_scu::find(), kcenon::pacs::services::image, is_job_cancelled(), is_job_paused(), job_cache, kcenon::pacs::services::query_scu_config::level, kcenon::pacs::encoding::LO, logger, kcenon::pacs::core::tags::modality, kcenon::pacs::services::query_scu_config::model, node_manager, kcenon::pacs::services::patient, kcenon::pacs::core::tags::patient_id, kcenon::pacs::core::tags::patient_name, kcenon::pacs::client::job_progress::percent_complete, kcenon::pacs::encoding::PN, kcenon::pacs::core::tags::query_retrieve_level, kcenon::pacs::services::series, kcenon::pacs::core::tags::series_instance_uid, kcenon::pacs::core::dicom_dataset::set_string(), kcenon::pacs::encoding::SH, kcenon::pacs::services::study, kcenon::pacs::core::tags::study_date, kcenon::pacs::core::tags::study_instance_uid, kcenon::pacs::services::study_root, kcenon::pacs::services::study_root_find_sop_class_uid, kcenon::pacs::client::job_progress::total_items, kcenon::pacs::encoding::UI, and update_progress().

Referenced by execute_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_retrieve_job()

void kcenon::pacs::client::job_manager::impl::execute_retrieve_job ( job_record & job)
inline

Definition at line 358 of file job_manager.cpp.

358 {
359 logger->info_fmt("Executing retrieve job {} from node {}",
360 job.job_id, job.source_node_id);
361
362 // Get the source node information
363 auto node_opt = node_manager->get_node(job.source_node_id);
364 if (!node_opt) {
366 "Source node not found: " + job.source_node_id);
367 return;
368 }
369
370 const auto& node = *node_opt;
371 if (!node.is_online()) {
373 "Source node is offline: " + job.source_node_id);
374 return;
375 }
376
377 // Check for cancellation
378 if (is_job_cancelled(job.job_id)) {
380 return;
381 }
382
383 // Wait if paused
384 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
385 std::this_thread::sleep_for(std::chrono::milliseconds(100));
386 }
387 if (is_job_cancelled(job.job_id)) {
389 return;
390 }
391
392 // Determine SOP classes needed based on retrieve type
393 std::vector<std::string> sop_classes;
394 if (node.supports_move) {
395 sop_classes.push_back(std::string(services::study_root_move_sop_class_uid));
396 } else if (node.supports_get) {
397 sop_classes.push_back(std::string(services::study_root_get_sop_class_uid));
398 } else {
400 "Node does not support C-MOVE or C-GET");
401 return;
402 }
403
404 // Acquire association
405 auto assoc_result = node_manager->acquire_association(
406 job.source_node_id, sop_classes);
407 if (assoc_result.is_err()) {
409 "Failed to acquire association: " + assoc_result.error().message);
410 return;
411 }
412
413 auto assoc = std::move(assoc_result.value());
414
415 // Configure retrieve SCU
416 services::retrieve_scu_config retrieve_config;
417 retrieve_config.mode = node.supports_move
420 retrieve_config.model = services::query_model::study_root;
421
422 // Set level based on job scope
423 if (job.sop_instance_uid.has_value()) {
424 retrieve_config.level = services::query_level::image;
425 } else if (job.series_uid.has_value()) {
426 retrieve_config.level = services::query_level::series;
427 } else {
428 retrieve_config.level = services::query_level::study;
429 }
430
431 // Set move destination if using C-MOVE
432 if (retrieve_config.mode == services::retrieve_mode::c_move) {
433 retrieve_config.move_destination = config.local_ae_title;
434 }
435
436 services::retrieve_scu scu(retrieve_config, logger);
437
438 // Track progress
439 job_progress progress;
440 progress.total_items = 0; // Will be updated from pending responses
441
442 // Progress callback
443 auto progress_callback = [this, &job, &progress](
444 const services::retrieve_progress& rp) {
445 // Check for pause/cancel
446 if (is_job_cancelled(job.job_id)) {
447 return;
448 }
449
450 // Wait if paused
451 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
452 std::this_thread::sleep_for(std::chrono::milliseconds(100));
453 }
454
455 progress.total_items = rp.total();
456 progress.completed_items = rp.completed;
457 progress.failed_items = rp.failed;
458 progress.skipped_items = rp.warning;
459 progress.percent_complete = rp.percent();
460 progress.elapsed = rp.elapsed();
461 update_progress(job.job_id, progress);
462 };
463
464 // Validate study UID is present
465 if (!job.study_uid.has_value()) {
467 "No study UID specified for retrieve job");
468 node_manager->release_association(job.source_node_id, std::move(assoc));
469 return;
470 }
471
472 // Execute retrieve based on level
473 auto result = job.series_uid.has_value()
474 ? scu.retrieve_series(*assoc, *job.series_uid, progress_callback)
475 : scu.retrieve_study(*assoc, *job.study_uid, progress_callback);
476
477 // Release association back to pool
478 node_manager->release_association(job.source_node_id, std::move(assoc));
479
480 // Handle result
481 if (result.is_err()) {
483 "Retrieve failed: " + result.error().message);
484 return;
485 }
486
487 const auto& retrieve_result = result.value();
488
489 if (is_job_cancelled(job.job_id)) {
491 } else if (retrieve_result.is_success()) {
492 // Update final progress
493 progress.completed_items = retrieve_result.completed;
494 progress.failed_items = retrieve_result.failed;
495 progress.skipped_items = retrieve_result.warning;
496 progress.percent_complete = 100.0f;
497 update_progress(job.job_id, progress);
498
500 } else if (retrieve_result.has_failures()) {
501 std::ostringstream oss;
502 oss << "Retrieve completed with failures: " << retrieve_result.failed
503 << " of " << (retrieve_result.completed + retrieve_result.failed)
504 << " sub-operations failed";
505 complete_job(job.job_id, job_status::failed, oss.str());
506 } else {
508 }
509 }
@ c_get
Receive directly from SCP on same association.
@ c_move
Request SCP to send to third party (requires move destination)
constexpr std::string_view study_root_move_sop_class_uid
Study Root Query/Retrieve Information Model - MOVE.
constexpr std::string_view study_root_get_sop_class_uid
Study Root Query/Retrieve Information Model - GET.
std::string local_ae_title
Local AE title for operations.
Definition job_types.h:413

References kcenon::pacs::services::c_get, kcenon::pacs::services::c_move, kcenon::pacs::client::cancelled, complete_job(), kcenon::pacs::client::completed, kcenon::pacs::client::job_progress::completed_items, config, kcenon::pacs::client::job_progress::elapsed, kcenon::pacs::client::failed, kcenon::pacs::client::job_progress::failed_items, kcenon::pacs::services::image, is_job_cancelled(), is_job_paused(), kcenon::pacs::services::retrieve_scu_config::level, kcenon::pacs::client::job_manager_config::local_ae_title, logger, kcenon::pacs::services::retrieve_scu_config::mode, kcenon::pacs::services::retrieve_scu_config::model, kcenon::pacs::services::retrieve_scu_config::move_destination, node_manager, kcenon::pacs::client::job_progress::percent_complete, progress_callback, kcenon::pacs::services::retrieve_scu::retrieve_series(), kcenon::pacs::services::retrieve_scu::retrieve_study(), kcenon::pacs::services::series, kcenon::pacs::client::job_progress::skipped_items, kcenon::pacs::services::study, kcenon::pacs::services::study_root, kcenon::pacs::services::study_root_get_sop_class_uid, kcenon::pacs::services::study_root_move_sop_class_uid, kcenon::pacs::client::job_progress::total_items, and update_progress().

Referenced by execute_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_store_job()

void kcenon::pacs::client::job_manager::impl::execute_store_job ( job_record & job)
inline

Definition at line 511 of file job_manager.cpp.

511 {
512 logger->info_fmt("Executing store job {} to node {} ({} instances)",
513 job.job_id, job.destination_node_id, job.instance_uids.size());
514
515 // Validate input
516 if (job.instance_uids.empty()) {
518 "No instance UIDs specified for store job");
519 return;
520 }
521
522 // Get destination node information
523 auto node_opt = node_manager->get_node(job.destination_node_id);
524 if (!node_opt) {
526 "Destination node not found: " + job.destination_node_id);
527 return;
528 }
529
530 const auto& node = *node_opt;
531 if (!node.is_online()) {
533 "Destination node is offline: " + job.destination_node_id);
534 return;
535 }
536
537 if (!node.supports_store) {
539 "Destination node does not support C-STORE");
540 return;
541 }
542
543 // Check for cancellation
544 if (is_job_cancelled(job.job_id)) {
546 return;
547 }
548
549 // Initialize progress tracking
550 job_progress progress;
551 progress.total_items = job.instance_uids.size();
552
553 // We need to determine the SOP classes from the instances to be stored
554 // For now, use a placeholder set of common storage SOP classes
555 std::vector<std::string> sop_classes = {
556 "1.2.840.10008.5.1.4.1.1.2", // CT Image Storage
557 "1.2.840.10008.5.1.4.1.1.4", // MR Image Storage
558 "1.2.840.10008.5.1.4.1.1.7", // Secondary Capture
559 "1.2.840.10008.5.1.4.1.1.1.2", // Digital X-Ray
560 };
561
562 // Acquire association
563 auto assoc_result = node_manager->acquire_association(
564 job.destination_node_id, sop_classes);
565 if (assoc_result.is_err()) {
567 "Failed to acquire association: " + assoc_result.error().message);
568 return;
569 }
570
571 auto assoc = std::move(assoc_result.value());
572
573 // Create storage SCU
574 services::storage_scu_config store_config;
575 store_config.continue_on_error = true;
576 services::storage_scu scu(store_config, logger);
577
578 // Process each instance
579 size_t completed = 0;
580 size_t failed = 0;
581 std::string last_error;
582
583 for (size_t i = 0; i < job.instance_uids.size(); ++i) {
584 // Check for cancellation
585 if (is_job_cancelled(job.job_id)) {
586 break;
587 }
588
589 // Wait if paused
590 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
591 std::this_thread::sleep_for(std::chrono::milliseconds(100));
592 }
593 if (is_job_cancelled(job.job_id)) {
594 break;
595 }
596
597 const auto& sop_instance_uid = job.instance_uids[i];
598
599 // Update progress - current item
600 progress.current_item = sop_instance_uid;
601 progress.current_item_description = "Storing instance " + std::to_string(i + 1);
602 update_progress(job.job_id, progress);
603
604 // Load the DICOM file for this instance
605 // In a real implementation, this would query the local storage
606 // to get the file path for the SOP Instance UID
607 // For now, we simulate the store operation
608 logger->debug_fmt("Storing instance {} ({}/{})",
609 sop_instance_uid, i + 1, job.instance_uids.size());
610
611 // Simulated store result - in real implementation, would call:
612 // auto result = scu.store(*assoc, dataset);
613 // For now, mark as successful
614 ++completed;
615 progress.completed_items = completed;
616 progress.failed_items = failed;
617 progress.calculate_percent();
618 update_progress(job.job_id, progress);
619 }
620
621 // Release association back to pool
622 node_manager->release_association(job.destination_node_id, std::move(assoc));
623
624 // Determine final status
625 if (is_job_cancelled(job.job_id)) {
627 } else if (failed == 0) {
628 progress.percent_complete = 100.0f;
629 update_progress(job.job_id, progress);
631 } else if (completed > 0) {
632 // Partial success
633 std::ostringstream oss;
634 oss << "Store completed with failures: " << failed << " of "
635 << job.instance_uids.size() << " instances failed";
636 if (!last_error.empty()) {
637 oss << ". Last error: " << last_error;
638 }
639 complete_job(job.job_id, job_status::failed, oss.str());
640 } else {
641 // Complete failure
643 "All store operations failed: " + last_error);
644 }
645 }
constexpr dicom_tag sop_instance_uid
SOP Instance UID.

References kcenon::pacs::client::job_progress::calculate_percent(), kcenon::pacs::client::cancelled, complete_job(), kcenon::pacs::client::completed, kcenon::pacs::client::job_progress::completed_items, kcenon::pacs::services::storage_scu_config::continue_on_error, kcenon::pacs::client::job_progress::current_item, kcenon::pacs::client::job_progress::current_item_description, kcenon::pacs::client::failed, kcenon::pacs::client::job_progress::failed_items, is_job_cancelled(), is_job_paused(), logger, node_manager, kcenon::pacs::client::job_progress::percent_complete, kcenon::pacs::client::job_progress::total_items, and update_progress().

Referenced by execute_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_sync_job()

void kcenon::pacs::client::job_manager::impl::execute_sync_job ( job_record & job)
inline

Definition at line 802 of file job_manager.cpp.

802 {
803 logger->info_fmt("Executing sync job {} from node {}",
804 job.job_id, job.source_node_id);
805
806 // Synchronization job queries remote node for studies matching criteria
807 // and retrieves any that are not in local storage
808
809 // Get the source node
810 auto node_opt = node_manager->get_node(job.source_node_id);
811 if (!node_opt) {
813 "Source node not found: " + job.source_node_id);
814 return;
815 }
816
817 const auto& node = *node_opt;
818 if (!node.is_online()) {
820 "Source node is offline: " + job.source_node_id);
821 return;
822 }
823
824 if (!node.supports_find) {
826 "Node does not support C-FIND for sync");
827 return;
828 }
829
830 // Check for cancellation
831 if (is_job_cancelled(job.job_id)) {
833 return;
834 }
835
836 // Wait if paused
837 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
838 std::this_thread::sleep_for(std::chrono::milliseconds(100));
839 }
840 if (is_job_cancelled(job.job_id)) {
842 return;
843 }
844
845 // Initialize progress
846 job_progress progress;
847 progress.total_items = 2; // Query + Retrieve phases
848 progress.current_item_description = "Querying remote node for studies";
849 update_progress(job.job_id, progress);
850
851 // Acquire association for query
852 std::vector<std::string> query_sop_classes = {
854 };
855
856 auto assoc_result = node_manager->acquire_association(
857 job.source_node_id, query_sop_classes);
858 if (assoc_result.is_err()) {
860 "Failed to acquire association: " + assoc_result.error().message);
861 return;
862 }
863
864 auto assoc = std::move(assoc_result.value());
865
866 // Configure query for studies
867 services::query_scu_config query_config;
868 query_config.model = services::query_model::study_root;
869 query_config.level = services::query_level::study;
870
871 services::query_scu query_scu(query_config, logger);
872
873 // Build query keys
874 core::dicom_dataset query_keys;
875 query_keys.set_string(core::tags::query_retrieve_level, encoding::vr_type::CS, "STUDY");
876
877 // Filter by patient if specified
878 if (job.patient_id.has_value()) {
879 query_keys.set_string(core::tags::patient_id, encoding::vr_type::LO, *job.patient_id);
880 }
881
882 // Request return attributes
883 query_keys.set_string(core::tags::study_instance_uid, encoding::vr_type::UI, "");
884 query_keys.set_string(core::tags::study_date, encoding::vr_type::DA, "");
885 query_keys.set_string(core::tags::patient_name, encoding::vr_type::PN, "");
886
887 // Execute query
888 auto query_result_res = query_scu.find(*assoc, query_keys);
889
890 // Release query association
891 node_manager->release_association(job.source_node_id, std::move(assoc));
892
893 if (query_result_res.is_err()) {
895 "Sync query failed: " + query_result_res.error().message);
896 return;
897 }
898
899 const auto& query_result = query_result_res.value();
900
901 progress.completed_items = 1;
902 progress.current_item_description =
903 std::to_string(query_result.matches.size()) + " studies found";
904 update_progress(job.job_id, progress);
905
906 if (is_job_cancelled(job.job_id)) {
908 return;
909 }
910
911 // Sync complete - in full implementation, would compare with local
912 // storage and create retrieve jobs for missing studies
913 progress.completed_items = 2;
914 progress.percent_complete = 100.0f;
915 progress.current_item_description =
916 "Sync complete: " + std::to_string(query_result.matches.size()) + " studies checked";
917 update_progress(job.job_id, progress);
918
919 // Store result in metadata
920 {
921 std::unique_lock lock(cache_mutex);
922 auto it = job_cache.find(job.job_id);
923 if (it != job_cache.end()) {
924 it->second.metadata["studies_found"] = std::to_string(query_result.matches.size());
925 }
926 }
927
929 }

References cache_mutex, kcenon::pacs::client::cancelled, complete_job(), kcenon::pacs::client::completed, kcenon::pacs::client::job_progress::completed_items, kcenon::pacs::encoding::CS, kcenon::pacs::client::job_progress::current_item_description, kcenon::pacs::encoding::DA, kcenon::pacs::client::failed, is_job_cancelled(), is_job_paused(), job_cache, kcenon::pacs::services::query_scu_config::level, kcenon::pacs::encoding::LO, logger, kcenon::pacs::services::query_scu_config::model, node_manager, kcenon::pacs::core::tags::patient_id, kcenon::pacs::core::tags::patient_name, kcenon::pacs::client::job_progress::percent_complete, kcenon::pacs::encoding::PN, kcenon::pacs::core::tags::query_retrieve_level, kcenon::pacs::core::dicom_dataset::set_string(), kcenon::pacs::services::study, kcenon::pacs::core::tags::study_date, kcenon::pacs::core::tags::study_instance_uid, kcenon::pacs::services::study_root, kcenon::pacs::services::study_root_find_sop_class_uid, kcenon::pacs::client::job_progress::total_items, kcenon::pacs::encoding::UI, and update_progress().

Referenced by execute_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_job_from_cache()

std::optional< job_record > kcenon::pacs::client::job_manager::impl::get_job_from_cache ( std::string_view job_id) const
inline

◆ is_job_cancelled()

bool kcenon::pacs::client::job_manager::impl::is_job_cancelled ( const std::string & job_id)
inline

Definition at line 237 of file job_manager.cpp.

237 {
238 std::lock_guard lock(cancelled_mutex);
239 return cancelled_job_ids.count(job_id) > 0;
240 }

References cancelled_job_ids, and cancelled_mutex.

Referenced by execute_prefetch_job(), execute_query_job(), execute_retrieve_job(), execute_store_job(), execute_sync_job(), and worker_loop().

Here is the caller graph for this function:

◆ is_job_paused()

bool kcenon::pacs::client::job_manager::impl::is_job_paused ( const std::string & job_id)
inline

Definition at line 242 of file job_manager.cpp.

242 {
243 std::lock_guard lock(paused_mutex);
244 return paused_job_ids.count(job_id) > 0;
245 }
std::unordered_set< std::string > paused_job_ids

References paused_job_ids, and paused_mutex.

Referenced by execute_prefetch_job(), execute_query_job(), execute_retrieve_job(), execute_store_job(), execute_sync_job(), and worker_loop().

Here is the caller graph for this function:

◆ load_pending_jobs_from_repo()

void kcenon::pacs::client::job_manager::impl::load_pending_jobs_from_repo ( )
inline

Definition at line 1099 of file job_manager.cpp.

1099 {
1100 if (!repo) return;
1101
1102#ifdef PACS_WITH_DATABASE_SYSTEM
1103 auto pending_result = repo->find_pending_jobs(config.max_queue_size);
1104 if (pending_result.is_err()) return;
1105 auto& pending = pending_result.value();
1106#else
1107 auto pending = repo->find_pending_jobs(config.max_queue_size);
1108#endif
1109 for (auto& job : pending) {
1110 // Add to cache
1111 {
1112 std::unique_lock lock(cache_mutex);
1113 job_cache[job.job_id] = job;
1114 }
1115 // Enqueue for execution
1116 enqueue_job(job.job_id, job.priority);
1117 }
1118
1119 if (!pending.empty()) {
1120 logger->info_fmt("Loaded {} pending jobs from repository", pending.size());
1121 }
1122 }
@ pending
Job created but not yet queued.
void enqueue_job(const std::string &job_id, job_priority priority)
std::shared_ptr< storage::job_repository > repo
size_t max_queue_size
Maximum jobs in queue.
Definition job_types.h:408

References cache_mutex, config, enqueue_job(), job_cache, logger, kcenon::pacs::client::job_manager_config::max_queue_size, kcenon::pacs::client::pending, and repo.

Referenced by kcenon::pacs::client::job_manager::job_manager().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ mark_job_active()

void kcenon::pacs::client::job_manager::impl::mark_job_active ( const std::string & job_id)
inline

Definition at line 247 of file job_manager.cpp.

247 {
248 std::lock_guard lock(active_mutex);
249 active_job_ids.insert(job_id);
250 }
std::unordered_set< std::string > active_job_ids

References active_job_ids, and active_mutex.

Referenced by worker_loop().

Here is the caller graph for this function:

◆ mark_job_inactive()

void kcenon::pacs::client::job_manager::impl::mark_job_inactive ( const std::string & job_id)
inline

Definition at line 252 of file job_manager.cpp.

252 {
253 std::lock_guard lock(active_mutex);
254 active_job_ids.erase(job_id);
255 }

References active_job_ids, and active_mutex.

Referenced by worker_loop().

Here is the caller graph for this function:

◆ notify_completion()

void kcenon::pacs::client::job_manager::impl::notify_completion ( const std::string & job_id,
const job_record & record )
inline

Definition at line 217 of file job_manager.cpp.

217 {
218 // Invoke callback
219 {
220 std::shared_lock lock(callbacks_mutex);
222 completion_callback(job_id, record);
223 }
224 }
225
226 // Fulfill promise if exists
227 {
228 std::lock_guard lock(promises_mutex);
229 auto it = completion_promises.find(job_id);
230 if (it != completion_promises.end()) {
231 it->second->set_value(record);
232 completion_promises.erase(it);
233 }
234 }
235 }
job_completion_callback completion_callback
std::unordered_map< std::string, std::shared_ptr< std::promise< job_record > > > completion_promises

References callbacks_mutex, completion_callback, completion_promises, and promises_mutex.

Referenced by kcenon::pacs::client::job_manager::cancel_job(), and complete_job().

Here is the caller graph for this function:

◆ notify_progress()

void kcenon::pacs::client::job_manager::impl::notify_progress ( const std::string & job_id,
const job_progress & progress )
inline

Definition at line 210 of file job_manager.cpp.

210 {
211 std::shared_lock lock(callbacks_mutex);
212 if (progress_callback) {
213 progress_callback(job_id, progress);
214 }
215 }

References callbacks_mutex, and progress_callback.

Referenced by update_progress().

Here is the caller graph for this function:

◆ save_job()

void kcenon::pacs::client::job_manager::impl::save_job ( const job_record & job)
inline

Definition at line 149 of file job_manager.cpp.

149 {
150 // Save to cache
151 {
152 std::unique_lock lock(cache_mutex);
153 job_cache[job.job_id] = job;
154 }
155
156 // Persist to repository
157 if (repo) {
158 [[maybe_unused]] auto result = repo->save(job);
159 }
160 }

References cache_mutex, job_cache, and repo.

Referenced by kcenon::pacs::client::job_manager::create_prefetch_job(), kcenon::pacs::client::job_manager::create_query_job(), kcenon::pacs::client::job_manager::create_retrieve_job(), kcenon::pacs::client::job_manager::create_store_job(), and kcenon::pacs::client::job_manager::create_sync_job().

Here is the caller graph for this function:

◆ update_job_status()

void kcenon::pacs::client::job_manager::impl::update_job_status ( const std::string & job_id,
job_status status,
const std::string & error_msg = "",
const std::string & error_details = "" )
inline

Definition at line 171 of file job_manager.cpp.

173 {
174 {
175 std::unique_lock lock(cache_mutex);
176 auto it = job_cache.find(job_id);
177 if (it != job_cache.end()) {
178 it->second.status = status;
179 if (!error_msg.empty()) {
180 it->second.error_message = error_msg;
181 it->second.error_details = error_details;
182 }
183 if (status == job_status::running && !it->second.started_at.has_value()) {
184 it->second.started_at = std::chrono::system_clock::now();
185 }
186 if (is_terminal_status(status) && !it->second.completed_at.has_value()) {
187 it->second.completed_at = std::chrono::system_clock::now();
188 }
189 }
190 }
191
192 // Update repository
193 if (repo) {
194 if (is_terminal_status(status)) {
195 if (status == job_status::completed) {
196 [[maybe_unused]] auto result = repo->mark_completed(job_id);
197 } else if (status == job_status::failed) {
198 [[maybe_unused]] auto result = repo->mark_failed(job_id, error_msg, error_details);
199 } else {
200 [[maybe_unused]] auto result = repo->update_status(job_id, status, error_msg, error_details);
201 }
202 } else if (status == job_status::running) {
203 [[maybe_unused]] auto result = repo->mark_started(job_id);
204 } else {
205 [[maybe_unused]] auto result = repo->update_status(job_id, status, error_msg, error_details);
206 }
207 }
208 }
constexpr bool is_terminal_status(job_status status) noexcept
Check if job status is a terminal state.
Definition job_types.h:139
@ running
Job is currently executing.
constexpr dicom_tag status
Status.

References cache_mutex, kcenon::pacs::client::completed, kcenon::pacs::client::failed, kcenon::pacs::client::is_terminal_status(), job_cache, repo, and kcenon::pacs::client::running.

Referenced by kcenon::pacs::client::job_manager::cancel_job(), complete_job(), kcenon::pacs::client::job_manager::pause_job(), kcenon::pacs::client::job_manager::resume_job(), kcenon::pacs::client::job_manager::start_job(), and worker_loop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ update_progress()

void kcenon::pacs::client::job_manager::impl::update_progress ( const std::string & job_id,
const job_progress & progress )
inline

Definition at line 1061 of file job_manager.cpp.

1061 {
1062 // Update cache
1063 {
1064 std::unique_lock lock(cache_mutex);
1065 auto it = job_cache.find(job_id);
1066 if (it != job_cache.end()) {
1067 it->second.progress = progress;
1068 }
1069 }
1070
1071 // Update repository
1072 if (repo) {
1073 [[maybe_unused]] auto result = repo->update_progress(job_id, progress);
1074 }
1075
1076 // Notify callback
1077 notify_progress(job_id, progress);
1078 }
void notify_progress(const std::string &job_id, const job_progress &progress)

References cache_mutex, job_cache, notify_progress(), and repo.

Referenced by execute_prefetch_job(), execute_query_job(), execute_retrieve_job(), execute_store_job(), and execute_sync_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ worker_loop()

void kcenon::pacs::client::job_manager::impl::worker_loop ( )
inline

Definition at line 269 of file job_manager.cpp.

269 {
270 while (running.load()) {
271 std::string job_id;
273
274 // Wait for a job
275 {
276 std::unique_lock lock(queue_mutex);
277 queue_cv.wait(lock, [this] {
278 return !running.load() || !job_queue.empty();
279 });
280
281 if (!running.load()) {
282 break;
283 }
284
285 if (job_queue.empty()) {
286 continue;
287 }
288
289 auto entry = job_queue.top();
290 job_queue.pop();
291 job_id = entry.job_id;
292 priority = entry.priority;
293 }
294
295 // Check if cancelled or paused
296 if (is_job_cancelled(job_id)) {
297 logger->debug_fmt("Job {} was cancelled before execution", job_id);
298 continue;
299 }
300
301 if (is_job_paused(job_id)) {
302 logger->debug_fmt("Job {} is paused, re-queueing", job_id);
303 // Re-queue with same priority for later
304 enqueue_job(job_id, priority);
305 std::this_thread::sleep_for(std::chrono::milliseconds(100));
306 continue;
307 }
308
309 // Get job record
310 auto job_opt = get_job_from_cache(job_id);
311 if (!job_opt) {
312 logger->warn_fmt("Job {} not found in cache", job_id);
313 continue;
314 }
315
316 auto job = *job_opt;
317
318 // Mark as running
319 mark_job_active(job_id);
321 logger->info_fmt("Starting job {}: type={}", job_id, to_string(job.type));
322
323 // Execute job
324 try {
325 execute_job(job);
326 } catch (const std::exception& e) {
327 logger->error_fmt("Job {} failed with exception: {}", job_id, e.what());
328 complete_job(job_id, job_status::failed, e.what());
329 }
330
331 mark_job_inactive(job_id);
332 }
333 }
job_priority
Priority level for job execution.
Definition job_types.h:154
void mark_job_active(const std::string &job_id)
void mark_job_inactive(const std::string &job_id)

References complete_job(), enqueue_job(), execute_job(), kcenon::pacs::client::failed, get_job_from_cache(), is_job_cancelled(), is_job_paused(), job_queue, logger, mark_job_active(), mark_job_inactive(), queue_cv, queue_mutex, running, kcenon::pacs::client::running, kcenon::pacs::client::to_string(), and update_job_status().

Referenced by kcenon::pacs::client::job_manager::start_workers().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ active_job_ids

std::unordered_set<std::string> kcenon::pacs::client::job_manager::impl::active_job_ids

◆ active_mutex

std::mutex kcenon::pacs::client::job_manager::impl::active_mutex
mutable

◆ cache_mutex

◆ callbacks_mutex

std::shared_mutex kcenon::pacs::client::job_manager::impl::callbacks_mutex
mutable

◆ cancelled_job_ids

std::unordered_set<std::string> kcenon::pacs::client::job_manager::impl::cancelled_job_ids

◆ cancelled_mutex

std::mutex kcenon::pacs::client::job_manager::impl::cancelled_mutex
mutable

◆ completion_callback

job_completion_callback kcenon::pacs::client::job_manager::impl::completion_callback

◆ completion_promises

std::unordered_map<std::string, std::shared_ptr<std::promise<job_record> > > kcenon::pacs::client::job_manager::impl::completion_promises

◆ config

◆ job_cache

◆ job_queue

std::priority_queue<queue_entry> kcenon::pacs::client::job_manager::impl::job_queue

◆ logger

◆ node_manager

std::shared_ptr<remote_node_manager> kcenon::pacs::client::job_manager::impl::node_manager

◆ paused_job_ids

std::unordered_set<std::string> kcenon::pacs::client::job_manager::impl::paused_job_ids

◆ paused_mutex

std::mutex kcenon::pacs::client::job_manager::impl::paused_mutex
mutable

◆ progress_callback

job_progress_callback kcenon::pacs::client::job_manager::impl::progress_callback

◆ promises_mutex

std::mutex kcenon::pacs::client::job_manager::impl::promises_mutex
mutable

◆ queue_cv

std::condition_variable kcenon::pacs::client::job_manager::impl::queue_cv

◆ queue_mutex

std::mutex kcenon::pacs::client::job_manager::impl::queue_mutex
mutable

◆ repo

◆ running

std::atomic<bool> kcenon::pacs::client::job_manager::impl::running {false}

◆ workers

std::vector<std::thread> kcenon::pacs::client::job_manager::impl::workers

The documentation for this struct was generated from the following file: