PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
sync_manager.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16#ifdef PACS_WITH_DATABASE_SYSTEM
20#endif
25
26#include <algorithm>
27#include <atomic>
28#include <chrono>
29#include <condition_variable>
30#include <iomanip>
31#include <mutex>
32#include <random>
33#include <shared_mutex>
34#include <sstream>
35#include <thread>
36#include <unordered_map>
37#include <unordered_set>
38
39namespace kcenon::pacs::client {
40
41// =============================================================================
42// UUID Generation
43// =============================================================================
44
45namespace {
46
47std::string generate_uuid() {
48 static thread_local std::random_device rd;
49 static thread_local std::mt19937_64 gen(rd());
50 static thread_local std::uniform_int_distribution<uint64_t> dis;
51
52 uint64_t ab = dis(gen);
53 uint64_t cd = dis(gen);
54
55 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
56 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
57
58 std::ostringstream oss;
59 oss << std::hex << std::setfill('0');
60 oss << std::setw(8) << (ab >> 32);
61 oss << '-';
62 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
63 oss << '-';
64 oss << std::setw(4) << (ab & 0xFFFF);
65 oss << '-';
66 oss << std::setw(4) << (cd >> 48);
67 oss << '-';
68 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
69
70 return oss.str();
71}
72
73} // namespace
74
75// =============================================================================
76// Implementation Structure
77// =============================================================================
78
80 // Configuration
82
83 // Dependencies
85 std::shared_ptr<storage::sync_repository> compatibility_repo;
86 std::shared_ptr<remote_node_manager> node_manager;
87 std::shared_ptr<job_manager> job_mgr;
88 std::shared_ptr<services::query_scu> query_scu;
89 std::shared_ptr<di::ILogger> logger;
90
91 // Config cache
92 std::vector<sync_config> configs;
93 mutable std::shared_mutex configs_mutex;
94
95 // Active syncs tracking
96 std::unordered_set<std::string> active_sync_config_ids;
97 mutable std::mutex active_mutex;
98
99 // Last results cache
100 std::unordered_map<std::string, sync_result> last_results;
101 mutable std::shared_mutex results_mutex;
102
103 // Conflicts storage
104 std::vector<sync_conflict> conflicts;
105 mutable std::mutex conflicts_mutex;
106
107 // Scheduler
108 std::thread scheduler_thread;
109 std::atomic<bool> scheduler_running{false};
110 std::condition_variable scheduler_cv;
111 std::mutex scheduler_mutex;
112
113 // Callbacks
117 mutable std::shared_mutex callbacks_mutex;
118
119 // Completion promises
120 std::unordered_map<std::string, std::shared_ptr<std::promise<sync_result>>>
122 mutable std::mutex promises_mutex;
123
124 // Statistics
125 std::atomic<size_t> total_syncs{0};
126 std::atomic<size_t> successful_syncs{0};
127 std::atomic<size_t> failed_syncs{0};
128 std::atomic<size_t> total_studies_synced{0};
129 std::atomic<size_t> total_bytes_transferred{0};
130 std::atomic<size_t> total_conflicts_detected{0};
131 std::atomic<size_t> total_conflicts_resolved{0};
132
133 // =========================================================================
134 // Helper Methods
135 // =========================================================================
136
137 void save_config(const sync_config& cfg) {
138 {
139 std::unique_lock lock(configs_mutex);
140 auto it = std::find_if(configs.begin(), configs.end(),
141 [&cfg](const sync_config& c) {
142 return c.config_id == cfg.config_id;
143 });
144 if (it != configs.end()) {
145 *it = cfg;
146 } else {
147 configs.push_back(cfg);
148 }
149 }
150
151 #ifdef PACS_WITH_DATABASE_SYSTEM
152 if (repositories.configs) {
153 [[maybe_unused]] auto result = repositories.configs->save(cfg);
154 } else if (compatibility_repo) {
155 [[maybe_unused]] auto result = compatibility_repo->save_config(cfg);
156 }
157 #else
158 if (compatibility_repo) {
159 [[maybe_unused]] auto result = compatibility_repo->save_config(cfg);
160 }
161 #endif
162 }
163
164 std::optional<sync_config> get_config_from_cache(std::string_view config_id) const {
165 std::shared_lock lock(configs_mutex);
166 auto it = std::find_if(configs.begin(), configs.end(),
167 [&config_id](const sync_config& c) {
168 return c.config_id == config_id;
169 });
170 if (it != configs.end()) {
171 return *it;
172 }
173 return std::nullopt;
174 }
175
176 void update_config_stats(std::string_view config_id, bool success,
177 size_t studies_synced) {
178 std::unique_lock lock(configs_mutex);
179 auto it = std::find_if(configs.begin(), configs.end(),
180 [&config_id](const sync_config& c) {
181 return c.config_id == config_id;
182 });
183 if (it != configs.end()) {
184 it->total_syncs++;
185 it->studies_synced += studies_synced;
186 it->last_sync = std::chrono::system_clock::now();
187 if (success) {
188 it->last_successful_sync = std::chrono::system_clock::now();
189 }
190 }
191
192 #ifdef PACS_WITH_DATABASE_SYSTEM
193 if (repositories.configs) {
194 [[maybe_unused]] auto result =
195 repositories.configs->update_stats(config_id, success,
196 studies_synced);
197 } else if (compatibility_repo) {
198 [[maybe_unused]] auto result =
199 compatibility_repo->update_config_stats(config_id, success,
200 studies_synced);
201 }
202 #else
203 if (compatibility_repo) {
204 [[maybe_unused]] auto result =
205 compatibility_repo->update_config_stats(config_id, success,
206 studies_synced);
207 }
208 #endif
209 }
210
211 void mark_sync_active(std::string_view config_id) {
212 std::lock_guard lock(active_mutex);
213 active_sync_config_ids.insert(std::string(config_id));
214 }
215
216 void mark_sync_inactive(std::string_view config_id) {
217 std::lock_guard lock(active_mutex);
218 active_sync_config_ids.erase(std::string(config_id));
219 }
220
221 bool is_sync_active(std::string_view config_id) const {
222 std::lock_guard lock(active_mutex);
223 return active_sync_config_ids.count(std::string(config_id)) > 0;
224 }
225
226 void store_last_result(const std::string& config_id, const sync_result& result) {
227 std::unique_lock lock(results_mutex);
228 last_results[config_id] = result;
229 }
230
231 void notify_progress(const std::string& config_id, size_t synced, size_t total) {
232 std::shared_lock lock(callbacks_mutex);
233 if (progress_callback) {
234 progress_callback(config_id, synced, total);
235 }
236 }
237
238 void notify_completion(const std::string& config_id, const sync_result& result) {
239 {
240 std::shared_lock lock(callbacks_mutex);
242 completion_callback(config_id, result);
243 }
244 }
245
246 // Fulfill promise if exists
247 {
248 std::lock_guard lock(promises_mutex);
249 auto it = completion_promises.find(result.job_id);
250 if (it != completion_promises.end()) {
251 it->second->set_value(result);
252 completion_promises.erase(it);
253 }
254 }
255 }
256
257 void notify_conflict(const sync_conflict& conflict) {
258 std::shared_lock lock(callbacks_mutex);
259 if (conflict_callback) {
260 conflict_callback(conflict);
261 }
262 }
263
264 void add_conflict(const sync_conflict& conflict) {
265 {
266 std::lock_guard lock(conflicts_mutex);
267 // Check for existing conflict for same study
268 auto it = std::find_if(conflicts.begin(), conflicts.end(),
269 [&conflict](const sync_conflict& c) {
270 return c.study_uid == conflict.study_uid &&
271 c.config_id == conflict.config_id;
272 });
273 if (it != conflicts.end()) {
274 *it = conflict; // Update existing
275 } else {
276 conflicts.push_back(conflict);
277 }
278 }
279
280 #ifdef PACS_WITH_DATABASE_SYSTEM
282 [[maybe_unused]] auto result = repositories.conflicts->save(conflict);
283 } else if (compatibility_repo) {
284 [[maybe_unused]] auto result =
285 compatibility_repo->save_conflict(conflict);
286 }
287 #else
288 if (compatibility_repo) {
289 [[maybe_unused]] auto result =
290 compatibility_repo->save_conflict(conflict);
291 }
292 #endif
293
294 total_conflicts_detected.fetch_add(1, std::memory_order_relaxed);
295 notify_conflict(conflict);
296 }
297
298 // =========================================================================
299 // Sync Execution
300 // =========================================================================
301
303 sync_result result;
304 result.config_id = cfg.config_id;
305 result.job_id = generate_uuid();
306 result.started_at = std::chrono::system_clock::now();
307
308 if (logger) {
309 logger->info_fmt("Starting {} sync for config '{}' from node '{}'",
310 full_sync ? "full" : "incremental", cfg.config_id, cfg.source_node_id);
311 }
312
313 // Get the source node
314 auto node_opt = node_manager->get_node(cfg.source_node_id);
315 if (!node_opt) {
316 result.success = false;
317 result.errors.push_back("Source node not found: " + cfg.source_node_id);
318 result.completed_at = std::chrono::system_clock::now();
319 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
320 result.completed_at - result.started_at);
321 return result;
322 }
323
324 const auto& node = *node_opt;
325 if (!node.is_online()) {
326 result.success = false;
327 result.errors.push_back("Source node is offline: " + cfg.source_node_id);
328 result.completed_at = std::chrono::system_clock::now();
329 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
330 result.completed_at - result.started_at);
331 return result;
332 }
333
334 if (!node.supports_find) {
335 result.success = false;
336 result.errors.push_back("Node does not support C-FIND");
337 result.completed_at = std::chrono::system_clock::now();
338 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
339 result.completed_at - result.started_at);
340 return result;
341 }
342
343 // Determine sync start time based on full_sync or incremental
344 auto sync_start_time = full_sync
345 ? std::chrono::system_clock::now() - cfg.lookback
347
348 // If no successful sync and not full sync, use lookback
349 if (!full_sync && sync_start_time == std::chrono::system_clock::time_point{}) {
350 sync_start_time = std::chrono::system_clock::now() - cfg.lookback;
351 }
352
353 // Query remote studies
354 auto remote_studies = query_remote_studies(cfg, sync_start_time, result);
355 if (!result.errors.empty()) {
356 result.success = false;
357 result.completed_at = std::chrono::system_clock::now();
358 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
359 result.completed_at - result.started_at);
360 return result;
361 }
362
363 result.studies_checked = remote_studies.size();
364
365 // Compare with local studies
366 auto comparison = compare_with_local(cfg, remote_studies, result);
367
368 // Process differences based on sync direction
369 if (cfg.direction == sync_direction::pull ||
371 // Pull missing studies from remote
372 for (const auto& conflict : comparison) {
373 if (conflict.conflict_type == sync_conflict_type::missing_local) {
374 // Create retrieve job
375 if (job_mgr && node.supports_query_retrieve()) {
376 auto job_id = job_mgr->create_retrieve_job(
377 cfg.source_node_id, conflict.study_uid, std::nullopt,
379 if (!job_id.empty()) {
380 result.studies_synced++;
382 result.studies_checked);
383 }
384 }
385 } else if (conflict.conflict_type == sync_conflict_type::count_mismatch ||
386 conflict.conflict_type == sync_conflict_type::modified) {
387 // Handle conflicts
390 } else {
391 add_conflict(conflict);
392 result.conflicts.push_back(conflict);
393 }
394 }
395 }
396 }
397
398 if (cfg.direction == sync_direction::push ||
400 // Push studies missing on remote
401 for (const auto& conflict : comparison) {
402 if (conflict.conflict_type == sync_conflict_type::missing_remote) {
403 // Store job would be created here for push
404 if (logger) {
405 logger->info_fmt("Study {} is missing on remote, would create store job",
406 conflict.study_uid);
407 }
408 result.studies_synced++;
409 }
410 }
411 }
412
413 // Update statistics
414 total_syncs.fetch_add(1, std::memory_order_relaxed);
415 total_studies_synced.fetch_add(result.studies_synced, std::memory_order_relaxed);
416
417 result.success = result.errors.empty();
418 result.completed_at = std::chrono::system_clock::now();
419 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
420 result.completed_at - result.started_at);
421
422 if (result.success) {
423 successful_syncs.fetch_add(1, std::memory_order_relaxed);
424 } else {
425 failed_syncs.fetch_add(1, std::memory_order_relaxed);
426 }
427
428 // Update config statistics
430
431 // Store result
432 store_last_result(cfg.config_id, result);
433
434 // Save history
435 sync_history history;
436 history.config_id = cfg.config_id;
437 history.job_id = result.job_id;
438 history.success = result.success;
439 history.studies_checked = result.studies_checked;
440 history.studies_synced = result.studies_synced;
441 history.conflicts_found = result.conflicts.size();
442 history.errors = result.errors;
443 history.started_at = result.started_at;
444 history.completed_at = result.completed_at;
445
446 #ifdef PACS_WITH_DATABASE_SYSTEM
447 if (repositories.history) {
448 [[maybe_unused]] auto save_result = repositories.history->save(history);
449 } else if (compatibility_repo) {
450 [[maybe_unused]] auto save_result =
451 compatibility_repo->save_history(history);
452 }
453 #else
454 if (compatibility_repo) {
455 [[maybe_unused]] auto save_result =
456 compatibility_repo->save_history(history);
457 }
458 #endif
459
460 if (logger) {
461 logger->info_fmt("Sync completed for config '{}': {} checked, {} synced, {} conflicts",
462 cfg.config_id, result.studies_checked, result.studies_synced,
463 result.conflicts.size());
464 }
465
466 return result;
467 }
468
469 std::vector<std::string> query_remote_studies(
470 const sync_config& cfg,
471 std::chrono::system_clock::time_point since,
472 sync_result& result) {
473
474 std::vector<std::string> study_uids;
475
476 // Get association for query
477 std::vector<std::string> sop_classes = {
479 };
480
481 auto assoc_result = node_manager->acquire_association(
482 cfg.source_node_id, sop_classes);
483 if (assoc_result.is_err()) {
484 result.errors.push_back("Failed to acquire association: " +
485 assoc_result.error().message);
486 return study_uids;
487 }
488
489 auto assoc = std::move(assoc_result.value());
490
491 // Build query
492 core::dicom_dataset query_keys;
495
496 // Apply filters
497 if (!cfg.modalities.empty()) {
498 std::string modality_filter;
499 for (size_t i = 0; i < cfg.modalities.size(); ++i) {
500 if (i > 0) modality_filter += "\\";
501 modality_filter += cfg.modalities[i];
502 }
503 query_keys.set_string(core::tags::modality, encoding::vr_type::CS, modality_filter);
504 }
505
506 // Date range filter
507 if (since != std::chrono::system_clock::time_point{}) {
508 auto since_time_t = std::chrono::system_clock::to_time_t(since);
509 std::tm tm_buf;
510#ifdef _WIN32
511 localtime_s(&tm_buf, &since_time_t);
512#else
513 localtime_r(&since_time_t, &tm_buf);
514#endif
515 std::ostringstream date_oss;
516 date_oss << std::put_time(&tm_buf, "%Y%m%d") << "-";
517 query_keys.set_string(core::tags::study_date, encoding::vr_type::DA, date_oss.str());
518 }
519
520 // Execute query
521 services::query_scu_config query_config;
523 query_config.level = services::query_level::study;
524
525 services::query_scu scu(query_config, logger);
526 auto query_result = scu.find(*assoc, query_keys);
527
528 // Release association
529 node_manager->release_association(cfg.source_node_id, std::move(assoc));
530
531 if (query_result.is_err()) {
532 result.errors.push_back("Query failed: " + query_result.error().message);
533 return study_uids;
534 }
535
536 const auto& qr = query_result.value();
537 for (const auto& match : qr.matches) {
538 auto uid = match.get_string(core::tags::study_instance_uid);
539 if (!uid.empty()) {
540 study_uids.push_back(uid);
541 }
542 }
543
544 return study_uids;
545 }
546
547 std::vector<sync_conflict> compare_with_local(
548 const sync_config& cfg,
549 const std::vector<std::string>& remote_study_uids,
550 sync_result& result) {
551 (void)result; // Reserved for future result aggregation
552
553 std::vector<sync_conflict> comparison_conflicts;
554
555 // For each remote study, check if it exists locally
556 for (const auto& study_uid : remote_study_uids) {
557 // In real implementation, query local database
558 // For now, we'll simulate that all remote studies are missing locally
559 bool exists_locally = false; // Would check storage index here
560
561 if (!exists_locally) {
562 sync_conflict conflict;
563 conflict.config_id = cfg.config_id;
564 conflict.study_uid = study_uid;
565 conflict.conflict_type = sync_conflict_type::missing_local;
566 conflict.remote_modified = std::chrono::system_clock::now();
567 conflict.detected_at = std::chrono::system_clock::now();
568 comparison_conflicts.push_back(conflict);
569 }
570 }
571
572 return comparison_conflicts;
573 }
574
576 conflict_resolution resolution,
577 sync_result& result) {
578 switch (resolution) {
580 // Create retrieve job
581 if (job_mgr) {
582 auto job_id = job_mgr->create_retrieve_job(
583 conflict.config_id, conflict.study_uid, std::nullopt,
585 if (!job_id.empty()) {
586 result.studies_synced++;
587 }
588 }
589 break;
590
592 // Keep local, skip
593 result.studies_skipped++;
594 break;
595
597 // Compare timestamps and choose
598 if (conflict.remote_modified > conflict.local_modified) {
599 if (job_mgr) {
600 auto job_id = job_mgr->create_retrieve_job(
601 conflict.config_id, conflict.study_uid, std::nullopt,
603 if (!job_id.empty()) {
604 result.studies_synced++;
605 }
606 }
607 } else {
608 result.studies_skipped++;
609 }
610 break;
611 }
612
613 total_conflicts_resolved.fetch_add(1, std::memory_order_relaxed);
614 }
615
616 // =========================================================================
617 // Scheduler
618 // =========================================================================
619
621 while (scheduler_running.load()) {
622 std::unique_lock lock(scheduler_mutex);
623 scheduler_cv.wait_for(lock, std::chrono::minutes(1), [this] {
624 return !scheduler_running.load();
625 });
626
627 if (!scheduler_running.load()) {
628 break;
629 }
630
631 // Check each config for scheduled syncs
632 std::vector<sync_config> configs_copy;
633 {
634 std::shared_lock cfg_lock(configs_mutex);
635 configs_copy = configs;
636 }
637
638 for (const auto& cfg : configs_copy) {
639 if (!cfg.enabled || cfg.schedule_cron.empty()) {
640 continue;
641 }
642
643 // Simple cron check - in production would parse cron expression
644 // For now, check if it's time based on last sync
645 auto now = std::chrono::system_clock::now();
646 auto since_last = now - cfg.last_sync;
647
648 // Default to hourly if cron not parsed
649 if (since_last >= std::chrono::hours(1)) {
650 if (!is_sync_active(cfg.config_id)) {
651 if (logger) {
652 logger->info_fmt("Scheduler triggering sync for config '{}'",
653 cfg.config_id);
654 }
655
656 mark_sync_active(cfg.config_id);
657 auto result = perform_sync(cfg, false);
658 mark_sync_inactive(cfg.config_id);
659 notify_completion(cfg.config_id, result);
660 }
661 }
662 }
663 }
664 }
665
667 std::vector<sync_config> loaded;
668 #ifdef PACS_WITH_DATABASE_SYSTEM
669 if (repositories.configs) {
670 auto result = repositories.configs->find_all();
671 if (result.is_err()) {
672 return;
673 }
674 loaded = std::move(result.value());
675 } else if (compatibility_repo) {
676 loaded = compatibility_repo->list_configs();
677 } else {
678 return;
679 }
680 #else
681 if (compatibility_repo) {
682 loaded = compatibility_repo->list_configs();
683 } else {
684 return;
685 }
686 #endif
687 {
688 std::unique_lock lock(configs_mutex);
689 configs = std::move(loaded);
690 }
691
692 if (logger && !configs.empty()) {
693 logger->info_fmt("Loaded {} sync configs from repository", configs.size());
694 }
695 }
696
698 std::vector<sync_conflict> loaded;
699 #ifdef PACS_WITH_DATABASE_SYSTEM
701 auto result = repositories.conflicts->find_unresolved();
702 if (result.is_err()) {
703 return;
704 }
705 loaded = std::move(result.value());
706 } else if (compatibility_repo) {
707 loaded = compatibility_repo->list_unresolved_conflicts();
708 } else {
709 return;
710 }
711 #else
712 if (compatibility_repo) {
713 loaded = compatibility_repo->list_unresolved_conflicts();
714 } else {
715 return;
716 }
717 #endif
718 {
719 std::lock_guard lock(conflicts_mutex);
720 conflicts = std::move(loaded);
721 }
722
723 if (logger && !conflicts.empty()) {
724 logger->info_fmt("Loaded {} unresolved conflicts from repository", conflicts.size());
725 }
726 }
727};
728
729// =============================================================================
730// Construction / Destruction
731// =============================================================================
732
734 sync_repositories repositories,
735 std::shared_ptr<remote_node_manager> node_manager,
736 std::shared_ptr<job_manager> job_manager,
737 std::shared_ptr<services::query_scu> query_scu,
738 std::shared_ptr<di::ILogger> logger)
739 : sync_manager(sync_manager_config{}, std::move(repositories),
740 std::move(node_manager), std::move(job_manager),
741 std::move(query_scu), std::move(logger)) {}
742
744 const sync_manager_config& config,
745 sync_repositories repositories,
746 std::shared_ptr<remote_node_manager> node_manager,
747 std::shared_ptr<job_manager> job_manager,
748 std::shared_ptr<services::query_scu> query_scu,
749 std::shared_ptr<di::ILogger> logger)
750 : impl_(std::make_unique<impl>()) {
751
753 impl_->repositories = std::move(repositories);
754 impl_->node_manager = std::move(node_manager);
755 impl_->job_mgr = std::move(job_manager);
756 impl_->query_scu = std::move(query_scu);
757 impl_->logger = logger ? std::move(logger) : di::null_logger();
758
761}
762
764 std::shared_ptr<storage::sync_repository> repo,
765 std::shared_ptr<remote_node_manager> node_manager,
766 std::shared_ptr<job_manager> job_manager,
767 std::shared_ptr<services::query_scu> query_scu,
768 std::shared_ptr<di::ILogger> logger)
769 : sync_manager(sync_manager_config{}, std::move(repo), std::move(node_manager),
770 std::move(job_manager), std::move(query_scu), std::move(logger)) {
771}
772
774 const sync_manager_config& config,
775 std::shared_ptr<storage::sync_repository> repo,
776 std::shared_ptr<remote_node_manager> node_manager,
777 std::shared_ptr<job_manager> job_manager,
778 std::shared_ptr<services::query_scu> query_scu,
779 std::shared_ptr<di::ILogger> logger)
780 : impl_(std::make_unique<impl>()) {
781
783 impl_->compatibility_repo = std::move(repo);
784 impl_->node_manager = std::move(node_manager);
785 impl_->job_mgr = std::move(job_manager);
786 impl_->query_scu = std::move(query_scu);
787 impl_->logger = logger ? std::move(logger) : di::null_logger();
788
791}
792
796
797// =============================================================================
798// Config CRUD
799// =============================================================================
800
801kcenon::pacs::VoidResult sync_manager::add_config(const sync_config& config) {
802 if (config.config_id.empty()) {
804 kcenon::pacs::error_codes::invalid_argument,
805 "Config ID cannot be empty");
806 }
807
808 if (config.source_node_id.empty()) {
810 kcenon::pacs::error_codes::invalid_argument,
811 "Source node ID cannot be empty");
812 }
813
814 // Check for duplicate
815 if (impl_->get_config_from_cache(config.config_id).has_value()) {
817 kcenon::pacs::error_codes::already_exists,
818 "Config already exists: " + config.config_id);
819 }
820
822
823 if (impl_->logger) {
824 impl_->logger->info_fmt("Added sync config '{}' for node '{}'",
825 config.config_id, config.source_node_id);
826 }
827
828 return kcenon::pacs::ok();
829}
830
831kcenon::pacs::VoidResult sync_manager::update_config(const sync_config& config) {
832 if (!impl_->get_config_from_cache(config.config_id).has_value()) {
834 kcenon::pacs::error_codes::not_found,
835 "Config not found: " + config.config_id);
836 }
837
839
840 if (impl_->logger) {
841 impl_->logger->info_fmt("Updated sync config '{}'", config.config_id);
842 }
843
844 return kcenon::pacs::ok();
845}
846
847kcenon::pacs::VoidResult sync_manager::remove_config(std::string_view config_id) {
848 {
849 std::unique_lock lock(impl_->configs_mutex);
850 auto it = std::find_if(impl_->configs.begin(), impl_->configs.end(),
851 [&config_id](const sync_config& c) {
852 return c.config_id == config_id;
853 });
854 if (it == impl_->configs.end()) {
856 kcenon::pacs::error_codes::not_found,
857 "Config not found: " + std::string(config_id));
858 }
859 impl_->configs.erase(it);
860 }
861
862 #ifdef PACS_WITH_DATABASE_SYSTEM
864 [[maybe_unused]] auto result =
865 impl_->repositories.configs->remove(std::string(config_id));
866 } else if (impl_->compatibility_repo) {
867 [[maybe_unused]] auto result =
868 impl_->compatibility_repo->remove_config(config_id);
869 }
870 #else
872 [[maybe_unused]] auto result =
873 impl_->compatibility_repo->remove_config(config_id);
874 }
875 #endif
876
877 if (impl_->logger) {
878 impl_->logger->info_fmt("Removed sync config '{}'", config_id);
879 }
880
881 return kcenon::pacs::ok();
882}
883
884std::optional<sync_config> sync_manager::get_config(std::string_view config_id) const {
885 return impl_->get_config_from_cache(config_id);
886}
887
888std::vector<sync_config> sync_manager::list_configs() const {
889 std::shared_lock lock(impl_->configs_mutex);
890 return impl_->configs;
891}
892
893// =============================================================================
894// Manual Sync Operations
895// =============================================================================
896
897std::string sync_manager::sync_now(std::string_view config_id) {
898 auto config_opt = impl_->get_config_from_cache(config_id);
899 if (!config_opt) {
900 return "";
901 }
902
903 impl_->mark_sync_active(config_id);
904
905 // Determine if incremental or full based on last_successful_sync
906 bool full = config_opt->last_successful_sync == std::chrono::system_clock::time_point{};
907 auto result = impl_->perform_sync(*config_opt, full);
908
909 impl_->mark_sync_inactive(config_id);
910 impl_->notify_completion(std::string(config_id), result);
911
912 return result.job_id;
913}
914
915std::string sync_manager::full_sync(std::string_view config_id) {
916 auto config_opt = impl_->get_config_from_cache(config_id);
917 if (!config_opt) {
918 return "";
919 }
920
921 impl_->mark_sync_active(config_id);
922 auto result = impl_->perform_sync(*config_opt, true);
923 impl_->mark_sync_inactive(config_id);
924 impl_->notify_completion(std::string(config_id), result);
925
926 return result.job_id;
927}
928
929std::string sync_manager::incremental_sync(std::string_view config_id) {
930 auto config_opt = impl_->get_config_from_cache(config_id);
931 if (!config_opt) {
932 return "";
933 }
934
935 impl_->mark_sync_active(config_id);
936 auto result = impl_->perform_sync(*config_opt, false);
937 impl_->mark_sync_inactive(config_id);
938 impl_->notify_completion(std::string(config_id), result);
939
940 return result.job_id;
941}
942
943std::future<sync_result> sync_manager::wait_for_sync(std::string_view job_id) {
944 auto promise = std::make_shared<std::promise<sync_result>>();
945 auto future = promise->get_future();
946
947 {
948 std::lock_guard lock(impl_->promises_mutex);
949 impl_->completion_promises[std::string(job_id)] = promise;
950 }
951
952 return future;
953}
954
955// =============================================================================
956// Comparison (Dry Run)
957// =============================================================================
958
959sync_result sync_manager::compare(std::string_view config_id) {
960 sync_result result;
961 result.config_id = std::string(config_id);
962 result.started_at = std::chrono::system_clock::now();
963
964 auto config_opt = impl_->get_config_from_cache(config_id);
965 if (!config_opt) {
966 result.errors.push_back("Config not found: " + std::string(config_id));
967 result.completed_at = std::chrono::system_clock::now();
968 return result;
969 }
970
971 const auto& cfg = *config_opt;
972
973 // Query remote studies
974 auto sync_start_time = std::chrono::system_clock::now() - cfg.lookback;
975 auto remote_studies = impl_->query_remote_studies(cfg, sync_start_time, result);
976
977 if (!result.errors.empty()) {
978 result.completed_at = std::chrono::system_clock::now();
979 return result;
980 }
981
982 result.studies_checked = remote_studies.size();
983
984 // Compare with local
985 auto conflicts = impl_->compare_with_local(cfg, remote_studies, result);
986 result.conflicts = conflicts;
987
988 result.success = true;
989 result.completed_at = std::chrono::system_clock::now();
990 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
991 result.completed_at - result.started_at);
992
993 return result;
994}
995
996std::future<sync_result> sync_manager::compare_async(std::string_view config_id) {
997 return std::async(std::launch::async, [this, id = std::string(config_id)]() {
998 return compare(id);
999 });
1000}
1001
1002// =============================================================================
1003// Conflict Management
1004// =============================================================================
1005
1006std::vector<sync_conflict> sync_manager::get_conflicts() const {
1007 std::lock_guard lock(impl_->conflicts_mutex);
1008 std::vector<sync_conflict> unresolved;
1009 std::copy_if(impl_->conflicts.begin(), impl_->conflicts.end(),
1010 std::back_inserter(unresolved),
1011 [](const sync_conflict& c) { return !c.resolved; });
1012 return unresolved;
1013}
1014
1015std::vector<sync_conflict> sync_manager::get_conflicts(std::string_view config_id) const {
1016 std::lock_guard lock(impl_->conflicts_mutex);
1017 std::vector<sync_conflict> filtered;
1018 std::copy_if(impl_->conflicts.begin(), impl_->conflicts.end(),
1019 std::back_inserter(filtered),
1020 [&config_id](const sync_conflict& c) {
1021 return !c.resolved && c.config_id == config_id;
1022 });
1023 return filtered;
1024}
1025
1026kcenon::pacs::VoidResult sync_manager::resolve_conflict(
1027 std::string_view study_uid,
1028 conflict_resolution resolution) {
1029
1030 std::lock_guard lock(impl_->conflicts_mutex);
1031
1032 auto it = std::find_if(impl_->conflicts.begin(), impl_->conflicts.end(),
1033 [&study_uid](const sync_conflict& c) {
1034 return c.study_uid == study_uid && !c.resolved;
1035 });
1036
1037 if (it == impl_->conflicts.end()) {
1039 kcenon::pacs::error_codes::not_found,
1040 "Conflict not found: " + std::string(study_uid));
1041 }
1042
1043 // Apply resolution
1044 sync_result dummy_result;
1045 impl_->resolve_conflict_internal(*it, resolution, dummy_result);
1046
1047 it->resolved = true;
1048 it->resolution_used = resolution;
1049 it->resolved_at = std::chrono::system_clock::now();
1050
1051 #ifdef PACS_WITH_DATABASE_SYSTEM
1053 [[maybe_unused]] auto result =
1054 impl_->repositories.conflicts->resolve(study_uid, resolution);
1055 } else if (impl_->compatibility_repo) {
1056 [[maybe_unused]] auto result =
1057 impl_->compatibility_repo->resolve_conflict(study_uid, resolution);
1058 }
1059 #else
1061 [[maybe_unused]] auto result =
1062 impl_->compatibility_repo->resolve_conflict(study_uid, resolution);
1063 }
1064 #endif
1065
1066 if (impl_->logger) {
1067 impl_->logger->info_fmt("Resolved conflict for study '{}' using {}",
1068 study_uid, to_string(resolution));
1069 }
1070
1071 return kcenon::pacs::ok();
1072}
1073
1075 std::string_view config_id,
1076 conflict_resolution resolution) {
1077
1078 std::vector<std::string> study_uids;
1079
1080 {
1081 std::lock_guard lock(impl_->conflicts_mutex);
1082 for (const auto& c : impl_->conflicts) {
1083 if (c.config_id == config_id && !c.resolved) {
1084 study_uids.push_back(c.study_uid);
1085 }
1086 }
1087 }
1088
1089 for (const auto& uid : study_uids) {
1090 auto result = resolve_conflict(uid, resolution);
1091 if (result.is_err()) {
1092 return result;
1093 }
1094 }
1095
1096 if (impl_->logger) {
1097 impl_->logger->info_fmt("Resolved {} conflicts for config '{}'",
1098 study_uids.size(), config_id);
1099 }
1100
1101 return kcenon::pacs::ok();
1102}
1103
1104// =============================================================================
1105// Scheduler
1106// =============================================================================
1107
1109 if (impl_->scheduler_running.load()) {
1110 return;
1111 }
1112
1113 impl_->scheduler_running.store(true);
1114 impl_->scheduler_thread = std::thread([this]() {
1116 });
1117
1118 if (impl_->logger) {
1119 impl_->logger->info("Sync scheduler started");
1120 }
1121}
1122
1124 if (!impl_->scheduler_running.load()) {
1125 return;
1126 }
1127
1128 impl_->scheduler_running.store(false);
1129 impl_->scheduler_cv.notify_all();
1130
1131 if (impl_->scheduler_thread.joinable()) {
1132 impl_->scheduler_thread.join();
1133 }
1134
1135 if (impl_->logger) {
1136 impl_->logger->info("Sync scheduler stopped");
1137 }
1138}
1139
1141 return impl_->scheduler_running.load();
1142}
1143
1144// =============================================================================
1145// Status
1146// =============================================================================
1147
1148bool sync_manager::is_syncing(std::string_view config_id) const {
1149 return impl_->is_sync_active(config_id);
1150}
1151
1152sync_result sync_manager::get_last_result(std::string_view config_id) const {
1153 std::shared_lock lock(impl_->results_mutex);
1154 auto it = impl_->last_results.find(std::string(config_id));
1155 if (it != impl_->last_results.end()) {
1156 return it->second;
1157 }
1158 return sync_result{};
1159}
1160
1161// =============================================================================
1162// Statistics
1163// =============================================================================
1164
1166 sync_statistics stats;
1167 stats.total_syncs = impl_->total_syncs.load(std::memory_order_relaxed);
1168 stats.successful_syncs = impl_->successful_syncs.load(std::memory_order_relaxed);
1169 stats.failed_syncs = impl_->failed_syncs.load(std::memory_order_relaxed);
1170 stats.total_studies_synced = impl_->total_studies_synced.load(std::memory_order_relaxed);
1171 stats.total_bytes_transferred = impl_->total_bytes_transferred.load(std::memory_order_relaxed);
1172 stats.total_conflicts_detected = impl_->total_conflicts_detected.load(std::memory_order_relaxed);
1173 stats.total_conflicts_resolved = impl_->total_conflicts_resolved.load(std::memory_order_relaxed);
1174 return stats;
1175}
1176
1177sync_statistics sync_manager::get_statistics(std::string_view config_id) const {
1178 sync_statistics stats;
1179
1180 auto config_opt = impl_->get_config_from_cache(config_id);
1181 if (config_opt) {
1182 stats.total_syncs = config_opt->total_syncs;
1183 stats.total_studies_synced = config_opt->studies_synced;
1184 }
1185
1186 // Count conflicts for this config
1187 {
1188 std::lock_guard lock(impl_->conflicts_mutex);
1189 for (const auto& c : impl_->conflicts) {
1190 if (c.config_id == config_id) {
1191 stats.total_conflicts_detected++;
1192 if (c.resolved) {
1193 stats.total_conflicts_resolved++;
1194 }
1195 }
1196 }
1197 }
1198
1199 return stats;
1200}
1201
1202// =============================================================================
1203// Callbacks
1204// =============================================================================
1205
1207 std::unique_lock lock(impl_->callbacks_mutex);
1208 impl_->progress_callback = std::move(callback);
1209}
1210
1212 std::unique_lock lock(impl_->callbacks_mutex);
1213 impl_->completion_callback = std::move(callback);
1214}
1215
1217 std::unique_lock lock(impl_->callbacks_mutex);
1218 impl_->conflict_callback = std::move(callback);
1219}
1220
1221// =============================================================================
1222// Configuration
1223// =============================================================================
1224
1226 return impl_->config;
1227}
1228
1229} // namespace kcenon::pacs::client
auto get_conflicts() const -> std::vector< sync_conflict >
Get all unresolved conflicts.
void start_scheduler()
Start the sync scheduler.
auto get_last_result(std::string_view config_id) const -> sync_result
Get the last sync result for a configuration.
auto sync_now(std::string_view config_id) -> std::string
Start sync immediately for a configuration.
auto compare_async(std::string_view config_id) -> std::future< sync_result >
Compare local and remote data asynchronously.
auto is_scheduler_running() const noexcept -> bool
Check if scheduler is running.
auto config() const noexcept -> const sync_manager_config &
Get current manager configuration.
auto add_config(const sync_config &config) -> kcenon::pacs::VoidResult
Add a new sync configuration.
auto resolve_conflict(std::string_view study_uid, conflict_resolution resolution) -> kcenon::pacs::VoidResult
Resolve a specific conflict.
auto compare(std::string_view config_id) -> sync_result
Compare local and remote data without syncing.
auto full_sync(std::string_view config_id) -> std::string
Perform a full sync for a configuration.
std::unique_ptr< impl > impl_
auto resolve_all_conflicts(std::string_view config_id, conflict_resolution resolution) -> kcenon::pacs::VoidResult
Resolve all conflicts for a configuration.
auto incremental_sync(std::string_view config_id) -> std::string
Perform an incremental sync for a configuration.
void set_conflict_callback(sync_conflict_callback callback)
Set callback for conflict detection.
void set_completion_callback(sync_completion_callback callback)
Set callback for sync completion.
auto remove_config(std::string_view config_id) -> kcenon::pacs::VoidResult
Remove a sync configuration.
void stop_scheduler()
Stop the sync scheduler.
auto is_syncing(std::string_view config_id) const -> bool
Check if a sync is currently running for a configuration.
auto update_config(const sync_config &config) -> kcenon::pacs::VoidResult
Update an existing sync configuration.
auto wait_for_sync(std::string_view job_id) -> std::future< sync_result >
Wait for a sync operation to complete.
void set_progress_callback(sync_progress_callback callback)
Set callback for sync progress updates.
~sync_manager()
Destructor - stops scheduler if running.
sync_manager(sync_repositories repositories, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< job_manager > job_manager, std::shared_ptr< services::query_scu > query_scu, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a sync manager from split repositories.
auto list_configs() const -> std::vector< sync_config >
List all sync configurations.
auto get_statistics() const -> sync_statistics
Get overall sync statistics.
auto get_config(std::string_view config_id) const -> std::optional< sync_config >
Get a sync configuration by ID.
void set_string(dicom_tag tag, encoding::vr_type vr, std::string_view value)
Set a string value for the given tag.
network::Result< query_result > find(network::association &assoc, const core::dicom_dataset &query_keys)
Perform a C-FIND query with raw dataset.
Definition query_scu.cpp:36
Compile-time constants for commonly used DICOM tags.
Job manager for asynchronous DICOM operations.
conflict_resolution
Strategy for resolving synchronization conflicts.
Definition sync_types.h:121
@ prefer_newer
Use the newer version based on timestamp.
std::function< void( const std::string &config_id, size_t studies_synced, size_t studies_total)> sync_progress_callback
Callback for sync progress updates.
Definition sync_types.h:350
@ missing_remote
Study exists locally but not on remote.
@ missing_local
Study exists on remote but not locally.
@ modified
Study modified on both sides.
@ count_mismatch
Instance counts differ.
std::function< void(const sync_conflict &conflict)> sync_conflict_callback
Callback for conflict detection.
Definition sync_types.h:370
@ push
Push from local to remote.
@ pull
Pull from remote to local.
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
std::function< void( const std::string &config_id, const sync_result &result)> sync_completion_callback
Callback for sync completion.
Definition sync_types.h:361
constexpr dicom_tag query_retrieve_level
Query/Retrieve Level.
constexpr dicom_tag modality
Modality.
constexpr dicom_tag study_instance_uid
Study Instance UID.
constexpr dicom_tag study_date
Study Date.
std::shared_ptr< ILogger > null_logger()
Get a shared null logger instance.
Definition ilogger.h:271
@ DA
Date (8 chars, format: YYYYMMDD)
@ UI
Unique Identifier (64 chars max)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
Definition query_scp.h:42
@ study
Study level - query study information.
@ study_root
Study Root Query/Retrieve Information Model.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
DICOM Query SCU service (C-FIND sender)
Remote PACS node manager for client operations.
Configuration for a synchronization task.
Definition sync_types.h:164
std::vector< std::string > modalities
Modality filter (empty = all)
Definition sync_types.h:179
std::chrono::hours lookback
How far back to sync.
Definition sync_types.h:178
sync_direction direction
Direction of sync.
Definition sync_types.h:186
std::chrono::system_clock::time_point last_successful_sync
Definition sync_types.h:202
std::string source_node_id
Remote node to sync with.
Definition sync_types.h:170
std::string config_id
Unique configuration identifier.
Definition sync_types.h:169
Represents a conflict detected during synchronization.
Definition sync_types.h:220
Historical record of a sync operation.
Definition sync_types.h:291
std::vector< std::string > errors
Definition sync_types.h:300
std::chrono::system_clock::time_point started_at
Definition sync_types.h:302
std::chrono::system_clock::time_point completed_at
Definition sync_types.h:303
void update_config_stats(std::string_view config_id, bool success, size_t studies_synced)
void save_config(const sync_config &cfg)
std::vector< sync_conflict > conflicts
std::optional< sync_config > get_config_from_cache(std::string_view config_id) const
void add_conflict(const sync_conflict &conflict)
sync_result perform_sync(const sync_config &cfg, bool full_sync)
std::unordered_map< std::string, sync_result > last_results
void store_last_result(const std::string &config_id, const sync_result &result)
sync_completion_callback completion_callback
bool is_sync_active(std::string_view config_id) const
void notify_progress(const std::string &config_id, size_t synced, size_t total)
std::shared_ptr< job_manager > job_mgr
void notify_conflict(const sync_conflict &conflict)
void mark_sync_inactive(std::string_view config_id)
std::vector< sync_conflict > compare_with_local(const sync_config &cfg, const std::vector< std::string > &remote_study_uids, sync_result &result)
void mark_sync_active(std::string_view config_id)
std::unordered_map< std::string, std::shared_ptr< std::promise< sync_result > > > completion_promises
std::unordered_set< std::string > active_sync_config_ids
std::shared_ptr< storage::sync_repository > compatibility_repo
std::shared_ptr< di::ILogger > logger
void resolve_conflict_internal(const sync_conflict &conflict, conflict_resolution resolution, sync_result &result)
std::vector< std::string > query_remote_studies(const sync_config &cfg, std::chrono::system_clock::time_point since, sync_result &result)
std::shared_ptr< services::query_scu > query_scu
std::shared_ptr< remote_node_manager > node_manager
void notify_completion(const std::string &config_id, const sync_result &result)
Configuration for the sync manager.
Definition sync_types.h:315
bool auto_resolve_conflicts
Auto-resolve conflicts.
Definition sync_types.h:318
std::shared_ptr< storage::sync_conflict_repository > conflicts
std::shared_ptr< storage::sync_history_repository > history
std::shared_ptr< storage::sync_config_repository > configs
Result of a synchronization operation.
Definition sync_types.h:253
std::chrono::milliseconds elapsed
Definition sync_types.h:281
bool success
Overall success.
Definition sync_types.h:256
std::vector< std::string > errors
Error messages.
Definition sync_types.h:273
size_t studies_synced
Studies actually synced.
Definition sync_types.h:263
std::string job_id
Job ID if async.
Definition sync_types.h:255
size_t studies_skipped
Studies skipped.
Definition sync_types.h:264
std::string config_id
Configuration used.
Definition sync_types.h:254
std::vector< sync_conflict > conflicts
Conflicts detected.
Definition sync_types.h:272
size_t studies_checked
Total studies compared.
Definition sync_types.h:262
std::chrono::system_clock::time_point started_at
Definition sync_types.h:279
std::chrono::system_clock::time_point completed_at
Definition sync_types.h:280
Aggregate statistics for synchronization operations.
Definition sync_types.h:329
Configuration for Query SCU service.
Definition query_scu.h:168
query_model model
Query information model (Patient Root or Study Root)
Definition query_scu.h:170
query_level level
Query level (Patient, Study, Series, or Image)
Definition query_scu.h:173
Repository for sync config records using base_repository pattern.
Repository for sync conflict records using base_repository pattern.
Repository for sync history records using base_repository pattern.
Sync manager for bidirectional DICOM data synchronization.
Repository for sync persistence.
std::string_view uid