PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
sync_repository.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
15
16#include <chrono>
17#include <cstring>
18#include <sstream>
19
20#ifdef PACS_WITH_DATABASE_SYSTEM
21
22// =============================================================================
23// pacs_database_adapter Implementation
24// =============================================================================
25
26namespace kcenon::pacs::storage {
27
28namespace {
29
31[[nodiscard]] std::string to_timestamp_string(
32 std::chrono::system_clock::time_point tp) {
33 if (tp == std::chrono::system_clock::time_point{}) {
34 return "";
35 }
36 auto time = std::chrono::system_clock::to_time_t(tp);
37 std::tm tm{};
38#ifdef _WIN32
39 gmtime_s(&tm, &time);
40#else
41 gmtime_r(&time, &tm);
42#endif
43 char buf[32];
44 std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
45 return buf;
46}
47
49[[nodiscard]] std::chrono::system_clock::time_point from_timestamp_string(
50 const std::string& str) {
51 if (str.empty()) {
52 return {};
53 }
54 std::tm tm{};
55 if (std::sscanf(str.c_str(), "%d-%d-%d %d:%d:%d",
56 &tm.tm_year, &tm.tm_mon, &tm.tm_mday,
57 &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
58 return {};
59 }
60 tm.tm_year -= 1900;
61 tm.tm_mon -= 1;
62#ifdef _WIN32
63 auto time = _mkgmtime(&tm);
64#else
65 auto time = timegm(&tm);
66#endif
67 return std::chrono::system_clock::from_time_t(time);
68}
69
70} // namespace
71
72// =============================================================================
73// JSON Serialization
74// =============================================================================
75
76std::string sync_repository::serialize_vector(const std::vector<std::string>& vec) {
77 if (vec.empty()) return "[]";
78
79 std::ostringstream oss;
80 oss << "[";
81 for (size_t i = 0; i < vec.size(); ++i) {
82 if (i > 0) oss << ",";
83 oss << "\"";
84 for (char c : vec[i]) {
85 if (c == '"') oss << "\\\"";
86 else if (c == '\\') oss << "\\\\";
87 else oss << c;
88 }
89 oss << "\"";
90 }
91 oss << "]";
92 return oss.str();
93}
94
95std::vector<std::string> sync_repository::deserialize_vector(std::string_view json) {
96 std::vector<std::string> result;
97 if (json.empty() || json == "[]") return result;
98
99 size_t pos = 0;
100 while (pos < json.size()) {
101 auto start = json.find('"', pos);
102 if (start == std::string_view::npos) break;
103
104 size_t end = start + 1;
105 while (end < json.size()) {
106 if (json[end] == '\\' && end + 1 < json.size()) {
107 end += 2;
108 } else if (json[end] == '"') {
109 break;
110 } else {
111 ++end;
112 }
113 }
114
115 if (end < json.size()) {
116 std::string value{json.substr(start + 1, end - start - 1)};
117 std::string unescaped;
118 for (size_t i = 0; i < value.size(); ++i) {
119 if (value[i] == '\\' && i + 1 < value.size()) {
120 unescaped += value[++i];
121 } else {
122 unescaped += value[i];
123 }
124 }
125 result.push_back(std::move(unescaped));
126 }
127
128 pos = end + 1;
129 }
130
131 return result;
132}
133
134// =============================================================================
135// Construction / Destruction
136// =============================================================================
137
138sync_repository::sync_repository(std::shared_ptr<pacs_database_adapter> db)
139 : db_(std::move(db)) {}
140
141sync_repository::~sync_repository() = default;
142
143sync_repository::sync_repository(sync_repository&&) noexcept = default;
144auto sync_repository::operator=(sync_repository&&) noexcept -> sync_repository& = default;
145
146// =============================================================================
147// Timestamp Helpers
148// =============================================================================
149
150auto sync_repository::parse_timestamp(const std::string& str) const
151 -> std::chrono::system_clock::time_point {
152 return from_timestamp_string(str);
153}
154
155auto sync_repository::format_timestamp(
156 std::chrono::system_clock::time_point tp) const -> std::string {
157 return to_timestamp_string(tp);
158}
159
160// =============================================================================
161// Config Operations
162// =============================================================================
163
164VoidResult sync_repository::save_config(const client::sync_config& config) {
165 if (!db_ || !db_->is_connected()) {
166 return VoidResult(kcenon::common::error_info{
167 -1, "Database not connected", "sync_repository"});
168 }
169
170 auto builder = db_->open_session().create_query_builder();
171 std::ostringstream sql;
172 sql << R"(
173 INSERT INTO sync_configs (
174 config_id, source_node_id, name, enabled,
175 lookback_hours, modalities_json, patient_patterns_json,
176 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
177 schedule_cron, last_sync, last_successful_sync,
178 total_syncs, studies_synced
179 ) VALUES (
180 ')" << config.config_id << "', "
181 << "'" << config.source_node_id << "', "
182 << "'" << config.name << "', "
183 << (config.enabled ? 1 : 0) << ", "
184 << config.lookback.count() << ", "
185 << "'" << serialize_vector(config.modalities) << "', "
186 << "'" << serialize_vector(config.patient_id_patterns) << "', "
187 << "'" << to_string(config.direction) << "', "
188 << (config.delete_missing ? 1 : 0) << ", "
189 << (config.overwrite_existing ? 1 : 0) << ", "
190 << (config.sync_metadata_only ? 1 : 0) << ", "
191 << "'" << config.schedule_cron << "', "
192 << "'" << format_timestamp(config.last_sync) << "', "
193 << "'" << format_timestamp(config.last_successful_sync) << "', "
194 << config.total_syncs << ", "
195 << config.studies_synced << R"()
196 ON CONFLICT(config_id) DO UPDATE SET
197 source_node_id = excluded.source_node_id,
198 name = excluded.name,
199 enabled = excluded.enabled,
200 lookback_hours = excluded.lookback_hours,
201 modalities_json = excluded.modalities_json,
202 patient_patterns_json = excluded.patient_patterns_json,
203 sync_direction = excluded.sync_direction,
204 delete_missing = excluded.delete_missing,
205 overwrite_existing = excluded.overwrite_existing,
206 sync_metadata_only = excluded.sync_metadata_only,
207 schedule_cron = excluded.schedule_cron,
208 last_sync = excluded.last_sync,
209 last_successful_sync = excluded.last_successful_sync,
210 total_syncs = excluded.total_syncs,
211 studies_synced = excluded.studies_synced,
212 updated_at = datetime('now')
213 )";
214
215 auto result = db_->open_session().insert(sql.str());
216 if (result.is_err()) {
217 return VoidResult(result.error());
218 }
219
220 return kcenon::common::ok();
221}
222
223std::optional<client::sync_config> sync_repository::find_config(
224 std::string_view config_id) const {
225 if (!db_ || !db_->is_connected()) return std::nullopt;
226
227 std::ostringstream sql;
228 sql << R"(
229 SELECT pk, config_id, source_node_id, name, enabled,
230 lookback_hours, modalities_json, patient_patterns_json,
231 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
232 schedule_cron, last_sync, last_successful_sync,
233 total_syncs, studies_synced
234 FROM sync_configs WHERE config_id = ')" << config_id << "'";
235
236 auto result = db_->open_session().select(sql.str());
237 if (result.is_err() || result.value().empty()) {
238 return std::nullopt;
239 }
240
241 return map_row_to_config(result.value()[0]);
242}
243
244std::vector<client::sync_config> sync_repository::list_configs() const {
245 std::vector<client::sync_config> configs;
246 if (!db_ || !db_->is_connected()) return configs;
247
248 const char* sql = R"(
249 SELECT pk, config_id, source_node_id, name, enabled,
250 lookback_hours, modalities_json, patient_patterns_json,
251 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
252 schedule_cron, last_sync, last_successful_sync,
253 total_syncs, studies_synced
254 FROM sync_configs ORDER BY name
255 )";
256
257 auto result = db_->open_session().select(sql);
258 if (result.is_err()) return configs;
259
260 configs.reserve(result.value().size());
261 for (const auto& row : result.value()) {
262 configs.push_back(map_row_to_config(row));
263 }
264
265 return configs;
266}
267
268std::vector<client::sync_config> sync_repository::list_enabled_configs() const {
269 std::vector<client::sync_config> configs;
270 if (!db_ || !db_->is_connected()) return configs;
271
272 const char* sql = R"(
273 SELECT pk, config_id, source_node_id, name, enabled,
274 lookback_hours, modalities_json, patient_patterns_json,
275 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
276 schedule_cron, last_sync, last_successful_sync,
277 total_syncs, studies_synced
278 FROM sync_configs WHERE enabled = 1 ORDER BY name
279 )";
280
281 auto result = db_->open_session().select(sql);
282 if (result.is_err()) return configs;
283
284 configs.reserve(result.value().size());
285 for (const auto& row : result.value()) {
286 configs.push_back(map_row_to_config(row));
287 }
288
289 return configs;
290}
291
292VoidResult sync_repository::remove_config(std::string_view config_id) {
293 if (!db_ || !db_->is_connected()) {
294 return VoidResult(kcenon::common::error_info{
295 -1, "Database not connected", "sync_repository"});
296 }
297
298 std::ostringstream sql;
299 sql << "DELETE FROM sync_configs WHERE config_id = '" << config_id << "'";
300
301 auto result = db_->open_session().remove(sql.str());
302 if (result.is_err()) {
303 return VoidResult(result.error());
304 }
305
306 return kcenon::common::ok();
307}
308
309VoidResult sync_repository::update_config_stats(
310 std::string_view config_id,
311 bool success,
312 size_t studies_synced) {
313 if (!db_ || !db_->is_connected()) {
314 return VoidResult(kcenon::common::error_info{
315 -1, "Database not connected", "sync_repository"});
316 }
317
318 std::ostringstream sql;
319 if (success) {
320 sql << R"(
321 UPDATE sync_configs SET
322 total_syncs = total_syncs + 1,
323 studies_synced = studies_synced + )" << studies_synced << R"(,
324 last_sync = datetime('now'),
325 last_successful_sync = datetime('now'),
326 updated_at = datetime('now')
327 WHERE config_id = ')" << config_id << "'";
328 } else {
329 sql << R"(
330 UPDATE sync_configs SET
331 total_syncs = total_syncs + 1,
332 last_sync = datetime('now'),
333 updated_at = datetime('now')
334 WHERE config_id = ')" << config_id << "'";
335 }
336
337 auto result = db_->open_session().update(sql.str());
338 if (result.is_err()) {
339 return VoidResult(result.error());
340 }
341
342 return kcenon::common::ok();
343}
344
345// =============================================================================
346// Conflict Operations
347// =============================================================================
348
349VoidResult sync_repository::save_conflict(const client::sync_conflict& conflict) {
350 if (!db_ || !db_->is_connected()) {
351 return VoidResult(kcenon::common::error_info{
352 -1, "Database not connected", "sync_repository"});
353 }
354
355 std::ostringstream sql;
356 sql << R"(
357 INSERT INTO sync_conflicts (
358 config_id, study_uid, patient_id, conflict_type,
359 local_modified, remote_modified,
360 local_instance_count, remote_instance_count,
361 resolved, resolution, detected_at, resolved_at
362 ) VALUES (
363 ')" << conflict.config_id << "', "
364 << "'" << conflict.study_uid << "', "
365 << "'" << conflict.patient_id << "', "
366 << "'" << to_string(conflict.conflict_type) << "', "
367 << "'" << format_timestamp(conflict.local_modified) << "', "
368 << "'" << format_timestamp(conflict.remote_modified) << "', "
369 << conflict.local_instance_count << ", "
370 << conflict.remote_instance_count << ", "
371 << (conflict.resolved ? 1 : 0) << ", "
372 << "'" << (conflict.resolved ? to_string(conflict.resolution_used) : "") << "', "
373 << "'" << format_timestamp(conflict.detected_at) << "', ";
374
375 if (conflict.resolved_at.has_value()) {
376 sql << "'" << format_timestamp(conflict.resolved_at.value()) << "'";
377 } else {
378 sql << "NULL";
379 }
380
381 sql << R"()
382 ON CONFLICT(config_id, study_uid) DO UPDATE SET
383 patient_id = excluded.patient_id,
384 conflict_type = excluded.conflict_type,
385 local_modified = excluded.local_modified,
386 remote_modified = excluded.remote_modified,
387 local_instance_count = excluded.local_instance_count,
388 remote_instance_count = excluded.remote_instance_count,
389 resolved = excluded.resolved,
390 resolution = excluded.resolution,
391 detected_at = excluded.detected_at,
392 resolved_at = excluded.resolved_at
393 )";
394
395 auto result = db_->open_session().insert(sql.str());
396 if (result.is_err()) {
397 return VoidResult(result.error());
398 }
399
400 return kcenon::common::ok();
401}
402
403std::optional<client::sync_conflict> sync_repository::find_conflict(
404 std::string_view study_uid) const {
405 if (!db_ || !db_->is_connected()) return std::nullopt;
406
407 std::ostringstream sql;
408 sql << R"(
409 SELECT pk, config_id, study_uid, patient_id, conflict_type,
410 local_modified, remote_modified,
411 local_instance_count, remote_instance_count,
412 resolved, resolution, detected_at, resolved_at
413 FROM sync_conflicts WHERE study_uid = ')" << study_uid << "'";
414
415 auto result = db_->open_session().select(sql.str());
416 if (result.is_err() || result.value().empty()) {
417 return std::nullopt;
418 }
419
420 return map_row_to_conflict(result.value()[0]);
421}
422
423std::vector<client::sync_conflict> sync_repository::list_conflicts(
424 std::string_view config_id) const {
425 std::vector<client::sync_conflict> conflicts;
426 if (!db_ || !db_->is_connected()) return conflicts;
427
428 std::ostringstream sql;
429 sql << R"(
430 SELECT pk, config_id, study_uid, patient_id, conflict_type,
431 local_modified, remote_modified,
432 local_instance_count, remote_instance_count,
433 resolved, resolution, detected_at, resolved_at
434 FROM sync_conflicts WHERE config_id = ')" << config_id << R"(' ORDER BY detected_at DESC)";
435
436 auto result = db_->open_session().select(sql.str());
437 if (result.is_err()) return conflicts;
438
439 conflicts.reserve(result.value().size());
440 for (const auto& row : result.value()) {
441 conflicts.push_back(map_row_to_conflict(row));
442 }
443
444 return conflicts;
445}
446
447std::vector<client::sync_conflict> sync_repository::list_unresolved_conflicts() const {
448 std::vector<client::sync_conflict> conflicts;
449 if (!db_ || !db_->is_connected()) return conflicts;
450
451 const char* sql = R"(
452 SELECT pk, config_id, study_uid, patient_id, conflict_type,
453 local_modified, remote_modified,
454 local_instance_count, remote_instance_count,
455 resolved, resolution, detected_at, resolved_at
456 FROM sync_conflicts WHERE resolved = 0 ORDER BY detected_at DESC
457 )";
458
459 auto result = db_->open_session().select(sql);
460 if (result.is_err()) return conflicts;
461
462 conflicts.reserve(result.value().size());
463 for (const auto& row : result.value()) {
464 conflicts.push_back(map_row_to_conflict(row));
465 }
466
467 return conflicts;
468}
469
470VoidResult sync_repository::resolve_conflict(
471 std::string_view study_uid,
472 client::conflict_resolution resolution) {
473 if (!db_ || !db_->is_connected()) {
474 return VoidResult(kcenon::common::error_info{
475 -1, "Database not connected", "sync_repository"});
476 }
477
478 std::ostringstream sql;
479 sql << R"(
480 UPDATE sync_conflicts SET
481 resolved = 1,
482 resolution = ')" << to_string(resolution) << R"(',
483 resolved_at = datetime('now')
484 WHERE study_uid = ')" << study_uid << "' AND resolved = 0";
485
486 auto result = db_->open_session().update(sql.str());
487 if (result.is_err()) {
488 return VoidResult(result.error());
489 }
490
491 return kcenon::common::ok();
492}
493
494Result<size_t> sync_repository::cleanup_old_conflicts(std::chrono::hours max_age) {
495 if (!db_ || !db_->is_connected()) {
496 return kcenon::common::make_error<size_t>(-1,
497 "Database not connected", "sync_repository");
498 }
499
500 auto cutoff = std::chrono::system_clock::now() - max_age;
501 auto cutoff_str = format_timestamp(cutoff);
502
503 std::ostringstream sql;
504 sql << "DELETE FROM sync_conflicts WHERE resolved = 1 AND resolved_at < '"
505 << cutoff_str << "'";
506
507 auto result = db_->open_session().remove(sql.str());
508 if (result.is_err()) {
509 return Result<size_t>(result.error());
510 }
511
512 return static_cast<size_t>(result.value());
513}
514
515// =============================================================================
516// History Operations
517// =============================================================================
518
519VoidResult sync_repository::save_history(const client::sync_history& history) {
520 if (!db_ || !db_->is_connected()) {
521 return VoidResult(kcenon::common::error_info{
522 -1, "Database not connected", "sync_repository"});
523 }
524
525 std::ostringstream sql;
526 sql << R"(
527 INSERT INTO sync_history (
528 config_id, job_id, success,
529 studies_checked, studies_synced, conflicts_found,
530 errors_json, started_at, completed_at
531 ) VALUES (
532 ')" << history.config_id << "', "
533 << "'" << history.job_id << "', "
534 << (history.success ? 1 : 0) << ", "
535 << history.studies_checked << ", "
536 << history.studies_synced << ", "
537 << history.conflicts_found << ", "
538 << "'" << serialize_vector(history.errors) << "', "
539 << "'" << format_timestamp(history.started_at) << "', "
540 << "'" << format_timestamp(history.completed_at) << "')";
541
542 auto result = db_->open_session().insert(sql.str());
543 if (result.is_err()) {
544 return VoidResult(result.error());
545 }
546
547 return kcenon::common::ok();
548}
549
550std::vector<client::sync_history> sync_repository::list_history(
551 std::string_view config_id, size_t limit) const {
552 std::vector<client::sync_history> histories;
553 if (!db_ || !db_->is_connected()) return histories;
554
555 std::ostringstream sql;
556 sql << R"(
557 SELECT pk, config_id, job_id, success,
558 studies_checked, studies_synced, conflicts_found,
559 errors_json, started_at, completed_at
560 FROM sync_history WHERE config_id = ')" << config_id
561 << "' ORDER BY started_at DESC LIMIT " << limit;
562
563 auto result = db_->open_session().select(sql.str());
564 if (result.is_err()) return histories;
565
566 histories.reserve(result.value().size());
567 for (const auto& row : result.value()) {
568 histories.push_back(map_row_to_history(row));
569 }
570
571 return histories;
572}
573
574std::optional<client::sync_history> sync_repository::get_last_history(
575 std::string_view config_id) const {
576 if (!db_ || !db_->is_connected()) return std::nullopt;
577
578 std::ostringstream sql;
579 sql << R"(
580 SELECT pk, config_id, job_id, success,
581 studies_checked, studies_synced, conflicts_found,
582 errors_json, started_at, completed_at
583 FROM sync_history WHERE config_id = ')" << config_id
584 << "' ORDER BY started_at DESC LIMIT 1";
585
586 auto result = db_->open_session().select(sql.str());
587 if (result.is_err() || result.value().empty()) {
588 return std::nullopt;
589 }
590
591 return map_row_to_history(result.value()[0]);
592}
593
594Result<size_t> sync_repository::cleanup_old_history(std::chrono::hours max_age) {
595 if (!db_ || !db_->is_connected()) {
596 return kcenon::common::make_error<size_t>(-1,
597 "Database not connected", "sync_repository");
598 }
599
600 auto cutoff = std::chrono::system_clock::now() - max_age;
601 auto cutoff_str = format_timestamp(cutoff);
602
603 std::ostringstream sql;
604 sql << "DELETE FROM sync_history WHERE completed_at < '" << cutoff_str << "'";
605
606 auto result = db_->open_session().remove(sql.str());
607 if (result.is_err()) {
608 return Result<size_t>(result.error());
609 }
610
611 return static_cast<size_t>(result.value());
612}
613
614// =============================================================================
615// Statistics
616// =============================================================================
617
618size_t sync_repository::count_configs() const {
619 if (!db_ || !db_->is_connected()) return 0;
620
621 auto result = db_->open_session().select("SELECT COUNT(*) as count FROM sync_configs");
622 if (result.is_err() || result.value().empty()) return 0;
623
624 return std::stoull(result.value()[0].at("count"));
625}
626
627size_t sync_repository::count_unresolved_conflicts() const {
628 if (!db_ || !db_->is_connected()) return 0;
629
630 auto result = db_->open_session().select(
631 "SELECT COUNT(*) as count FROM sync_conflicts WHERE resolved = 0");
632 if (result.is_err() || result.value().empty()) return 0;
633
634 return std::stoull(result.value()[0].at("count"));
635}
636
637size_t sync_repository::count_syncs_today() const {
638 if (!db_ || !db_->is_connected()) return 0;
639
640 auto result = db_->open_session().select(R"(
641 SELECT COUNT(*) as count FROM sync_history
642 WHERE date(completed_at) = date('now')
643 )");
644 if (result.is_err() || result.value().empty()) return 0;
645
646 return std::stoull(result.value()[0].at("count"));
647}
648
649// =============================================================================
650// Database Information
651// =============================================================================
652
653bool sync_repository::is_valid() const noexcept {
654 return db_ && db_->is_connected();
655}
656
657// =============================================================================
658// Row Mapping
659// =============================================================================
660
661client::sync_config sync_repository::map_row_to_config(
662 const database_row& row) const {
663 client::sync_config config;
664
665 config.pk = std::stoll(row.at("pk"));
666 config.config_id = row.at("config_id");
667 config.source_node_id = row.at("source_node_id");
668 config.name = row.at("name");
669 config.enabled = (row.at("enabled") == "1");
670 config.lookback = std::chrono::hours(std::stoi(row.at("lookback_hours")));
671 config.modalities = deserialize_vector(row.at("modalities_json"));
672 config.patient_id_patterns = deserialize_vector(row.at("patient_patterns_json"));
673 config.direction = client::sync_direction_from_string(row.at("sync_direction"));
674 config.delete_missing = (row.at("delete_missing") == "1");
675 config.overwrite_existing = (row.at("overwrite_existing") == "1");
676 config.sync_metadata_only = (row.at("sync_metadata_only") == "1");
677 config.schedule_cron = row.at("schedule_cron");
678 config.last_sync = parse_timestamp(row.at("last_sync"));
679 config.last_successful_sync = parse_timestamp(row.at("last_successful_sync"));
680 config.total_syncs = std::stoull(row.at("total_syncs"));
681 config.studies_synced = std::stoull(row.at("studies_synced"));
682
683 return config;
684}
685
686client::sync_conflict sync_repository::map_row_to_conflict(
687 const database_row& row) const {
688 client::sync_conflict conflict;
689
690 conflict.pk = std::stoll(row.at("pk"));
691 conflict.config_id = row.at("config_id");
692 conflict.study_uid = row.at("study_uid");
693 conflict.patient_id = row.at("patient_id");
694 conflict.conflict_type = client::sync_conflict_type_from_string(
695 row.at("conflict_type"));
696 conflict.local_modified = parse_timestamp(row.at("local_modified"));
697 conflict.remote_modified = parse_timestamp(row.at("remote_modified"));
698 conflict.local_instance_count = std::stoull(row.at("local_instance_count"));
699 conflict.remote_instance_count = std::stoull(row.at("remote_instance_count"));
700 conflict.resolved = (row.at("resolved") == "1");
701 conflict.resolution_used = client::conflict_resolution_from_string(
702 row.at("resolution"));
703 conflict.detected_at = parse_timestamp(row.at("detected_at"));
704
705 auto resolved_at_it = row.find("resolved_at");
706 if (resolved_at_it != row.end() && !resolved_at_it->second.empty()) {
707 conflict.resolved_at = parse_timestamp(resolved_at_it->second);
708 }
709
710 return conflict;
711}
712
713client::sync_history sync_repository::map_row_to_history(
714 const database_row& row) const {
715 client::sync_history history;
716
717 history.pk = std::stoll(row.at("pk"));
718 history.config_id = row.at("config_id");
719 history.job_id = row.at("job_id");
720 history.success = (row.at("success") == "1");
721 history.studies_checked = std::stoull(row.at("studies_checked"));
722 history.studies_synced = std::stoull(row.at("studies_synced"));
723 history.conflicts_found = std::stoull(row.at("conflicts_found"));
724 history.errors = deserialize_vector(row.at("errors_json"));
725 history.started_at = parse_timestamp(row.at("started_at"));
726 history.completed_at = parse_timestamp(row.at("completed_at"));
727
728 return history;
729}
730
731} // namespace kcenon::pacs::storage
732
733#else // !PACS_WITH_DATABASE_SYSTEM
734
735// =============================================================================
736// Legacy SQLite Implementation
737// =============================================================================
738
739#include <sqlite3.h>
740
741namespace kcenon::pacs::storage {
742
743// =============================================================================
744// Helper Functions
745// =============================================================================
746
747namespace {
748
750[[nodiscard]] std::string to_timestamp_string(
751 std::chrono::system_clock::time_point tp) {
752 if (tp == std::chrono::system_clock::time_point{}) {
753 return "";
754 }
755 auto time = std::chrono::system_clock::to_time_t(tp);
756 std::tm tm{};
757#ifdef _WIN32
758 gmtime_s(&tm, &time);
759#else
760 gmtime_r(&time, &tm);
761#endif
762 char buf[32];
763 std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
764 return buf;
765}
766
768[[nodiscard]] std::chrono::system_clock::time_point from_timestamp_string(
769 const char* str) {
770 if (!str || str[0] == '\0') {
771 return {};
772 }
773 std::tm tm{};
774 if (std::sscanf(str, "%d-%d-%d %d:%d:%d",
775 &tm.tm_year, &tm.tm_mon, &tm.tm_mday,
776 &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
777 return {};
778 }
779 tm.tm_year -= 1900;
780 tm.tm_mon -= 1;
781#ifdef _WIN32
782 auto time = _mkgmtime(&tm);
783#else
784 auto time = timegm(&tm);
785#endif
786 return std::chrono::system_clock::from_time_t(time);
787}
788
790[[nodiscard]] std::string get_text_column(sqlite3_stmt* stmt, int col) {
791 auto text = reinterpret_cast<const char*>(sqlite3_column_text(stmt, col));
792 return text ? text : "";
793}
794
796[[nodiscard]] int get_int_column(sqlite3_stmt* stmt, int col, int default_val = 0) {
797 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
798 return default_val;
799 }
800 return sqlite3_column_int(stmt, col);
801}
802
804[[nodiscard]] int64_t get_int64_column(sqlite3_stmt* stmt, int col, int64_t default_val = 0) {
805 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
806 return default_val;
807 }
808 return sqlite3_column_int64(stmt, col);
809}
810
811} // namespace
812
813// =============================================================================
814// JSON Serialization
815// =============================================================================
816
817std::string sync_repository::serialize_vector(const std::vector<std::string>& vec) {
818 if (vec.empty()) return "[]";
819
820 std::ostringstream oss;
821 oss << "[";
822 for (size_t i = 0; i < vec.size(); ++i) {
823 if (i > 0) oss << ",";
824 oss << "\"";
825 for (char c : vec[i]) {
826 if (c == '"') oss << "\\\"";
827 else if (c == '\\') oss << "\\\\";
828 else oss << c;
829 }
830 oss << "\"";
831 }
832 oss << "]";
833 return oss.str();
834}
835
836std::vector<std::string> sync_repository::deserialize_vector(std::string_view json) {
837 std::vector<std::string> result;
838 if (json.empty() || json == "[]") return result;
839
840 size_t pos = 0;
841 while (pos < json.size()) {
842 auto start = json.find('"', pos);
843 if (start == std::string_view::npos) break;
844
845 size_t end = start + 1;
846 while (end < json.size()) {
847 if (json[end] == '\\' && end + 1 < json.size()) {
848 end += 2;
849 } else if (json[end] == '"') {
850 break;
851 } else {
852 ++end;
853 }
854 }
855
856 if (end < json.size()) {
857 std::string value{json.substr(start + 1, end - start - 1)};
858 std::string unescaped;
859 for (size_t i = 0; i < value.size(); ++i) {
860 if (value[i] == '\\' && i + 1 < value.size()) {
861 unescaped += value[++i];
862 } else {
863 unescaped += value[i];
864 }
865 }
866 result.push_back(std::move(unescaped));
867 }
868
869 pos = end + 1;
870 }
871
872 return result;
873}
874
875// =============================================================================
876// Construction / Destruction
877// =============================================================================
878
879sync_repository::sync_repository(sqlite3* db) : db_(db) {}
880
882
884auto sync_repository::operator=(sync_repository&&) noexcept -> sync_repository& = default;
885
886// =============================================================================
887// Config Operations
888// =============================================================================
889
890VoidResult sync_repository::save_config(const client::sync_config& config) {
891 if (!db_) {
892 return VoidResult(kcenon::common::error_info{
893 -1, "Database not initialized", "sync_repository"});
894 }
895
896 const char* sql = R"(
897 INSERT INTO sync_configs (
898 config_id, source_node_id, name, enabled,
899 lookback_hours, modalities_json, patient_patterns_json,
900 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
901 schedule_cron, last_sync, last_successful_sync,
902 total_syncs, studies_synced
903 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
904 ON CONFLICT(config_id) DO UPDATE SET
905 source_node_id = excluded.source_node_id,
906 name = excluded.name,
907 enabled = excluded.enabled,
908 lookback_hours = excluded.lookback_hours,
909 modalities_json = excluded.modalities_json,
910 patient_patterns_json = excluded.patient_patterns_json,
911 sync_direction = excluded.sync_direction,
912 delete_missing = excluded.delete_missing,
913 overwrite_existing = excluded.overwrite_existing,
914 sync_metadata_only = excluded.sync_metadata_only,
915 schedule_cron = excluded.schedule_cron,
916 last_sync = excluded.last_sync,
917 last_successful_sync = excluded.last_successful_sync,
918 total_syncs = excluded.total_syncs,
919 studies_synced = excluded.studies_synced,
920 updated_at = datetime('now')
921 )";
922
923 sqlite3_stmt* stmt = nullptr;
924 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
925 return VoidResult(kcenon::common::error_info{
926 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
927 "sync_repository"});
928 }
929
930 sqlite3_bind_text(stmt, 1, config.config_id.c_str(), -1, SQLITE_TRANSIENT);
931 sqlite3_bind_text(stmt, 2, config.source_node_id.c_str(), -1, SQLITE_TRANSIENT);
932 sqlite3_bind_text(stmt, 3, config.name.c_str(), -1, SQLITE_TRANSIENT);
933 sqlite3_bind_int(stmt, 4, config.enabled ? 1 : 0);
934 sqlite3_bind_int(stmt, 5, static_cast<int>(config.lookback.count()));
935 sqlite3_bind_text(stmt, 6, serialize_vector(config.modalities).c_str(), -1, SQLITE_TRANSIENT);
936 sqlite3_bind_text(stmt, 7, serialize_vector(config.patient_id_patterns).c_str(), -1, SQLITE_TRANSIENT);
937 sqlite3_bind_text(stmt, 8, to_string(config.direction), -1, SQLITE_TRANSIENT);
938 sqlite3_bind_int(stmt, 9, config.delete_missing ? 1 : 0);
939 sqlite3_bind_int(stmt, 10, config.overwrite_existing ? 1 : 0);
940 sqlite3_bind_int(stmt, 11, config.sync_metadata_only ? 1 : 0);
941 sqlite3_bind_text(stmt, 12, config.schedule_cron.c_str(), -1, SQLITE_TRANSIENT);
942 sqlite3_bind_text(stmt, 13, to_timestamp_string(config.last_sync).c_str(), -1, SQLITE_TRANSIENT);
943 sqlite3_bind_text(stmt, 14, to_timestamp_string(config.last_successful_sync).c_str(), -1, SQLITE_TRANSIENT);
944 sqlite3_bind_int64(stmt, 15, static_cast<int64_t>(config.total_syncs));
945 sqlite3_bind_int64(stmt, 16, static_cast<int64_t>(config.studies_synced));
946
947 int rc = sqlite3_step(stmt);
948 sqlite3_finalize(stmt);
949
950 if (rc != SQLITE_DONE) {
951 return VoidResult(kcenon::common::error_info{
952 -1, "Failed to save config: " + std::string(sqlite3_errmsg(db_)),
953 "sync_repository"});
954 }
955
956 return kcenon::common::ok();
957}
958
959std::optional<client::sync_config> sync_repository::find_config(
960 std::string_view config_id) const {
961 if (!db_) return std::nullopt;
962
963 const char* sql = R"(
964 SELECT pk, config_id, source_node_id, name, enabled,
965 lookback_hours, modalities_json, patient_patterns_json,
966 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
967 schedule_cron, last_sync, last_successful_sync,
968 total_syncs, studies_synced
969 FROM sync_configs WHERE config_id = ?
970 )";
971
972 sqlite3_stmt* stmt = nullptr;
973 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
974 return std::nullopt;
975 }
976
977 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
978
979 std::optional<client::sync_config> result;
980 if (sqlite3_step(stmt) == SQLITE_ROW) {
981 result = parse_config_row(stmt);
982 }
983
984 sqlite3_finalize(stmt);
985 return result;
986}
987
988std::vector<client::sync_config> sync_repository::list_configs() const {
989 std::vector<client::sync_config> result;
990 if (!db_) return result;
991
992 const char* sql = R"(
993 SELECT pk, config_id, source_node_id, name, enabled,
994 lookback_hours, modalities_json, patient_patterns_json,
995 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
996 schedule_cron, last_sync, last_successful_sync,
997 total_syncs, studies_synced
998 FROM sync_configs ORDER BY name
999 )";
1000
1001 sqlite3_stmt* stmt = nullptr;
1002 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1003 return result;
1004 }
1005
1006 while (sqlite3_step(stmt) == SQLITE_ROW) {
1007 result.push_back(parse_config_row(stmt));
1008 }
1009
1010 sqlite3_finalize(stmt);
1011 return result;
1012}
1013
1014std::vector<client::sync_config> sync_repository::list_enabled_configs() const {
1015 std::vector<client::sync_config> result;
1016 if (!db_) return result;
1017
1018 const char* sql = R"(
1019 SELECT pk, config_id, source_node_id, name, enabled,
1020 lookback_hours, modalities_json, patient_patterns_json,
1021 sync_direction, delete_missing, overwrite_existing, sync_metadata_only,
1022 schedule_cron, last_sync, last_successful_sync,
1023 total_syncs, studies_synced
1024 FROM sync_configs WHERE enabled = 1 ORDER BY name
1025 )";
1026
1027 sqlite3_stmt* stmt = nullptr;
1028 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1029 return result;
1030 }
1031
1032 while (sqlite3_step(stmt) == SQLITE_ROW) {
1033 result.push_back(parse_config_row(stmt));
1034 }
1035
1036 sqlite3_finalize(stmt);
1037 return result;
1038}
1039
1040VoidResult sync_repository::remove_config(std::string_view config_id) {
1041 if (!db_) {
1042 return VoidResult(kcenon::common::error_info{
1043 -1, "Database not initialized", "sync_repository"});
1044 }
1045
1046 const char* sql = "DELETE FROM sync_configs WHERE config_id = ?";
1047
1048 sqlite3_stmt* stmt = nullptr;
1049 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1050 return VoidResult(kcenon::common::error_info{
1051 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1052 "sync_repository"});
1053 }
1054
1055 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1056
1057 int rc = sqlite3_step(stmt);
1058 sqlite3_finalize(stmt);
1059
1060 if (rc != SQLITE_DONE) {
1061 return VoidResult(kcenon::common::error_info{
1062 -1, "Failed to delete config: " + std::string(sqlite3_errmsg(db_)),
1063 "sync_repository"});
1064 }
1065
1066 return kcenon::common::ok();
1067}
1068
1070 std::string_view config_id,
1071 bool success,
1072 size_t studies_synced) {
1073 if (!db_) {
1074 return VoidResult(kcenon::common::error_info{
1075 -1, "Database not initialized", "sync_repository"});
1076 }
1077
1078 std::string sql;
1079 if (success) {
1080 sql = R"(
1081 UPDATE sync_configs SET
1082 total_syncs = total_syncs + 1,
1083 studies_synced = studies_synced + ?,
1084 last_sync = datetime('now'),
1085 last_successful_sync = datetime('now'),
1086 updated_at = datetime('now')
1087 WHERE config_id = ?
1088 )";
1089 } else {
1090 sql = R"(
1091 UPDATE sync_configs SET
1092 total_syncs = total_syncs + 1,
1093 last_sync = datetime('now'),
1094 updated_at = datetime('now')
1095 WHERE config_id = ?
1096 )";
1097 }
1098
1099 sqlite3_stmt* stmt = nullptr;
1100 if (sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt, nullptr) != SQLITE_OK) {
1101 return VoidResult(kcenon::common::error_info{
1102 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1103 "sync_repository"});
1104 }
1105
1106 if (success) {
1107 sqlite3_bind_int64(stmt, 1, static_cast<int64_t>(studies_synced));
1108 sqlite3_bind_text(stmt, 2, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1109 } else {
1110 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1111 }
1112
1113 int rc = sqlite3_step(stmt);
1114 sqlite3_finalize(stmt);
1115
1116 if (rc != SQLITE_DONE) {
1117 return VoidResult(kcenon::common::error_info{
1118 -1, "Failed to update config stats: " + std::string(sqlite3_errmsg(db_)),
1119 "sync_repository"});
1120 }
1121
1122 return kcenon::common::ok();
1123}
1124
1125// =============================================================================
1126// Conflict Operations
1127// =============================================================================
1128
1130 if (!db_) {
1131 return VoidResult(kcenon::common::error_info{
1132 -1, "Database not initialized", "sync_repository"});
1133 }
1134
1135 const char* sql = R"(
1136 INSERT INTO sync_conflicts (
1137 config_id, study_uid, patient_id, conflict_type,
1138 local_modified, remote_modified,
1139 local_instance_count, remote_instance_count,
1140 resolved, resolution, detected_at, resolved_at
1141 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1142 ON CONFLICT(config_id, study_uid) DO UPDATE SET
1143 patient_id = excluded.patient_id,
1144 conflict_type = excluded.conflict_type,
1145 local_modified = excluded.local_modified,
1146 remote_modified = excluded.remote_modified,
1147 local_instance_count = excluded.local_instance_count,
1148 remote_instance_count = excluded.remote_instance_count,
1149 resolved = excluded.resolved,
1150 resolution = excluded.resolution,
1151 detected_at = excluded.detected_at,
1152 resolved_at = excluded.resolved_at
1153 )";
1154
1155 sqlite3_stmt* stmt = nullptr;
1156 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1157 return VoidResult(kcenon::common::error_info{
1158 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1159 "sync_repository"});
1160 }
1161
1162 sqlite3_bind_text(stmt, 1, conflict.config_id.c_str(), -1, SQLITE_TRANSIENT);
1163 sqlite3_bind_text(stmt, 2, conflict.study_uid.c_str(), -1, SQLITE_TRANSIENT);
1164 sqlite3_bind_text(stmt, 3, conflict.patient_id.c_str(), -1, SQLITE_TRANSIENT);
1165 sqlite3_bind_text(stmt, 4, to_string(conflict.conflict_type), -1, SQLITE_TRANSIENT);
1166 sqlite3_bind_text(stmt, 5, to_timestamp_string(conflict.local_modified).c_str(), -1, SQLITE_TRANSIENT);
1167 sqlite3_bind_text(stmt, 6, to_timestamp_string(conflict.remote_modified).c_str(), -1, SQLITE_TRANSIENT);
1168 sqlite3_bind_int64(stmt, 7, static_cast<int64_t>(conflict.local_instance_count));
1169 sqlite3_bind_int64(stmt, 8, static_cast<int64_t>(conflict.remote_instance_count));
1170 sqlite3_bind_int(stmt, 9, conflict.resolved ? 1 : 0);
1171 sqlite3_bind_text(stmt, 10, conflict.resolved ? to_string(conflict.resolution_used) : "", -1, SQLITE_TRANSIENT);
1172 sqlite3_bind_text(stmt, 11, to_timestamp_string(conflict.detected_at).c_str(), -1, SQLITE_TRANSIENT);
1173 if (conflict.resolved_at.has_value()) {
1174 sqlite3_bind_text(stmt, 12, to_timestamp_string(conflict.resolved_at.value()).c_str(), -1, SQLITE_TRANSIENT);
1175 } else {
1176 sqlite3_bind_null(stmt, 12);
1177 }
1178
1179 int rc = sqlite3_step(stmt);
1180 sqlite3_finalize(stmt);
1181
1182 if (rc != SQLITE_DONE) {
1183 return VoidResult(kcenon::common::error_info{
1184 -1, "Failed to save conflict: " + std::string(sqlite3_errmsg(db_)),
1185 "sync_repository"});
1186 }
1187
1188 return kcenon::common::ok();
1189}
1190
1191std::optional<client::sync_conflict> sync_repository::find_conflict(
1192 std::string_view study_uid) const {
1193 if (!db_) return std::nullopt;
1194
1195 const char* sql = R"(
1196 SELECT pk, config_id, study_uid, patient_id, conflict_type,
1197 local_modified, remote_modified,
1198 local_instance_count, remote_instance_count,
1199 resolved, resolution, detected_at, resolved_at
1200 FROM sync_conflicts WHERE study_uid = ?
1201 )";
1202
1203 sqlite3_stmt* stmt = nullptr;
1204 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1205 return std::nullopt;
1206 }
1207
1208 sqlite3_bind_text(stmt, 1, study_uid.data(), static_cast<int>(study_uid.size()), SQLITE_TRANSIENT);
1209
1210 std::optional<client::sync_conflict> result;
1211 if (sqlite3_step(stmt) == SQLITE_ROW) {
1212 result = parse_conflict_row(stmt);
1213 }
1214
1215 sqlite3_finalize(stmt);
1216 return result;
1217}
1218
1219std::vector<client::sync_conflict> sync_repository::list_conflicts(
1220 std::string_view config_id) const {
1221 std::vector<client::sync_conflict> result;
1222 if (!db_) return result;
1223
1224 const char* sql = R"(
1225 SELECT pk, config_id, study_uid, patient_id, conflict_type,
1226 local_modified, remote_modified,
1227 local_instance_count, remote_instance_count,
1228 resolved, resolution, detected_at, resolved_at
1229 FROM sync_conflicts WHERE config_id = ? ORDER BY detected_at DESC
1230 )";
1231
1232 sqlite3_stmt* stmt = nullptr;
1233 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1234 return result;
1235 }
1236
1237 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1238
1239 while (sqlite3_step(stmt) == SQLITE_ROW) {
1240 result.push_back(parse_conflict_row(stmt));
1241 }
1242
1243 sqlite3_finalize(stmt);
1244 return result;
1245}
1246
1247std::vector<client::sync_conflict> sync_repository::list_unresolved_conflicts() const {
1248 std::vector<client::sync_conflict> result;
1249 if (!db_) return result;
1250
1251 const char* sql = R"(
1252 SELECT pk, config_id, study_uid, patient_id, conflict_type,
1253 local_modified, remote_modified,
1254 local_instance_count, remote_instance_count,
1255 resolved, resolution, detected_at, resolved_at
1256 FROM sync_conflicts WHERE resolved = 0 ORDER BY detected_at DESC
1257 )";
1258
1259 sqlite3_stmt* stmt = nullptr;
1260 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1261 return result;
1262 }
1263
1264 while (sqlite3_step(stmt) == SQLITE_ROW) {
1265 result.push_back(parse_conflict_row(stmt));
1266 }
1267
1268 sqlite3_finalize(stmt);
1269 return result;
1270}
1271
1273 std::string_view study_uid,
1274 client::conflict_resolution resolution) {
1275 if (!db_) {
1276 return VoidResult(kcenon::common::error_info{
1277 -1, "Database not initialized", "sync_repository"});
1278 }
1279
1280 const char* sql = R"(
1281 UPDATE sync_conflicts SET
1282 resolved = 1,
1283 resolution = ?,
1284 resolved_at = datetime('now')
1285 WHERE study_uid = ? AND resolved = 0
1286 )";
1287
1288 sqlite3_stmt* stmt = nullptr;
1289 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1290 return VoidResult(kcenon::common::error_info{
1291 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1292 "sync_repository"});
1293 }
1294
1295 sqlite3_bind_text(stmt, 1, to_string(resolution), -1, SQLITE_TRANSIENT);
1296 sqlite3_bind_text(stmt, 2, study_uid.data(), static_cast<int>(study_uid.size()), SQLITE_TRANSIENT);
1297
1298 int rc = sqlite3_step(stmt);
1299 sqlite3_finalize(stmt);
1300
1301 if (rc != SQLITE_DONE) {
1302 return VoidResult(kcenon::common::error_info{
1303 -1, "Failed to resolve conflict: " + std::string(sqlite3_errmsg(db_)),
1304 "sync_repository"});
1305 }
1306
1307 return kcenon::common::ok();
1308}
1309
1311 if (!db_) {
1312 return kcenon::common::make_error<size_t>(-1,
1313 "Database not initialized", "sync_repository");
1314 }
1315
1316 auto cutoff = std::chrono::system_clock::now() - max_age;
1317 auto cutoff_str = to_timestamp_string(cutoff);
1318
1319 const char* sql = R"(
1320 DELETE FROM sync_conflicts WHERE resolved = 1 AND resolved_at < ?
1321 )";
1322
1323 sqlite3_stmt* stmt = nullptr;
1324 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1325 return kcenon::common::make_error<size_t>(-1,
1326 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1327 "sync_repository");
1328 }
1329
1330 sqlite3_bind_text(stmt, 1, cutoff_str.c_str(), -1, SQLITE_TRANSIENT);
1331
1332 int rc = sqlite3_step(stmt);
1333 sqlite3_finalize(stmt);
1334
1335 if (rc != SQLITE_DONE) {
1336 return kcenon::common::make_error<size_t>(-1,
1337 "Failed to cleanup conflicts: " + std::string(sqlite3_errmsg(db_)),
1338 "sync_repository");
1339 }
1340
1341 return static_cast<size_t>(sqlite3_changes(db_));
1342}
1343
1344// =============================================================================
1345// History Operations
1346// =============================================================================
1347
1349 if (!db_) {
1350 return VoidResult(kcenon::common::error_info{
1351 -1, "Database not initialized", "sync_repository"});
1352 }
1353
1354 const char* sql = R"(
1355 INSERT INTO sync_history (
1356 config_id, job_id, success,
1357 studies_checked, studies_synced, conflicts_found,
1358 errors_json, started_at, completed_at
1359 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
1360 )";
1361
1362 sqlite3_stmt* stmt = nullptr;
1363 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1364 return VoidResult(kcenon::common::error_info{
1365 -1, "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1366 "sync_repository"});
1367 }
1368
1369 sqlite3_bind_text(stmt, 1, history.config_id.c_str(), -1, SQLITE_TRANSIENT);
1370 sqlite3_bind_text(stmt, 2, history.job_id.c_str(), -1, SQLITE_TRANSIENT);
1371 sqlite3_bind_int(stmt, 3, history.success ? 1 : 0);
1372 sqlite3_bind_int64(stmt, 4, static_cast<int64_t>(history.studies_checked));
1373 sqlite3_bind_int64(stmt, 5, static_cast<int64_t>(history.studies_synced));
1374 sqlite3_bind_int64(stmt, 6, static_cast<int64_t>(history.conflicts_found));
1375 sqlite3_bind_text(stmt, 7, serialize_vector(history.errors).c_str(), -1, SQLITE_TRANSIENT);
1376 sqlite3_bind_text(stmt, 8, to_timestamp_string(history.started_at).c_str(), -1, SQLITE_TRANSIENT);
1377 sqlite3_bind_text(stmt, 9, to_timestamp_string(history.completed_at).c_str(), -1, SQLITE_TRANSIENT);
1378
1379 int rc = sqlite3_step(stmt);
1380 sqlite3_finalize(stmt);
1381
1382 if (rc != SQLITE_DONE) {
1383 return VoidResult(kcenon::common::error_info{
1384 -1, "Failed to save history: " + std::string(sqlite3_errmsg(db_)),
1385 "sync_repository"});
1386 }
1387
1388 return kcenon::common::ok();
1389}
1390
1391std::vector<client::sync_history> sync_repository::list_history(
1392 std::string_view config_id, size_t limit) const {
1393 std::vector<client::sync_history> result;
1394 if (!db_) return result;
1395
1396 const char* sql = R"(
1397 SELECT pk, config_id, job_id, success,
1398 studies_checked, studies_synced, conflicts_found,
1399 errors_json, started_at, completed_at
1400 FROM sync_history WHERE config_id = ? ORDER BY started_at DESC LIMIT ?
1401 )";
1402
1403 sqlite3_stmt* stmt = nullptr;
1404 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1405 return result;
1406 }
1407
1408 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1409 sqlite3_bind_int64(stmt, 2, static_cast<int64_t>(limit));
1410
1411 while (sqlite3_step(stmt) == SQLITE_ROW) {
1412 result.push_back(parse_history_row(stmt));
1413 }
1414
1415 sqlite3_finalize(stmt);
1416 return result;
1417}
1418
1419std::optional<client::sync_history> sync_repository::get_last_history(
1420 std::string_view config_id) const {
1421 if (!db_) return std::nullopt;
1422
1423 const char* sql = R"(
1424 SELECT pk, config_id, job_id, success,
1425 studies_checked, studies_synced, conflicts_found,
1426 errors_json, started_at, completed_at
1427 FROM sync_history WHERE config_id = ? ORDER BY started_at DESC LIMIT 1
1428 )";
1429
1430 sqlite3_stmt* stmt = nullptr;
1431 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1432 return std::nullopt;
1433 }
1434
1435 sqlite3_bind_text(stmt, 1, config_id.data(), static_cast<int>(config_id.size()), SQLITE_TRANSIENT);
1436
1437 std::optional<client::sync_history> result;
1438 if (sqlite3_step(stmt) == SQLITE_ROW) {
1439 result = parse_history_row(stmt);
1440 }
1441
1442 sqlite3_finalize(stmt);
1443 return result;
1444}
1445
1447 if (!db_) {
1448 return kcenon::common::make_error<size_t>(-1,
1449 "Database not initialized", "sync_repository");
1450 }
1451
1452 auto cutoff = std::chrono::system_clock::now() - max_age;
1453 auto cutoff_str = to_timestamp_string(cutoff);
1454
1455 const char* sql = "DELETE FROM sync_history WHERE completed_at < ?";
1456
1457 sqlite3_stmt* stmt = nullptr;
1458 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1459 return kcenon::common::make_error<size_t>(-1,
1460 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1461 "sync_repository");
1462 }
1463
1464 sqlite3_bind_text(stmt, 1, cutoff_str.c_str(), -1, SQLITE_TRANSIENT);
1465
1466 int rc = sqlite3_step(stmt);
1467 sqlite3_finalize(stmt);
1468
1469 if (rc != SQLITE_DONE) {
1470 return kcenon::common::make_error<size_t>(-1,
1471 "Failed to cleanup history: " + std::string(sqlite3_errmsg(db_)),
1472 "sync_repository");
1473 }
1474
1475 return static_cast<size_t>(sqlite3_changes(db_));
1476}
1477
1478// =============================================================================
1479// Statistics
1480// =============================================================================
1481
1483 if (!db_) return 0;
1484
1485 const char* sql = "SELECT COUNT(*) FROM sync_configs";
1486
1487 sqlite3_stmt* stmt = nullptr;
1488 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1489 return 0;
1490 }
1491
1492 size_t count = 0;
1493 if (sqlite3_step(stmt) == SQLITE_ROW) {
1494 count = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1495 }
1496
1497 sqlite3_finalize(stmt);
1498 return count;
1499}
1500
1502 if (!db_) return 0;
1503
1504 const char* sql = "SELECT COUNT(*) FROM sync_conflicts WHERE resolved = 0";
1505
1506 sqlite3_stmt* stmt = nullptr;
1507 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1508 return 0;
1509 }
1510
1511 size_t count = 0;
1512 if (sqlite3_step(stmt) == SQLITE_ROW) {
1513 count = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1514 }
1515
1516 sqlite3_finalize(stmt);
1517 return count;
1518}
1519
1521 if (!db_) return 0;
1522
1523 const char* sql = R"(
1524 SELECT COUNT(*) FROM sync_history
1525 WHERE date(completed_at) = date('now')
1526 )";
1527
1528 sqlite3_stmt* stmt = nullptr;
1529 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1530 return 0;
1531 }
1532
1533 size_t count = 0;
1534 if (sqlite3_step(stmt) == SQLITE_ROW) {
1535 count = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1536 }
1537
1538 sqlite3_finalize(stmt);
1539 return count;
1540}
1541
1542// =============================================================================
1543// Database Information
1544// =============================================================================
1545
1546bool sync_repository::is_valid() const noexcept {
1547 return db_ != nullptr;
1548}
1549
1550// =============================================================================
1551// Private Implementation
1552// =============================================================================
1553
1555 auto* stmt = static_cast<sqlite3_stmt*>(stmt_ptr);
1556 client::sync_config config;
1557
1558 config.pk = get_int64_column(stmt, 0);
1559 config.config_id = get_text_column(stmt, 1);
1560 config.source_node_id = get_text_column(stmt, 2);
1561 config.name = get_text_column(stmt, 3);
1562 config.enabled = get_int_column(stmt, 4) != 0;
1563 config.lookback = std::chrono::hours(get_int_column(stmt, 5, 24));
1564 config.modalities = deserialize_vector(get_text_column(stmt, 6));
1565 config.patient_id_patterns = deserialize_vector(get_text_column(stmt, 7));
1566 config.direction = client::sync_direction_from_string(get_text_column(stmt, 8));
1567 config.delete_missing = get_int_column(stmt, 9) != 0;
1568 config.overwrite_existing = get_int_column(stmt, 10) != 0;
1569 config.sync_metadata_only = get_int_column(stmt, 11) != 0;
1570 config.schedule_cron = get_text_column(stmt, 12);
1571 config.last_sync = from_timestamp_string(
1572 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 13)));
1573 config.last_successful_sync = from_timestamp_string(
1574 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 14)));
1575 config.total_syncs = static_cast<size_t>(get_int64_column(stmt, 15));
1576 config.studies_synced = static_cast<size_t>(get_int64_column(stmt, 16));
1577
1578 return config;
1579}
1580
1582 auto* stmt = static_cast<sqlite3_stmt*>(stmt_ptr);
1583 client::sync_conflict conflict;
1584
1585 conflict.pk = get_int64_column(stmt, 0);
1586 conflict.config_id = get_text_column(stmt, 1);
1587 conflict.study_uid = get_text_column(stmt, 2);
1588 conflict.patient_id = get_text_column(stmt, 3);
1589 conflict.conflict_type = client::sync_conflict_type_from_string(get_text_column(stmt, 4));
1590 conflict.local_modified = from_timestamp_string(
1591 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 5)));
1592 conflict.remote_modified = from_timestamp_string(
1593 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 6)));
1594 conflict.local_instance_count = static_cast<size_t>(get_int64_column(stmt, 7));
1595 conflict.remote_instance_count = static_cast<size_t>(get_int64_column(stmt, 8));
1596 conflict.resolved = get_int_column(stmt, 9) != 0;
1597 conflict.resolution_used = client::conflict_resolution_from_string(get_text_column(stmt, 10));
1598 conflict.detected_at = from_timestamp_string(
1599 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 11)));
1600
1601 auto resolved_at_str = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 12));
1602 if (resolved_at_str && resolved_at_str[0] != '\0') {
1603 conflict.resolved_at = from_timestamp_string(resolved_at_str);
1604 }
1605
1606 return conflict;
1607}
1608
1610 auto* stmt = static_cast<sqlite3_stmt*>(stmt_ptr);
1611 client::sync_history history;
1612
1613 history.pk = get_int64_column(stmt, 0);
1614 history.config_id = get_text_column(stmt, 1);
1615 history.job_id = get_text_column(stmt, 2);
1616 history.success = get_int_column(stmt, 3) != 0;
1617 history.studies_checked = static_cast<size_t>(get_int64_column(stmt, 4));
1618 history.studies_synced = static_cast<size_t>(get_int64_column(stmt, 5));
1619 history.conflicts_found = static_cast<size_t>(get_int64_column(stmt, 6));
1620 history.errors = deserialize_vector(get_text_column(stmt, 7));
1621 history.started_at = from_timestamp_string(
1622 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 8)));
1623 history.completed_at = from_timestamp_string(
1624 reinterpret_cast<const char*>(sqlite3_column_text(stmt, 9)));
1625
1626 return history;
1627}
1628
1629} // namespace kcenon::pacs::storage
1630
1631#endif // PACS_WITH_DATABASE_SYSTEM
Repository for sync persistence (legacy SQLite interface)
auto find_config(std::string_view config_id) const -> std::optional< client::sync_config >
auto list_unresolved_conflicts() const -> std::vector< client::sync_conflict >
auto remove_config(std::string_view config_id) -> VoidResult
auto count_unresolved_conflicts() const -> size_t
static auto serialize_vector(const std::vector< std::string > &vec) -> std::string
auto get_last_history(std::string_view config_id) const -> std::optional< client::sync_history >
auto save_history(const client::sync_history &history) -> VoidResult
auto list_enabled_configs() const -> std::vector< client::sync_config >
auto parse_config_row(void *stmt) const -> client::sync_config
auto find_conflict(std::string_view study_uid) const -> std::optional< client::sync_conflict >
auto resolve_conflict(std::string_view study_uid, client::conflict_resolution resolution) -> VoidResult
auto list_configs() const -> std::vector< client::sync_config >
auto cleanup_old_history(std::chrono::hours max_age) -> Result< size_t >
auto parse_conflict_row(void *stmt) const -> client::sync_conflict
static auto deserialize_vector(std::string_view json) -> std::vector< std::string >
auto is_valid() const noexcept -> bool
auto update_config_stats(std::string_view config_id, bool success, size_t studies_synced) -> VoidResult
auto list_conflicts(std::string_view config_id) const -> std::vector< client::sync_conflict >
auto parse_history_row(void *stmt) const -> client::sync_history
auto list_history(std::string_view config_id, size_t limit=100) const -> std::vector< client::sync_history >
auto save_conflict(const client::sync_conflict &conflict) -> VoidResult
auto cleanup_old_conflicts(std::chrono::hours max_age) -> Result< size_t >
conflict_resolution
Strategy for resolving synchronization conflicts.
Definition sync_types.h:121
conflict_resolution conflict_resolution_from_string(std::string_view str) noexcept
Parse conflict_resolution from string.
Definition sync_types.h:147
sync_conflict_type sync_conflict_type_from_string(std::string_view str) noexcept
Parse sync_conflict_type from string.
Definition sync_types.h:105
sync_direction sync_direction_from_string(std::string_view str) noexcept
Parse sync_direction from string.
Definition sync_types.h:63
@ move
C-MOVE move request/response.
auto to_string(annotation_type type) -> std::string
Convert annotation_type to string.
Configuration for a synchronization task.
Definition sync_types.h:164
std::vector< std::string > modalities
Modality filter (empty = all)
Definition sync_types.h:179
std::chrono::hours lookback
How far back to sync.
Definition sync_types.h:178
std::string name
Human-readable name.
Definition sync_types.h:171
int64_t pk
Primary key (0 if not persisted)
Definition sync_types.h:210
bool delete_missing
Delete local if not on remote.
Definition sync_types.h:187
std::string schedule_cron
Cron expression for scheduling.
Definition sync_types.h:195
sync_direction direction
Direction of sync.
Definition sync_types.h:186
bool overwrite_existing
Overwrite if different.
Definition sync_types.h:188
std::chrono::system_clock::time_point last_successful_sync
Definition sync_types.h:202
std::string source_node_id
Remote node to sync with.
Definition sync_types.h:170
bool enabled
Whether this config is active.
Definition sync_types.h:172
std::chrono::system_clock::time_point last_sync
Definition sync_types.h:201
std::string config_id
Unique configuration identifier.
Definition sync_types.h:169
std::vector< std::string > patient_id_patterns
Patient ID patterns (empty = all)
Definition sync_types.h:180
bool sync_metadata_only
Only sync metadata, not images.
Definition sync_types.h:189
Represents a conflict detected during synchronization.
Definition sync_types.h:220
Historical record of a sync operation.
Definition sync_types.h:291
std::vector< std::string > errors
Definition sync_types.h:300
std::chrono::system_clock::time_point started_at
Definition sync_types.h:302
std::chrono::system_clock::time_point completed_at
Definition sync_types.h:303
Repository for sync persistence.