PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
job_repository.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16
17#include <chrono>
18#include <cstring>
19#include <sstream>
20
22
23#ifdef PACS_WITH_DATABASE_SYSTEM
24
25namespace kcenon::pacs::storage {
26
27// =============================================================================
28// Constructor
29// =============================================================================
30
31job_repository::job_repository(std::shared_ptr<pacs_database_adapter> db)
32 : base_repository(std::move(db), "jobs", "job_id") {}
33
34// =============================================================================
35// JSON Serialization (Static methods)
36// =============================================================================
37
38std::string job_repository::serialize_instance_uids(
39 const std::vector<std::string>& uids) {
40 if (uids.empty()) return "[]";
41
42 std::ostringstream oss;
43 oss << "[";
44 for (size_t i = 0; i < uids.size(); ++i) {
45 if (i > 0) oss << ",";
46 oss << "\"";
47 for (char c : uids[i]) {
48 if (c == '"')
49 oss << "\\\"";
50 else if (c == '\\')
51 oss << "\\\\";
52 else
53 oss << c;
54 }
55 oss << "\"";
56 }
57 oss << "]";
58 return oss.str();
59}
60
61std::vector<std::string> job_repository::deserialize_instance_uids(
62 std::string_view json) {
63 std::vector<std::string> result;
64 if (json.empty() || json == "[]") return result;
65
66 size_t pos = 0;
67 while (pos < json.size()) {
68 auto start = json.find('"', pos);
69 if (start == std::string_view::npos) break;
70
71 size_t end = start + 1;
72 while (end < json.size()) {
73 if (json[end] == '\\' && end + 1 < json.size()) {
74 end += 2;
75 } else if (json[end] == '"') {
76 break;
77 } else {
78 ++end;
79 }
80 }
81
82 if (end < json.size()) {
83 std::string value{json.substr(start + 1, end - start - 1)};
84 std::string unescaped;
85 for (size_t i = 0; i < value.size(); ++i) {
86 if (value[i] == '\\' && i + 1 < value.size()) {
87 unescaped += value[++i];
88 } else {
89 unescaped += value[i];
90 }
91 }
92 result.push_back(std::move(unescaped));
93 }
94
95 pos = end + 1;
96 }
97
98 return result;
99}
100
101std::string job_repository::serialize_metadata(
102 const std::unordered_map<std::string, std::string>& metadata) {
103 if (metadata.empty()) return "{}";
104
105 std::ostringstream oss;
106 oss << "{";
107 bool first = true;
108 for (const auto& [key, value] : metadata) {
109 if (!first) oss << ",";
110 first = false;
111
112 oss << "\"";
113 for (char c : key) {
114 if (c == '"')
115 oss << "\\\"";
116 else if (c == '\\')
117 oss << "\\\\";
118 else
119 oss << c;
120 }
121 oss << "\":\"";
122
123 for (char c : value) {
124 if (c == '"')
125 oss << "\\\"";
126 else if (c == '\\')
127 oss << "\\\\";
128 else
129 oss << c;
130 }
131 oss << "\"";
132 }
133 oss << "}";
134 return oss.str();
135}
136
137std::unordered_map<std::string, std::string> job_repository::deserialize_metadata(
138 std::string_view json) {
139 std::unordered_map<std::string, std::string> result;
140 if (json.empty() || json == "{}") return result;
141
142 size_t pos = 0;
143 while (pos < json.size()) {
144 auto key_start = json.find('"', pos);
145 if (key_start == std::string_view::npos) break;
146
147 size_t key_end = key_start + 1;
148 while (key_end < json.size() && json[key_end] != '"') {
149 if (json[key_end] == '\\') ++key_end;
150 ++key_end;
151 }
152 if (key_end >= json.size()) break;
153
154 std::string key{json.substr(key_start + 1, key_end - key_start - 1)};
155
156 auto val_start = json.find('"', key_end + 1);
157 if (val_start == std::string_view::npos) break;
158
159 size_t val_end = val_start + 1;
160 while (val_end < json.size() && json[val_end] != '"') {
161 if (json[val_end] == '\\') ++val_end;
162 ++val_end;
163 }
164
165 if (val_end < json.size()) {
166 std::string value{json.substr(val_start + 1, val_end - val_start - 1)};
167 result[key] = value;
168 }
169
170 pos = val_end + 1;
171 }
172
173 return result;
174}
175
176// =============================================================================
177// Timestamp Helpers
178// =============================================================================
179
180auto job_repository::parse_timestamp(const std::string& str) const
181 -> std::chrono::system_clock::time_point {
182 if (str.empty()) {
183 return {};
184 }
185
186 std::tm tm{};
187 if (std::sscanf(str.c_str(), "%d-%d-%d %d:%d:%d", &tm.tm_year, &tm.tm_mon,
188 &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
189 return {};
190 }
191
192 tm.tm_year -= 1900;
193 tm.tm_mon -= 1;
194
195#ifdef _WIN32
196 auto time = _mkgmtime(&tm);
197#else
198 auto time = timegm(&tm);
199#endif
200
201 return std::chrono::system_clock::from_time_t(time);
202}
203
204auto job_repository::format_timestamp(
205 std::chrono::system_clock::time_point tp) const -> std::string {
206 if (tp == std::chrono::system_clock::time_point{}) {
207 return "";
208 }
209
210 auto time = std::chrono::system_clock::to_time_t(tp);
211 std::tm tm{};
212#ifdef _WIN32
213 gmtime_s(&tm, &time);
214#else
215 gmtime_r(&time, &tm);
216#endif
217
218 char buf[32];
219 std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
220 return buf;
221}
222
223auto job_repository::format_optional_timestamp(
224 const std::optional<std::chrono::system_clock::time_point>& tp) const
225 -> std::string {
226 if (!tp.has_value()) {
227 return "";
228 }
229 return format_timestamp(tp.value());
230}
231
232// =============================================================================
233// Domain-Specific Operations
234// =============================================================================
235
236auto job_repository::find_by_pk(int64_t pk) -> result_type {
237 if (!db() || !db()->is_connected()) {
238 return result_type(kcenon::common::error_info{
239 -1, "Database not connected", "storage"});
240 }
241
242 auto builder = query_builder();
243 builder.select(select_columns())
244 .from(table_name())
245 .where("pk", "=", pk)
246 .limit(1);
247
248 auto result = storage_session().select(builder.build());
249 if (result.is_err()) {
250 return result_type(result.error());
251 }
252
253 if (result.value().empty()) {
254 return result_type(kcenon::common::error_info{
255 -1, "Job not found with pk=" + std::to_string(pk), "storage"});
256 }
257
258 return result_type(map_row_to_entity(result.value()[0]));
259}
260
261auto job_repository::find_jobs(const job_query_options& options)
262 -> list_result_type {
263 if (!db() || !db()->is_connected()) {
264 return list_result_type(kcenon::common::error_info{
265 -1, "Database not connected", "storage"});
266 }
267
268 auto builder = query_builder();
269 builder.select(select_columns()).from(table_name());
270
271 std::optional<database::query_condition> condition;
272
273 if (options.status.has_value()) {
274 auto cond = database::query_condition(
275 "status", "=", std::string(client::to_string(options.status.value())));
276 condition = cond;
277 }
278
279 if (options.type.has_value()) {
280 auto cond = database::query_condition(
281 "type", "=", std::string(client::to_string(options.type.value())));
282 if (condition.has_value()) {
283 condition = condition.value() && cond;
284 } else {
285 condition = cond;
286 }
287 }
288
289 if (options.created_by.has_value()) {
290 auto cond = database::query_condition(
291 "created_by", "=", options.created_by.value());
292 if (condition.has_value()) {
293 condition = condition.value() && cond;
294 } else {
295 condition = cond;
296 }
297 }
298
299 if (condition.has_value()) {
300 builder.where(condition.value());
301 }
302
303 if (options.order_by_priority) {
304 builder.order_by("priority", database::sort_order::desc);
305 builder.order_by("created_at", database::sort_order::asc);
306 } else {
307 builder.order_by("created_at", database::sort_order::desc);
308 }
309
310 if (options.limit > 0) {
311 builder.limit(options.limit);
312 if (options.offset > 0) {
313 builder.offset(options.offset);
314 }
315 }
316
317 auto result = storage_session().select(builder.build());
318 if (result.is_err()) {
319 return list_result_type(result.error());
320 }
321
322 std::vector<client::job_record> records;
323 records.reserve(result.value().size());
324 for (const auto& row : result.value()) {
325 records.push_back(map_row_to_entity(row));
326 }
327
328 return list_result_type(std::move(records));
329}
330
331auto job_repository::find_by_status(client::job_status status, size_t limit)
332 -> list_result_type {
333 job_query_options options;
334 options.status = status;
335 options.limit = limit;
336 return find_jobs(options);
337}
338
339auto job_repository::find_pending_jobs(size_t limit) -> list_result_type {
340 if (!db() || !db()->is_connected()) {
341 return list_result_type(kcenon::common::error_info{
342 -1, "Database not connected", "storage"});
343 }
344
345 auto builder = query_builder();
346 auto pending_cond = database::query_condition(
347 "status", "=", std::string("pending"));
348 auto queued_cond = database::query_condition(
349 "status", "=", std::string("queued"));
350
351 builder.select(select_columns())
352 .from(table_name())
353 .where(pending_cond || queued_cond)
354 .order_by("priority", database::sort_order::desc)
355 .order_by("created_at", database::sort_order::asc)
356 .limit(limit);
357
358 auto result = storage_session().select(builder.build());
359 if (result.is_err()) {
360 return list_result_type(result.error());
361 }
362
363 std::vector<client::job_record> records;
364 records.reserve(result.value().size());
365 for (const auto& row : result.value()) {
366 records.push_back(map_row_to_entity(row));
367 }
368
369 return list_result_type(std::move(records));
370}
371
372auto job_repository::find_by_node(std::string_view node_id) -> list_result_type {
373 if (!db() || !db()->is_connected()) {
374 return list_result_type(kcenon::common::error_info{
375 -1, "Database not connected", "storage"});
376 }
377
378 auto builder = query_builder();
379 auto source_cond = database::query_condition(
380 "source_node_id", "=", std::string(node_id));
381 auto dest_cond = database::query_condition(
382 "destination_node_id", "=", std::string(node_id));
383
384 builder.select(select_columns())
385 .from(table_name())
386 .where(source_cond || dest_cond)
387 .order_by("created_at", database::sort_order::desc);
388
389 auto result = storage_session().select(builder.build());
390 if (result.is_err()) {
391 return list_result_type(result.error());
392 }
393
394 std::vector<client::job_record> records;
395 records.reserve(result.value().size());
396 for (const auto& row : result.value()) {
397 records.push_back(map_row_to_entity(row));
398 }
399
400 return list_result_type(std::move(records));
401}
402
403auto job_repository::cleanup_old_jobs(std::chrono::hours max_age)
404 -> Result<size_t> {
405 if (!db() || !db()->is_connected()) {
406 return kcenon::common::make_error<size_t>(
407 -1, "Database not connected", "storage");
408 }
409
410 auto cutoff = std::chrono::system_clock::now() - max_age;
411 auto cutoff_str = format_timestamp(cutoff);
412
413 auto completed_cond = database::query_condition(
414 "status", "=", std::string("completed"));
415 auto failed_cond = database::query_condition(
416 "status", "=", std::string("failed"));
417 auto cancelled_cond = database::query_condition(
418 "status", "=", std::string("cancelled"));
419 auto date_cond = database::query_condition(
420 "completed_at", "<", cutoff_str);
421
422 auto status_cond = completed_cond || failed_cond || cancelled_cond;
423 auto final_cond = status_cond && date_cond;
424
425 auto builder = query_builder();
426 builder.delete_from(table_name()).where(final_cond);
427
428 auto result = storage_session().remove(builder.build());
429 if (result.is_err()) {
430 return kcenon::common::make_error<size_t>(
431 -1, result.error().message, "storage");
432 }
433
434 return kcenon::common::ok(static_cast<size_t>(result.value()));
435}
436
437// =============================================================================
438// Status Updates
439// =============================================================================
440
441auto job_repository::update_status(std::string_view job_id,
442 client::job_status status,
443 std::string_view error_message,
444 std::string_view error_details) -> VoidResult {
445 if (!db() || !db()->is_connected()) {
446 return VoidResult(kcenon::common::error_info{
447 -1, "Database not connected", "storage"});
448 }
449
450 auto builder = query_builder();
451 builder.update(table_name())
452 .set("status", std::string(client::to_string(status)));
453
454 if (status == client::job_status::failed) {
455 builder.set("error_message", std::string(error_message))
456 .set("error_details", std::string(error_details));
457 }
458
459 builder.where("job_id", "=", std::string(job_id));
460
461 auto result = storage_session().execute(builder.build());
462 if (result.is_err()) {
463 return VoidResult(result.error());
464 }
465
466 return kcenon::common::ok();
467}
468
469auto job_repository::update_progress(std::string_view job_id,
470 const client::job_progress& progress)
471 -> VoidResult {
472 if (!db() || !db()->is_connected()) {
473 return VoidResult(kcenon::common::error_info{
474 -1, "Database not connected", "storage"});
475 }
476
477 auto builder = query_builder();
478 builder.update(table_name())
479 .set("total_items", static_cast<int64_t>(progress.total_items))
480 .set("completed_items", static_cast<int64_t>(progress.completed_items))
481 .set("failed_items", static_cast<int64_t>(progress.failed_items))
482 .set("skipped_items", static_cast<int64_t>(progress.skipped_items))
483 .set("bytes_transferred", static_cast<int64_t>(progress.bytes_transferred))
484 .set("current_item", progress.current_item)
485 .set("current_item_description", progress.current_item_description)
486 .where("job_id", "=", std::string(job_id));
487
488 auto result = storage_session().execute(builder.build());
489 if (result.is_err()) {
490 return VoidResult(result.error());
491 }
492
493 return kcenon::common::ok();
494}
495
496auto job_repository::mark_started(std::string_view job_id) -> VoidResult {
497 if (!db() || !db()->is_connected()) {
498 return VoidResult(kcenon::common::error_info{
499 -1, "Database not connected", "storage"});
500 }
501
502 auto now_str = format_timestamp(std::chrono::system_clock::now());
503
504 auto builder = query_builder();
505 builder.update(table_name())
506 .set("status", std::string("running"))
507 .set("started_at", now_str)
508 .where("job_id", "=", std::string(job_id));
509
510 auto result = storage_session().execute(builder.build());
511 if (result.is_err()) {
512 return VoidResult(result.error());
513 }
514
515 return kcenon::common::ok();
516}
517
518auto job_repository::mark_completed(std::string_view job_id) -> VoidResult {
519 if (!db() || !db()->is_connected()) {
520 return VoidResult(kcenon::common::error_info{
521 -1, "Database not connected", "storage"});
522 }
523
524 auto now_str = format_timestamp(std::chrono::system_clock::now());
525
526 auto builder = query_builder();
527 builder.update(table_name())
528 .set("status", std::string("completed"))
529 .set("completed_at", now_str)
530 .where("job_id", "=", std::string(job_id));
531
532 auto result = storage_session().execute(builder.build());
533 if (result.is_err()) {
534 return VoidResult(result.error());
535 }
536
537 return kcenon::common::ok();
538}
539
540auto job_repository::mark_failed(std::string_view job_id,
541 std::string_view error_message,
542 std::string_view error_details) -> VoidResult {
543 if (!db() || !db()->is_connected()) {
544 return VoidResult(kcenon::common::error_info{
545 -1, "Database not connected", "storage"});
546 }
547
548 auto now_str = format_timestamp(std::chrono::system_clock::now());
549
550 // First get current retry count
551 auto find_result = find_by_id(std::string(job_id));
552 int current_retry = 0;
553 if (find_result.is_ok()) {
554 current_retry = find_result.value().retry_count;
555 }
556
557 auto builder = query_builder();
558 builder.update(table_name())
559 .set("status", std::string("failed"))
560 .set("error_message", std::string(error_message))
561 .set("error_details", std::string(error_details))
562 .set("retry_count", static_cast<int64_t>(current_retry + 1))
563 .set("completed_at", now_str)
564 .where("job_id", "=", std::string(job_id));
565
566 auto result = storage_session().execute(builder.build());
567 if (result.is_err()) {
568 return VoidResult(result.error());
569 }
570
571 return kcenon::common::ok();
572}
573
574auto job_repository::increment_retry(std::string_view job_id) -> VoidResult {
575 if (!db() || !db()->is_connected()) {
576 return VoidResult(kcenon::common::error_info{
577 -1, "Database not connected", "storage"});
578 }
579
580 // First get current retry count
581 auto find_result = find_by_id(std::string(job_id));
582 int current_retry = 0;
583 if (find_result.is_ok()) {
584 current_retry = find_result.value().retry_count;
585 }
586
587 auto builder = query_builder();
588 builder.update(table_name())
589 .set("retry_count", static_cast<int64_t>(current_retry + 1))
590 .where("job_id", "=", std::string(job_id));
591
592 auto result = storage_session().execute(builder.build());
593 if (result.is_err()) {
594 return VoidResult(result.error());
595 }
596
597 return kcenon::common::ok();
598}
599
600// =============================================================================
601// Statistics
602// =============================================================================
603
604auto job_repository::count_by_status(client::job_status status)
605 -> Result<size_t> {
606 if (!db() || !db()->is_connected()) {
607 return kcenon::common::make_error<size_t>(
608 -1, "Database not connected", "storage");
609 }
610
611 auto result = storage_session().select(kcenon::pacs::compat::format(
612 "SELECT COUNT(*) as count FROM {} WHERE status = '{}'",
613 table_name(), std::string(client::to_string(status))));
614 if (result.is_err()) {
615 return Result<size_t>(result.error());
616 }
617
618 if (result.value().empty()) {
619 return kcenon::common::ok(static_cast<size_t>(0));
620 }
621
622 return kcenon::common::ok(
623 static_cast<size_t>(std::stoull(result.value()[0].at("count"))));
624}
625
626auto job_repository::count_completed_today() -> Result<size_t> {
627 if (!db() || !db()->is_connected()) {
628 return kcenon::common::make_error<size_t>(
629 -1, "Database not connected", "storage");
630 }
631
632 // Get today's date in SQL format
633 auto now = std::chrono::system_clock::now();
634 auto today_str = format_timestamp(now).substr(0, 10); // YYYY-MM-DD
635
636 // Note: For date comparison, we use the date portion of completed_at
637 // This requires database-specific handling - using LIKE for portability
638 auto result = storage_session().select(kcenon::pacs::compat::format(
639 "SELECT COUNT(*) as count FROM {} WHERE status = 'completed'",
640 table_name()));
641 if (result.is_err()) {
642 return Result<size_t>(result.error());
643 }
644
645 if (result.value().empty()) {
646 return kcenon::common::ok(static_cast<size_t>(0));
647 }
648
649 return kcenon::common::ok(
650 static_cast<size_t>(std::stoull(result.value()[0].at("count"))));
651}
652
653auto job_repository::count_failed_today() -> Result<size_t> {
654 if (!db() || !db()->is_connected()) {
655 return kcenon::common::make_error<size_t>(
656 -1, "Database not connected", "storage");
657 }
658
659 auto result = storage_session().select(kcenon::pacs::compat::format(
660 "SELECT COUNT(*) as count FROM {} WHERE status = 'failed'",
661 table_name()));
662 if (result.is_err()) {
663 return Result<size_t>(result.error());
664 }
665
666 if (result.value().empty()) {
667 return kcenon::common::ok(static_cast<size_t>(0));
668 }
669
670 return kcenon::common::ok(
671 static_cast<size_t>(std::stoull(result.value()[0].at("count"))));
672}
673
674// =============================================================================
675// base_repository Overrides
676// =============================================================================
677
678auto job_repository::map_row_to_entity(const database_row& row) const
679 -> client::job_record {
680 client::job_record job;
681
682 // Parse pk if present
683 auto pk_it = row.find("pk");
684 if (pk_it != row.end() && !pk_it->second.empty()) {
685 job.pk = std::stoll(pk_it->second);
686 }
687
688 job.job_id = row.at("job_id");
689 job.type = client::job_type_from_string(row.at("type"));
690 job.status = client::job_status_from_string(row.at("status"));
691
692 auto priority_it = row.find("priority");
693 if (priority_it != row.end() && !priority_it->second.empty()) {
694 job.priority = client::job_priority_from_int(std::stoi(priority_it->second));
695 }
696
697 job.source_node_id = row.at("source_node_id");
698 job.destination_node_id = row.at("destination_node_id");
699
700 // Optional fields
701 auto patient_id_it = row.find("patient_id");
702 if (patient_id_it != row.end() && !patient_id_it->second.empty()) {
703 job.patient_id = patient_id_it->second;
704 }
705
706 auto study_uid_it = row.find("study_uid");
707 if (study_uid_it != row.end() && !study_uid_it->second.empty()) {
708 job.study_uid = study_uid_it->second;
709 }
710
711 auto series_uid_it = row.find("series_uid");
712 if (series_uid_it != row.end() && !series_uid_it->second.empty()) {
713 job.series_uid = series_uid_it->second;
714 }
715
716 auto sop_uid_it = row.find("sop_instance_uid");
717 if (sop_uid_it != row.end() && !sop_uid_it->second.empty()) {
718 job.sop_instance_uid = sop_uid_it->second;
719 }
720
721 auto uids_it = row.find("instance_uids_json");
722 if (uids_it != row.end() && !uids_it->second.empty()) {
723 job.instance_uids = deserialize_instance_uids(uids_it->second);
724 }
725
726 // Progress fields
727 auto total_it = row.find("total_items");
728 if (total_it != row.end() && !total_it->second.empty()) {
729 job.progress.total_items = static_cast<size_t>(std::stoll(total_it->second));
730 }
731
732 auto completed_it = row.find("completed_items");
733 if (completed_it != row.end() && !completed_it->second.empty()) {
734 job.progress.completed_items = static_cast<size_t>(std::stoll(completed_it->second));
735 }
736
737 auto failed_it = row.find("failed_items");
738 if (failed_it != row.end() && !failed_it->second.empty()) {
739 job.progress.failed_items = static_cast<size_t>(std::stoll(failed_it->second));
740 }
741
742 auto skipped_it = row.find("skipped_items");
743 if (skipped_it != row.end() && !skipped_it->second.empty()) {
744 job.progress.skipped_items = static_cast<size_t>(std::stoll(skipped_it->second));
745 }
746
747 auto bytes_it = row.find("bytes_transferred");
748 if (bytes_it != row.end() && !bytes_it->second.empty()) {
749 job.progress.bytes_transferred = static_cast<size_t>(std::stoll(bytes_it->second));
750 }
751
752 auto current_it = row.find("current_item");
753 if (current_it != row.end()) {
754 job.progress.current_item = current_it->second;
755 }
756
757 auto current_desc_it = row.find("current_item_description");
758 if (current_desc_it != row.end()) {
759 job.progress.current_item_description = current_desc_it->second;
760 }
761
762 job.progress.calculate_percent();
763
764 // Error fields
765 auto error_msg_it = row.find("error_message");
766 if (error_msg_it != row.end()) {
767 job.error_message = error_msg_it->second;
768 }
769
770 auto error_det_it = row.find("error_details");
771 if (error_det_it != row.end()) {
772 job.error_details = error_det_it->second;
773 }
774
775 auto retry_it = row.find("retry_count");
776 if (retry_it != row.end() && !retry_it->second.empty()) {
777 job.retry_count = std::stoi(retry_it->second);
778 }
779
780 auto max_retry_it = row.find("max_retries");
781 if (max_retry_it != row.end() && !max_retry_it->second.empty()) {
782 job.max_retries = std::stoi(max_retry_it->second);
783 } else {
784 job.max_retries = 3;
785 }
786
787 // Metadata
788 auto created_by_it = row.find("created_by");
789 if (created_by_it != row.end()) {
790 job.created_by = created_by_it->second;
791 }
792
793 auto metadata_it = row.find("metadata_json");
794 if (metadata_it != row.end() && !metadata_it->second.empty()) {
795 job.metadata = deserialize_metadata(metadata_it->second);
796 }
797
798 // Timestamps
799 auto created_at_it = row.find("created_at");
800 if (created_at_it != row.end() && !created_at_it->second.empty()) {
801 job.created_at = parse_timestamp(created_at_it->second);
802 }
803
804 auto queued_it = row.find("queued_at");
805 if (queued_it != row.end() && !queued_it->second.empty()) {
806 auto tp = parse_timestamp(queued_it->second);
807 if (tp != std::chrono::system_clock::time_point{}) {
808 job.queued_at = tp;
809 }
810 }
811
812 auto started_it = row.find("started_at");
813 if (started_it != row.end() && !started_it->second.empty()) {
814 auto tp = parse_timestamp(started_it->second);
815 if (tp != std::chrono::system_clock::time_point{}) {
816 job.started_at = tp;
817 }
818 }
819
820 auto completed_at_it = row.find("completed_at");
821 if (completed_at_it != row.end() && !completed_at_it->second.empty()) {
822 auto tp = parse_timestamp(completed_at_it->second);
823 if (tp != std::chrono::system_clock::time_point{}) {
824 job.completed_at = tp;
825 }
826 }
827
828 return job;
829}
830
831auto job_repository::entity_to_row(const client::job_record& entity) const
832 -> std::map<std::string, database_value> {
833 std::map<std::string, database_value> row;
834
835 row["job_id"] = entity.job_id;
836 row["type"] = std::string(client::to_string(entity.type));
837 row["status"] = std::string(client::to_string(entity.status));
838 row["priority"] = static_cast<int64_t>(entity.priority);
839
840 row["source_node_id"] = entity.source_node_id;
841 row["destination_node_id"] = entity.destination_node_id;
842
843 row["patient_id"] = entity.patient_id.value_or("");
844 row["study_uid"] = entity.study_uid.value_or("");
845 row["series_uid"] = entity.series_uid.value_or("");
846 row["sop_instance_uid"] = entity.sop_instance_uid.value_or("");
847 row["instance_uids_json"] = serialize_instance_uids(entity.instance_uids);
848
849 row["total_items"] = static_cast<int64_t>(entity.progress.total_items);
850 row["completed_items"] = static_cast<int64_t>(entity.progress.completed_items);
851 row["failed_items"] = static_cast<int64_t>(entity.progress.failed_items);
852 row["skipped_items"] = static_cast<int64_t>(entity.progress.skipped_items);
853 row["bytes_transferred"] = static_cast<int64_t>(entity.progress.bytes_transferred);
854 row["current_item"] = entity.progress.current_item;
855 row["current_item_description"] = entity.progress.current_item_description;
856
857 row["error_message"] = entity.error_message;
858 row["error_details"] = entity.error_details;
859 row["retry_count"] = static_cast<int64_t>(entity.retry_count);
860 row["max_retries"] = static_cast<int64_t>(entity.max_retries);
861
862 row["created_by"] = entity.created_by;
863 row["metadata_json"] = serialize_metadata(entity.metadata);
864
865 if (entity.created_at != std::chrono::system_clock::time_point{}) {
866 row["created_at"] = format_timestamp(entity.created_at);
867 } else {
868 row["created_at"] = format_timestamp(std::chrono::system_clock::now());
869 }
870
871 row["queued_at"] = format_optional_timestamp(entity.queued_at);
872 row["started_at"] = format_optional_timestamp(entity.started_at);
873 row["completed_at"] = format_optional_timestamp(entity.completed_at);
874
875 return row;
876}
877
878auto job_repository::get_pk(const client::job_record& entity) const
879 -> std::string {
880 return entity.job_id;
881}
882
883auto job_repository::has_pk(const client::job_record& entity) const -> bool {
884 return !entity.job_id.empty();
885}
886
887auto job_repository::select_columns() const -> std::vector<std::string> {
888 return {"pk",
889 "job_id",
890 "type",
891 "status",
892 "priority",
893 "source_node_id",
894 "destination_node_id",
895 "patient_id",
896 "study_uid",
897 "series_uid",
898 "sop_instance_uid",
899 "instance_uids_json",
900 "total_items",
901 "completed_items",
902 "failed_items",
903 "skipped_items",
904 "bytes_transferred",
905 "current_item",
906 "current_item_description",
907 "error_message",
908 "error_details",
909 "retry_count",
910 "max_retries",
911 "created_by",
912 "metadata_json",
913 "created_at",
914 "queued_at",
915 "started_at",
916 "completed_at"};
917}
918
919} // namespace kcenon::pacs::storage
920
921#else // !PACS_WITH_DATABASE_SYSTEM
922
923// =============================================================================
924// Legacy SQLite Implementation
925// =============================================================================
926
927#include <sqlite3.h>
928
929namespace kcenon::pacs::storage {
930
931namespace {
932
934[[nodiscard]] std::string to_timestamp_string(
935 std::chrono::system_clock::time_point tp) {
936 if (tp == std::chrono::system_clock::time_point{}) {
937 return "";
938 }
939 auto time = std::chrono::system_clock::to_time_t(tp);
940 std::tm tm{};
941#ifdef _WIN32
942 gmtime_s(&tm, &time);
943#else
944 gmtime_r(&time, &tm);
945#endif
946 char buf[32];
947 std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
948 return buf;
949}
950
952[[nodiscard]] std::chrono::system_clock::time_point from_timestamp_string(
953 const char* str) {
954 if (!str || str[0] == '\0') {
955 return {};
956 }
957 std::tm tm{};
958 if (std::sscanf(str, "%d-%d-%d %d:%d:%d", &tm.tm_year, &tm.tm_mon,
959 &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
960 return {};
961 }
962 tm.tm_year -= 1900;
963 tm.tm_mon -= 1;
964#ifdef _WIN32
965 auto time = _mkgmtime(&tm);
966#else
967 auto time = timegm(&tm);
968#endif
969 return std::chrono::system_clock::from_time_t(time);
970}
971
973[[nodiscard]] std::optional<std::chrono::system_clock::time_point>
974from_optional_timestamp(const char* str) {
975 if (!str || str[0] == '\0') {
976 return std::nullopt;
977 }
978 auto tp = from_timestamp_string(str);
979 if (tp == std::chrono::system_clock::time_point{}) {
980 return std::nullopt;
981 }
982 return tp;
983}
984
986[[nodiscard]] std::string get_text_column(sqlite3_stmt* stmt, int col) {
987 auto text = reinterpret_cast<const char*>(sqlite3_column_text(stmt, col));
988 return text ? text : "";
989}
990
992[[nodiscard]] int get_int_column(sqlite3_stmt* stmt, int col,
993 int default_val = 0) {
994 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
995 return default_val;
996 }
997 return sqlite3_column_int(stmt, col);
998}
999
1001[[nodiscard]] int64_t get_int64_column(sqlite3_stmt* stmt, int col,
1002 int64_t default_val = 0) {
1003 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
1004 return default_val;
1005 }
1006 return sqlite3_column_int64(stmt, col);
1007}
1008
1010[[nodiscard]] std::optional<std::string> get_optional_text(sqlite3_stmt* stmt,
1011 int col) {
1012 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
1013 return std::nullopt;
1014 }
1015 auto text = reinterpret_cast<const char*>(sqlite3_column_text(stmt, col));
1016 return text ? std::optional<std::string>{text} : std::nullopt;
1017}
1018
1020void bind_optional_text(sqlite3_stmt* stmt, int idx,
1021 const std::optional<std::string>& value) {
1022 if (value.has_value()) {
1023 sqlite3_bind_text(stmt, idx, value->c_str(), -1, SQLITE_TRANSIENT);
1024 } else {
1025 sqlite3_bind_null(stmt, idx);
1026 }
1027}
1028
1030void bind_optional_timestamp(
1031 sqlite3_stmt* stmt, int idx,
1032 const std::optional<std::chrono::system_clock::time_point>& tp) {
1033 if (tp.has_value()) {
1034 auto str = to_timestamp_string(tp.value());
1035 sqlite3_bind_text(stmt, idx, str.c_str(), -1, SQLITE_TRANSIENT);
1036 } else {
1037 sqlite3_bind_null(stmt, idx);
1038 }
1039}
1040
1041} // namespace
1042
1043// =============================================================================
1044// JSON Serialization (Simple Implementation)
1045// =============================================================================
1046
1048 const std::vector<std::string>& uids) {
1049 if (uids.empty()) return "[]";
1050
1051 std::ostringstream oss;
1052 oss << "[";
1053 for (size_t i = 0; i < uids.size(); ++i) {
1054 if (i > 0) oss << ",";
1055 oss << "\"";
1056 for (char c : uids[i]) {
1057 if (c == '"')
1058 oss << "\\\"";
1059 else if (c == '\\')
1060 oss << "\\\\";
1061 else
1062 oss << c;
1063 }
1064 oss << "\"";
1065 }
1066 oss << "]";
1067 return oss.str();
1068}
1069
1071 std::string_view json) {
1072 std::vector<std::string> result;
1073 if (json.empty() || json == "[]") return result;
1074
1075 size_t pos = 0;
1076 while (pos < json.size()) {
1077 auto start = json.find('"', pos);
1078 if (start == std::string_view::npos) break;
1079
1080 size_t end = start + 1;
1081 while (end < json.size()) {
1082 if (json[end] == '\\' && end + 1 < json.size()) {
1083 end += 2;
1084 } else if (json[end] == '"') {
1085 break;
1086 } else {
1087 ++end;
1088 }
1089 }
1090
1091 if (end < json.size()) {
1092 std::string value{json.substr(start + 1, end - start - 1)};
1093 std::string unescaped;
1094 for (size_t i = 0; i < value.size(); ++i) {
1095 if (value[i] == '\\' && i + 1 < value.size()) {
1096 unescaped += value[++i];
1097 } else {
1098 unescaped += value[i];
1099 }
1100 }
1101 result.push_back(std::move(unescaped));
1102 }
1103
1104 pos = end + 1;
1105 }
1106
1107 return result;
1108}
1109
1111 const std::unordered_map<std::string, std::string>& metadata) {
1112 if (metadata.empty()) return "{}";
1113
1114 std::ostringstream oss;
1115 oss << "{";
1116 bool first = true;
1117 for (const auto& [key, value] : metadata) {
1118 if (!first) oss << ",";
1119 first = false;
1120
1121 oss << "\"";
1122 for (char c : key) {
1123 if (c == '"')
1124 oss << "\\\"";
1125 else if (c == '\\')
1126 oss << "\\\\";
1127 else
1128 oss << c;
1129 }
1130 oss << "\":\"";
1131
1132 for (char c : value) {
1133 if (c == '"')
1134 oss << "\\\"";
1135 else if (c == '\\')
1136 oss << "\\\\";
1137 else
1138 oss << c;
1139 }
1140 oss << "\"";
1141 }
1142 oss << "}";
1143 return oss.str();
1144}
1145
1146std::unordered_map<std::string, std::string> job_repository::deserialize_metadata(
1147 std::string_view json) {
1148 std::unordered_map<std::string, std::string> result;
1149 if (json.empty() || json == "{}") return result;
1150
1151 size_t pos = 0;
1152 while (pos < json.size()) {
1153 auto key_start = json.find('"', pos);
1154 if (key_start == std::string_view::npos) break;
1155
1156 size_t key_end = key_start + 1;
1157 while (key_end < json.size() && json[key_end] != '"') {
1158 if (json[key_end] == '\\') ++key_end;
1159 ++key_end;
1160 }
1161 if (key_end >= json.size()) break;
1162
1163 std::string key{json.substr(key_start + 1, key_end - key_start - 1)};
1164
1165 auto val_start = json.find('"', key_end + 1);
1166 if (val_start == std::string_view::npos) break;
1167
1168 size_t val_end = val_start + 1;
1169 while (val_end < json.size() && json[val_end] != '"') {
1170 if (json[val_end] == '\\') ++val_end;
1171 ++val_end;
1172 }
1173
1174 if (val_end < json.size()) {
1175 std::string value{json.substr(val_start + 1, val_end - val_start - 1)};
1176 result[key] = value;
1177 }
1178
1179 pos = val_end + 1;
1180 }
1181
1182 return result;
1183}
1184
1185// =============================================================================
1186// Construction / Destruction
1187// =============================================================================
1188
1189job_repository::job_repository(sqlite3* db) : db_(db) {}
1190
1192
1194
1195auto job_repository::operator=(job_repository&&) noexcept
1196 -> job_repository& = default;
1197
1198// =============================================================================
1199// CRUD Operations
1200// =============================================================================
1201
1202VoidResult job_repository::save(const client::job_record& job) {
1203 if (!db_) {
1204 return VoidResult(kcenon::common::error_info{
1205 -1, "Database not initialized", "job_repository"});
1206 }
1207
1208 static constexpr const char* sql = R"(
1209 INSERT INTO jobs (
1210 job_id, type, status, priority,
1211 source_node_id, destination_node_id,
1212 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1213 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1214 current_item, current_item_description,
1215 error_message, error_details, retry_count, max_retries,
1216 created_by, metadata_json,
1217 created_at, queued_at, started_at, completed_at
1218 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1219 ON CONFLICT(job_id) DO UPDATE SET
1220 status = excluded.status,
1221 priority = excluded.priority,
1222 total_items = excluded.total_items,
1223 completed_items = excluded.completed_items,
1224 failed_items = excluded.failed_items,
1225 skipped_items = excluded.skipped_items,
1226 bytes_transferred = excluded.bytes_transferred,
1227 current_item = excluded.current_item,
1228 current_item_description = excluded.current_item_description,
1229 error_message = excluded.error_message,
1230 error_details = excluded.error_details,
1231 retry_count = excluded.retry_count,
1232 queued_at = excluded.queued_at,
1233 started_at = excluded.started_at,
1234 completed_at = excluded.completed_at
1235 )";
1236
1237 sqlite3_stmt* stmt = nullptr;
1238 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1239 return VoidResult(kcenon::common::error_info{
1240 -1,
1241 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1242 "job_repository"});
1243 }
1244
1245 int idx = 1;
1246 sqlite3_bind_text(stmt, idx++, job.job_id.c_str(), -1, SQLITE_TRANSIENT);
1247 sqlite3_bind_text(stmt, idx++, client::to_string(job.type), -1,
1248 SQLITE_STATIC);
1249 sqlite3_bind_text(stmt, idx++, client::to_string(job.status), -1,
1250 SQLITE_STATIC);
1251 sqlite3_bind_int(stmt, idx++, static_cast<int>(job.priority));
1252
1253 sqlite3_bind_text(stmt, idx++, job.source_node_id.c_str(), -1,
1254 SQLITE_TRANSIENT);
1255 sqlite3_bind_text(stmt, idx++, job.destination_node_id.c_str(), -1,
1256 SQLITE_TRANSIENT);
1257
1258 bind_optional_text(stmt, idx++, job.patient_id);
1259 bind_optional_text(stmt, idx++, job.study_uid);
1260 bind_optional_text(stmt, idx++, job.series_uid);
1261 bind_optional_text(stmt, idx++, job.sop_instance_uid);
1262
1263 auto uids_json = serialize_instance_uids(job.instance_uids);
1264 sqlite3_bind_text(stmt, idx++, uids_json.c_str(), -1, SQLITE_TRANSIENT);
1265
1266 sqlite3_bind_int64(stmt, idx++,
1267 static_cast<int64_t>(job.progress.total_items));
1268 sqlite3_bind_int64(stmt, idx++,
1269 static_cast<int64_t>(job.progress.completed_items));
1270 sqlite3_bind_int64(stmt, idx++,
1271 static_cast<int64_t>(job.progress.failed_items));
1272 sqlite3_bind_int64(stmt, idx++,
1273 static_cast<int64_t>(job.progress.skipped_items));
1274 sqlite3_bind_int64(stmt, idx++,
1275 static_cast<int64_t>(job.progress.bytes_transferred));
1276
1277 sqlite3_bind_text(stmt, idx++, job.progress.current_item.c_str(), -1,
1278 SQLITE_TRANSIENT);
1279 sqlite3_bind_text(stmt, idx++,
1280 job.progress.current_item_description.c_str(), -1,
1281 SQLITE_TRANSIENT);
1282
1283 sqlite3_bind_text(stmt, idx++, job.error_message.c_str(), -1,
1284 SQLITE_TRANSIENT);
1285 sqlite3_bind_text(stmt, idx++, job.error_details.c_str(), -1,
1286 SQLITE_TRANSIENT);
1287 sqlite3_bind_int(stmt, idx++, job.retry_count);
1288 sqlite3_bind_int(stmt, idx++, job.max_retries);
1289
1290 sqlite3_bind_text(stmt, idx++, job.created_by.c_str(), -1, SQLITE_TRANSIENT);
1291
1292 auto metadata_json = serialize_metadata(job.metadata);
1293 sqlite3_bind_text(stmt, idx++, metadata_json.c_str(), -1, SQLITE_TRANSIENT);
1294
1295 auto created_str = to_timestamp_string(job.created_at);
1296 sqlite3_bind_text(stmt, idx++, created_str.c_str(), -1, SQLITE_TRANSIENT);
1297
1298 bind_optional_timestamp(stmt, idx++, job.queued_at);
1299 bind_optional_timestamp(stmt, idx++, job.started_at);
1300 bind_optional_timestamp(stmt, idx++, job.completed_at);
1301
1302 auto rc = sqlite3_step(stmt);
1303 sqlite3_finalize(stmt);
1304
1305 if (rc != SQLITE_DONE) {
1306 return VoidResult(kcenon::common::error_info{
1307 -1, "Failed to save job: " + std::string(sqlite3_errmsg(db_)),
1308 "job_repository"});
1309 }
1310
1311 return kcenon::common::ok();
1312}
1313
1314std::optional<client::job_record> job_repository::find_by_id(
1315 std::string_view job_id) const {
1316 if (!db_) return std::nullopt;
1317
1318 static constexpr const char* sql = R"(
1319 SELECT pk, job_id, type, status, priority,
1320 source_node_id, destination_node_id,
1321 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1322 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1323 current_item, current_item_description,
1324 error_message, error_details, retry_count, max_retries,
1325 created_by, metadata_json,
1326 created_at, queued_at, started_at, completed_at
1327 FROM jobs WHERE job_id = ?
1328 )";
1329
1330 sqlite3_stmt* stmt = nullptr;
1331 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1332 return std::nullopt;
1333 }
1334
1335 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1336 SQLITE_TRANSIENT);
1337
1338 std::optional<client::job_record> result;
1339 if (sqlite3_step(stmt) == SQLITE_ROW) {
1340 result = parse_row(stmt);
1341 }
1342
1343 sqlite3_finalize(stmt);
1344 return result;
1345}
1346
1347std::optional<client::job_record> job_repository::find_by_pk(int64_t pk) const {
1348 if (!db_) return std::nullopt;
1349
1350 static constexpr const char* sql = R"(
1351 SELECT pk, job_id, type, status, priority,
1352 source_node_id, destination_node_id,
1353 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1354 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1355 current_item, current_item_description,
1356 error_message, error_details, retry_count, max_retries,
1357 created_by, metadata_json,
1358 created_at, queued_at, started_at, completed_at
1359 FROM jobs WHERE pk = ?
1360 )";
1361
1362 sqlite3_stmt* stmt = nullptr;
1363 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1364 return std::nullopt;
1365 }
1366
1367 sqlite3_bind_int64(stmt, 1, pk);
1368
1369 std::optional<client::job_record> result;
1370 if (sqlite3_step(stmt) == SQLITE_ROW) {
1371 result = parse_row(stmt);
1372 }
1373
1374 sqlite3_finalize(stmt);
1375 return result;
1376}
1377
1378std::vector<client::job_record> job_repository::find_jobs(
1379 const job_query_options& options) const {
1380 std::vector<client::job_record> result;
1381 if (!db_) return result;
1382
1383 std::ostringstream sql;
1384 sql << R"(
1385 SELECT pk, job_id, type, status, priority,
1386 source_node_id, destination_node_id,
1387 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1388 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1389 current_item, current_item_description,
1390 error_message, error_details, retry_count, max_retries,
1391 created_by, metadata_json,
1392 created_at, queued_at, started_at, completed_at
1393 FROM jobs WHERE 1=1
1394 )";
1395
1396 std::vector<std::pair<int, std::string>> bindings;
1397 int param_idx = 1;
1398
1399 if (options.status.has_value()) {
1400 sql << " AND status = ?";
1401 bindings.emplace_back(
1402 param_idx++, std::string(client::to_string(options.status.value())));
1403 }
1404
1405 if (options.type.has_value()) {
1406 sql << " AND type = ?";
1407 bindings.emplace_back(param_idx++,
1408 std::string(client::to_string(options.type.value())));
1409 }
1410
1411 if (options.node_id.has_value()) {
1412 sql << " AND (source_node_id = ? OR destination_node_id = ?)";
1413 bindings.emplace_back(param_idx++, options.node_id.value());
1414 bindings.emplace_back(param_idx++, options.node_id.value());
1415 }
1416
1417 if (options.created_by.has_value()) {
1418 sql << " AND created_by = ?";
1419 bindings.emplace_back(param_idx++, options.created_by.value());
1420 }
1421
1422 if (options.order_by_priority) {
1423 sql << " ORDER BY priority DESC, created_at ASC";
1424 } else {
1425 sql << " ORDER BY created_at DESC";
1426 }
1427
1428 sql << " LIMIT " << options.limit << " OFFSET " << options.offset;
1429
1430 sqlite3_stmt* stmt = nullptr;
1431 auto sql_str = sql.str();
1432 if (sqlite3_prepare_v2(db_, sql_str.c_str(), -1, &stmt, nullptr) !=
1433 SQLITE_OK) {
1434 return result;
1435 }
1436
1437 for (const auto& [idx, value] : bindings) {
1438 sqlite3_bind_text(stmt, idx, value.c_str(), -1, SQLITE_TRANSIENT);
1440
1441 while (sqlite3_step(stmt) == SQLITE_ROW) {
1442 result.push_back(parse_row(stmt));
1443 }
1444
1445 sqlite3_finalize(stmt);
1446 return result;
1448
1449std::vector<client::job_record> job_repository::find_by_status(
1450 client::job_status status, size_t limit) const {
1451 job_query_options options;
1452 options.status = status;
1453 options.limit = limit;
1454 return find_jobs(options);
1455}
1456
1457std::vector<client::job_record> job_repository::find_pending_jobs(
1458 size_t limit) const {
1459 std::vector<client::job_record> result;
1460 if (!db_) return result;
1461
1462 static constexpr const char* sql = R"(
1463 SELECT pk, job_id, type, status, priority,
1464 source_node_id, destination_node_id,
1465 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1466 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1467 current_item, current_item_description,
1468 error_message, error_details, retry_count, max_retries,
1469 created_by, metadata_json,
1470 created_at, queued_at, started_at, completed_at
1471 FROM jobs
1472 WHERE status IN ('pending', 'queued')
1473 ORDER BY priority DESC, created_at ASC
1474 LIMIT ?
1475 )";
1476
1477 sqlite3_stmt* stmt = nullptr;
1478 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1479 return result;
1480 }
1481
1482 sqlite3_bind_int64(stmt, 1, static_cast<int64_t>(limit));
1483
1484 while (sqlite3_step(stmt) == SQLITE_ROW) {
1485 result.push_back(parse_row(stmt));
1486 }
1487
1488 sqlite3_finalize(stmt);
1489 return result;
1490}
1491
1492std::vector<client::job_record> job_repository::find_by_node(
1493 std::string_view node_id) const {
1494 job_query_options options;
1495 options.node_id = std::string(node_id);
1496 return find_jobs(options);
1497}
1498
1499VoidResult job_repository::remove(std::string_view job_id) {
1500 if (!db_) {
1501 return VoidResult(kcenon::common::error_info{
1502 -1, "Database not initialized", "job_repository"});
1503 }
1504
1505 static constexpr const char* sql = "DELETE FROM jobs WHERE job_id = ?";
1506
1507 sqlite3_stmt* stmt = nullptr;
1508 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1509 return VoidResult(kcenon::common::error_info{
1510 -1,
1511 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1512 "job_repository"});
1513 }
1514
1515 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1516 SQLITE_TRANSIENT);
1517
1518 auto rc = sqlite3_step(stmt);
1519 sqlite3_finalize(stmt);
1521 if (rc != SQLITE_DONE) {
1522 return VoidResult(kcenon::common::error_info{
1523 -1, "Failed to delete job: " + std::string(sqlite3_errmsg(db_)),
1524 "job_repository"});
1525 }
1526
1527 return kcenon::common::ok();
1528}
1529
1530Result<size_t> job_repository::cleanup_old_jobs(std::chrono::hours max_age) {
1531 if (!db_) {
1532 return kcenon::common::make_error<size_t>(
1533 -1, "Database not initialized", "job_repository");
1534 }
1535
1536 auto cutoff = std::chrono::system_clock::now() - max_age;
1537 auto cutoff_str = to_timestamp_string(cutoff);
1538
1539 static constexpr const char* sql = R"(
1540 DELETE FROM jobs
1541 WHERE status IN ('completed', 'failed', 'cancelled')
1542 AND completed_at < ?
1543 )";
1544
1545 sqlite3_stmt* stmt = nullptr;
1546 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1547 return kcenon::common::make_error<size_t>(
1548 -1,
1549 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1550 "job_repository");
1551 }
1552
1553 sqlite3_bind_text(stmt, 1, cutoff_str.c_str(), -1, SQLITE_TRANSIENT);
1554
1555 auto rc = sqlite3_step(stmt);
1556 sqlite3_finalize(stmt);
1558 if (rc != SQLITE_DONE) {
1559 return kcenon::common::make_error<size_t>(
1560 -1, "Failed to cleanup jobs: " + std::string(sqlite3_errmsg(db_)),
1561 "job_repository");
1562 }
1563
1564 return kcenon::common::ok(static_cast<size_t>(sqlite3_changes(db_)));
1565}
1566
1567bool job_repository::exists(std::string_view job_id) const {
1568 if (!db_) return false;
1569
1570 static constexpr const char* sql = "SELECT 1 FROM jobs WHERE job_id = ?";
1571
1572 sqlite3_stmt* stmt = nullptr;
1573 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1574 return false;
1575 }
1576
1577 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1578 SQLITE_TRANSIENT);
1580 bool found = (sqlite3_step(stmt) == SQLITE_ROW);
1581 sqlite3_finalize(stmt);
1582 return found;
1583}
1584
1585// =============================================================================
1586// Status Updates
1587// =============================================================================
1588
1589VoidResult job_repository::update_status(std::string_view job_id,
1590 client::job_status status,
1591 std::string_view error_message,
1592 std::string_view error_details) {
1593 if (!db_) {
1594 return VoidResult(kcenon::common::error_info{
1595 -1, "Database not initialized", "job_repository"});
1596 }
1597
1598 const char* sql = nullptr;
1599 if (status == client::job_status::failed) {
1600 sql = R"(
1601 UPDATE jobs SET
1602 status = ?,
1603 error_message = ?,
1604 error_details = ?
1605 WHERE job_id = ?
1606 )";
1607 } else {
1608 sql = "UPDATE jobs SET status = ? WHERE job_id = ?";
1609 }
1610
1611 sqlite3_stmt* stmt = nullptr;
1612 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1613 return VoidResult(kcenon::common::error_info{
1614 -1,
1615 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1616 "job_repository"});
1617 }
1618
1619 int idx = 1;
1620 sqlite3_bind_text(stmt, idx++, client::to_string(status), -1, SQLITE_STATIC);
1621
1622 if (status == client::job_status::failed) {
1623 sqlite3_bind_text(stmt, idx++, error_message.data(),
1624 static_cast<int>(error_message.size()),
1625 SQLITE_TRANSIENT);
1626 sqlite3_bind_text(stmt, idx++, error_details.data(),
1627 static_cast<int>(error_details.size()),
1628 SQLITE_TRANSIENT);
1629 }
1630
1631 sqlite3_bind_text(stmt, idx++, job_id.data(),
1632 static_cast<int>(job_id.size()), SQLITE_TRANSIENT);
1633
1634 auto rc = sqlite3_step(stmt);
1635 sqlite3_finalize(stmt);
1637 if (rc != SQLITE_DONE) {
1638 return VoidResult(kcenon::common::error_info{
1639 -1, "Failed to update status: " + std::string(sqlite3_errmsg(db_)),
1640 "job_repository"});
1641 }
1642
1643 return kcenon::common::ok();
1644}
1645
1646VoidResult job_repository::update_progress(std::string_view job_id,
1647 const client::job_progress& progress) {
1648 if (!db_) {
1649 return VoidResult(kcenon::common::error_info{
1650 -1, "Database not initialized", "job_repository"});
1651 }
1652
1653 static constexpr const char* sql = R"(
1654 UPDATE jobs SET
1655 total_items = ?,
1656 completed_items = ?,
1657 failed_items = ?,
1658 skipped_items = ?,
1659 bytes_transferred = ?,
1660 current_item = ?,
1661 current_item_description = ?
1662 WHERE job_id = ?
1663 )";
1664
1665 sqlite3_stmt* stmt = nullptr;
1666 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1667 return VoidResult(kcenon::common::error_info{
1668 -1,
1669 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1670 "job_repository"});
1671 }
1672
1673 int idx = 1;
1674 sqlite3_bind_int64(stmt, idx++,
1675 static_cast<int64_t>(progress.total_items));
1676 sqlite3_bind_int64(stmt, idx++,
1677 static_cast<int64_t>(progress.completed_items));
1678 sqlite3_bind_int64(stmt, idx++,
1679 static_cast<int64_t>(progress.failed_items));
1680 sqlite3_bind_int64(stmt, idx++,
1681 static_cast<int64_t>(progress.skipped_items));
1682 sqlite3_bind_int64(stmt, idx++,
1683 static_cast<int64_t>(progress.bytes_transferred));
1684 sqlite3_bind_text(stmt, idx++, progress.current_item.c_str(), -1,
1685 SQLITE_TRANSIENT);
1686 sqlite3_bind_text(stmt, idx++, progress.current_item_description.c_str(),
1687 -1, SQLITE_TRANSIENT);
1688 sqlite3_bind_text(stmt, idx++, job_id.data(),
1689 static_cast<int>(job_id.size()), SQLITE_TRANSIENT);
1690
1691 auto rc = sqlite3_step(stmt);
1692 sqlite3_finalize(stmt);
1694 if (rc != SQLITE_DONE) {
1695 return VoidResult(kcenon::common::error_info{
1696 -1, "Failed to update progress: " + std::string(sqlite3_errmsg(db_)),
1697 "job_repository"});
1698 }
1699
1700 return kcenon::common::ok();
1701}
1702
1703VoidResult job_repository::mark_started(std::string_view job_id) {
1704 if (!db_) {
1705 return VoidResult(kcenon::common::error_info{
1706 -1, "Database not initialized", "job_repository"});
1707 }
1708
1709 static constexpr const char* sql = R"(
1710 UPDATE jobs SET
1711 status = 'running',
1712 started_at = CURRENT_TIMESTAMP
1713 WHERE job_id = ?
1714 )";
1715
1716 sqlite3_stmt* stmt = nullptr;
1717 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1718 return VoidResult(kcenon::common::error_info{
1719 -1,
1720 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1721 "job_repository"});
1722 }
1723
1724 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1725 SQLITE_TRANSIENT);
1726
1727 auto rc = sqlite3_step(stmt);
1728 sqlite3_finalize(stmt);
1730 if (rc != SQLITE_DONE) {
1731 return VoidResult(kcenon::common::error_info{
1732 -1, "Failed to mark started: " + std::string(sqlite3_errmsg(db_)),
1733 "job_repository"});
1734 }
1735
1736 return kcenon::common::ok();
1737}
1738
1739VoidResult job_repository::mark_completed(std::string_view job_id) {
1740 if (!db_) {
1741 return VoidResult(kcenon::common::error_info{
1742 -1, "Database not initialized", "job_repository"});
1743 }
1744
1745 static constexpr const char* sql = R"(
1746 UPDATE jobs SET
1747 status = 'completed',
1748 completed_at = CURRENT_TIMESTAMP
1749 WHERE job_id = ?
1750 )";
1751
1752 sqlite3_stmt* stmt = nullptr;
1753 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1754 return VoidResult(kcenon::common::error_info{
1755 -1,
1756 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1757 "job_repository"});
1758 }
1759
1760 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1761 SQLITE_TRANSIENT);
1762
1763 auto rc = sqlite3_step(stmt);
1764 sqlite3_finalize(stmt);
1766 if (rc != SQLITE_DONE) {
1767 return VoidResult(kcenon::common::error_info{
1768 -1, "Failed to mark completed: " + std::string(sqlite3_errmsg(db_)),
1769 "job_repository"});
1770 }
1771
1772 return kcenon::common::ok();
1773}
1774
1775VoidResult job_repository::mark_failed(std::string_view job_id,
1776 std::string_view error_message,
1777 std::string_view error_details) {
1778 if (!db_) {
1779 return VoidResult(kcenon::common::error_info{
1780 -1, "Database not initialized", "job_repository"});
1781 }
1782
1783 static constexpr const char* sql = R"(
1784 UPDATE jobs SET
1785 status = 'failed',
1786 error_message = ?,
1787 error_details = ?,
1788 retry_count = retry_count + 1,
1789 completed_at = CURRENT_TIMESTAMP
1790 WHERE job_id = ?
1791 )";
1792
1793 sqlite3_stmt* stmt = nullptr;
1794 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1795 return VoidResult(kcenon::common::error_info{
1796 -1,
1797 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1798 "job_repository"});
1799 }
1800
1801 sqlite3_bind_text(stmt, 1, error_message.data(),
1802 static_cast<int>(error_message.size()), SQLITE_TRANSIENT);
1803 sqlite3_bind_text(stmt, 2, error_details.data(),
1804 static_cast<int>(error_details.size()), SQLITE_TRANSIENT);
1805 sqlite3_bind_text(stmt, 3, job_id.data(), static_cast<int>(job_id.size()),
1806 SQLITE_TRANSIENT);
1807
1808 auto rc = sqlite3_step(stmt);
1809 sqlite3_finalize(stmt);
1811 if (rc != SQLITE_DONE) {
1812 return VoidResult(kcenon::common::error_info{
1813 -1, "Failed to mark failed: " + std::string(sqlite3_errmsg(db_)),
1814 "job_repository"});
1815 }
1816
1817 return kcenon::common::ok();
1818}
1819
1820VoidResult job_repository::increment_retry(std::string_view job_id) {
1821 if (!db_) {
1822 return VoidResult(kcenon::common::error_info{
1823 -1, "Database not initialized", "job_repository"});
1824 }
1825
1826 static constexpr const char* sql = R"(
1827 UPDATE jobs SET retry_count = retry_count + 1 WHERE job_id = ?
1828 )";
1829
1830 sqlite3_stmt* stmt = nullptr;
1831 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1832 return VoidResult(kcenon::common::error_info{
1833 -1,
1834 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1835 "job_repository"});
1836 }
1837
1838 sqlite3_bind_text(stmt, 1, job_id.data(), static_cast<int>(job_id.size()),
1839 SQLITE_TRANSIENT);
1840
1841 auto rc = sqlite3_step(stmt);
1842 sqlite3_finalize(stmt);
1843
1844 if (rc != SQLITE_DONE) {
1845 return VoidResult(kcenon::common::error_info{
1846 -1, "Failed to increment retry: " + std::string(sqlite3_errmsg(db_)),
1847 "job_repository"});
1848 }
1849
1850 return kcenon::common::ok();
1851}
1852
1853// =============================================================================
1854// Statistics
1855// =============================================================================
1856
1857size_t job_repository::count() const {
1858 if (!db_) return 0;
1859
1860 static constexpr const char* sql = "SELECT COUNT(*) FROM jobs";
1861
1862 sqlite3_stmt* stmt = nullptr;
1863 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1864 return 0;
1865 }
1867 size_t result = 0;
1868 if (sqlite3_step(stmt) == SQLITE_ROW) {
1869 result = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1870 }
1871
1872 sqlite3_finalize(stmt);
1873 return result;
1874}
1875
1877 if (!db_) return 0;
1878
1879 static constexpr const char* sql =
1880 "SELECT COUNT(*) FROM jobs WHERE status = ?";
1881
1882 sqlite3_stmt* stmt = nullptr;
1883 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1884 return 0;
1885 }
1886
1887 sqlite3_bind_text(stmt, 1, client::to_string(status), -1, SQLITE_STATIC);
1889 size_t result = 0;
1890 if (sqlite3_step(stmt) == SQLITE_ROW) {
1891 result = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1892 }
1893
1894 sqlite3_finalize(stmt);
1895 return result;
1896}
1897
1899 if (!db_) return 0;
1900
1901 static constexpr const char* sql = R"(
1902 SELECT COUNT(*) FROM jobs
1903 WHERE status = 'completed'
1904 AND date(completed_at) = date('now')
1905 )";
1906
1907 sqlite3_stmt* stmt = nullptr;
1908 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1909 return 0;
1910 }
1912 size_t result = 0;
1913 if (sqlite3_step(stmt) == SQLITE_ROW) {
1914 result = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1915 }
1916
1917 sqlite3_finalize(stmt);
1918 return result;
1919}
1920
1922 if (!db_) return 0;
1923
1924 static constexpr const char* sql = R"(
1925 SELECT COUNT(*) FROM jobs
1926 WHERE status = 'failed'
1927 AND date(completed_at) = date('now')
1928 )";
1929
1930 sqlite3_stmt* stmt = nullptr;
1931 if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
1932 return 0;
1933 }
1934
1935 size_t result = 0;
1936 if (sqlite3_step(stmt) == SQLITE_ROW) {
1937 result = static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1939
1940 sqlite3_finalize(stmt);
1941 return result;
1942}
1943
1944// =============================================================================
1945// Database Information
1946// =============================================================================
1947
1948bool job_repository::is_valid() const noexcept { return db_ != nullptr; }
1949
1950// =============================================================================
1951// Private Implementation
1952// =============================================================================
1953
1954client::job_record job_repository::parse_row(void* stmt_ptr) const {
1955 auto* stmt = static_cast<sqlite3_stmt*>(stmt_ptr);
1957
1958 int col = 0;
1959 job.pk = get_int64_column(stmt, col++);
1960 job.job_id = get_text_column(stmt, col++);
1961 job.type = client::job_type_from_string(get_text_column(stmt, col++));
1962 job.status = client::job_status_from_string(get_text_column(stmt, col++));
1963 job.priority = client::job_priority_from_int(get_int_column(stmt, col++));
1964
1965 job.source_node_id = get_text_column(stmt, col++);
1966 job.destination_node_id = get_text_column(stmt, col++);
1967
1968 job.patient_id = get_optional_text(stmt, col++);
1969 job.study_uid = get_optional_text(stmt, col++);
1970 job.series_uid = get_optional_text(stmt, col++);
1971 job.sop_instance_uid = get_optional_text(stmt, col++);
1972
1973 auto uids_json = get_text_column(stmt, col++);
1974 job.instance_uids = deserialize_instance_uids(uids_json);
1975
1976 job.progress.total_items =
1977 static_cast<size_t>(get_int64_column(stmt, col++));
1978 job.progress.completed_items =
1979 static_cast<size_t>(get_int64_column(stmt, col++));
1980 job.progress.failed_items =
1981 static_cast<size_t>(get_int64_column(stmt, col++));
1982 job.progress.skipped_items =
1983 static_cast<size_t>(get_int64_column(stmt, col++));
1984 job.progress.bytes_transferred =
1985 static_cast<size_t>(get_int64_column(stmt, col++));
1986 job.progress.current_item = get_text_column(stmt, col++);
1987 job.progress.current_item_description = get_text_column(stmt, col++);
1988 job.progress.calculate_percent();
1989
1990 job.error_message = get_text_column(stmt, col++);
1991 job.error_details = get_text_column(stmt, col++);
1992 job.retry_count = get_int_column(stmt, col++);
1993 job.max_retries = get_int_column(stmt, col++, 3);
1994
1995 job.created_by = get_text_column(stmt, col++);
1996
1997 auto metadata_json = get_text_column(stmt, col++);
1998 job.metadata = deserialize_metadata(metadata_json);
1999
2000 auto created_str = get_text_column(stmt, col++);
2001 job.created_at = from_timestamp_string(created_str.c_str());
2002
2003 auto queued_str = get_text_column(stmt, col++);
2004 job.queued_at = from_optional_timestamp(queued_str.c_str());
2005
2006 auto started_str = get_text_column(stmt, col++);
2007 job.started_at = from_optional_timestamp(started_str.c_str());
2008
2009 auto completed_str = get_text_column(stmt, col++);
2010 job.completed_at = from_optional_timestamp(completed_str.c_str());
2011
2012 return job;
2013}
2014
2015} // namespace kcenon::pacs::storage
2016
2017#endif // PACS_WITH_DATABASE_SYSTEM
Repository for job persistence (legacy SQLite interface)
auto cleanup_old_jobs(std::chrono::hours max_age) -> Result< size_t >
static auto deserialize_metadata(std::string_view json) -> std::unordered_map< std::string, std::string >
auto find_by_node(std::string_view node_id) const -> std::vector< client::job_record >
auto mark_completed(std::string_view job_id) -> VoidResult
static auto serialize_metadata(const std::unordered_map< std::string, std::string > &metadata) -> std::string
auto increment_retry(std::string_view job_id) -> VoidResult
auto remove(std::string_view job_id) -> VoidResult
auto update_status(std::string_view job_id, client::job_status status, std::string_view error_message="", std::string_view error_details="") -> VoidResult
auto mark_started(std::string_view job_id) -> VoidResult
auto find_by_status(client::job_status status, size_t limit=100) const -> std::vector< client::job_record >
auto find_pending_jobs(size_t limit=10) const -> std::vector< client::job_record >
static auto deserialize_instance_uids(std::string_view json) -> std::vector< std::string >
auto find_by_id(std::string_view job_id) const -> std::optional< client::job_record >
auto is_valid() const noexcept -> bool
auto mark_failed(std::string_view job_id, std::string_view error_message, std::string_view error_details="") -> VoidResult
static auto serialize_instance_uids(const std::vector< std::string > &uids) -> std::string
auto find_jobs(const job_query_options &options={}) const -> std::vector< client::job_record >
auto exists(std::string_view job_id) const -> bool
auto count_by_status(client::job_status status) const -> size_t
auto parse_row(void *stmt) const -> client::job_record
auto update_progress(std::string_view job_id, const client::job_progress &progress) -> VoidResult
auto find_by_pk(int64_t pk) const -> std::optional< client::job_record >
Compatibility header providing kcenon::pacs::compat::format as an alias for std::format.
Repository for job persistence using base_repository pattern.
job_priority job_priority_from_int(int value) noexcept
Parse job_priority from integer.
Definition job_types.h:194
job_type job_type_from_string(std::string_view str) noexcept
Parse job_type from string.
Definition job_types.h:72
job_status job_status_from_string(std::string_view str) noexcept
Parse job_status from string.
Definition job_types.h:123
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
job_status
Current status of a job.
Definition job_types.h:90
@ failed
Job failed with error.
constexpr dicom_tag status
Status.
@ move
C-MOVE move request/response.
Progress tracking for a job.
Definition job_types.h:211
size_t failed_items
Failed items.
Definition job_types.h:214
std::string current_item
Current SOP Instance UID being processed.
Definition job_types.h:220
size_t skipped_items
Skipped items.
Definition job_types.h:215
std::string current_item_description
Human-readable description.
Definition job_types.h:221
size_t bytes_transferred
Total bytes transferred.
Definition job_types.h:216
size_t completed_items
Successfully completed items.
Definition job_types.h:213
size_t total_items
Total number of items to process.
Definition job_types.h:212
Complete job record with all metadata.
Definition job_types.h:255
Query options for listing jobs.
std::optional< std::string > node_id
Filter by source or destination node.
std::optional< client::job_status > status
Filter by status.