16#ifdef PACS_WITH_DATABASE_SYSTEM
29#include <condition_variable>
33#include <shared_mutex>
36#include <unordered_map>
37#include <unordered_set>
47std::string generate_uuid() {
48 static thread_local std::random_device rd;
49 static thread_local std::mt19937_64 gen(rd());
50 static thread_local std::uniform_int_distribution<uint64_t> dis;
52 uint64_t ab = dis(gen);
53 uint64_t cd = dis(gen);
55 ab = (ab & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL;
56 cd = (cd & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL;
58 std::ostringstream oss;
59 oss << std::hex << std::setfill(
'0');
60 oss << std::setw(8) << (ab >> 32);
62 oss << std::setw(4) << ((ab >> 16) & 0xFFFF);
64 oss << std::setw(4) << (ab & 0xFFFF);
66 oss << std::setw(4) << (cd >> 48);
68 oss << std::setw(12) << (cd & 0xFFFFFFFFFFFFULL);
120 std::unordered_map<std::string, std::shared_ptr<std::promise<sync_result>>>
142 return c.config_id == cfg.config_id;
151 #ifdef PACS_WITH_DATABASE_SYSTEM
168 return c.config_id == config_id;
177 size_t studies_synced) {
181 return c.config_id == config_id;
185 it->studies_synced += studies_synced;
186 it->last_sync = std::chrono::system_clock::now();
188 it->last_successful_sync = std::chrono::system_clock::now();
192 #ifdef PACS_WITH_DATABASE_SYSTEM
194 [[maybe_unused]]
auto result =
198 [[maybe_unused]]
auto result =
204 [[maybe_unused]]
auto result =
251 it->second->set_value(result);
270 return c.study_uid == conflict.study_uid &&
271 c.config_id == conflict.config_id;
280 #ifdef PACS_WITH_DATABASE_SYSTEM
284 [[maybe_unused]]
auto result =
289 [[maybe_unused]]
auto result =
305 result.
job_id = generate_uuid();
306 result.
started_at = std::chrono::system_clock::now();
309 logger->info_fmt(
"Starting {} sync for config '{}' from node '{}'",
319 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
324 const auto& node = *node_opt;
325 if (!node.is_online()) {
329 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
334 if (!node.supports_find) {
336 result.
errors.push_back(
"Node does not support C-FIND");
338 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
345 ? std::chrono::system_clock::now() - cfg.
lookback
349 if (!
full_sync && sync_start_time == std::chrono::system_clock::time_point{}) {
350 sync_start_time = std::chrono::system_clock::now() - cfg.
lookback;
355 if (!result.
errors.empty()) {
358 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
372 for (
const auto& conflict : comparison) {
375 if (
job_mgr && node.supports_query_retrieve()) {
376 auto job_id =
job_mgr->create_retrieve_job(
379 if (!job_id.empty()) {
401 for (
const auto& conflict : comparison) {
405 logger->info_fmt(
"Study {} is missing on remote, would create store job",
414 total_syncs.fetch_add(1, std::memory_order_relaxed);
419 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
446 #ifdef PACS_WITH_DATABASE_SYSTEM
450 [[maybe_unused]]
auto save_result =
455 [[maybe_unused]]
auto save_result =
461 logger->info_fmt(
"Sync completed for config '{}': {} checked, {} synced, {} conflicts",
471 std::chrono::system_clock::time_point since,
474 std::vector<std::string> study_uids;
477 std::vector<std::string> sop_classes = {
483 if (assoc_result.is_err()) {
484 result.
errors.push_back(
"Failed to acquire association: " +
485 assoc_result.error().message);
489 auto assoc = std::move(assoc_result.value());
498 std::string modality_filter;
499 for (
size_t i = 0; i < cfg.
modalities.size(); ++i) {
500 if (i > 0) modality_filter +=
"\\";
507 if (since != std::chrono::system_clock::time_point{}) {
508 auto since_time_t = std::chrono::system_clock::to_time_t(since);
511 localtime_s(&tm_buf, &since_time_t);
513 localtime_r(&since_time_t, &tm_buf);
515 std::ostringstream date_oss;
516 date_oss << std::put_time(&tm_buf,
"%Y%m%d") <<
"-";
526 auto query_result = scu.
find(*assoc, query_keys);
531 if (query_result.is_err()) {
532 result.
errors.push_back(
"Query failed: " + query_result.error().message);
536 const auto& qr = query_result.value();
537 for (
const auto& match : qr.matches) {
540 study_uids.push_back(
uid);
549 const std::vector<std::string>& remote_study_uids,
553 std::vector<sync_conflict> comparison_conflicts;
556 for (
const auto& study_uid : remote_study_uids) {
559 bool exists_locally =
false;
561 if (!exists_locally) {
564 conflict.study_uid = study_uid;
566 conflict.remote_modified = std::chrono::system_clock::now();
567 conflict.detected_at = std::chrono::system_clock::now();
568 comparison_conflicts.push_back(conflict);
572 return comparison_conflicts;
578 switch (resolution) {
582 auto job_id =
job_mgr->create_retrieve_job(
583 conflict.config_id, conflict.study_uid, std::nullopt,
585 if (!job_id.empty()) {
598 if (conflict.remote_modified > conflict.local_modified) {
600 auto job_id =
job_mgr->create_retrieve_job(
601 conflict.config_id, conflict.study_uid, std::nullopt,
603 if (!job_id.empty()) {
623 scheduler_cv.wait_for(lock, std::chrono::minutes(1), [
this] {
632 std::vector<sync_config> configs_copy;
638 for (
const auto& cfg : configs_copy) {
639 if (!cfg.enabled || cfg.schedule_cron.empty()) {
645 auto now = std::chrono::system_clock::now();
646 auto since_last = now - cfg.last_sync;
649 if (since_last >= std::chrono::hours(1)) {
652 logger->info_fmt(
"Scheduler triggering sync for config '{}'",
667 std::vector<sync_config> loaded;
668 #ifdef PACS_WITH_DATABASE_SYSTEM
671 if (result.is_err()) {
674 loaded = std::move(result.value());
693 logger->info_fmt(
"Loaded {} sync configs from repository",
configs.size());
698 std::vector<sync_conflict> loaded;
699 #ifdef PACS_WITH_DATABASE_SYSTEM
702 if (result.is_err()) {
705 loaded = std::move(result.value());
724 logger->info_fmt(
"Loaded {} unresolved conflicts from repository",
conflicts.size());
735 std::shared_ptr<remote_node_manager> node_manager,
737 std::shared_ptr<services::query_scu>
query_scu,
738 std::shared_ptr<di::ILogger> logger)
741 std::move(
query_scu), std::move(logger)) {}
746 std::shared_ptr<remote_node_manager> node_manager,
748 std::shared_ptr<services::query_scu>
query_scu,
749 std::shared_ptr<di::ILogger> logger)
750 : impl_(std::make_unique<
impl>()) {
764 std::shared_ptr<storage::sync_repository> repo,
765 std::shared_ptr<remote_node_manager> node_manager,
767 std::shared_ptr<services::query_scu>
query_scu,
768 std::shared_ptr<di::ILogger> logger)
775 std::shared_ptr<storage::sync_repository> repo,
776 std::shared_ptr<remote_node_manager> node_manager,
778 std::shared_ptr<services::query_scu>
query_scu,
779 std::shared_ptr<di::ILogger> logger)
780 : impl_(std::make_unique<
impl>()) {
802 if (
config.config_id.empty()) {
804 kcenon::pacs::error_codes::invalid_argument,
805 "Config ID cannot be empty");
808 if (
config.source_node_id.empty()) {
810 kcenon::pacs::error_codes::invalid_argument,
811 "Source node ID cannot be empty");
817 kcenon::pacs::error_codes::already_exists,
818 "Config already exists: " +
config.config_id);
824 impl_->
logger->info_fmt(
"Added sync config '{}' for node '{}'",
828 return kcenon::pacs::ok();
834 kcenon::pacs::error_codes::not_found,
835 "Config not found: " +
config.config_id);
844 return kcenon::pacs::ok();
852 return c.config_id == config_id;
856 kcenon::pacs::error_codes::not_found,
857 "Config not found: " + std::string(config_id));
862 #ifdef PACS_WITH_DATABASE_SYSTEM
864 [[maybe_unused]]
auto result =
867 [[maybe_unused]]
auto result =
872 [[maybe_unused]]
auto result =
878 impl_->
logger->info_fmt(
"Removed sync config '{}'", config_id);
881 return kcenon::pacs::ok();
906 bool full = config_opt->last_successful_sync == std::chrono::system_clock::time_point{};
912 return result.job_id;
926 return result.job_id;
940 return result.job_id;
944 auto promise = std::make_shared<std::promise<sync_result>>();
945 auto future = promise->get_future();
961 result.
config_id = std::string(config_id);
962 result.
started_at = std::chrono::system_clock::now();
966 result.
errors.push_back(
"Config not found: " + std::string(config_id));
971 const auto& cfg = *config_opt;
974 auto sync_start_time = std::chrono::system_clock::now() - cfg.lookback;
977 if (!result.
errors.empty()) {
990 result.
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
997 return std::async(std::launch::async, [
this,
id = std::string(config_id)]() {
1008 std::vector<sync_conflict> unresolved;
1010 std::back_inserter(unresolved),
1017 std::vector<sync_conflict> filtered;
1019 std::back_inserter(filtered),
1021 return !c.resolved && c.config_id == config_id;
1027 std::string_view study_uid,
1034 return c.study_uid == study_uid && !c.resolved;
1039 kcenon::pacs::error_codes::not_found,
1040 "Conflict not found: " + std::string(study_uid));
1047 it->resolved =
true;
1048 it->resolution_used = resolution;
1049 it->resolved_at = std::chrono::system_clock::now();
1051 #ifdef PACS_WITH_DATABASE_SYSTEM
1053 [[maybe_unused]]
auto result =
1056 [[maybe_unused]]
auto result =
1061 [[maybe_unused]]
auto result =
1067 impl_->
logger->info_fmt(
"Resolved conflict for study '{}' using {}",
1071 return kcenon::pacs::ok();
1075 std::string_view config_id,
1078 std::vector<std::string> study_uids;
1083 if (c.config_id == config_id && !c.resolved) {
1084 study_uids.push_back(c.study_uid);
1089 for (
const auto&
uid : study_uids) {
1091 if (result.is_err()) {
1097 impl_->
logger->info_fmt(
"Resolved {} conflicts for config '{}'",
1098 study_uids.size(), config_id);
1101 return kcenon::pacs::ok();
1182 stats.total_syncs = config_opt->total_syncs;
1183 stats.total_studies_synced = config_opt->studies_synced;
1190 if (c.config_id == config_id) {
1191 stats.total_conflicts_detected++;
1193 stats.total_conflicts_resolved++;
auto get_conflicts() const -> std::vector< sync_conflict >
Get all unresolved conflicts.
void start_scheduler()
Start the sync scheduler.
auto get_last_result(std::string_view config_id) const -> sync_result
Get the last sync result for a configuration.
auto sync_now(std::string_view config_id) -> std::string
Start sync immediately for a configuration.
auto compare_async(std::string_view config_id) -> std::future< sync_result >
Compare local and remote data asynchronously.
auto is_scheduler_running() const noexcept -> bool
Check if scheduler is running.
auto config() const noexcept -> const sync_manager_config &
Get current manager configuration.
auto add_config(const sync_config &config) -> kcenon::pacs::VoidResult
Add a new sync configuration.
auto resolve_conflict(std::string_view study_uid, conflict_resolution resolution) -> kcenon::pacs::VoidResult
Resolve a specific conflict.
auto compare(std::string_view config_id) -> sync_result
Compare local and remote data without syncing.
auto full_sync(std::string_view config_id) -> std::string
Perform a full sync for a configuration.
std::unique_ptr< impl > impl_
auto resolve_all_conflicts(std::string_view config_id, conflict_resolution resolution) -> kcenon::pacs::VoidResult
Resolve all conflicts for a configuration.
auto incremental_sync(std::string_view config_id) -> std::string
Perform an incremental sync for a configuration.
void set_conflict_callback(sync_conflict_callback callback)
Set callback for conflict detection.
void set_completion_callback(sync_completion_callback callback)
Set callback for sync completion.
auto remove_config(std::string_view config_id) -> kcenon::pacs::VoidResult
Remove a sync configuration.
void stop_scheduler()
Stop the sync scheduler.
auto is_syncing(std::string_view config_id) const -> bool
Check if a sync is currently running for a configuration.
auto update_config(const sync_config &config) -> kcenon::pacs::VoidResult
Update an existing sync configuration.
auto wait_for_sync(std::string_view job_id) -> std::future< sync_result >
Wait for a sync operation to complete.
void set_progress_callback(sync_progress_callback callback)
Set callback for sync progress updates.
~sync_manager()
Destructor - stops scheduler if running.
sync_manager(sync_repositories repositories, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< job_manager > job_manager, std::shared_ptr< services::query_scu > query_scu, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a sync manager from split repositories.
auto list_configs() const -> std::vector< sync_config >
List all sync configurations.
auto get_statistics() const -> sync_statistics
Get overall sync statistics.
auto get_config(std::string_view config_id) const -> std::optional< sync_config >
Get a sync configuration by ID.
void set_string(dicom_tag tag, encoding::vr_type vr, std::string_view value)
Set a string value for the given tag.
network::Result< query_result > find(network::association &assoc, const core::dicom_dataset &query_keys)
Perform a C-FIND query with raw dataset.
Compile-time constants for commonly used DICOM tags.
Job manager for asynchronous DICOM operations.
conflict_resolution
Strategy for resolving synchronization conflicts.
@ prefer_local
Keep local version.
@ prefer_newer
Use the newer version based on timestamp.
@ prefer_remote
Use remote version.
std::function< void( const std::string &config_id, size_t studies_synced, size_t studies_total)> sync_progress_callback
Callback for sync progress updates.
@ missing_remote
Study exists locally but not on remote.
@ missing_local
Study exists on remote but not locally.
@ modified
Study modified on both sides.
@ count_mismatch
Instance counts differ.
@ normal
Standard priority.
std::function< void(const sync_conflict &conflict)> sync_conflict_callback
Callback for conflict detection.
@ push
Push from local to remote.
@ bidirectional
Both directions.
@ pull
Pull from remote to local.
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
std::function< void( const std::string &config_id, const sync_result &result)> sync_completion_callback
Callback for sync completion.
std::shared_ptr< ILogger > null_logger()
Get a shared null logger instance.
@ DA
Date (8 chars, format: YYYYMMDD)
@ UI
Unique Identifier (64 chars max)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
@ study
Study level - query study information.
@ study_root
Study Root Query/Retrieve Information Model.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
DICOM Query SCU service (C-FIND sender)
Remote PACS node manager for client operations.
Configuration for a synchronization task.
std::vector< std::string > modalities
Modality filter (empty = all)
std::chrono::hours lookback
How far back to sync.
sync_direction direction
Direction of sync.
std::chrono::system_clock::time_point last_successful_sync
std::string source_node_id
Remote node to sync with.
std::string config_id
Unique configuration identifier.
Represents a conflict detected during synchronization.
Historical record of a sync operation.
std::vector< std::string > errors
std::chrono::system_clock::time_point started_at
std::chrono::system_clock::time_point completed_at
void update_config_stats(std::string_view config_id, bool success, size_t studies_synced)
void save_config(const sync_config &cfg)
std::vector< sync_conflict > conflicts
sync_repositories repositories
std::optional< sync_config > get_config_from_cache(std::string_view config_id) const
void add_conflict(const sync_conflict &conflict)
std::atomic< size_t > total_bytes_transferred
std::atomic< size_t > successful_syncs
std::atomic< size_t > total_syncs
std::atomic< size_t > failed_syncs
std::mutex conflicts_mutex
sync_result perform_sync(const sync_config &cfg, bool full_sync)
std::unordered_map< std::string, sync_result > last_results
void store_last_result(const std::string &config_id, const sync_result &result)
void load_conflicts_from_repo()
sync_completion_callback completion_callback
std::vector< sync_config > configs
std::thread scheduler_thread
bool is_sync_active(std::string_view config_id) const
sync_manager_config config
void notify_progress(const std::string &config_id, size_t synced, size_t total)
std::atomic< bool > scheduler_running
std::shared_ptr< job_manager > job_mgr
std::atomic< size_t > total_conflicts_detected
std::mutex scheduler_mutex
void notify_conflict(const sync_conflict &conflict)
void mark_sync_inactive(std::string_view config_id)
std::vector< sync_conflict > compare_with_local(const sync_config &cfg, const std::vector< std::string > &remote_study_uids, sync_result &result)
std::shared_mutex configs_mutex
void mark_sync_active(std::string_view config_id)
std::atomic< size_t > total_studies_synced
sync_conflict_callback conflict_callback
std::unordered_map< std::string, std::shared_ptr< std::promise< sync_result > > > completion_promises
std::unordered_set< std::string > active_sync_config_ids
std::shared_ptr< storage::sync_repository > compatibility_repo
std::shared_mutex results_mutex
std::atomic< size_t > total_conflicts_resolved
std::shared_ptr< di::ILogger > logger
void resolve_conflict_internal(const sync_conflict &conflict, conflict_resolution resolution, sync_result &result)
std::vector< std::string > query_remote_studies(const sync_config &cfg, std::chrono::system_clock::time_point since, sync_result &result)
std::mutex promises_mutex
std::shared_ptr< services::query_scu > query_scu
std::shared_ptr< remote_node_manager > node_manager
void notify_completion(const std::string &config_id, const sync_result &result)
void load_configs_from_repo()
sync_progress_callback progress_callback
std::shared_mutex callbacks_mutex
std::condition_variable scheduler_cv
Configuration for the sync manager.
conflict_resolution default_resolution
bool auto_resolve_conflicts
Auto-resolve conflicts.
std::shared_ptr< storage::sync_conflict_repository > conflicts
std::shared_ptr< storage::sync_history_repository > history
std::shared_ptr< storage::sync_config_repository > configs
Result of a synchronization operation.
std::chrono::milliseconds elapsed
bool success
Overall success.
std::vector< std::string > errors
Error messages.
size_t studies_synced
Studies actually synced.
std::string job_id
Job ID if async.
size_t studies_skipped
Studies skipped.
std::string config_id
Configuration used.
std::vector< sync_conflict > conflicts
Conflicts detected.
size_t studies_checked
Total studies compared.
std::chrono::system_clock::time_point started_at
std::chrono::system_clock::time_point completed_at
Aggregate statistics for synchronization operations.
Configuration for Query SCU service.
query_model model
Query information model (Patient Root or Study Root)
query_level level
Query level (Patient, Study, Series, or Image)
Repository for sync config records using base_repository pattern.
Repository for sync conflict records using base_repository pattern.
Repository for sync history records using base_repository pattern.
Sync manager for bidirectional DICOM data synchronization.
Repository for sync persistence.