PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
kcenon::pacs::client::sync_manager::impl Struct Reference
Collaboration diagram for kcenon::pacs::client::sync_manager::impl:
Collaboration graph

Public Member Functions

void save_config (const sync_config &cfg)
 
std::optional< sync_configget_config_from_cache (std::string_view config_id) const
 
void update_config_stats (std::string_view config_id, bool success, size_t studies_synced)
 
void mark_sync_active (std::string_view config_id)
 
void mark_sync_inactive (std::string_view config_id)
 
bool is_sync_active (std::string_view config_id) const
 
void store_last_result (const std::string &config_id, const sync_result &result)
 
void notify_progress (const std::string &config_id, size_t synced, size_t total)
 
void notify_completion (const std::string &config_id, const sync_result &result)
 
void notify_conflict (const sync_conflict &conflict)
 
void add_conflict (const sync_conflict &conflict)
 
sync_result perform_sync (const sync_config &cfg, bool full_sync)
 
std::vector< std::string > query_remote_studies (const sync_config &cfg, std::chrono::system_clock::time_point since, sync_result &result)
 
std::vector< sync_conflictcompare_with_local (const sync_config &cfg, const std::vector< std::string > &remote_study_uids, sync_result &result)
 
void resolve_conflict_internal (const sync_conflict &conflict, conflict_resolution resolution, sync_result &result)
 
void scheduler_loop ()
 
void load_configs_from_repo ()
 
void load_conflicts_from_repo ()
 

Public Attributes

sync_manager_config config
 
sync_repositories repositories
 
std::shared_ptr< storage::sync_repositorycompatibility_repo
 
std::shared_ptr< remote_node_managernode_manager
 
std::shared_ptr< job_managerjob_mgr
 
std::shared_ptr< services::query_scuquery_scu
 
std::shared_ptr< di::ILoggerlogger
 
std::vector< sync_configconfigs
 
std::shared_mutex configs_mutex
 
std::unordered_set< std::string > active_sync_config_ids
 
std::mutex active_mutex
 
std::unordered_map< std::string, sync_resultlast_results
 
std::shared_mutex results_mutex
 
std::vector< sync_conflictconflicts
 
std::mutex conflicts_mutex
 
std::thread scheduler_thread
 
std::atomic< bool > scheduler_running {false}
 
std::condition_variable scheduler_cv
 
std::mutex scheduler_mutex
 
sync_progress_callback progress_callback
 
sync_completion_callback completion_callback
 
sync_conflict_callback conflict_callback
 
std::shared_mutex callbacks_mutex
 
std::unordered_map< std::string, std::shared_ptr< std::promise< sync_result > > > completion_promises
 
std::mutex promises_mutex
 
std::atomic< size_t > total_syncs {0}
 
std::atomic< size_t > successful_syncs {0}
 
std::atomic< size_t > failed_syncs {0}
 
std::atomic< size_t > total_studies_synced {0}
 
std::atomic< size_t > total_bytes_transferred {0}
 
std::atomic< size_t > total_conflicts_detected {0}
 
std::atomic< size_t > total_conflicts_resolved {0}
 

Detailed Description

Definition at line 79 of file sync_manager.cpp.

Member Function Documentation

◆ add_conflict()

void kcenon::pacs::client::sync_manager::impl::add_conflict ( const sync_conflict & conflict)
inline

Definition at line 264 of file sync_manager.cpp.

264 {
265 {
266 std::lock_guard lock(conflicts_mutex);
267 // Check for existing conflict for same study
268 auto it = std::find_if(conflicts.begin(), conflicts.end(),
269 [&conflict](const sync_conflict& c) {
270 return c.study_uid == conflict.study_uid &&
271 c.config_id == conflict.config_id;
272 });
273 if (it != conflicts.end()) {
274 *it = conflict; // Update existing
275 } else {
276 conflicts.push_back(conflict);
277 }
278 }
279
280 #ifdef PACS_WITH_DATABASE_SYSTEM
282 [[maybe_unused]] auto result = repositories.conflicts->save(conflict);
283 } else if (compatibility_repo) {
284 [[maybe_unused]] auto result =
285 compatibility_repo->save_conflict(conflict);
286 }
287 #else
288 if (compatibility_repo) {
289 [[maybe_unused]] auto result =
290 compatibility_repo->save_conflict(conflict);
291 }
292 #endif
293
294 total_conflicts_detected.fetch_add(1, std::memory_order_relaxed);
295 notify_conflict(conflict);
296 }
std::vector< sync_conflict > conflicts
void notify_conflict(const sync_conflict &conflict)
std::shared_ptr< storage::sync_repository > compatibility_repo
std::shared_ptr< storage::sync_conflict_repository > conflicts

References compatibility_repo, conflicts, kcenon::pacs::client::sync_repositories::conflicts, conflicts_mutex, notify_conflict(), repositories, and total_conflicts_detected.

Referenced by perform_sync().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ compare_with_local()

std::vector< sync_conflict > kcenon::pacs::client::sync_manager::impl::compare_with_local ( const sync_config & cfg,
const std::vector< std::string > & remote_study_uids,
sync_result & result )
inline

Definition at line 547 of file sync_manager.cpp.

550 {
551 (void)result; // Reserved for future result aggregation
552
553 std::vector<sync_conflict> comparison_conflicts;
554
555 // For each remote study, check if it exists locally
556 for (const auto& study_uid : remote_study_uids) {
557 // In real implementation, query local database
558 // For now, we'll simulate that all remote studies are missing locally
559 bool exists_locally = false; // Would check storage index here
560
561 if (!exists_locally) {
562 sync_conflict conflict;
563 conflict.config_id = cfg.config_id;
564 conflict.study_uid = study_uid;
566 conflict.remote_modified = std::chrono::system_clock::now();
567 conflict.detected_at = std::chrono::system_clock::now();
568 comparison_conflicts.push_back(conflict);
569 }
570 }
571
572 return comparison_conflicts;
573 }
@ missing_local
Study exists on remote but not locally.

References kcenon::pacs::client::sync_config::config_id, and kcenon::pacs::client::missing_local.

Referenced by kcenon::pacs::client::sync_manager::compare(), and perform_sync().

Here is the caller graph for this function:

◆ get_config_from_cache()

std::optional< sync_config > kcenon::pacs::client::sync_manager::impl::get_config_from_cache ( std::string_view config_id) const
inline

Definition at line 164 of file sync_manager.cpp.

164 {
165 std::shared_lock lock(configs_mutex);
166 auto it = std::find_if(configs.begin(), configs.end(),
167 [&config_id](const sync_config& c) {
168 return c.config_id == config_id;
169 });
170 if (it != configs.end()) {
171 return *it;
172 }
173 return std::nullopt;
174 }

References configs, and configs_mutex.

Referenced by kcenon::pacs::client::sync_manager::add_config(), kcenon::pacs::client::sync_manager::compare(), kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_manager::get_config(), kcenon::pacs::client::sync_manager::get_statistics(), kcenon::pacs::client::sync_manager::incremental_sync(), kcenon::pacs::client::sync_manager::sync_now(), and kcenon::pacs::client::sync_manager::update_config().

Here is the caller graph for this function:

◆ is_sync_active()

bool kcenon::pacs::client::sync_manager::impl::is_sync_active ( std::string_view config_id) const
inline

Definition at line 221 of file sync_manager.cpp.

221 {
222 std::lock_guard lock(active_mutex);
223 return active_sync_config_ids.count(std::string(config_id)) > 0;
224 }
std::unordered_set< std::string > active_sync_config_ids

References active_mutex, and active_sync_config_ids.

Referenced by kcenon::pacs::client::sync_manager::is_syncing(), and scheduler_loop().

Here is the caller graph for this function:

◆ load_configs_from_repo()

void kcenon::pacs::client::sync_manager::impl::load_configs_from_repo ( )
inline

Definition at line 666 of file sync_manager.cpp.

666 {
667 std::vector<sync_config> loaded;
668 #ifdef PACS_WITH_DATABASE_SYSTEM
669 if (repositories.configs) {
670 auto result = repositories.configs->find_all();
671 if (result.is_err()) {
672 return;
673 }
674 loaded = std::move(result.value());
675 } else if (compatibility_repo) {
676 loaded = compatibility_repo->list_configs();
677 } else {
678 return;
679 }
680 #else
681 if (compatibility_repo) {
682 loaded = compatibility_repo->list_configs();
683 } else {
684 return;
685 }
686 #endif
687 {
688 std::unique_lock lock(configs_mutex);
689 configs = std::move(loaded);
690 }
691
692 if (logger && !configs.empty()) {
693 logger->info_fmt("Loaded {} sync configs from repository", configs.size());
694 }
695 }
std::shared_ptr< di::ILogger > logger
std::shared_ptr< storage::sync_config_repository > configs

References compatibility_repo, configs, kcenon::pacs::client::sync_repositories::configs, configs_mutex, logger, and repositories.

Referenced by kcenon::pacs::client::sync_manager::sync_manager(), and kcenon::pacs::client::sync_manager::sync_manager().

Here is the caller graph for this function:

◆ load_conflicts_from_repo()

void kcenon::pacs::client::sync_manager::impl::load_conflicts_from_repo ( )
inline

Definition at line 697 of file sync_manager.cpp.

697 {
698 std::vector<sync_conflict> loaded;
699 #ifdef PACS_WITH_DATABASE_SYSTEM
701 auto result = repositories.conflicts->find_unresolved();
702 if (result.is_err()) {
703 return;
704 }
705 loaded = std::move(result.value());
706 } else if (compatibility_repo) {
707 loaded = compatibility_repo->list_unresolved_conflicts();
708 } else {
709 return;
710 }
711 #else
712 if (compatibility_repo) {
713 loaded = compatibility_repo->list_unresolved_conflicts();
714 } else {
715 return;
716 }
717 #endif
718 {
719 std::lock_guard lock(conflicts_mutex);
720 conflicts = std::move(loaded);
721 }
722
723 if (logger && !conflicts.empty()) {
724 logger->info_fmt("Loaded {} unresolved conflicts from repository", conflicts.size());
725 }
726 }

References compatibility_repo, conflicts, kcenon::pacs::client::sync_repositories::conflicts, conflicts_mutex, logger, and repositories.

Referenced by kcenon::pacs::client::sync_manager::sync_manager(), and kcenon::pacs::client::sync_manager::sync_manager().

Here is the caller graph for this function:

◆ mark_sync_active()

void kcenon::pacs::client::sync_manager::impl::mark_sync_active ( std::string_view config_id)
inline

Definition at line 211 of file sync_manager.cpp.

211 {
212 std::lock_guard lock(active_mutex);
213 active_sync_config_ids.insert(std::string(config_id));
214 }

References active_mutex, and active_sync_config_ids.

Referenced by kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_manager::incremental_sync(), scheduler_loop(), and kcenon::pacs::client::sync_manager::sync_now().

Here is the caller graph for this function:

◆ mark_sync_inactive()

void kcenon::pacs::client::sync_manager::impl::mark_sync_inactive ( std::string_view config_id)
inline

Definition at line 216 of file sync_manager.cpp.

216 {
217 std::lock_guard lock(active_mutex);
218 active_sync_config_ids.erase(std::string(config_id));
219 }

References active_mutex, and active_sync_config_ids.

Referenced by kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_manager::incremental_sync(), scheduler_loop(), and kcenon::pacs::client::sync_manager::sync_now().

Here is the caller graph for this function:

◆ notify_completion()

void kcenon::pacs::client::sync_manager::impl::notify_completion ( const std::string & config_id,
const sync_result & result )
inline

Definition at line 238 of file sync_manager.cpp.

238 {
239 {
240 std::shared_lock lock(callbacks_mutex);
242 completion_callback(config_id, result);
243 }
244 }
245
246 // Fulfill promise if exists
247 {
248 std::lock_guard lock(promises_mutex);
249 auto it = completion_promises.find(result.job_id);
250 if (it != completion_promises.end()) {
251 it->second->set_value(result);
252 completion_promises.erase(it);
253 }
254 }
255 }
sync_completion_callback completion_callback
std::unordered_map< std::string, std::shared_ptr< std::promise< sync_result > > > completion_promises

References callbacks_mutex, completion_callback, completion_promises, kcenon::pacs::client::sync_result::job_id, and promises_mutex.

Referenced by kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_manager::incremental_sync(), scheduler_loop(), and kcenon::pacs::client::sync_manager::sync_now().

Here is the caller graph for this function:

◆ notify_conflict()

void kcenon::pacs::client::sync_manager::impl::notify_conflict ( const sync_conflict & conflict)
inline

Definition at line 257 of file sync_manager.cpp.

257 {
258 std::shared_lock lock(callbacks_mutex);
259 if (conflict_callback) {
260 conflict_callback(conflict);
261 }
262 }

References callbacks_mutex, and conflict_callback.

Referenced by add_conflict().

Here is the caller graph for this function:

◆ notify_progress()

void kcenon::pacs::client::sync_manager::impl::notify_progress ( const std::string & config_id,
size_t synced,
size_t total )
inline

Definition at line 231 of file sync_manager.cpp.

231 {
232 std::shared_lock lock(callbacks_mutex);
233 if (progress_callback) {
234 progress_callback(config_id, synced, total);
235 }
236 }

References callbacks_mutex, and progress_callback.

Referenced by perform_sync().

Here is the caller graph for this function:

◆ perform_sync()

sync_result kcenon::pacs::client::sync_manager::impl::perform_sync ( const sync_config & cfg,
bool full_sync )
inline

Definition at line 302 of file sync_manager.cpp.

302 {
303 sync_result result;
304 result.config_id = cfg.config_id;
305 result.job_id = generate_uuid();
306 result.started_at = std::chrono::system_clock::now();
307
308 if (logger) {
309 logger->info_fmt("Starting {} sync for config '{}' from node '{}'",
310 full_sync ? "full" : "incremental", cfg.config_id, cfg.source_node_id);
311 }
312
313 // Get the source node
314 auto node_opt = node_manager->get_node(cfg.source_node_id);
315 if (!node_opt) {
316 result.success = false;
317 result.errors.push_back("Source node not found: " + cfg.source_node_id);
318 result.completed_at = std::chrono::system_clock::now();
319 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
320 result.completed_at - result.started_at);
321 return result;
322 }
323
324 const auto& node = *node_opt;
325 if (!node.is_online()) {
326 result.success = false;
327 result.errors.push_back("Source node is offline: " + cfg.source_node_id);
328 result.completed_at = std::chrono::system_clock::now();
329 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
330 result.completed_at - result.started_at);
331 return result;
332 }
333
334 if (!node.supports_find) {
335 result.success = false;
336 result.errors.push_back("Node does not support C-FIND");
337 result.completed_at = std::chrono::system_clock::now();
338 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
339 result.completed_at - result.started_at);
340 return result;
341 }
342
343 // Determine sync start time based on full_sync or incremental
344 auto sync_start_time = full_sync
345 ? std::chrono::system_clock::now() - cfg.lookback
346 : cfg.last_successful_sync;
347
348 // If no successful sync and not full sync, use lookback
349 if (!full_sync && sync_start_time == std::chrono::system_clock::time_point{}) {
350 sync_start_time = std::chrono::system_clock::now() - cfg.lookback;
351 }
352
353 // Query remote studies
354 auto remote_studies = query_remote_studies(cfg, sync_start_time, result);
355 if (!result.errors.empty()) {
356 result.success = false;
357 result.completed_at = std::chrono::system_clock::now();
358 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
359 result.completed_at - result.started_at);
360 return result;
361 }
362
363 result.studies_checked = remote_studies.size();
364
365 // Compare with local studies
366 auto comparison = compare_with_local(cfg, remote_studies, result);
367
368 // Process differences based on sync direction
369 if (cfg.direction == sync_direction::pull ||
370 cfg.direction == sync_direction::bidirectional) {
371 // Pull missing studies from remote
372 for (const auto& conflict : comparison) {
373 if (conflict.conflict_type == sync_conflict_type::missing_local) {
374 // Create retrieve job
375 if (job_mgr && node.supports_query_retrieve()) {
376 auto job_id = job_mgr->create_retrieve_job(
377 cfg.source_node_id, conflict.study_uid, std::nullopt,
379 if (!job_id.empty()) {
380 result.studies_synced++;
381 notify_progress(cfg.config_id, result.studies_synced,
382 result.studies_checked);
383 }
384 }
385 } else if (conflict.conflict_type == sync_conflict_type::count_mismatch ||
386 conflict.conflict_type == sync_conflict_type::modified) {
387 // Handle conflicts
390 } else {
391 add_conflict(conflict);
392 result.conflicts.push_back(conflict);
393 }
394 }
395 }
396 }
397
398 if (cfg.direction == sync_direction::push ||
399 cfg.direction == sync_direction::bidirectional) {
400 // Push studies missing on remote
401 for (const auto& conflict : comparison) {
402 if (conflict.conflict_type == sync_conflict_type::missing_remote) {
403 // Store job would be created here for push
404 if (logger) {
405 logger->info_fmt("Study {} is missing on remote, would create store job",
406 conflict.study_uid);
407 }
408 result.studies_synced++;
409 }
410 }
411 }
412
413 // Update statistics
414 total_syncs.fetch_add(1, std::memory_order_relaxed);
415 total_studies_synced.fetch_add(result.studies_synced, std::memory_order_relaxed);
416
417 result.success = result.errors.empty();
418 result.completed_at = std::chrono::system_clock::now();
419 result.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
420 result.completed_at - result.started_at);
421
422 if (result.success) {
423 successful_syncs.fetch_add(1, std::memory_order_relaxed);
424 } else {
425 failed_syncs.fetch_add(1, std::memory_order_relaxed);
426 }
427
428 // Update config statistics
429 update_config_stats(cfg.config_id, result.success, result.studies_synced);
430
431 // Store result
432 store_last_result(cfg.config_id, result);
433
434 // Save history
435 sync_history history;
436 history.config_id = cfg.config_id;
437 history.job_id = result.job_id;
438 history.success = result.success;
439 history.studies_checked = result.studies_checked;
440 history.studies_synced = result.studies_synced;
441 history.conflicts_found = result.conflicts.size();
442 history.errors = result.errors;
443 history.started_at = result.started_at;
444 history.completed_at = result.completed_at;
445
446 #ifdef PACS_WITH_DATABASE_SYSTEM
447 if (repositories.history) {
448 [[maybe_unused]] auto save_result = repositories.history->save(history);
449 } else if (compatibility_repo) {
450 [[maybe_unused]] auto save_result =
451 compatibility_repo->save_history(history);
452 }
453 #else
454 if (compatibility_repo) {
455 [[maybe_unused]] auto save_result =
456 compatibility_repo->save_history(history);
457 }
458 #endif
459
460 if (logger) {
461 logger->info_fmt("Sync completed for config '{}': {} checked, {} synced, {} conflicts",
462 cfg.config_id, result.studies_checked, result.studies_synced,
463 result.conflicts.size());
464 }
465
466 return result;
467 }
auto full_sync(std::string_view config_id) -> std::string
Perform a full sync for a configuration.
@ missing_remote
Study exists locally but not on remote.
@ modified
Study modified on both sides.
@ count_mismatch
Instance counts differ.
@ push
Push from local to remote.
@ pull
Pull from remote to local.
void update_config_stats(std::string_view config_id, bool success, size_t studies_synced)
void add_conflict(const sync_conflict &conflict)
void store_last_result(const std::string &config_id, const sync_result &result)
void notify_progress(const std::string &config_id, size_t synced, size_t total)
std::shared_ptr< job_manager > job_mgr
std::vector< sync_conflict > compare_with_local(const sync_config &cfg, const std::vector< std::string > &remote_study_uids, sync_result &result)
void resolve_conflict_internal(const sync_conflict &conflict, conflict_resolution resolution, sync_result &result)
std::vector< std::string > query_remote_studies(const sync_config &cfg, std::chrono::system_clock::time_point since, sync_result &result)
std::shared_ptr< remote_node_manager > node_manager
bool auto_resolve_conflicts
Auto-resolve conflicts.
Definition sync_types.h:318
std::shared_ptr< storage::sync_history_repository > history

References add_conflict(), kcenon::pacs::client::sync_manager_config::auto_resolve_conflicts, kcenon::pacs::client::bidirectional, compare_with_local(), compatibility_repo, kcenon::pacs::client::sync_history::completed_at, kcenon::pacs::client::sync_result::completed_at, config, kcenon::pacs::client::sync_config::config_id, kcenon::pacs::client::sync_history::config_id, kcenon::pacs::client::sync_result::config_id, kcenon::pacs::client::sync_result::conflicts, kcenon::pacs::client::sync_history::conflicts_found, kcenon::pacs::client::count_mismatch, kcenon::pacs::client::sync_manager_config::default_resolution, kcenon::pacs::client::sync_config::direction, kcenon::pacs::client::sync_result::elapsed, kcenon::pacs::client::sync_history::errors, kcenon::pacs::client::sync_result::errors, failed_syncs, kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_repositories::history, kcenon::pacs::client::sync_history::job_id, kcenon::pacs::client::sync_result::job_id, job_mgr, kcenon::pacs::client::sync_config::last_successful_sync, logger, kcenon::pacs::client::sync_config::lookback, kcenon::pacs::client::missing_local, kcenon::pacs::client::missing_remote, kcenon::pacs::client::modified, node_manager, kcenon::pacs::client::normal, notify_progress(), kcenon::pacs::client::pull, kcenon::pacs::client::push, query_remote_studies(), repositories, resolve_conflict_internal(), kcenon::pacs::client::sync_config::source_node_id, kcenon::pacs::client::sync_history::started_at, kcenon::pacs::client::sync_result::started_at, store_last_result(), kcenon::pacs::client::sync_history::studies_checked, kcenon::pacs::client::sync_result::studies_checked, kcenon::pacs::client::sync_history::studies_synced, kcenon::pacs::client::sync_result::studies_synced, kcenon::pacs::client::sync_history::success, kcenon::pacs::client::sync_result::success, successful_syncs, total_studies_synced, total_syncs, and update_config_stats().

Referenced by kcenon::pacs::client::sync_manager::full_sync(), kcenon::pacs::client::sync_manager::incremental_sync(), scheduler_loop(), and kcenon::pacs::client::sync_manager::sync_now().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ query_remote_studies()

std::vector< std::string > kcenon::pacs::client::sync_manager::impl::query_remote_studies ( const sync_config & cfg,
std::chrono::system_clock::time_point since,
sync_result & result )
inline

Definition at line 469 of file sync_manager.cpp.

472 {
473
474 std::vector<std::string> study_uids;
475
476 // Get association for query
477 std::vector<std::string> sop_classes = {
479 };
480
481 auto assoc_result = node_manager->acquire_association(
482 cfg.source_node_id, sop_classes);
483 if (assoc_result.is_err()) {
484 result.errors.push_back("Failed to acquire association: " +
485 assoc_result.error().message);
486 return study_uids;
487 }
488
489 auto assoc = std::move(assoc_result.value());
490
491 // Build query
492 core::dicom_dataset query_keys;
493 query_keys.set_string(core::tags::query_retrieve_level, encoding::vr_type::CS, "STUDY");
494 query_keys.set_string(core::tags::study_instance_uid, encoding::vr_type::UI, "");
495
496 // Apply filters
497 if (!cfg.modalities.empty()) {
498 std::string modality_filter;
499 for (size_t i = 0; i < cfg.modalities.size(); ++i) {
500 if (i > 0) modality_filter += "\\";
501 modality_filter += cfg.modalities[i];
502 }
503 query_keys.set_string(core::tags::modality, encoding::vr_type::CS, modality_filter);
504 }
505
506 // Date range filter
507 if (since != std::chrono::system_clock::time_point{}) {
508 auto since_time_t = std::chrono::system_clock::to_time_t(since);
509 std::tm tm_buf;
510#ifdef _WIN32
511 localtime_s(&tm_buf, &since_time_t);
512#else
513 localtime_r(&since_time_t, &tm_buf);
514#endif
515 std::ostringstream date_oss;
516 date_oss << std::put_time(&tm_buf, "%Y%m%d") << "-";
517 query_keys.set_string(core::tags::study_date, encoding::vr_type::DA, date_oss.str());
518 }
519
520 // Execute query
521 services::query_scu_config query_config;
522 query_config.model = services::query_model::study_root;
523 query_config.level = services::query_level::study;
524
525 services::query_scu scu(query_config, logger);
526 auto query_result = scu.find(*assoc, query_keys);
527
528 // Release association
529 node_manager->release_association(cfg.source_node_id, std::move(assoc));
530
531 if (query_result.is_err()) {
532 result.errors.push_back("Query failed: " + query_result.error().message);
533 return study_uids;
534 }
535
536 const auto& qr = query_result.value();
537 for (const auto& match : qr.matches) {
538 auto uid = match.get_string(core::tags::study_instance_uid);
539 if (!uid.empty()) {
540 study_uids.push_back(uid);
541 }
542 }
543
544 return study_uids;
545 }
constexpr dicom_tag query_retrieve_level
Query/Retrieve Level.
constexpr dicom_tag modality
Modality.
constexpr dicom_tag study_instance_uid
Study Instance UID.
constexpr dicom_tag study_date
Study Date.
@ DA
Date (8 chars, format: YYYYMMDD)
@ UI
Unique Identifier (64 chars max)
@ CS
Code String (16 chars max, uppercase + digits + space + underscore)
constexpr std::string_view study_root_find_sop_class_uid
Study Root Query/Retrieve Information Model - FIND.
Definition query_scp.h:42
@ study
Study level - query study information.
@ study_root
Study Root Query/Retrieve Information Model.
std::string_view uid

References kcenon::pacs::encoding::CS, kcenon::pacs::encoding::DA, kcenon::pacs::client::sync_result::errors, kcenon::pacs::services::query_scu::find(), kcenon::pacs::services::query_scu_config::level, logger, kcenon::pacs::client::sync_config::modalities, kcenon::pacs::core::tags::modality, kcenon::pacs::services::query_scu_config::model, node_manager, kcenon::pacs::core::tags::query_retrieve_level, kcenon::pacs::core::dicom_dataset::set_string(), kcenon::pacs::client::sync_config::source_node_id, kcenon::pacs::services::study, kcenon::pacs::core::tags::study_date, kcenon::pacs::core::tags::study_instance_uid, kcenon::pacs::services::study_root, kcenon::pacs::services::study_root_find_sop_class_uid, kcenon::pacs::encoding::UI, and uid.

Referenced by kcenon::pacs::client::sync_manager::compare(), and perform_sync().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ resolve_conflict_internal()

void kcenon::pacs::client::sync_manager::impl::resolve_conflict_internal ( const sync_conflict & conflict,
conflict_resolution resolution,
sync_result & result )
inline

Definition at line 575 of file sync_manager.cpp.

577 {
578 switch (resolution) {
580 // Create retrieve job
581 if (job_mgr) {
582 auto job_id = job_mgr->create_retrieve_job(
583 conflict.config_id, conflict.study_uid, std::nullopt,
585 if (!job_id.empty()) {
586 result.studies_synced++;
587 }
588 }
589 break;
590
592 // Keep local, skip
593 result.studies_skipped++;
594 break;
595
597 // Compare timestamps and choose
598 if (conflict.remote_modified > conflict.local_modified) {
599 if (job_mgr) {
600 auto job_id = job_mgr->create_retrieve_job(
601 conflict.config_id, conflict.study_uid, std::nullopt,
603 if (!job_id.empty()) {
604 result.studies_synced++;
605 }
606 }
607 } else {
608 result.studies_skipped++;
609 }
610 break;
611 }
612
613 total_conflicts_resolved.fetch_add(1, std::memory_order_relaxed);
614 }
@ prefer_newer
Use the newer version based on timestamp.

References job_mgr, kcenon::pacs::client::normal, kcenon::pacs::client::prefer_local, kcenon::pacs::client::prefer_newer, kcenon::pacs::client::prefer_remote, kcenon::pacs::client::sync_result::studies_skipped, kcenon::pacs::client::sync_result::studies_synced, and total_conflicts_resolved.

Referenced by perform_sync(), and kcenon::pacs::client::sync_manager::resolve_conflict().

Here is the caller graph for this function:

◆ save_config()

void kcenon::pacs::client::sync_manager::impl::save_config ( const sync_config & cfg)
inline

Definition at line 137 of file sync_manager.cpp.

137 {
138 {
139 std::unique_lock lock(configs_mutex);
140 auto it = std::find_if(configs.begin(), configs.end(),
141 [&cfg](const sync_config& c) {
142 return c.config_id == cfg.config_id;
143 });
144 if (it != configs.end()) {
145 *it = cfg;
146 } else {
147 configs.push_back(cfg);
148 }
149 }
150
151 #ifdef PACS_WITH_DATABASE_SYSTEM
152 if (repositories.configs) {
153 [[maybe_unused]] auto result = repositories.configs->save(cfg);
154 } else if (compatibility_repo) {
155 [[maybe_unused]] auto result = compatibility_repo->save_config(cfg);
156 }
157 #else
158 if (compatibility_repo) {
159 [[maybe_unused]] auto result = compatibility_repo->save_config(cfg);
160 }
161 #endif
162 }

References compatibility_repo, configs, kcenon::pacs::client::sync_repositories::configs, configs_mutex, and repositories.

Referenced by kcenon::pacs::client::sync_manager::add_config(), and kcenon::pacs::client::sync_manager::update_config().

Here is the caller graph for this function:

◆ scheduler_loop()

void kcenon::pacs::client::sync_manager::impl::scheduler_loop ( )
inline

Definition at line 620 of file sync_manager.cpp.

620 {
621 while (scheduler_running.load()) {
622 std::unique_lock lock(scheduler_mutex);
623 scheduler_cv.wait_for(lock, std::chrono::minutes(1), [this] {
624 return !scheduler_running.load();
625 });
626
627 if (!scheduler_running.load()) {
628 break;
629 }
630
631 // Check each config for scheduled syncs
632 std::vector<sync_config> configs_copy;
633 {
634 std::shared_lock cfg_lock(configs_mutex);
635 configs_copy = configs;
636 }
637
638 for (const auto& cfg : configs_copy) {
639 if (!cfg.enabled || cfg.schedule_cron.empty()) {
640 continue;
641 }
642
643 // Simple cron check - in production would parse cron expression
644 // For now, check if it's time based on last sync
645 auto now = std::chrono::system_clock::now();
646 auto since_last = now - cfg.last_sync;
647
648 // Default to hourly if cron not parsed
649 if (since_last >= std::chrono::hours(1)) {
650 if (!is_sync_active(cfg.config_id)) {
651 if (logger) {
652 logger->info_fmt("Scheduler triggering sync for config '{}'",
653 cfg.config_id);
654 }
655
656 mark_sync_active(cfg.config_id);
657 auto result = perform_sync(cfg, false);
658 mark_sync_inactive(cfg.config_id);
659 notify_completion(cfg.config_id, result);
660 }
661 }
662 }
663 }
664 }
sync_result perform_sync(const sync_config &cfg, bool full_sync)
bool is_sync_active(std::string_view config_id) const
void mark_sync_inactive(std::string_view config_id)
void mark_sync_active(std::string_view config_id)
void notify_completion(const std::string &config_id, const sync_result &result)

References configs, configs_mutex, is_sync_active(), logger, mark_sync_active(), mark_sync_inactive(), notify_completion(), perform_sync(), scheduler_cv, scheduler_mutex, and scheduler_running.

Referenced by kcenon::pacs::client::sync_manager::start_scheduler().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ store_last_result()

void kcenon::pacs::client::sync_manager::impl::store_last_result ( const std::string & config_id,
const sync_result & result )
inline

Definition at line 226 of file sync_manager.cpp.

226 {
227 std::unique_lock lock(results_mutex);
228 last_results[config_id] = result;
229 }
std::unordered_map< std::string, sync_result > last_results

References last_results, and results_mutex.

Referenced by perform_sync().

Here is the caller graph for this function:

◆ update_config_stats()

void kcenon::pacs::client::sync_manager::impl::update_config_stats ( std::string_view config_id,
bool success,
size_t studies_synced )
inline

Definition at line 176 of file sync_manager.cpp.

177 {
178 std::unique_lock lock(configs_mutex);
179 auto it = std::find_if(configs.begin(), configs.end(),
180 [&config_id](const sync_config& c) {
181 return c.config_id == config_id;
182 });
183 if (it != configs.end()) {
184 it->total_syncs++;
185 it->studies_synced += studies_synced;
186 it->last_sync = std::chrono::system_clock::now();
187 if (success) {
188 it->last_successful_sync = std::chrono::system_clock::now();
189 }
190 }
191
192 #ifdef PACS_WITH_DATABASE_SYSTEM
193 if (repositories.configs) {
194 [[maybe_unused]] auto result =
195 repositories.configs->update_stats(config_id, success,
196 studies_synced);
197 } else if (compatibility_repo) {
198 [[maybe_unused]] auto result =
199 compatibility_repo->update_config_stats(config_id, success,
200 studies_synced);
201 }
202 #else
203 if (compatibility_repo) {
204 [[maybe_unused]] auto result =
205 compatibility_repo->update_config_stats(config_id, success,
206 studies_synced);
207 }
208 #endif
209 }

References compatibility_repo, configs, kcenon::pacs::client::sync_repositories::configs, configs_mutex, and repositories.

Referenced by perform_sync().

Here is the caller graph for this function:

Member Data Documentation

◆ active_mutex

std::mutex kcenon::pacs::client::sync_manager::impl::active_mutex
mutable

Definition at line 97 of file sync_manager.cpp.

Referenced by is_sync_active(), mark_sync_active(), and mark_sync_inactive().

◆ active_sync_config_ids

std::unordered_set<std::string> kcenon::pacs::client::sync_manager::impl::active_sync_config_ids

Definition at line 96 of file sync_manager.cpp.

Referenced by is_sync_active(), mark_sync_active(), and mark_sync_inactive().

◆ callbacks_mutex

◆ compatibility_repo

◆ completion_callback

sync_completion_callback kcenon::pacs::client::sync_manager::impl::completion_callback

◆ completion_promises

std::unordered_map<std::string, std::shared_ptr<std::promise<sync_result> > > kcenon::pacs::client::sync_manager::impl::completion_promises

◆ config

◆ configs

◆ configs_mutex

std::shared_mutex kcenon::pacs::client::sync_manager::impl::configs_mutex
mutable

◆ conflict_callback

sync_conflict_callback kcenon::pacs::client::sync_manager::impl::conflict_callback

◆ conflicts

◆ conflicts_mutex

◆ failed_syncs

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::failed_syncs {0}

Definition at line 127 of file sync_manager.cpp.

127{0};

Referenced by kcenon::pacs::client::sync_manager::get_statistics(), and perform_sync().

◆ job_mgr

std::shared_ptr<job_manager> kcenon::pacs::client::sync_manager::impl::job_mgr

◆ last_results

std::unordered_map<std::string, sync_result> kcenon::pacs::client::sync_manager::impl::last_results

◆ logger

◆ node_manager

std::shared_ptr<remote_node_manager> kcenon::pacs::client::sync_manager::impl::node_manager

◆ progress_callback

sync_progress_callback kcenon::pacs::client::sync_manager::impl::progress_callback

◆ promises_mutex

std::mutex kcenon::pacs::client::sync_manager::impl::promises_mutex
mutable

◆ query_scu

std::shared_ptr<services::query_scu> kcenon::pacs::client::sync_manager::impl::query_scu

◆ repositories

◆ results_mutex

std::shared_mutex kcenon::pacs::client::sync_manager::impl::results_mutex
mutable

◆ scheduler_cv

std::condition_variable kcenon::pacs::client::sync_manager::impl::scheduler_cv

◆ scheduler_mutex

std::mutex kcenon::pacs::client::sync_manager::impl::scheduler_mutex

Definition at line 111 of file sync_manager.cpp.

Referenced by scheduler_loop().

◆ scheduler_running

std::atomic<bool> kcenon::pacs::client::sync_manager::impl::scheduler_running {false}

◆ scheduler_thread

std::thread kcenon::pacs::client::sync_manager::impl::scheduler_thread

◆ successful_syncs

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::successful_syncs {0}

Definition at line 126 of file sync_manager.cpp.

126{0};

Referenced by kcenon::pacs::client::sync_manager::get_statistics(), and perform_sync().

◆ total_bytes_transferred

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::total_bytes_transferred {0}

Definition at line 129 of file sync_manager.cpp.

129{0};

Referenced by kcenon::pacs::client::sync_manager::get_statistics().

◆ total_conflicts_detected

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::total_conflicts_detected {0}

Definition at line 130 of file sync_manager.cpp.

130{0};

Referenced by add_conflict(), and kcenon::pacs::client::sync_manager::get_statistics().

◆ total_conflicts_resolved

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::total_conflicts_resolved {0}

◆ total_studies_synced

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::total_studies_synced {0}

Definition at line 128 of file sync_manager.cpp.

128{0};

Referenced by kcenon::pacs::client::sync_manager::get_statistics(), and perform_sync().

◆ total_syncs

std::atomic<size_t> kcenon::pacs::client::sync_manager::impl::total_syncs {0}

Definition at line 125 of file sync_manager.cpp.

125{0};

Referenced by kcenon::pacs::client::sync_manager::get_statistics(), and perform_sync().


The documentation for this struct was generated from the following file: