28#include <condition_variable>
34#include <shared_mutex>
37#include <unordered_map>
38#include <unordered_set>
51std::string generate_uuid() {
52 static thread_local std::random_device rd;
53 static thread_local std::mt19937_64 gen(rd());
54 static thread_local std::uniform_int_distribution<uint64_t> dis;
56 uint64_t ab = dis(gen);
57 uint64_t cd = dis(gen);
60 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
61 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
63 std::ostringstream oss;
64 oss << std::hex << std::setfill(
'0');
65 oss << std::setw(8) << (ab >> 32);
67 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
69 oss << std::setw(4) << (ab & 0xFFFF);
71 oss << std::setw(4) << (cd >> 48);
73 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
91 if (
static_cast<int>(
priority) !=
static_cast<int>(other.priority)) {
92 return static_cast<int>(
priority) <
static_cast<int>(other.priority);
107 std::shared_ptr<storage::job_repository>
repo;
158 [[maybe_unused]]
auto result =
repo->save(job);
164 auto it =
job_cache.find(std::string(job_id));
172 const std::string& error_msg =
"",
173 const std::string& error_details =
"") {
178 it->second.status = status;
179 if (!error_msg.empty()) {
180 it->second.error_message = error_msg;
181 it->second.error_details = error_details;
184 it->second.started_at = std::chrono::system_clock::now();
187 it->second.completed_at = std::chrono::system_clock::now();
196 [[maybe_unused]]
auto result =
repo->mark_completed(job_id);
198 [[maybe_unused]]
auto result =
repo->mark_failed(job_id, error_msg, error_details);
200 [[maybe_unused]]
auto result =
repo->update_status(job_id, status, error_msg, error_details);
203 [[maybe_unused]]
auto result =
repo->mark_started(job_id);
205 [[maybe_unused]]
auto result =
repo->update_status(job_id, status, error_msg, error_details);
231 it->second->set_value(record);
260 job_queue.push({job_id, priority, std::chrono::system_clock::now()});
291 job_id = entry.job_id;
292 priority = entry.priority;
297 logger->debug_fmt(
"Job {} was cancelled before execution", job_id);
302 logger->debug_fmt(
"Job {} is paused, re-queueing", job_id);
305 std::this_thread::sleep_for(std::chrono::milliseconds(100));
312 logger->warn_fmt(
"Job {} not found in cache", job_id);
321 logger->info_fmt(
"Starting job {}: type={}", job_id,
to_string(job.type));
326 }
catch (
const std::exception& e) {
327 logger->error_fmt(
"Job {} failed with exception: {}", job_id, e.what());
359 logger->info_fmt(
"Executing retrieve job {} from node {}",
360 job.job_id, job.source_node_id);
363 auto node_opt =
node_manager->get_node(job.source_node_id);
366 "Source node not found: " + job.source_node_id);
370 const auto& node = *node_opt;
371 if (!node.is_online()) {
373 "Source node is offline: " + job.source_node_id);
385 std::this_thread::sleep_for(std::chrono::milliseconds(100));
393 std::vector<std::string> sop_classes;
394 if (node.supports_move) {
396 }
else if (node.supports_get) {
400 "Node does not support C-MOVE or C-GET");
406 job.source_node_id, sop_classes);
407 if (assoc_result.is_err()) {
409 "Failed to acquire association: " + assoc_result.error().message);
413 auto assoc = std::move(assoc_result.value());
417 retrieve_config.
mode = node.supports_move
423 if (job.sop_instance_uid.has_value()) {
425 }
else if (job.series_uid.has_value()) {
452 std::this_thread::sleep_for(std::chrono::milliseconds(100));
460 progress.
elapsed = rp.elapsed();
465 if (!job.study_uid.has_value()) {
467 "No study UID specified for retrieve job");
468 node_manager->release_association(job.source_node_id, std::move(assoc));
473 auto result = job.series_uid.has_value()
478 node_manager->release_association(job.source_node_id, std::move(assoc));
481 if (result.is_err()) {
483 "Retrieve failed: " + result.error().message);
487 const auto& retrieve_result = result.value();
491 }
else if (retrieve_result.is_success()) {
500 }
else if (retrieve_result.has_failures()) {
501 std::ostringstream oss;
502 oss <<
"Retrieve completed with failures: " << retrieve_result.failed
503 <<
" of " << (retrieve_result.completed + retrieve_result.failed)
504 <<
" sub-operations failed";
512 logger->info_fmt(
"Executing store job {} to node {} ({} instances)",
513 job.job_id, job.destination_node_id, job.instance_uids.size());
516 if (job.instance_uids.empty()) {
518 "No instance UIDs specified for store job");
523 auto node_opt =
node_manager->get_node(job.destination_node_id);
526 "Destination node not found: " + job.destination_node_id);
530 const auto& node = *node_opt;
531 if (!node.is_online()) {
533 "Destination node is offline: " + job.destination_node_id);
537 if (!node.supports_store) {
539 "Destination node does not support C-STORE");
555 std::vector<std::string> sop_classes = {
556 "1.2.840.10008.5.1.4.1.1.2",
557 "1.2.840.10008.5.1.4.1.1.4",
558 "1.2.840.10008.5.1.4.1.1.7",
559 "1.2.840.10008.5.1.4.1.1.1.2",
564 job.destination_node_id, sop_classes);
565 if (assoc_result.is_err()) {
567 "Failed to acquire association: " + assoc_result.error().message);
571 auto assoc = std::move(assoc_result.value());
581 std::string last_error;
583 for (
size_t i = 0; i < job.instance_uids.size(); ++i) {
591 std::this_thread::sleep_for(std::chrono::milliseconds(100));
597 const auto& sop_instance_uid = job.instance_uids[i];
608 logger->debug_fmt(
"Storing instance {} ({}/{})",
609 sop_instance_uid, i + 1, job.instance_uids.size());
622 node_manager->release_association(job.destination_node_id, std::move(assoc));
633 std::ostringstream oss;
634 oss <<
"Store completed with failures: " <<
failed <<
" of "
635 << job.instance_uids.size() <<
" instances failed";
636 if (!last_error.empty()) {
637 oss <<
". Last error: " << last_error;
643 "All store operations failed: " + last_error);
648 logger->info_fmt(
"Executing query job {} on node {}",
649 job.job_id, job.source_node_id);
652 auto node_opt =
node_manager->get_node(job.source_node_id);
655 "Node not found: " + job.source_node_id);
659 const auto& node = *node_opt;
660 if (!node.is_online()) {
662 "Node is offline: " + job.source_node_id);
666 if (!node.supports_find) {
668 "Node does not support C-FIND");
680 std::this_thread::sleep_for(std::chrono::milliseconds(100));
688 std::string query_level =
"STUDY";
689 auto level_it = job.metadata.find(
"query_level");
690 if (level_it != job.metadata.end()) {
691 query_level = level_it->second;
695 std::vector<std::string> sop_classes = {
701 job.source_node_id, sop_classes);
702 if (assoc_result.is_err()) {
704 "Failed to acquire association: " + assoc_result.error().message);
708 auto assoc = std::move(assoc_result.value());
715 if (query_level ==
"PATIENT") {
717 }
else if (query_level ==
"STUDY") {
719 }
else if (query_level ==
"SERIES") {
721 }
else if (query_level ==
"IMAGE") {
732 for (
const auto& [key, value] : job.metadata) {
733 if (key.starts_with(
"query_") && key !=
"query_level") {
734 std::string tag_name = key.substr(6);
736 if (tag_name ==
"PatientID") {
738 }
else if (tag_name ==
"PatientName") {
740 }
else if (tag_name ==
"StudyDate") {
742 }
else if (tag_name ==
"StudyInstanceUID") {
744 }
else if (tag_name ==
"AccessionNumber") {
746 }
else if (tag_name ==
"Modality") {
748 }
else if (tag_name ==
"SeriesInstanceUID") {
760 auto result = scu.
find(*assoc, query_keys);
763 node_manager->release_association(job.source_node_id, std::move(assoc));
766 if (result.is_err()) {
768 "Query failed: " + result.error().message);
772 const auto& query_result = result.value();
776 }
else if (query_result.is_success()) {
788 it->second.metadata[
"result_count"] = std::to_string(query_result.matches.size());
793 }
else if (query_result.is_cancelled()) {
796 std::ostringstream oss;
797 oss <<
"Query completed with status: 0x" << std::hex << query_result.status;
803 logger->info_fmt(
"Executing sync job {} from node {}",
804 job.job_id, job.source_node_id);
810 auto node_opt =
node_manager->get_node(job.source_node_id);
813 "Source node not found: " + job.source_node_id);
817 const auto& node = *node_opt;
818 if (!node.is_online()) {
820 "Source node is offline: " + job.source_node_id);
824 if (!node.supports_find) {
826 "Node does not support C-FIND for sync");
838 std::this_thread::sleep_for(std::chrono::milliseconds(100));
852 std::vector<std::string> query_sop_classes = {
857 job.source_node_id, query_sop_classes);
858 if (assoc_result.is_err()) {
860 "Failed to acquire association: " + assoc_result.error().message);
864 auto assoc = std::move(assoc_result.value());
878 if (job.patient_id.has_value()) {
888 auto query_result_res =
query_scu.find(*assoc, query_keys);
891 node_manager->release_association(job.source_node_id, std::move(assoc));
893 if (query_result_res.is_err()) {
895 "Sync query failed: " + query_result_res.error().message);
899 const auto& query_result = query_result_res.value();
903 std::to_string(query_result.matches.size()) +
" studies found";
916 "Sync complete: " + std::to_string(query_result.matches.size()) +
" studies checked";
924 it->second.metadata[
"studies_found"] = std::to_string(query_result.matches.size());
932 logger->info_fmt(
"Executing prefetch job {} for patient {} from node {}",
933 job.job_id, job.patient_id.value_or(
"unknown"),
939 if (!job.patient_id.has_value()) {
941 "No patient ID specified for prefetch job");
946 auto node_opt =
node_manager->get_node(job.source_node_id);
949 "Source node not found: " + job.source_node_id);
953 const auto& node = *node_opt;
954 if (!node.is_online()) {
956 "Source node is offline: " + job.source_node_id);
960 if (!node.supports_find || !node.supports_query_retrieve()) {
962 "Node does not support query/retrieve for prefetch");
974 std::this_thread::sleep_for(std::chrono::milliseconds(100));
988 std::vector<std::string> query_sop_classes = {
993 job.source_node_id, query_sop_classes);
994 if (assoc_result.is_err()) {
996 "Failed to acquire association: " + assoc_result.error().message);
1000 auto assoc = std::move(assoc_result.value());
1018 auto query_result_res =
query_scu.find(*assoc, query_keys);
1021 node_manager->release_association(job.source_node_id, std::move(assoc));
1023 if (query_result_res.is_err()) {
1025 "Prefetch query failed: " + query_result_res.error().message);
1029 const auto& query_result = query_result_res.value();
1033 std::to_string(query_result.matches.size()) +
" prior studies found";
1046 "Prefetch complete: " + std::to_string(query_result.matches.size()) +
" studies identified";
1054 it->second.metadata[
"prior_studies"] = std::to_string(query_result.matches.size());
1067 it->second.progress = progress;
1073 [[maybe_unused]]
auto result =
repo->update_progress(job_id, progress);
1081 const std::string& error_msg =
"") {
1096 logger->info_fmt(
"Job {} completed with status: {}", job_id,
to_string(status));
1102#ifdef PACS_WITH_DATABASE_SYSTEM
1104 if (pending_result.is_err())
return;
1105 auto&
pending = pending_result.value();
1120 logger->info_fmt(
"Loaded {} pending jobs from repository",
pending.size());
1130 std::shared_ptr<storage::job_repository> repo,
1131 std::shared_ptr<remote_node_manager> node_manager,
1132 std::shared_ptr<di::ILogger> logger)
1138 std::shared_ptr<storage::job_repository> repo,
1139 std::shared_ptr<remote_node_manager> node_manager,
1140 std::shared_ptr<di::ILogger> logger)
1141 : impl_(std::make_unique<
impl>()) {
1161 std::string_view source_node_id,
1162 std::string_view study_uid,
1163 std::optional<std::string_view> series_uid,
1167 job.job_id = generate_uuid();
1170 job.priority = priority;
1171 job.source_node_id = std::string(source_node_id);
1172 job.study_uid = std::string(study_uid);
1174 job.series_uid = std::string(*series_uid);
1176 job.created_at = std::chrono::system_clock::now();
1179 impl_->
logger->info_fmt(
"Created retrieve job {}: study={}", job.job_id, study_uid);
1185 std::string_view destination_node_id,
1186 const std::vector<std::string>& instance_uids,
1190 job.job_id = generate_uuid();
1193 job.priority = priority;
1194 job.destination_node_id = std::string(destination_node_id);
1195 job.instance_uids = instance_uids;
1196 job.progress.total_items = instance_uids.size();
1197 job.created_at = std::chrono::system_clock::now();
1200 impl_->
logger->info_fmt(
"Created store job {}: {} instances to {}",
1201 job.job_id, instance_uids.size(), destination_node_id);
1207 std::string_view node_id,
1208 std::string_view query_level,
1209 const std::unordered_map<std::string, std::string>& query_keys,
1213 job.job_id = generate_uuid();
1216 job.priority = priority;
1217 job.source_node_id = std::string(node_id);
1218 job.metadata[
"query_level"] = std::string(query_level);
1219 for (
const auto& [key, value] : query_keys) {
1220 job.metadata[
"query_" + key] = value;
1222 job.created_at = std::chrono::system_clock::now();
1225 impl_->
logger->info_fmt(
"Created query job {}: level={}", job.job_id, query_level);
1231 std::string_view source_node_id,
1232 std::optional<std::string_view> patient_id,
1236 job.job_id = generate_uuid();
1239 job.priority = priority;
1240 job.source_node_id = std::string(source_node_id);
1242 job.patient_id = std::string(*patient_id);
1244 job.created_at = std::chrono::system_clock::now();
1247 impl_->
logger->info_fmt(
"Created sync job {}: from {}", job.job_id, source_node_id);
1253 std::string_view source_node_id,
1254 std::string_view patient_id,
1258 job.job_id = generate_uuid();
1261 job.priority = priority;
1262 job.source_node_id = std::string(source_node_id);
1263 job.patient_id = std::string(patient_id);
1264 job.created_at = std::chrono::system_clock::now();
1267 impl_->
logger->info_fmt(
"Created prefetch job {}: patient={}", job.job_id, patient_id);
1280 kcenon::pacs::error_codes::not_found,
1281 "Job not found: " + std::string(job_id));
1284 if (!job_opt->can_start()) {
1286 kcenon::pacs::error_codes::invalid_argument,
1287 "Job cannot be started in current state: " + std::string(
to_string(job_opt->status)));
1296 it->second.queued_at = std::chrono::system_clock::now();
1301 impl_->
logger->info_fmt(
"Started job {}", job_id);
1302 return kcenon::pacs::ok();
1309 kcenon::pacs::error_codes::not_found,
1310 "Job not found: " + std::string(job_id));
1313 if (!job_opt->can_pause()) {
1315 kcenon::pacs::error_codes::invalid_argument,
1316 "Job cannot be paused in current state: " + std::string(
to_string(job_opt->status)));
1326 return kcenon::pacs::ok();
1333 kcenon::pacs::error_codes::not_found,
1334 "Job not found: " + std::string(job_id));
1339 kcenon::pacs::error_codes::invalid_argument,
1340 "Job is not paused: " + std::string(
to_string(job_opt->status)));
1349 impl_->
logger->info_fmt(
"Resumed job {}", job_id);
1350 return kcenon::pacs::ok();
1357 kcenon::pacs::error_codes::not_found,
1358 "Job not found: " + std::string(job_id));
1361 if (!job_opt->can_cancel()) {
1363 kcenon::pacs::error_codes::invalid_argument,
1364 "Job cannot be cancelled in current state: " + std::string(
to_string(job_opt->status)));
1384 impl_->
logger->info_fmt(
"Cancelled job {}", job_id);
1385 return kcenon::pacs::ok();
1392 kcenon::pacs::error_codes::not_found,
1393 "Job not found: " + std::string(job_id));
1396 if (!job_opt->can_retry()) {
1398 kcenon::pacs::error_codes::invalid_argument,
1399 "Job cannot be retried: " + std::string(
to_string(job_opt->status)));
1408 it->second.retry_count++;
1409 it->second.error_message.clear();
1410 it->second.error_details.clear();
1411 it->second.started_at = std::nullopt;
1412 it->second.completed_at = std::nullopt;
1418 [[maybe_unused]]
auto result =
impl_->
repo->increment_retry(job_id);
1422 impl_->
logger->info_fmt(
"Retrying job {} (attempt {})", job_id, job_opt->retry_count + 1);
1423 return kcenon::pacs::ok();
1430 kcenon::pacs::error_codes::not_found,
1431 "Job not found: " + std::string(job_id));
1435 if (!job_opt->is_finished()) {
1437 if (cancel_result.is_err()) {
1438 return cancel_result;
1450 [[maybe_unused]]
auto result =
impl_->
repo->remove(std::string(job_id));
1453 impl_->
logger->info_fmt(
"Deleted job {}", job_id);
1454 return kcenon::pacs::ok();
1466 std::optional<job_status> status,
1467 std::optional<job_type> type,
1469 size_t offset)
const {
1474 options.type = type;
1475 options.limit = limit;
1476 options.offset = offset;
1477#ifdef PACS_WITH_DATABASE_SYSTEM
1478 auto result =
impl_->
repo->find_jobs(options);
1479 if (result.is_ok()) {
1480 return result.value();
1489 std::vector<job_record> result;
1492 if (status && job.status != *status)
continue;
1493 if (type && job.type != *type)
continue;
1498 result.push_back(job);
1499 if (result.size() >= limit)
break;
1506#ifdef PACS_WITH_DATABASE_SYSTEM
1507 auto result =
impl_->
repo->find_by_node(node_id);
1508 if (result.is_ok()) {
1509 return result.value();
1513 return impl_->
repo->find_by_node(node_id);
1518 std::vector<job_record> result;
1520 std::string node_str(node_id);
1522 if (job.source_node_id == node_str || job.destination_node_id == node_str) {
1523 result.push_back(job);
1536 return job_opt->progress;
1556 auto promise = std::make_shared<std::promise<job_record>>();
1557 auto future = promise->get_future();
1561 if (job_opt && job_opt->is_finished()) {
1562 promise->set_value(*job_opt);
1605 if (worker.joinable()) {
1634#ifdef PACS_WITH_DATABASE_SYSTEM
1635 auto result =
impl_->
repo->count_completed_today();
1636 if (result.is_ok()) {
1637 return result.value();
1641 return impl_->
repo->count_completed_today();
1649#ifdef PACS_WITH_DATABASE_SYSTEM
1650 auto result =
impl_->
repo->count_failed_today();
1651 if (result.is_ok()) {
1652 return result.value();
1656 return impl_->
repo->count_failed_today();
auto pending_jobs() const -> size_t
Get number of pending jobs.
void set_progress_callback(job_progress_callback callback)
Set the progress callback.
auto is_running() const noexcept -> bool
Check if workers are running.
auto delete_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Delete a job.
auto create_prefetch_job(std::string_view source_node_id, std::string_view patient_id, job_priority priority=job_priority::low) -> std::string
Create a prefetch job.
auto cancel_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Cancel a job.
auto create_retrieve_job(std::string_view source_node_id, std::string_view study_uid, std::optional< std::string_view > series_uid=std::nullopt, job_priority priority=job_priority::normal) -> std::string
Create a retrieve job (C-MOVE/C-GET)
void start_workers()
Start the worker threads.
auto pause_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Pause a running or queued job.
auto completed_jobs_today() const -> size_t
Get number of jobs completed today.
auto config() const noexcept -> const job_manager_config &
Get current configuration.
auto get_progress(std::string_view job_id) const -> job_progress
Get current progress for a job.
auto resume_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Resume a paused job.
auto list_jobs_by_node(std::string_view node_id) const -> std::vector< job_record >
List jobs by node ID.
auto create_store_job(std::string_view destination_node_id, const std::vector< std::string > &instance_uids, job_priority priority=job_priority::normal) -> std::string
Create a store job (C-STORE)
job_manager(std::shared_ptr< storage::job_repository > repo, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a job manager with default configuration.
std::unique_ptr< impl > impl_
auto create_sync_job(std::string_view source_node_id, std::optional< std::string_view > patient_id=std::nullopt, job_priority priority=job_priority::low) -> std::string
Create a sync job.
auto create_query_job(std::string_view node_id, std::string_view query_level, const std::unordered_map< std::string, std::string > &query_keys, job_priority priority=job_priority::normal) -> std::string
Create a query job (C-FIND)
auto list_jobs(std::optional< job_status > status=std::nullopt, std::optional< job_type > type=std::nullopt, size_t limit=100, size_t offset=0) const -> std::vector< job_record >
List jobs with optional filters.
auto wait_for_completion(std::string_view job_id) -> std::future< job_record >
Wait for a job to complete.
auto active_jobs() const -> size_t
Get number of active (running) jobs.
~job_manager()
Destructor - stops workers if running.
void set_completion_callback(job_completion_callback callback)
Set the completion callback.
auto get_job(std::string_view job_id) const -> std::optional< job_record >
Get a job by ID.
auto start_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Start a pending job.
auto failed_jobs_today() const -> size_t
Get number of jobs failed today.
auto retry_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Retry a failed job.
void stop_workers()
Stop the worker threads.
void set_string(dicom_tag tag, encoding::vr_type vr, std::string_view value)
Set a string value for the given tag.
network::Result< query_result > find(network::association &assoc, const core::dicom_dataset &query_keys)
Perform a C-FIND query with raw dataset.
network::Result< retrieve_result > retrieve_series(network::association &assoc, std::string_view series_uid, retrieve_progress_callback progress=nullptr)
Retrieve a series by Series Instance UID.
network::Result< retrieve_result > retrieve_study(network::association &assoc, std::string_view study_uid, retrieve_progress_callback progress=nullptr)
Retrieve a study by Study Instance UID.
DICOM Part 10 file handling for reading/writing DICOM files.
Compile-time constants for commonly used DICOM tags.
PACS index database for metadata storage and retrieval.
Job manager for asynchronous DICOM operations.
Repository for job persistence using base_repository pattern.
std::function< void( const std::string &job_id, const job_progress &progress)> job_progress_callback
Callback for job progress updates.
std::function< void( const std::string &job_id, const job_record &record)> job_completion_callback
Callback for job completion.
constexpr bool is_terminal_status(job_status status) noexcept
Check if job status is a terminal state.
@ prefetch
Prefetch prior studies.
@ store
C-STORE operation.
@ retrieve
C-MOVE/C-GET operation.
job_priority
Priority level for job execution.
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
job_status
Current status of a job.
@ failed
Job failed with error.
@ cancelled
Job was cancelled by user.
@ running
Job is currently executing.
@ pending
Job created but not yet queued.
@ queued
Job is in the execution queue.
@ completed
Job completed successfully.
std::shared_ptr< ILogger > null_logger()
Get a shared null logger instance.
@ DA
Date (8 chars, format: YYYYMMDD)
@ LO
Long String (64 chars max)
@ UI
Unique Identifier (64 chars max)
@ PN
Person Name (64 chars max per component group)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
@ SH
Short String (16 chars max)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
@ c_get
Receive directly from SCP on same association.
@ c_move
Request SCP to send to third party (requires move destination)
constexpr std::string_view study_root_move_sop_class_uid
Study Root Query/Retrieve Information Model - MOVE.
constexpr std::string_view study_root_get_sop_class_uid
Study Root Query/Retrieve Information Model - GET.
@ study
Study level - query study information.
@ image
Image (Instance) level - query instance information.
@ patient
Patient level - query patient demographics.
@ series
Series level - query series information.
@ study_root
Study Root Query/Retrieve Information Model.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
DICOM Query SCU service (C-FIND sender)
Remote PACS node manager for client operations.
DICOM Retrieve SCU service (C-MOVE/C-GET sender)
DICOM Storage SCU service (C-STORE sender)
void execute_store_job(job_record &job)
std::vector< std::thread > workers
void execute_retrieve_job(job_record &job)
bool is_job_paused(const std::string &job_id)
void load_pending_jobs_from_repo()
void execute_query_job(job_record &job)
void save_job(const job_record &job)
void update_job_status(const std::string &job_id, job_status status, const std::string &error_msg="", const std::string &error_details="")
std::unordered_set< std::string > active_job_ids
void notify_progress(const std::string &job_id, const job_progress &progress)
std::condition_variable queue_cv
void complete_job(const std::string &job_id, job_status status, const std::string &error_msg="")
std::mutex cancelled_mutex
std::atomic< bool > running
std::unordered_set< std::string > cancelled_job_ids
void execute_sync_job(job_record &job)
void mark_job_active(const std::string &job_id)
void update_progress(const std::string &job_id, const job_progress &progress)
bool is_job_cancelled(const std::string &job_id)
void execute_prefetch_job(job_record &job)
std::unordered_set< std::string > paused_job_ids
std::priority_queue< queue_entry > job_queue
job_manager_config config
job_completion_callback completion_callback
std::shared_ptr< di::ILogger > logger
void enqueue_job(const std::string &job_id, job_priority priority)
void mark_job_inactive(const std::string &job_id)
std::unordered_map< std::string, job_record > job_cache
std::shared_ptr< storage::job_repository > repo
void execute_job(job_record &job)
std::shared_ptr< remote_node_manager > node_manager
std::shared_mutex cache_mutex
void notify_completion(const std::string &job_id, const job_record &record)
std::unordered_map< std::string, std::shared_ptr< std::promise< job_record > > > completion_promises
job_progress_callback progress_callback
std::shared_mutex callbacks_mutex
std::optional< job_record > get_job_from_cache(std::string_view job_id) const
std::mutex promises_mutex
Configuration for the job manager.
size_t max_queue_size
Maximum jobs in queue.
size_t worker_count
Number of worker threads.
std::string local_ae_title
Local AE title for operations.
Progress tracking for a job.
std::chrono::milliseconds elapsed
Time elapsed since start.
size_t failed_items
Failed items.
std::string current_item
Current SOP Instance UID being processed.
size_t skipped_items
Skipped items.
std::string current_item_description
Human-readable description.
size_t completed_items
Successfully completed items.
size_t total_items
Total number of items to process.
float percent_complete
Completion percentage (0-100)
void calculate_percent() noexcept
Calculate completion percentage from item counts.
Complete job record with all metadata.
bool operator<(const queue_entry &other) const
std::chrono::system_clock::time_point queued_at
Configuration for Query SCU service.
query_model model
Query information model (Patient Root or Study Root)
query_level level
Query level (Patient, Study, Series, or Image)
Progress information for a retrieve operation.
Configuration for Retrieve SCU service.
query_level level
Query level (Study, Series, or Image)
query_model model
Query information model (Patient Root or Study Root)
retrieve_mode mode
Retrieve mode (C-MOVE or C-GET)
std::string move_destination
Move destination AE title (required for C-MOVE mode)
Configuration for Storage SCU service.
bool continue_on_error
Continue batch operation on error (true) or stop on first error (false)
Query options for listing jobs.
std::optional< client::job_status > status
Filter by status.