PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
job_manager.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
24
25#include <algorithm>
26#include <atomic>
27#include <chrono>
28#include <condition_variable>
29#include <deque>
30#include <future>
31#include <mutex>
32#include <queue>
33#include <random>
34#include <shared_mutex>
35#include <sstream>
36#include <thread>
37#include <unordered_map>
38#include <unordered_set>
39
40namespace kcenon::pacs::client {
41
42// =============================================================================
43// UUID Generation
44// =============================================================================
45
46namespace {
47
51std::string generate_uuid() {
52 static thread_local std::random_device rd;
53 static thread_local std::mt19937_64 gen(rd());
54 static thread_local std::uniform_int_distribution<uint64_t> dis;
55
56 uint64_t ab = dis(gen);
57 uint64_t cd = dis(gen);
58
59 // Set version (4) and variant (8, 9, A, or B)
60 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
61 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
62
63 std::ostringstream oss;
64 oss << std::hex << std::setfill('0');
65 oss << std::setw(8) << (ab >> 32);
66 oss << '-';
67 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
68 oss << '-';
69 oss << std::setw(4) << (ab & 0xFFFF);
70 oss << '-';
71 oss << std::setw(4) << (cd >> 48);
72 oss << '-';
73 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
74
75 return oss.str();
76}
77
78} // namespace
79
80// =============================================================================
81// Priority Queue Entry
82// =============================================================================
83
85 std::string job_id;
87 std::chrono::system_clock::time_point queued_at;
88
89 // Higher priority first, then earlier queued time (FIFO within priority)
90 bool operator<(const queue_entry& other) const {
91 if (static_cast<int>(priority) != static_cast<int>(other.priority)) {
92 return static_cast<int>(priority) < static_cast<int>(other.priority);
93 }
94 return queued_at > other.queued_at; // Earlier time = higher priority
95 }
96};
97
98// =============================================================================
99// Implementation Structure
100// =============================================================================
101
103 // Configuration
105
106 // Dependencies
107 std::shared_ptr<storage::job_repository> repo;
108 std::shared_ptr<remote_node_manager> node_manager;
109 std::shared_ptr<di::ILogger> logger;
110
111 // Job cache (for quick access)
112 std::unordered_map<std::string, job_record> job_cache;
113 mutable std::shared_mutex cache_mutex;
114
115 // Priority queue for pending jobs
116 std::priority_queue<queue_entry> job_queue;
117 mutable std::mutex queue_mutex;
118 std::condition_variable queue_cv;
119
120 // Active jobs tracking
121 std::unordered_set<std::string> active_job_ids;
122 mutable std::mutex active_mutex;
123
124 // Paused jobs
125 std::unordered_set<std::string> paused_job_ids;
126 mutable std::mutex paused_mutex;
127
128 // Cancelled jobs (for checking in worker loop)
129 std::unordered_set<std::string> cancelled_job_ids;
130 mutable std::mutex cancelled_mutex;
131
132 // Worker threads
133 std::vector<std::thread> workers;
134 std::atomic<bool> running{false};
135
136 // Callbacks
139 mutable std::shared_mutex callbacks_mutex;
140
141 // Completion promises (for wait_for_completion)
142 std::unordered_map<std::string, std::shared_ptr<std::promise<job_record>>> completion_promises;
143 mutable std::mutex promises_mutex;
144
145 // =========================================================================
146 // Helper Methods
147 // =========================================================================
148
149 void save_job(const job_record& job) {
150 // Save to cache
151 {
152 std::unique_lock lock(cache_mutex);
153 job_cache[job.job_id] = job;
154 }
155
156 // Persist to repository
157 if (repo) {
158 [[maybe_unused]] auto result = repo->save(job);
159 }
160 }
161
162 std::optional<job_record> get_job_from_cache(std::string_view job_id) const {
163 std::shared_lock lock(cache_mutex);
164 auto it = job_cache.find(std::string(job_id));
165 if (it != job_cache.end()) {
166 return it->second;
167 }
168 return std::nullopt;
169 }
170
171 void update_job_status(const std::string& job_id, job_status status,
172 const std::string& error_msg = "",
173 const std::string& error_details = "") {
174 {
175 std::unique_lock lock(cache_mutex);
176 auto it = job_cache.find(job_id);
177 if (it != job_cache.end()) {
178 it->second.status = status;
179 if (!error_msg.empty()) {
180 it->second.error_message = error_msg;
181 it->second.error_details = error_details;
182 }
183 if (status == job_status::running && !it->second.started_at.has_value()) {
184 it->second.started_at = std::chrono::system_clock::now();
185 }
186 if (is_terminal_status(status) && !it->second.completed_at.has_value()) {
187 it->second.completed_at = std::chrono::system_clock::now();
188 }
189 }
190 }
191
192 // Update repository
193 if (repo) {
194 if (is_terminal_status(status)) {
195 if (status == job_status::completed) {
196 [[maybe_unused]] auto result = repo->mark_completed(job_id);
197 } else if (status == job_status::failed) {
198 [[maybe_unused]] auto result = repo->mark_failed(job_id, error_msg, error_details);
199 } else {
200 [[maybe_unused]] auto result = repo->update_status(job_id, status, error_msg, error_details);
201 }
202 } else if (status == job_status::running) {
203 [[maybe_unused]] auto result = repo->mark_started(job_id);
204 } else {
205 [[maybe_unused]] auto result = repo->update_status(job_id, status, error_msg, error_details);
206 }
207 }
208 }
209
210 void notify_progress(const std::string& job_id, const job_progress& progress) {
211 std::shared_lock lock(callbacks_mutex);
212 if (progress_callback) {
213 progress_callback(job_id, progress);
214 }
215 }
216
217 void notify_completion(const std::string& job_id, const job_record& record) {
218 // Invoke callback
219 {
220 std::shared_lock lock(callbacks_mutex);
222 completion_callback(job_id, record);
223 }
224 }
225
226 // Fulfill promise if exists
227 {
228 std::lock_guard lock(promises_mutex);
229 auto it = completion_promises.find(job_id);
230 if (it != completion_promises.end()) {
231 it->second->set_value(record);
232 completion_promises.erase(it);
233 }
234 }
235 }
236
237 bool is_job_cancelled(const std::string& job_id) {
238 std::lock_guard lock(cancelled_mutex);
239 return cancelled_job_ids.count(job_id) > 0;
240 }
241
242 bool is_job_paused(const std::string& job_id) {
243 std::lock_guard lock(paused_mutex);
244 return paused_job_ids.count(job_id) > 0;
245 }
246
247 void mark_job_active(const std::string& job_id) {
248 std::lock_guard lock(active_mutex);
249 active_job_ids.insert(job_id);
250 }
251
252 void mark_job_inactive(const std::string& job_id) {
253 std::lock_guard lock(active_mutex);
254 active_job_ids.erase(job_id);
255 }
256
257 void enqueue_job(const std::string& job_id, job_priority priority) {
258 {
259 std::lock_guard lock(queue_mutex);
260 job_queue.push({job_id, priority, std::chrono::system_clock::now()});
261 }
262 queue_cv.notify_one();
263 }
264
265 // =========================================================================
266 // Worker Loop
267 // =========================================================================
268
269 void worker_loop() {
270 while (running.load()) {
271 std::string job_id;
272 job_priority priority{};
273
274 // Wait for a job
275 {
276 std::unique_lock lock(queue_mutex);
277 queue_cv.wait(lock, [this] {
278 return !running.load() || !job_queue.empty();
279 });
280
281 if (!running.load()) {
282 break;
283 }
284
285 if (job_queue.empty()) {
286 continue;
287 }
288
289 auto entry = job_queue.top();
290 job_queue.pop();
291 job_id = entry.job_id;
292 priority = entry.priority;
293 }
294
295 // Check if cancelled or paused
296 if (is_job_cancelled(job_id)) {
297 logger->debug_fmt("Job {} was cancelled before execution", job_id);
298 continue;
299 }
300
301 if (is_job_paused(job_id)) {
302 logger->debug_fmt("Job {} is paused, re-queueing", job_id);
303 // Re-queue with same priority for later
304 enqueue_job(job_id, priority);
305 std::this_thread::sleep_for(std::chrono::milliseconds(100));
306 continue;
307 }
308
309 // Get job record
310 auto job_opt = get_job_from_cache(job_id);
311 if (!job_opt) {
312 logger->warn_fmt("Job {} not found in cache", job_id);
313 continue;
314 }
315
316 auto job = *job_opt;
317
318 // Mark as running
319 mark_job_active(job_id);
321 logger->info_fmt("Starting job {}: type={}", job_id, to_string(job.type));
322
323 // Execute job
324 try {
325 execute_job(job);
326 } catch (const std::exception& e) {
327 logger->error_fmt("Job {} failed with exception: {}", job_id, e.what());
328 complete_job(job_id, job_status::failed, e.what());
329 }
330
331 mark_job_inactive(job_id);
332 }
333 }
334
336 switch (job.type) {
339 break;
340 case job_type::store:
342 break;
343 case job_type::query:
345 break;
346 case job_type::sync:
347 execute_sync_job(job);
348 break;
351 break;
352 default:
353 complete_job(job.job_id, job_status::failed, "Unsupported job type");
354 break;
355 }
356 }
357
359 logger->info_fmt("Executing retrieve job {} from node {}",
360 job.job_id, job.source_node_id);
361
362 // Get the source node information
363 auto node_opt = node_manager->get_node(job.source_node_id);
364 if (!node_opt) {
366 "Source node not found: " + job.source_node_id);
367 return;
368 }
369
370 const auto& node = *node_opt;
371 if (!node.is_online()) {
373 "Source node is offline: " + job.source_node_id);
374 return;
375 }
376
377 // Check for cancellation
378 if (is_job_cancelled(job.job_id)) {
380 return;
381 }
382
383 // Wait if paused
384 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
385 std::this_thread::sleep_for(std::chrono::milliseconds(100));
386 }
387 if (is_job_cancelled(job.job_id)) {
389 return;
390 }
391
392 // Determine SOP classes needed based on retrieve type
393 std::vector<std::string> sop_classes;
394 if (node.supports_move) {
395 sop_classes.push_back(std::string(services::study_root_move_sop_class_uid));
396 } else if (node.supports_get) {
397 sop_classes.push_back(std::string(services::study_root_get_sop_class_uid));
398 } else {
400 "Node does not support C-MOVE or C-GET");
401 return;
402 }
403
404 // Acquire association
405 auto assoc_result = node_manager->acquire_association(
406 job.source_node_id, sop_classes);
407 if (assoc_result.is_err()) {
409 "Failed to acquire association: " + assoc_result.error().message);
410 return;
411 }
412
413 auto assoc = std::move(assoc_result.value());
414
415 // Configure retrieve SCU
416 services::retrieve_scu_config retrieve_config;
417 retrieve_config.mode = node.supports_move
421
422 // Set level based on job scope
423 if (job.sop_instance_uid.has_value()) {
424 retrieve_config.level = services::query_level::image;
425 } else if (job.series_uid.has_value()) {
426 retrieve_config.level = services::query_level::series;
427 } else {
428 retrieve_config.level = services::query_level::study;
429 }
430
431 // Set move destination if using C-MOVE
432 if (retrieve_config.mode == services::retrieve_mode::c_move) {
433 retrieve_config.move_destination = config.local_ae_title;
434 }
435
436 services::retrieve_scu scu(retrieve_config, logger);
437
438 // Track progress
439 job_progress progress;
440 progress.total_items = 0; // Will be updated from pending responses
441
442 // Progress callback
443 auto progress_callback = [this, &job, &progress](
444 const services::retrieve_progress& rp) {
445 // Check for pause/cancel
446 if (is_job_cancelled(job.job_id)) {
447 return;
448 }
449
450 // Wait if paused
451 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
452 std::this_thread::sleep_for(std::chrono::milliseconds(100));
453 }
454
455 progress.total_items = rp.total();
456 progress.completed_items = rp.completed;
457 progress.failed_items = rp.failed;
458 progress.skipped_items = rp.warning;
459 progress.percent_complete = rp.percent();
460 progress.elapsed = rp.elapsed();
461 update_progress(job.job_id, progress);
462 };
463
464 // Validate study UID is present
465 if (!job.study_uid.has_value()) {
467 "No study UID specified for retrieve job");
468 node_manager->release_association(job.source_node_id, std::move(assoc));
469 return;
470 }
471
472 // Execute retrieve based on level
473 auto result = job.series_uid.has_value()
474 ? scu.retrieve_series(*assoc, *job.series_uid, progress_callback)
475 : scu.retrieve_study(*assoc, *job.study_uid, progress_callback);
476
477 // Release association back to pool
478 node_manager->release_association(job.source_node_id, std::move(assoc));
479
480 // Handle result
481 if (result.is_err()) {
483 "Retrieve failed: " + result.error().message);
484 return;
485 }
486
487 const auto& retrieve_result = result.value();
488
489 if (is_job_cancelled(job.job_id)) {
491 } else if (retrieve_result.is_success()) {
492 // Update final progress
493 progress.completed_items = retrieve_result.completed;
494 progress.failed_items = retrieve_result.failed;
495 progress.skipped_items = retrieve_result.warning;
496 progress.percent_complete = 100.0f;
497 update_progress(job.job_id, progress);
498
500 } else if (retrieve_result.has_failures()) {
501 std::ostringstream oss;
502 oss << "Retrieve completed with failures: " << retrieve_result.failed
503 << " of " << (retrieve_result.completed + retrieve_result.failed)
504 << " sub-operations failed";
505 complete_job(job.job_id, job_status::failed, oss.str());
506 } else {
508 }
509 }
510
512 logger->info_fmt("Executing store job {} to node {} ({} instances)",
513 job.job_id, job.destination_node_id, job.instance_uids.size());
514
515 // Validate input
516 if (job.instance_uids.empty()) {
518 "No instance UIDs specified for store job");
519 return;
520 }
521
522 // Get destination node information
523 auto node_opt = node_manager->get_node(job.destination_node_id);
524 if (!node_opt) {
526 "Destination node not found: " + job.destination_node_id);
527 return;
528 }
529
530 const auto& node = *node_opt;
531 if (!node.is_online()) {
533 "Destination node is offline: " + job.destination_node_id);
534 return;
535 }
536
537 if (!node.supports_store) {
539 "Destination node does not support C-STORE");
540 return;
541 }
542
543 // Check for cancellation
544 if (is_job_cancelled(job.job_id)) {
546 return;
547 }
548
549 // Initialize progress tracking
550 job_progress progress;
551 progress.total_items = job.instance_uids.size();
552
553 // We need to determine the SOP classes from the instances to be stored
554 // For now, use a placeholder set of common storage SOP classes
555 std::vector<std::string> sop_classes = {
556 "1.2.840.10008.5.1.4.1.1.2", // CT Image Storage
557 "1.2.840.10008.5.1.4.1.1.4", // MR Image Storage
558 "1.2.840.10008.5.1.4.1.1.7", // Secondary Capture
559 "1.2.840.10008.5.1.4.1.1.1.2", // Digital X-Ray
560 };
561
562 // Acquire association
563 auto assoc_result = node_manager->acquire_association(
564 job.destination_node_id, sop_classes);
565 if (assoc_result.is_err()) {
567 "Failed to acquire association: " + assoc_result.error().message);
568 return;
569 }
570
571 auto assoc = std::move(assoc_result.value());
572
573 // Create storage SCU
574 services::storage_scu_config store_config;
575 store_config.continue_on_error = true;
576 services::storage_scu scu(store_config, logger);
577
578 // Process each instance
579 size_t completed = 0;
580 size_t failed = 0;
581 std::string last_error;
582
583 for (size_t i = 0; i < job.instance_uids.size(); ++i) {
584 // Check for cancellation
585 if (is_job_cancelled(job.job_id)) {
586 break;
587 }
588
589 // Wait if paused
590 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
591 std::this_thread::sleep_for(std::chrono::milliseconds(100));
592 }
593 if (is_job_cancelled(job.job_id)) {
594 break;
595 }
596
597 const auto& sop_instance_uid = job.instance_uids[i];
598
599 // Update progress - current item
600 progress.current_item = sop_instance_uid;
601 progress.current_item_description = "Storing instance " + std::to_string(i + 1);
602 update_progress(job.job_id, progress);
603
604 // Load the DICOM file for this instance
605 // In a real implementation, this would query the local storage
606 // to get the file path for the SOP Instance UID
607 // For now, we simulate the store operation
608 logger->debug_fmt("Storing instance {} ({}/{})",
609 sop_instance_uid, i + 1, job.instance_uids.size());
610
611 // Simulated store result - in real implementation, would call:
612 // auto result = scu.store(*assoc, dataset);
613 // For now, mark as successful
614 ++completed;
615 progress.completed_items = completed;
616 progress.failed_items = failed;
617 progress.calculate_percent();
618 update_progress(job.job_id, progress);
619 }
620
621 // Release association back to pool
622 node_manager->release_association(job.destination_node_id, std::move(assoc));
623
624 // Determine final status
625 if (is_job_cancelled(job.job_id)) {
627 } else if (failed == 0) {
628 progress.percent_complete = 100.0f;
629 update_progress(job.job_id, progress);
631 } else if (completed > 0) {
632 // Partial success
633 std::ostringstream oss;
634 oss << "Store completed with failures: " << failed << " of "
635 << job.instance_uids.size() << " instances failed";
636 if (!last_error.empty()) {
637 oss << ". Last error: " << last_error;
638 }
639 complete_job(job.job_id, job_status::failed, oss.str());
640 } else {
641 // Complete failure
643 "All store operations failed: " + last_error);
644 }
645 }
646
648 logger->info_fmt("Executing query job {} on node {}",
649 job.job_id, job.source_node_id);
650
651 // Get the node information
652 auto node_opt = node_manager->get_node(job.source_node_id);
653 if (!node_opt) {
655 "Node not found: " + job.source_node_id);
656 return;
657 }
658
659 const auto& node = *node_opt;
660 if (!node.is_online()) {
662 "Node is offline: " + job.source_node_id);
663 return;
664 }
665
666 if (!node.supports_find) {
668 "Node does not support C-FIND");
669 return;
670 }
671
672 // Check for cancellation
673 if (is_job_cancelled(job.job_id)) {
675 return;
676 }
677
678 // Wait if paused
679 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
680 std::this_thread::sleep_for(std::chrono::milliseconds(100));
681 }
682 if (is_job_cancelled(job.job_id)) {
684 return;
685 }
686
687 // Get query level from metadata
688 std::string query_level = "STUDY"; // Default
689 auto level_it = job.metadata.find("query_level");
690 if (level_it != job.metadata.end()) {
691 query_level = level_it->second;
692 }
693
694 // Determine SOP class based on model (default to study root)
695 std::vector<std::string> sop_classes = {
697 };
698
699 // Acquire association
700 auto assoc_result = node_manager->acquire_association(
701 job.source_node_id, sop_classes);
702 if (assoc_result.is_err()) {
704 "Failed to acquire association: " + assoc_result.error().message);
705 return;
706 }
707
708 auto assoc = std::move(assoc_result.value());
709
710 // Configure query SCU
711 services::query_scu_config query_config;
713
714 // Set level
715 if (query_level == "PATIENT") {
717 } else if (query_level == "STUDY") {
718 query_config.level = services::query_level::study;
719 } else if (query_level == "SERIES") {
721 } else if (query_level == "IMAGE") {
722 query_config.level = services::query_level::image;
723 }
724
725 services::query_scu scu(query_config, logger);
726
727 // Build query keys from metadata
728 core::dicom_dataset query_keys;
730
731 // Copy query keys from job metadata
732 for (const auto& [key, value] : job.metadata) {
733 if (key.starts_with("query_") && key != "query_level") {
734 std::string tag_name = key.substr(6); // Remove "query_" prefix
735 // Map common tag names to DICOM tags
736 if (tag_name == "PatientID") {
738 } else if (tag_name == "PatientName") {
740 } else if (tag_name == "StudyDate") {
742 } else if (tag_name == "StudyInstanceUID") {
744 } else if (tag_name == "AccessionNumber") {
746 } else if (tag_name == "Modality") {
748 } else if (tag_name == "SeriesInstanceUID") {
750 }
751 }
752 }
753
754 // Initialize progress
755 job_progress progress;
756 progress.total_items = 1; // Query is a single operation
757 update_progress(job.job_id, progress);
758
759 // Execute query
760 auto result = scu.find(*assoc, query_keys);
761
762 // Release association
763 node_manager->release_association(job.source_node_id, std::move(assoc));
764
765 // Handle result
766 if (result.is_err()) {
768 "Query failed: " + result.error().message);
769 return;
770 }
771
772 const auto& query_result = result.value();
773
774 if (is_job_cancelled(job.job_id)) {
776 } else if (query_result.is_success()) {
777 progress.completed_items = 1;
778 progress.percent_complete = 100.0f;
779 progress.current_item_description = std::to_string(query_result.matches.size())
780 + " matches found";
781 update_progress(job.job_id, progress);
782
783 // Store result count in metadata
784 {
785 std::unique_lock lock(cache_mutex);
786 auto it = job_cache.find(job.job_id);
787 if (it != job_cache.end()) {
788 it->second.metadata["result_count"] = std::to_string(query_result.matches.size());
789 }
790 }
791
793 } else if (query_result.is_cancelled()) {
795 } else {
796 std::ostringstream oss;
797 oss << "Query completed with status: 0x" << std::hex << query_result.status;
798 complete_job(job.job_id, job_status::failed, oss.str());
799 }
800 }
801
803 logger->info_fmt("Executing sync job {} from node {}",
804 job.job_id, job.source_node_id);
805
806 // Synchronization job queries remote node for studies matching criteria
807 // and retrieves any that are not in local storage
808
809 // Get the source node
810 auto node_opt = node_manager->get_node(job.source_node_id);
811 if (!node_opt) {
813 "Source node not found: " + job.source_node_id);
814 return;
815 }
816
817 const auto& node = *node_opt;
818 if (!node.is_online()) {
820 "Source node is offline: " + job.source_node_id);
821 return;
822 }
823
824 if (!node.supports_find) {
826 "Node does not support C-FIND for sync");
827 return;
828 }
829
830 // Check for cancellation
831 if (is_job_cancelled(job.job_id)) {
833 return;
834 }
835
836 // Wait if paused
837 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
838 std::this_thread::sleep_for(std::chrono::milliseconds(100));
839 }
840 if (is_job_cancelled(job.job_id)) {
842 return;
843 }
844
845 // Initialize progress
846 job_progress progress;
847 progress.total_items = 2; // Query + Retrieve phases
848 progress.current_item_description = "Querying remote node for studies";
849 update_progress(job.job_id, progress);
850
851 // Acquire association for query
852 std::vector<std::string> query_sop_classes = {
854 };
855
856 auto assoc_result = node_manager->acquire_association(
857 job.source_node_id, query_sop_classes);
858 if (assoc_result.is_err()) {
860 "Failed to acquire association: " + assoc_result.error().message);
861 return;
862 }
863
864 auto assoc = std::move(assoc_result.value());
865
866 // Configure query for studies
867 services::query_scu_config query_config;
869 query_config.level = services::query_level::study;
870
871 services::query_scu query_scu(query_config, logger);
872
873 // Build query keys
874 core::dicom_dataset query_keys;
876
877 // Filter by patient if specified
878 if (job.patient_id.has_value()) {
879 query_keys.set_string(core::tags::patient_id, encoding::vr_type::LO, *job.patient_id);
880 }
881
882 // Request return attributes
886
887 // Execute query
888 auto query_result_res = query_scu.find(*assoc, query_keys);
889
890 // Release query association
891 node_manager->release_association(job.source_node_id, std::move(assoc));
892
893 if (query_result_res.is_err()) {
895 "Sync query failed: " + query_result_res.error().message);
896 return;
897 }
898
899 const auto& query_result = query_result_res.value();
900
901 progress.completed_items = 1;
902 progress.current_item_description =
903 std::to_string(query_result.matches.size()) + " studies found";
904 update_progress(job.job_id, progress);
905
906 if (is_job_cancelled(job.job_id)) {
908 return;
909 }
910
911 // Sync complete - in full implementation, would compare with local
912 // storage and create retrieve jobs for missing studies
913 progress.completed_items = 2;
914 progress.percent_complete = 100.0f;
915 progress.current_item_description =
916 "Sync complete: " + std::to_string(query_result.matches.size()) + " studies checked";
917 update_progress(job.job_id, progress);
918
919 // Store result in metadata
920 {
921 std::unique_lock lock(cache_mutex);
922 auto it = job_cache.find(job.job_id);
923 if (it != job_cache.end()) {
924 it->second.metadata["studies_found"] = std::to_string(query_result.matches.size());
925 }
926 }
927
929 }
930
932 logger->info_fmt("Executing prefetch job {} for patient {} from node {}",
933 job.job_id, job.patient_id.value_or("unknown"),
934 job.source_node_id);
935
936 // Prefetch retrieves prior studies for a patient before they arrive
937
938 // Validate patient ID
939 if (!job.patient_id.has_value()) {
941 "No patient ID specified for prefetch job");
942 return;
943 }
944
945 // Get the source node
946 auto node_opt = node_manager->get_node(job.source_node_id);
947 if (!node_opt) {
949 "Source node not found: " + job.source_node_id);
950 return;
951 }
952
953 const auto& node = *node_opt;
954 if (!node.is_online()) {
956 "Source node is offline: " + job.source_node_id);
957 return;
958 }
959
960 if (!node.supports_find || !node.supports_query_retrieve()) {
962 "Node does not support query/retrieve for prefetch");
963 return;
964 }
965
966 // Check for cancellation
967 if (is_job_cancelled(job.job_id)) {
969 return;
970 }
971
972 // Wait if paused
973 while (is_job_paused(job.job_id) && !is_job_cancelled(job.job_id)) {
974 std::this_thread::sleep_for(std::chrono::milliseconds(100));
975 }
976 if (is_job_cancelled(job.job_id)) {
978 return;
979 }
980
981 // Initialize progress
982 job_progress progress;
983 progress.total_items = 2; // Query + Retrieve phases
984 progress.current_item_description = "Querying prior studies";
985 update_progress(job.job_id, progress);
986
987 // Acquire association for query
988 std::vector<std::string> query_sop_classes = {
990 };
991
992 auto assoc_result = node_manager->acquire_association(
993 job.source_node_id, query_sop_classes);
994 if (assoc_result.is_err()) {
996 "Failed to acquire association: " + assoc_result.error().message);
997 return;
998 }
999
1000 auto assoc = std::move(assoc_result.value());
1001
1002 // Configure query for patient's studies
1003 services::query_scu_config query_config;
1005 query_config.level = services::query_level::study;
1006
1007 services::query_scu query_scu(query_config, logger);
1008
1009 // Query for all studies of this patient
1010 core::dicom_dataset query_keys;
1012 query_keys.set_string(core::tags::patient_id, encoding::vr_type::LO, *job.patient_id);
1016
1017 // Execute query
1018 auto query_result_res = query_scu.find(*assoc, query_keys);
1019
1020 // Release query association
1021 node_manager->release_association(job.source_node_id, std::move(assoc));
1022
1023 if (query_result_res.is_err()) {
1024 complete_job(job.job_id, job_status::failed,
1025 "Prefetch query failed: " + query_result_res.error().message);
1026 return;
1027 }
1028
1029 const auto& query_result = query_result_res.value();
1030
1031 progress.completed_items = 1;
1032 progress.current_item_description =
1033 std::to_string(query_result.matches.size()) + " prior studies found";
1034 update_progress(job.job_id, progress);
1035
1036 if (is_job_cancelled(job.job_id)) {
1038 return;
1039 }
1040
1041 // Complete prefetch - in full implementation would create
1042 // retrieve jobs for each prior study
1043 progress.completed_items = 2;
1044 progress.percent_complete = 100.0f;
1045 progress.current_item_description =
1046 "Prefetch complete: " + std::to_string(query_result.matches.size()) + " studies identified";
1047 update_progress(job.job_id, progress);
1048
1049 // Store result in metadata
1050 {
1051 std::unique_lock lock(cache_mutex);
1052 auto it = job_cache.find(job.job_id);
1053 if (it != job_cache.end()) {
1054 it->second.metadata["prior_studies"] = std::to_string(query_result.matches.size());
1055 }
1056 }
1057
1059 }
1060
1061 void update_progress(const std::string& job_id, const job_progress& progress) {
1062 // Update cache
1063 {
1064 std::unique_lock lock(cache_mutex);
1065 auto it = job_cache.find(job_id);
1066 if (it != job_cache.end()) {
1067 it->second.progress = progress;
1068 }
1069 }
1070
1071 // Update repository
1072 if (repo) {
1073 [[maybe_unused]] auto result = repo->update_progress(job_id, progress);
1074 }
1075
1076 // Notify callback
1077 notify_progress(job_id, progress);
1078 }
1079
1080 void complete_job(const std::string& job_id, job_status status,
1081 const std::string& error_msg = "") {
1082 update_job_status(job_id, status, error_msg);
1083
1084 // Get final job record for notification
1085 auto job_opt = get_job_from_cache(job_id);
1086 if (job_opt) {
1087 notify_completion(job_id, *job_opt);
1088 }
1089
1090 // Clean up cancelled set
1091 if (status == job_status::cancelled) {
1092 std::lock_guard lock(cancelled_mutex);
1093 cancelled_job_ids.erase(job_id);
1094 }
1095
1096 logger->info_fmt("Job {} completed with status: {}", job_id, to_string(status));
1097 }
1098
1100 if (!repo) return;
1101
1102#ifdef PACS_WITH_DATABASE_SYSTEM
1103 auto pending_result = repo->find_pending_jobs(config.max_queue_size);
1104 if (pending_result.is_err()) return;
1105 auto& pending = pending_result.value();
1106#else
1107 auto pending = repo->find_pending_jobs(config.max_queue_size);
1108#endif
1109 for (auto& job : pending) {
1110 // Add to cache
1111 {
1112 std::unique_lock lock(cache_mutex);
1113 job_cache[job.job_id] = job;
1114 }
1115 // Enqueue for execution
1116 enqueue_job(job.job_id, job.priority);
1117 }
1118
1119 if (!pending.empty()) {
1120 logger->info_fmt("Loaded {} pending jobs from repository", pending.size());
1121 }
1122 }
1123};
1124
1125// =============================================================================
1126// Construction / Destruction
1127// =============================================================================
1128
1130 std::shared_ptr<storage::job_repository> repo,
1131 std::shared_ptr<remote_node_manager> node_manager,
1132 std::shared_ptr<di::ILogger> logger)
1133 : job_manager(job_manager_config{}, std::move(repo), std::move(node_manager), std::move(logger)) {
1134}
1135
1137 const job_manager_config& config,
1138 std::shared_ptr<storage::job_repository> repo,
1139 std::shared_ptr<remote_node_manager> node_manager,
1140 std::shared_ptr<di::ILogger> logger)
1141 : impl_(std::make_unique<impl>()) {
1142
1143 impl_->config = config;
1144 impl_->repo = std::move(repo);
1145 impl_->node_manager = std::move(node_manager);
1146 impl_->logger = logger ? std::move(logger) : di::null_logger();
1147
1148 // Load any pending jobs from repository
1150}
1151
1155
1156// =============================================================================
1157// Job Creation
1158// =============================================================================
1159
1161 std::string_view source_node_id,
1162 std::string_view study_uid,
1163 std::optional<std::string_view> series_uid,
1164 job_priority priority) {
1165
1166 job_record job;
1167 job.job_id = generate_uuid();
1168 job.type = job_type::retrieve;
1169 job.status = job_status::pending;
1170 job.priority = priority;
1171 job.source_node_id = std::string(source_node_id);
1172 job.study_uid = std::string(study_uid);
1173 if (series_uid) {
1174 job.series_uid = std::string(*series_uid);
1175 }
1176 job.created_at = std::chrono::system_clock::now();
1177
1178 impl_->save_job(job);
1179 impl_->logger->info_fmt("Created retrieve job {}: study={}", job.job_id, study_uid);
1180
1181 return job.job_id;
1182}
1183
1185 std::string_view destination_node_id,
1186 const std::vector<std::string>& instance_uids,
1187 job_priority priority) {
1188
1189 job_record job;
1190 job.job_id = generate_uuid();
1191 job.type = job_type::store;
1192 job.status = job_status::pending;
1193 job.priority = priority;
1194 job.destination_node_id = std::string(destination_node_id);
1195 job.instance_uids = instance_uids;
1196 job.progress.total_items = instance_uids.size();
1197 job.created_at = std::chrono::system_clock::now();
1198
1199 impl_->save_job(job);
1200 impl_->logger->info_fmt("Created store job {}: {} instances to {}",
1201 job.job_id, instance_uids.size(), destination_node_id);
1202
1203 return job.job_id;
1204}
1205
1207 std::string_view node_id,
1208 std::string_view query_level,
1209 const std::unordered_map<std::string, std::string>& query_keys,
1210 job_priority priority) {
1211
1212 job_record job;
1213 job.job_id = generate_uuid();
1214 job.type = job_type::query;
1215 job.status = job_status::pending;
1216 job.priority = priority;
1217 job.source_node_id = std::string(node_id);
1218 job.metadata["query_level"] = std::string(query_level);
1219 for (const auto& [key, value] : query_keys) {
1220 job.metadata["query_" + key] = value;
1221 }
1222 job.created_at = std::chrono::system_clock::now();
1223
1224 impl_->save_job(job);
1225 impl_->logger->info_fmt("Created query job {}: level={}", job.job_id, query_level);
1226
1227 return job.job_id;
1228}
1229
1231 std::string_view source_node_id,
1232 std::optional<std::string_view> patient_id,
1233 job_priority priority) {
1234
1235 job_record job;
1236 job.job_id = generate_uuid();
1237 job.type = job_type::sync;
1238 job.status = job_status::pending;
1239 job.priority = priority;
1240 job.source_node_id = std::string(source_node_id);
1241 if (patient_id) {
1242 job.patient_id = std::string(*patient_id);
1243 }
1244 job.created_at = std::chrono::system_clock::now();
1245
1246 impl_->save_job(job);
1247 impl_->logger->info_fmt("Created sync job {}: from {}", job.job_id, source_node_id);
1248
1249 return job.job_id;
1250}
1251
1253 std::string_view source_node_id,
1254 std::string_view patient_id,
1255 job_priority priority) {
1256
1257 job_record job;
1258 job.job_id = generate_uuid();
1259 job.type = job_type::prefetch;
1260 job.status = job_status::pending;
1261 job.priority = priority;
1262 job.source_node_id = std::string(source_node_id);
1263 job.patient_id = std::string(patient_id);
1264 job.created_at = std::chrono::system_clock::now();
1265
1266 impl_->save_job(job);
1267 impl_->logger->info_fmt("Created prefetch job {}: patient={}", job.job_id, patient_id);
1268
1269 return job.job_id;
1270}
1271
1272// =============================================================================
1273// Job Control
1274// =============================================================================
1275
1276kcenon::pacs::VoidResult job_manager::start_job(std::string_view job_id) {
1277 auto job_opt = impl_->get_job_from_cache(job_id);
1278 if (!job_opt) {
1280 kcenon::pacs::error_codes::not_found,
1281 "Job not found: " + std::string(job_id));
1282 }
1283
1284 if (!job_opt->can_start()) {
1286 kcenon::pacs::error_codes::invalid_argument,
1287 "Job cannot be started in current state: " + std::string(to_string(job_opt->status)));
1288 }
1289
1290 // Update status and enqueue
1291 impl_->update_job_status(std::string(job_id), job_status::queued);
1292 {
1293 std::unique_lock lock(impl_->cache_mutex);
1294 auto it = impl_->job_cache.find(std::string(job_id));
1295 if (it != impl_->job_cache.end()) {
1296 it->second.queued_at = std::chrono::system_clock::now();
1297 }
1298 }
1299 impl_->enqueue_job(std::string(job_id), job_opt->priority);
1300
1301 impl_->logger->info_fmt("Started job {}", job_id);
1302 return kcenon::pacs::ok();
1303}
1304
1305kcenon::pacs::VoidResult job_manager::pause_job(std::string_view job_id) {
1306 auto job_opt = impl_->get_job_from_cache(job_id);
1307 if (!job_opt) {
1309 kcenon::pacs::error_codes::not_found,
1310 "Job not found: " + std::string(job_id));
1311 }
1312
1313 if (!job_opt->can_pause()) {
1315 kcenon::pacs::error_codes::invalid_argument,
1316 "Job cannot be paused in current state: " + std::string(to_string(job_opt->status)));
1317 }
1318
1319 {
1320 std::lock_guard lock(impl_->paused_mutex);
1321 impl_->paused_job_ids.insert(std::string(job_id));
1322 }
1323 impl_->update_job_status(std::string(job_id), job_status::paused);
1324
1325 impl_->logger->info_fmt("Paused job {}", job_id);
1326 return kcenon::pacs::ok();
1327}
1328
1329kcenon::pacs::VoidResult job_manager::resume_job(std::string_view job_id) {
1330 auto job_opt = impl_->get_job_from_cache(job_id);
1331 if (!job_opt) {
1333 kcenon::pacs::error_codes::not_found,
1334 "Job not found: " + std::string(job_id));
1335 }
1336
1337 if (job_opt->status != job_status::paused) {
1339 kcenon::pacs::error_codes::invalid_argument,
1340 "Job is not paused: " + std::string(to_string(job_opt->status)));
1341 }
1342
1343 {
1344 std::lock_guard lock(impl_->paused_mutex);
1345 impl_->paused_job_ids.erase(std::string(job_id));
1346 }
1347 impl_->update_job_status(std::string(job_id), job_status::queued);
1348
1349 impl_->logger->info_fmt("Resumed job {}", job_id);
1350 return kcenon::pacs::ok();
1351}
1352
1353kcenon::pacs::VoidResult job_manager::cancel_job(std::string_view job_id) {
1354 auto job_opt = impl_->get_job_from_cache(job_id);
1355 if (!job_opt) {
1357 kcenon::pacs::error_codes::not_found,
1358 "Job not found: " + std::string(job_id));
1359 }
1360
1361 if (!job_opt->can_cancel()) {
1363 kcenon::pacs::error_codes::invalid_argument,
1364 "Job cannot be cancelled in current state: " + std::string(to_string(job_opt->status)));
1365 }
1366
1367 {
1368 std::lock_guard lock(impl_->cancelled_mutex);
1369 impl_->cancelled_job_ids.insert(std::string(job_id));
1370 }
1371
1372 // If not actively running, update status immediately
1373 {
1374 std::lock_guard lock(impl_->active_mutex);
1375 if (impl_->active_job_ids.count(std::string(job_id)) == 0) {
1376 impl_->update_job_status(std::string(job_id), job_status::cancelled);
1377 auto final_job = impl_->get_job_from_cache(job_id);
1378 if (final_job) {
1379 impl_->notify_completion(std::string(job_id), *final_job);
1380 }
1381 }
1382 }
1383
1384 impl_->logger->info_fmt("Cancelled job {}", job_id);
1385 return kcenon::pacs::ok();
1386}
1387
1388kcenon::pacs::VoidResult job_manager::retry_job(std::string_view job_id) {
1389 auto job_opt = impl_->get_job_from_cache(job_id);
1390 if (!job_opt) {
1392 kcenon::pacs::error_codes::not_found,
1393 "Job not found: " + std::string(job_id));
1394 }
1395
1396 if (!job_opt->can_retry()) {
1398 kcenon::pacs::error_codes::invalid_argument,
1399 "Job cannot be retried: " + std::string(to_string(job_opt->status)));
1400 }
1401
1402 // Reset job state
1403 {
1404 std::unique_lock lock(impl_->cache_mutex);
1405 auto it = impl_->job_cache.find(std::string(job_id));
1406 if (it != impl_->job_cache.end()) {
1407 it->second.status = job_status::pending;
1408 it->second.retry_count++;
1409 it->second.error_message.clear();
1410 it->second.error_details.clear();
1411 it->second.started_at = std::nullopt;
1412 it->second.completed_at = std::nullopt;
1413 it->second.progress = job_progress{};
1414 }
1415 }
1416
1417 if (impl_->repo) {
1418 [[maybe_unused]] auto result = impl_->repo->increment_retry(job_id);
1419 [[maybe_unused]] auto status_result = impl_->repo->update_status(job_id, job_status::pending);
1420 }
1421
1422 impl_->logger->info_fmt("Retrying job {} (attempt {})", job_id, job_opt->retry_count + 1);
1423 return kcenon::pacs::ok();
1424}
1425
1426kcenon::pacs::VoidResult job_manager::delete_job(std::string_view job_id) {
1427 auto job_opt = impl_->get_job_from_cache(job_id);
1428 if (!job_opt) {
1430 kcenon::pacs::error_codes::not_found,
1431 "Job not found: " + std::string(job_id));
1432 }
1433
1434 // Cancel if not terminal
1435 if (!job_opt->is_finished()) {
1436 auto cancel_result = cancel_job(job_id);
1437 if (cancel_result.is_err()) {
1438 return cancel_result;
1439 }
1440 }
1441
1442 // Remove from cache
1443 {
1444 std::unique_lock lock(impl_->cache_mutex);
1445 impl_->job_cache.erase(std::string(job_id));
1446 }
1447
1448 // Remove from repository
1449 if (impl_->repo) {
1450 [[maybe_unused]] auto result = impl_->repo->remove(std::string(job_id));
1451 }
1452
1453 impl_->logger->info_fmt("Deleted job {}", job_id);
1454 return kcenon::pacs::ok();
1455}
1456
1457// =============================================================================
1458// Job Queries
1459// =============================================================================
1460
1461std::optional<job_record> job_manager::get_job(std::string_view job_id) const {
1462 return impl_->get_job_from_cache(job_id);
1463}
1464
1465std::vector<job_record> job_manager::list_jobs(
1466 std::optional<job_status> status,
1467 std::optional<job_type> type,
1468 size_t limit,
1469 size_t offset) const {
1470
1471 if (impl_->repo) {
1473 options.status = status;
1474 options.type = type;
1475 options.limit = limit;
1476 options.offset = offset;
1477#ifdef PACS_WITH_DATABASE_SYSTEM
1478 auto result = impl_->repo->find_jobs(options);
1479 if (result.is_ok()) {
1480 return result.value();
1481 }
1482 return {};
1483#else
1484 return impl_->repo->find_jobs(options);
1485#endif
1486 }
1487
1488 // Fallback to cache
1489 std::vector<job_record> result;
1490 std::shared_lock lock(impl_->cache_mutex);
1491 for (const auto& [_, job] : impl_->job_cache) {
1492 if (status && job.status != *status) continue;
1493 if (type && job.type != *type) continue;
1494 if (offset > 0) {
1495 --offset;
1496 continue;
1497 }
1498 result.push_back(job);
1499 if (result.size() >= limit) break;
1500 }
1501 return result;
1502}
1503
1504std::vector<job_record> job_manager::list_jobs_by_node(std::string_view node_id) const {
1505 if (impl_->repo) {
1506#ifdef PACS_WITH_DATABASE_SYSTEM
1507 auto result = impl_->repo->find_by_node(node_id);
1508 if (result.is_ok()) {
1509 return result.value();
1510 }
1511 return {};
1512#else
1513 return impl_->repo->find_by_node(node_id);
1514#endif
1515 }
1516
1517 // Fallback to cache
1518 std::vector<job_record> result;
1519 std::shared_lock lock(impl_->cache_mutex);
1520 std::string node_str(node_id);
1521 for (const auto& [_, job] : impl_->job_cache) {
1522 if (job.source_node_id == node_str || job.destination_node_id == node_str) {
1523 result.push_back(job);
1524 }
1525 }
1526 return result;
1527}
1528
1529// =============================================================================
1530// Progress Monitoring
1531// =============================================================================
1532
1533job_progress job_manager::get_progress(std::string_view job_id) const {
1534 auto job_opt = impl_->get_job_from_cache(job_id);
1535 if (job_opt) {
1536 return job_opt->progress;
1537 }
1538 return {};
1539}
1540
1542 std::unique_lock lock(impl_->callbacks_mutex);
1543 impl_->progress_callback = std::move(callback);
1544}
1545
1547 std::unique_lock lock(impl_->callbacks_mutex);
1548 impl_->completion_callback = std::move(callback);
1549}
1550
1551// =============================================================================
1552// Wait for Completion
1553// =============================================================================
1554
1555std::future<job_record> job_manager::wait_for_completion(std::string_view job_id) {
1556 auto promise = std::make_shared<std::promise<job_record>>();
1557 auto future = promise->get_future();
1558
1559 // Check if already completed
1560 auto job_opt = impl_->get_job_from_cache(job_id);
1561 if (job_opt && job_opt->is_finished()) {
1562 promise->set_value(*job_opt);
1563 return future;
1564 }
1565
1566 // Store promise for later fulfillment
1567 {
1568 std::lock_guard lock(impl_->promises_mutex);
1569 impl_->completion_promises[std::string(job_id)] = promise;
1570 }
1571
1572 return future;
1573}
1574
1575// =============================================================================
1576// Worker Management
1577// =============================================================================
1578
1580 if (impl_->running.load()) {
1581 return;
1582 }
1583
1584 impl_->running.store(true);
1586
1587 for (size_t i = 0; i < impl_->config.worker_count; ++i) {
1588 impl_->workers.emplace_back([this]() {
1589 impl_->worker_loop();
1590 });
1591 }
1592
1593 impl_->logger->info_fmt("Started {} worker threads", impl_->config.worker_count);
1594}
1595
1597 if (!impl_->running.load()) {
1598 return;
1599 }
1600
1601 impl_->running.store(false);
1602 impl_->queue_cv.notify_all();
1603
1604 for (auto& worker : impl_->workers) {
1605 if (worker.joinable()) {
1606 worker.join();
1607 }
1608 }
1609 impl_->workers.clear();
1610
1611 impl_->logger->info("Stopped worker threads");
1612}
1613
1614bool job_manager::is_running() const noexcept {
1615 return impl_->running.load();
1616}
1617
1618// =============================================================================
1619// Statistics
1620// =============================================================================
1621
1623 std::lock_guard lock(impl_->active_mutex);
1624 return impl_->active_job_ids.size();
1625}
1626
1628 std::lock_guard lock(impl_->queue_mutex);
1629 return impl_->job_queue.size();
1630}
1631
1633 if (impl_->repo) {
1634#ifdef PACS_WITH_DATABASE_SYSTEM
1635 auto result = impl_->repo->count_completed_today();
1636 if (result.is_ok()) {
1637 return result.value();
1638 }
1639 return 0;
1640#else
1641 return impl_->repo->count_completed_today();
1642#endif
1643 }
1644 return 0;
1645}
1646
1648 if (impl_->repo) {
1649#ifdef PACS_WITH_DATABASE_SYSTEM
1650 auto result = impl_->repo->count_failed_today();
1651 if (result.is_ok()) {
1652 return result.value();
1653 }
1654 return 0;
1655#else
1656 return impl_->repo->count_failed_today();
1657#endif
1658 }
1659 return 0;
1660}
1661
1662// =============================================================================
1663// Configuration
1664// =============================================================================
1665
1667 return impl_->config;
1668}
1669
1670} // namespace kcenon::pacs::client
auto pending_jobs() const -> size_t
Get number of pending jobs.
void set_progress_callback(job_progress_callback callback)
Set the progress callback.
auto is_running() const noexcept -> bool
Check if workers are running.
auto delete_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Delete a job.
auto create_prefetch_job(std::string_view source_node_id, std::string_view patient_id, job_priority priority=job_priority::low) -> std::string
Create a prefetch job.
auto cancel_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Cancel a job.
auto create_retrieve_job(std::string_view source_node_id, std::string_view study_uid, std::optional< std::string_view > series_uid=std::nullopt, job_priority priority=job_priority::normal) -> std::string
Create a retrieve job (C-MOVE/C-GET)
void start_workers()
Start the worker threads.
auto pause_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Pause a running or queued job.
auto completed_jobs_today() const -> size_t
Get number of jobs completed today.
auto config() const noexcept -> const job_manager_config &
Get current configuration.
auto get_progress(std::string_view job_id) const -> job_progress
Get current progress for a job.
auto resume_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Resume a paused job.
auto list_jobs_by_node(std::string_view node_id) const -> std::vector< job_record >
List jobs by node ID.
auto create_store_job(std::string_view destination_node_id, const std::vector< std::string > &instance_uids, job_priority priority=job_priority::normal) -> std::string
Create a store job (C-STORE)
job_manager(std::shared_ptr< storage::job_repository > repo, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a job manager with default configuration.
std::unique_ptr< impl > impl_
auto create_sync_job(std::string_view source_node_id, std::optional< std::string_view > patient_id=std::nullopt, job_priority priority=job_priority::low) -> std::string
Create a sync job.
auto create_query_job(std::string_view node_id, std::string_view query_level, const std::unordered_map< std::string, std::string > &query_keys, job_priority priority=job_priority::normal) -> std::string
Create a query job (C-FIND)
auto list_jobs(std::optional< job_status > status=std::nullopt, std::optional< job_type > type=std::nullopt, size_t limit=100, size_t offset=0) const -> std::vector< job_record >
List jobs with optional filters.
auto wait_for_completion(std::string_view job_id) -> std::future< job_record >
Wait for a job to complete.
auto active_jobs() const -> size_t
Get number of active (running) jobs.
~job_manager()
Destructor - stops workers if running.
void set_completion_callback(job_completion_callback callback)
Set the completion callback.
auto get_job(std::string_view job_id) const -> std::optional< job_record >
Get a job by ID.
auto start_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Start a pending job.
auto failed_jobs_today() const -> size_t
Get number of jobs failed today.
auto retry_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Retry a failed job.
void stop_workers()
Stop the worker threads.
void set_string(dicom_tag tag, encoding::vr_type vr, std::string_view value)
Set a string value for the given tag.
network::Result< query_result > find(network::association &assoc, const core::dicom_dataset &query_keys)
Perform a C-FIND query with raw dataset.
Definition query_scu.cpp:36
network::Result< retrieve_result > retrieve_series(network::association &assoc, std::string_view series_uid, retrieve_progress_callback progress=nullptr)
Retrieve a series by Series Instance UID.
network::Result< retrieve_result > retrieve_study(network::association &assoc, std::string_view study_uid, retrieve_progress_callback progress=nullptr)
Retrieve a study by Study Instance UID.
DICOM Part 10 file handling for reading/writing DICOM files.
Compile-time constants for commonly used DICOM tags.
PACS index database for metadata storage and retrieval.
Job manager for asynchronous DICOM operations.
Repository for job persistence using base_repository pattern.
std::function< void( const std::string &job_id, const job_progress &progress)> job_progress_callback
Callback for job progress updates.
Definition job_types.h:385
std::function< void( const std::string &job_id, const job_record &record)> job_completion_callback
Callback for job completion.
Definition job_types.h:395
constexpr bool is_terminal_status(job_status status) noexcept
Check if job status is a terminal state.
Definition job_types.h:139
@ prefetch
Prefetch prior studies.
@ store
C-STORE operation.
@ retrieve
C-MOVE/C-GET operation.
job_priority
Priority level for job execution.
Definition job_types.h:154
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
job_status
Current status of a job.
Definition job_types.h:90
@ failed
Job failed with error.
@ cancelled
Job was cancelled by user.
@ running
Job is currently executing.
@ pending
Job created but not yet queued.
@ queued
Job is in the execution queue.
@ completed
Job completed successfully.
constexpr dicom_tag patient_id
Patient ID.
constexpr dicom_tag query_retrieve_level
Query/Retrieve Level.
constexpr dicom_tag accession_number
Accession Number.
constexpr dicom_tag modality
Modality.
constexpr dicom_tag study_instance_uid
Study Instance UID.
constexpr dicom_tag patient_name
Patient's Name.
constexpr dicom_tag study_date
Study Date.
constexpr dicom_tag series_instance_uid
Series Instance UID.
std::shared_ptr< ILogger > null_logger()
Get a shared null logger instance.
Definition ilogger.h:271
@ DA
Date (8 chars, format: YYYYMMDD)
@ LO
Long String (64 chars max)
@ UI
Unique Identifier (64 chars max)
@ PN
Person Name (64 chars max per component group)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
@ SH
Short String (16 chars max)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
Definition query_scp.h:42
@ c_get
Receive directly from SCP on same association.
@ c_move
Request SCP to send to third party (requires move destination)
constexpr std::string_view study_root_move_sop_class_uid
Study Root Query/Retrieve Information Model - MOVE.
constexpr std::string_view study_root_get_sop_class_uid
Study Root Query/Retrieve Information Model - GET.
@ study
Study level - query study information.
@ image
Image (Instance) level - query instance information.
@ patient
Patient level - query patient demographics.
@ series
Series level - query series information.
@ study_root
Study Root Query/Retrieve Information Model.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
DICOM Query SCU service (C-FIND sender)
Remote PACS node manager for client operations.
DICOM Retrieve SCU service (C-MOVE/C-GET sender)
DICOM Storage SCU service (C-STORE sender)
std::vector< std::thread > workers
bool is_job_paused(const std::string &job_id)
void save_job(const job_record &job)
void update_job_status(const std::string &job_id, job_status status, const std::string &error_msg="", const std::string &error_details="")
std::unordered_set< std::string > active_job_ids
void notify_progress(const std::string &job_id, const job_progress &progress)
void complete_job(const std::string &job_id, job_status status, const std::string &error_msg="")
std::unordered_set< std::string > cancelled_job_ids
void mark_job_active(const std::string &job_id)
void update_progress(const std::string &job_id, const job_progress &progress)
bool is_job_cancelled(const std::string &job_id)
std::unordered_set< std::string > paused_job_ids
std::priority_queue< queue_entry > job_queue
job_completion_callback completion_callback
std::shared_ptr< di::ILogger > logger
void enqueue_job(const std::string &job_id, job_priority priority)
void mark_job_inactive(const std::string &job_id)
std::unordered_map< std::string, job_record > job_cache
std::shared_ptr< storage::job_repository > repo
std::shared_ptr< remote_node_manager > node_manager
void notify_completion(const std::string &job_id, const job_record &record)
std::unordered_map< std::string, std::shared_ptr< std::promise< job_record > > > completion_promises
std::optional< job_record > get_job_from_cache(std::string_view job_id) const
Configuration for the job manager.
Definition job_types.h:406
size_t max_queue_size
Maximum jobs in queue.
Definition job_types.h:408
size_t worker_count
Number of worker threads.
Definition job_types.h:407
std::string local_ae_title
Local AE title for operations.
Definition job_types.h:413
Progress tracking for a job.
Definition job_types.h:211
std::chrono::milliseconds elapsed
Time elapsed since start.
Definition job_types.h:223
size_t failed_items
Failed items.
Definition job_types.h:214
std::string current_item
Current SOP Instance UID being processed.
Definition job_types.h:220
size_t skipped_items
Skipped items.
Definition job_types.h:215
std::string current_item_description
Human-readable description.
Definition job_types.h:221
size_t completed_items
Successfully completed items.
Definition job_types.h:213
size_t total_items
Total number of items to process.
Definition job_types.h:212
float percent_complete
Completion percentage (0-100)
Definition job_types.h:218
void calculate_percent() noexcept
Calculate completion percentage from item counts.
Definition job_types.h:229
Complete job record with all metadata.
Definition job_types.h:255
bool operator<(const queue_entry &other) const
job_priority priority
std::string job_id
std::chrono::system_clock::time_point queued_at
Configuration for Query SCU service.
Definition query_scu.h:168
query_model model
Query information model (Patient Root or Study Root)
Definition query_scu.h:170
query_level level
Query level (Patient, Study, Series, or Image)
Definition query_scu.h:173
Progress information for a retrieve operation.
Configuration for Retrieve SCU service.
query_level level
Query level (Study, Series, or Image)
query_model model
Query information model (Patient Root or Study Root)
retrieve_mode mode
Retrieve mode (C-MOVE or C-GET)
std::string move_destination
Move destination AE title (required for C-MOVE mode)
Configuration for Storage SCU service.
Definition storage_scu.h:80
bool continue_on_error
Continue batch operation on error (true) or stop on first error (false)
Definition storage_scu.h:88
Query options for listing jobs.
std::optional< client::job_status > status
Filter by status.