23#ifdef PACS_WITH_DATABASE_SYSTEM
32 : base_repository(std::
move(db),
"jobs",
"job_id") {}
38std::string job_repository::serialize_instance_uids(
39 const std::vector<std::string>& uids) {
40 if (uids.empty())
return "[]";
42 std::ostringstream oss;
44 for (
size_t i = 0; i < uids.size(); ++i) {
45 if (i > 0) oss <<
",";
47 for (
char c : uids[i]) {
61std::vector<std::string> job_repository::deserialize_instance_uids(
62 std::string_view json) {
63 std::vector<std::string> result;
64 if (json.empty() || json ==
"[]")
return result;
67 while (pos < json.size()) {
68 auto start = json.find(
'"', pos);
69 if (start == std::string_view::npos)
break;
71 size_t end = start + 1;
72 while (end < json.size()) {
73 if (json[end] ==
'\\' && end + 1 < json.size()) {
75 }
else if (json[end] ==
'"') {
82 if (end < json.size()) {
83 std::string value{json.substr(start + 1, end - start - 1)};
84 std::string unescaped;
85 for (
size_t i = 0; i < value.size(); ++i) {
86 if (value[i] ==
'\\' && i + 1 < value.size()) {
87 unescaped += value[++i];
89 unescaped += value[i];
92 result.push_back(std::move(unescaped));
101std::string job_repository::serialize_metadata(
102 const std::unordered_map<std::string, std::string>& metadata) {
103 if (metadata.empty())
return "{}";
105 std::ostringstream oss;
108 for (
const auto& [key, value] : metadata) {
109 if (!first) oss <<
",";
123 for (
char c : value) {
137std::unordered_map<std::string, std::string> job_repository::deserialize_metadata(
138 std::string_view json) {
139 std::unordered_map<std::string, std::string> result;
140 if (json.empty() || json ==
"{}")
return result;
143 while (pos < json.size()) {
144 auto key_start = json.find(
'"', pos);
145 if (key_start == std::string_view::npos)
break;
147 size_t key_end = key_start + 1;
148 while (key_end < json.size() && json[key_end] !=
'"') {
149 if (json[key_end] ==
'\\') ++key_end;
152 if (key_end >= json.size())
break;
154 std::string key{json.substr(key_start + 1, key_end - key_start - 1)};
156 auto val_start = json.find(
'"', key_end + 1);
157 if (val_start == std::string_view::npos)
break;
159 size_t val_end = val_start + 1;
160 while (val_end < json.size() && json[val_end] !=
'"') {
161 if (json[val_end] ==
'\\') ++val_end;
165 if (val_end < json.size()) {
166 std::string value{json.substr(val_start + 1, val_end - val_start - 1)};
180auto job_repository::parse_timestamp(
const std::string& str)
const
181 -> std::chrono::system_clock::time_point {
187 if (std::sscanf(str.c_str(),
"%d-%d-%d %d:%d:%d", &tm.tm_year, &tm.tm_mon,
188 &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
196 auto time = _mkgmtime(&tm);
198 auto time = timegm(&tm);
201 return std::chrono::system_clock::from_time_t(time);
204auto job_repository::format_timestamp(
205 std::chrono::system_clock::time_point tp)
const -> std::string {
206 if (tp == std::chrono::system_clock::time_point{}) {
210 auto time = std::chrono::system_clock::to_time_t(tp);
213 gmtime_s(&tm, &time);
215 gmtime_r(&time, &tm);
219 std::strftime(buf,
sizeof(buf),
"%Y-%m-%d %H:%M:%S", &tm);
223auto job_repository::format_optional_timestamp(
224 const std::optional<std::chrono::system_clock::time_point>& tp)
const
226 if (!tp.has_value()) {
229 return format_timestamp(tp.value());
236auto job_repository::find_by_pk(int64_t pk) -> result_type {
237 if (!db() || !db()->is_connected()) {
238 return result_type(kcenon::common::error_info{
239 -1,
"Database not connected",
"storage"});
242 auto builder = query_builder();
243 builder.select(select_columns())
245 .where(
"pk",
"=", pk)
248 auto result = storage_session().select(builder.build());
249 if (result.is_err()) {
250 return result_type(result.error());
253 if (result.value().empty()) {
254 return result_type(kcenon::common::error_info{
255 -1,
"Job not found with pk=" + std::to_string(pk),
"storage"});
258 return result_type(map_row_to_entity(result.value()[0]));
261auto job_repository::find_jobs(
const job_query_options& options)
262 -> list_result_type {
263 if (!db() || !db()->is_connected()) {
264 return list_result_type(kcenon::common::error_info{
265 -1,
"Database not connected",
"storage"});
268 auto builder = query_builder();
269 builder.select(select_columns()).from(table_name());
271 std::optional<database::query_condition> condition;
273 if (options.status.has_value()) {
274 auto cond = database::query_condition(
275 "status",
"=", std::string(client::to_string(options.status.value())));
279 if (options.type.has_value()) {
280 auto cond = database::query_condition(
281 "type",
"=", std::string(client::to_string(options.type.value())));
282 if (condition.has_value()) {
283 condition = condition.value() && cond;
289 if (options.created_by.has_value()) {
290 auto cond = database::query_condition(
291 "created_by",
"=", options.created_by.value());
292 if (condition.has_value()) {
293 condition = condition.value() && cond;
299 if (condition.has_value()) {
300 builder.where(condition.value());
303 if (options.order_by_priority) {
304 builder.order_by(
"priority", database::sort_order::desc);
305 builder.order_by(
"created_at", database::sort_order::asc);
307 builder.order_by(
"created_at", database::sort_order::desc);
310 if (options.limit > 0) {
311 builder.limit(options.limit);
312 if (options.offset > 0) {
313 builder.offset(options.offset);
317 auto result = storage_session().select(builder.build());
318 if (result.is_err()) {
319 return list_result_type(result.error());
322 std::vector<client::job_record> records;
323 records.reserve(result.value().size());
324 for (
const auto& row : result.value()) {
325 records.push_back(map_row_to_entity(row));
328 return list_result_type(std::move(records));
331auto job_repository::find_by_status(client::job_status status,
size_t limit)
332 -> list_result_type {
333 job_query_options options;
335 options.limit = limit;
336 return find_jobs(options);
339auto job_repository::find_pending_jobs(
size_t limit) -> list_result_type {
340 if (!db() || !db()->is_connected()) {
341 return list_result_type(kcenon::common::error_info{
342 -1,
"Database not connected",
"storage"});
345 auto builder = query_builder();
346 auto pending_cond = database::query_condition(
347 "status",
"=", std::string(
"pending"));
348 auto queued_cond = database::query_condition(
349 "status",
"=", std::string(
"queued"));
351 builder.select(select_columns())
353 .where(pending_cond || queued_cond)
354 .order_by(
"priority", database::sort_order::desc)
355 .order_by(
"created_at", database::sort_order::asc)
358 auto result = storage_session().select(builder.build());
359 if (result.is_err()) {
360 return list_result_type(result.error());
363 std::vector<client::job_record> records;
364 records.reserve(result.value().size());
365 for (
const auto& row : result.value()) {
366 records.push_back(map_row_to_entity(row));
369 return list_result_type(std::move(records));
372auto job_repository::find_by_node(std::string_view node_id) -> list_result_type {
373 if (!db() || !db()->is_connected()) {
374 return list_result_type(kcenon::common::error_info{
375 -1,
"Database not connected",
"storage"});
378 auto builder = query_builder();
379 auto source_cond = database::query_condition(
380 "source_node_id",
"=", std::string(node_id));
381 auto dest_cond = database::query_condition(
382 "destination_node_id",
"=", std::string(node_id));
384 builder.select(select_columns())
386 .where(source_cond || dest_cond)
387 .order_by(
"created_at", database::sort_order::desc);
389 auto result = storage_session().select(builder.build());
390 if (result.is_err()) {
391 return list_result_type(result.error());
394 std::vector<client::job_record> records;
395 records.reserve(result.value().size());
396 for (
const auto& row : result.value()) {
397 records.push_back(map_row_to_entity(row));
400 return list_result_type(std::move(records));
403auto job_repository::cleanup_old_jobs(std::chrono::hours max_age)
405 if (!db() || !db()->is_connected()) {
406 return kcenon::common::make_error<size_t>(
407 -1,
"Database not connected",
"storage");
410 auto cutoff = std::chrono::system_clock::now() - max_age;
411 auto cutoff_str = format_timestamp(cutoff);
413 auto completed_cond = database::query_condition(
414 "status",
"=", std::string(
"completed"));
415 auto failed_cond = database::query_condition(
416 "status",
"=", std::string(
"failed"));
417 auto cancelled_cond = database::query_condition(
418 "status",
"=", std::string(
"cancelled"));
419 auto date_cond = database::query_condition(
420 "completed_at",
"<", cutoff_str);
422 auto status_cond = completed_cond || failed_cond || cancelled_cond;
423 auto final_cond = status_cond && date_cond;
425 auto builder = query_builder();
426 builder.delete_from(table_name()).where(final_cond);
428 auto result = storage_session().remove(builder.build());
429 if (result.is_err()) {
430 return kcenon::common::make_error<size_t>(
431 -1, result.error().message,
"storage");
434 return kcenon::common::ok(
static_cast<size_t>(result.value()));
441auto job_repository::update_status(std::string_view job_id,
442 client::job_status status,
443 std::string_view error_message,
444 std::string_view error_details) -> VoidResult {
445 if (!db() || !db()->is_connected()) {
446 return VoidResult(kcenon::common::error_info{
447 -1,
"Database not connected",
"storage"});
450 auto builder = query_builder();
451 builder.update(table_name())
452 .set(
"status", std::string(client::to_string(status)));
454 if (status == client::job_status::failed) {
455 builder.set(
"error_message", std::string(error_message))
456 .set(
"error_details", std::string(error_details));
459 builder.where(
"job_id",
"=", std::string(job_id));
461 auto result = storage_session().execute(builder.build());
462 if (result.is_err()) {
463 return VoidResult(result.error());
466 return kcenon::common::ok();
469auto job_repository::update_progress(std::string_view job_id,
470 const client::job_progress& progress)
472 if (!db() || !db()->is_connected()) {
473 return VoidResult(kcenon::common::error_info{
474 -1,
"Database not connected",
"storage"});
477 auto builder = query_builder();
478 builder.update(table_name())
479 .set(
"total_items",
static_cast<int64_t
>(progress.total_items))
480 .set(
"completed_items",
static_cast<int64_t
>(progress.completed_items))
481 .set(
"failed_items",
static_cast<int64_t
>(progress.failed_items))
482 .set(
"skipped_items",
static_cast<int64_t
>(progress.skipped_items))
483 .set(
"bytes_transferred",
static_cast<int64_t
>(progress.bytes_transferred))
484 .set(
"current_item", progress.current_item)
485 .set(
"current_item_description", progress.current_item_description)
486 .where(
"job_id",
"=", std::string(job_id));
488 auto result = storage_session().execute(builder.build());
489 if (result.is_err()) {
490 return VoidResult(result.error());
493 return kcenon::common::ok();
496auto job_repository::mark_started(std::string_view job_id) -> VoidResult {
497 if (!db() || !db()->is_connected()) {
498 return VoidResult(kcenon::common::error_info{
499 -1,
"Database not connected",
"storage"});
502 auto now_str = format_timestamp(std::chrono::system_clock::now());
504 auto builder = query_builder();
505 builder.update(table_name())
506 .set(
"status", std::string(
"running"))
507 .set(
"started_at", now_str)
508 .where(
"job_id",
"=", std::string(job_id));
510 auto result = storage_session().execute(builder.build());
511 if (result.is_err()) {
512 return VoidResult(result.error());
515 return kcenon::common::ok();
518auto job_repository::mark_completed(std::string_view job_id) -> VoidResult {
519 if (!db() || !db()->is_connected()) {
520 return VoidResult(kcenon::common::error_info{
521 -1,
"Database not connected",
"storage"});
524 auto now_str = format_timestamp(std::chrono::system_clock::now());
526 auto builder = query_builder();
527 builder.update(table_name())
528 .set(
"status", std::string(
"completed"))
529 .set(
"completed_at", now_str)
530 .where(
"job_id",
"=", std::string(job_id));
532 auto result = storage_session().execute(builder.build());
533 if (result.is_err()) {
534 return VoidResult(result.error());
537 return kcenon::common::ok();
540auto job_repository::mark_failed(std::string_view job_id,
541 std::string_view error_message,
542 std::string_view error_details) -> VoidResult {
543 if (!db() || !db()->is_connected()) {
544 return VoidResult(kcenon::common::error_info{
545 -1,
"Database not connected",
"storage"});
548 auto now_str = format_timestamp(std::chrono::system_clock::now());
551 auto find_result = find_by_id(std::string(job_id));
552 int current_retry = 0;
553 if (find_result.is_ok()) {
554 current_retry = find_result.value().retry_count;
557 auto builder = query_builder();
558 builder.update(table_name())
559 .set(
"status", std::string(
"failed"))
560 .set(
"error_message", std::string(error_message))
561 .set(
"error_details", std::string(error_details))
562 .set(
"retry_count",
static_cast<int64_t
>(current_retry + 1))
563 .set(
"completed_at", now_str)
564 .where(
"job_id",
"=", std::string(job_id));
566 auto result = storage_session().execute(builder.build());
567 if (result.is_err()) {
568 return VoidResult(result.error());
571 return kcenon::common::ok();
574auto job_repository::increment_retry(std::string_view job_id) -> VoidResult {
575 if (!db() || !db()->is_connected()) {
576 return VoidResult(kcenon::common::error_info{
577 -1,
"Database not connected",
"storage"});
581 auto find_result = find_by_id(std::string(job_id));
582 int current_retry = 0;
583 if (find_result.is_ok()) {
584 current_retry = find_result.value().retry_count;
587 auto builder = query_builder();
588 builder.update(table_name())
589 .set(
"retry_count",
static_cast<int64_t
>(current_retry + 1))
590 .where(
"job_id",
"=", std::string(job_id));
592 auto result = storage_session().execute(builder.build());
593 if (result.is_err()) {
594 return VoidResult(result.error());
597 return kcenon::common::ok();
604auto job_repository::count_by_status(client::job_status status)
606 if (!db() || !db()->is_connected()) {
607 return kcenon::common::make_error<size_t>(
608 -1,
"Database not connected",
"storage");
611 auto result = storage_session().select(kcenon::pacs::compat::format(
612 "SELECT COUNT(*) as count FROM {} WHERE status = '{}'",
613 table_name(), std::string(client::to_string(status))));
614 if (result.is_err()) {
615 return Result<size_t>(result.error());
618 if (result.value().empty()) {
619 return kcenon::common::ok(
static_cast<size_t>(0));
622 return kcenon::common::ok(
623 static_cast<size_t>(std::stoull(result.value()[0].at(
"count"))));
626auto job_repository::count_completed_today() -> Result<size_t> {
627 if (!db() || !db()->is_connected()) {
628 return kcenon::common::make_error<size_t>(
629 -1,
"Database not connected",
"storage");
633 auto now = std::chrono::system_clock::now();
634 auto today_str = format_timestamp(now).substr(0, 10);
638 auto result = storage_session().select(kcenon::pacs::compat::format(
639 "SELECT COUNT(*) as count FROM {} WHERE status = 'completed'",
641 if (result.is_err()) {
642 return Result<size_t>(result.error());
645 if (result.value().empty()) {
646 return kcenon::common::ok(
static_cast<size_t>(0));
649 return kcenon::common::ok(
650 static_cast<size_t>(std::stoull(result.value()[0].at(
"count"))));
653auto job_repository::count_failed_today() -> Result<size_t> {
654 if (!db() || !db()->is_connected()) {
655 return kcenon::common::make_error<size_t>(
656 -1,
"Database not connected",
"storage");
659 auto result = storage_session().select(kcenon::pacs::compat::format(
660 "SELECT COUNT(*) as count FROM {} WHERE status = 'failed'",
662 if (result.is_err()) {
663 return Result<size_t>(result.error());
666 if (result.value().empty()) {
667 return kcenon::common::ok(
static_cast<size_t>(0));
670 return kcenon::common::ok(
671 static_cast<size_t>(std::stoull(result.value()[0].at(
"count"))));
678auto job_repository::map_row_to_entity(
const database_row& row)
const
679 -> client::job_record {
680 client::job_record
job;
683 auto pk_it = row.find(
"pk");
684 if (pk_it != row.end() && !pk_it->second.empty()) {
685 job.pk = std::stoll(pk_it->second);
688 job.job_id = row.at(
"job_id");
689 job.type = client::job_type_from_string(row.at(
"type"));
690 job.status = client::job_status_from_string(row.at(
"status"));
692 auto priority_it = row.find(
"priority");
693 if (priority_it != row.end() && !priority_it->second.empty()) {
694 job.priority = client::job_priority_from_int(std::stoi(priority_it->second));
697 job.source_node_id = row.at(
"source_node_id");
698 job.destination_node_id = row.at(
"destination_node_id");
701 auto patient_id_it = row.find(
"patient_id");
702 if (patient_id_it != row.end() && !patient_id_it->second.empty()) {
703 job.patient_id = patient_id_it->second;
706 auto study_uid_it = row.find(
"study_uid");
707 if (study_uid_it != row.end() && !study_uid_it->second.empty()) {
708 job.study_uid = study_uid_it->second;
711 auto series_uid_it = row.find(
"series_uid");
712 if (series_uid_it != row.end() && !series_uid_it->second.empty()) {
713 job.series_uid = series_uid_it->second;
716 auto sop_uid_it = row.find(
"sop_instance_uid");
717 if (sop_uid_it != row.end() && !sop_uid_it->second.empty()) {
718 job.sop_instance_uid = sop_uid_it->second;
721 auto uids_it = row.find(
"instance_uids_json");
722 if (uids_it != row.end() && !uids_it->second.empty()) {
723 job.instance_uids = deserialize_instance_uids(uids_it->second);
727 auto total_it = row.find(
"total_items");
728 if (total_it != row.end() && !total_it->second.empty()) {
729 job.progress.total_items =
static_cast<size_t>(std::stoll(total_it->second));
732 auto completed_it = row.find(
"completed_items");
733 if (completed_it != row.end() && !completed_it->second.empty()) {
734 job.progress.completed_items =
static_cast<size_t>(std::stoll(completed_it->second));
737 auto failed_it = row.find(
"failed_items");
738 if (failed_it != row.end() && !failed_it->second.empty()) {
739 job.progress.failed_items =
static_cast<size_t>(std::stoll(failed_it->second));
742 auto skipped_it = row.find(
"skipped_items");
743 if (skipped_it != row.end() && !skipped_it->second.empty()) {
744 job.progress.skipped_items =
static_cast<size_t>(std::stoll(skipped_it->second));
747 auto bytes_it = row.find(
"bytes_transferred");
748 if (bytes_it != row.end() && !bytes_it->second.empty()) {
749 job.progress.bytes_transferred =
static_cast<size_t>(std::stoll(bytes_it->second));
752 auto current_it = row.find(
"current_item");
753 if (current_it != row.end()) {
754 job.progress.current_item = current_it->second;
757 auto current_desc_it = row.find(
"current_item_description");
758 if (current_desc_it != row.end()) {
759 job.progress.current_item_description = current_desc_it->second;
762 job.progress.calculate_percent();
765 auto error_msg_it = row.find(
"error_message");
766 if (error_msg_it != row.end()) {
767 job.error_message = error_msg_it->second;
770 auto error_det_it = row.find(
"error_details");
771 if (error_det_it != row.end()) {
772 job.error_details = error_det_it->second;
775 auto retry_it = row.find(
"retry_count");
776 if (retry_it != row.end() && !retry_it->second.empty()) {
777 job.retry_count = std::stoi(retry_it->second);
780 auto max_retry_it = row.find(
"max_retries");
781 if (max_retry_it != row.end() && !max_retry_it->second.empty()) {
782 job.max_retries = std::stoi(max_retry_it->second);
788 auto created_by_it = row.find(
"created_by");
789 if (created_by_it != row.end()) {
790 job.created_by = created_by_it->second;
793 auto metadata_it = row.find(
"metadata_json");
794 if (metadata_it != row.end() && !metadata_it->second.empty()) {
795 job.metadata = deserialize_metadata(metadata_it->second);
799 auto created_at_it = row.find(
"created_at");
800 if (created_at_it != row.end() && !created_at_it->second.empty()) {
801 job.created_at = parse_timestamp(created_at_it->second);
804 auto queued_it = row.find(
"queued_at");
805 if (queued_it != row.end() && !queued_it->second.empty()) {
806 auto tp = parse_timestamp(queued_it->second);
807 if (tp != std::chrono::system_clock::time_point{}) {
812 auto started_it = row.find(
"started_at");
813 if (started_it != row.end() && !started_it->second.empty()) {
814 auto tp = parse_timestamp(started_it->second);
815 if (tp != std::chrono::system_clock::time_point{}) {
820 auto completed_at_it = row.find(
"completed_at");
821 if (completed_at_it != row.end() && !completed_at_it->second.empty()) {
822 auto tp = parse_timestamp(completed_at_it->second);
823 if (tp != std::chrono::system_clock::time_point{}) {
824 job.completed_at = tp;
831auto job_repository::entity_to_row(
const client::job_record& entity)
const
832 -> std::map<std::string, database_value> {
833 std::map<std::string, database_value> row;
835 row[
"job_id"] = entity.job_id;
836 row[
"type"] = std::string(client::to_string(entity.type));
837 row[
"status"] = std::string(client::to_string(entity.status));
838 row[
"priority"] =
static_cast<int64_t
>(entity.priority);
840 row[
"source_node_id"] = entity.source_node_id;
841 row[
"destination_node_id"] = entity.destination_node_id;
843 row[
"patient_id"] = entity.patient_id.value_or(
"");
844 row[
"study_uid"] = entity.study_uid.value_or(
"");
845 row[
"series_uid"] = entity.series_uid.value_or(
"");
846 row[
"sop_instance_uid"] = entity.sop_instance_uid.value_or(
"");
847 row[
"instance_uids_json"] = serialize_instance_uids(entity.instance_uids);
849 row[
"total_items"] =
static_cast<int64_t
>(entity.progress.total_items);
850 row[
"completed_items"] =
static_cast<int64_t
>(entity.progress.completed_items);
851 row[
"failed_items"] =
static_cast<int64_t
>(entity.progress.failed_items);
852 row[
"skipped_items"] =
static_cast<int64_t
>(entity.progress.skipped_items);
853 row[
"bytes_transferred"] =
static_cast<int64_t
>(entity.progress.bytes_transferred);
854 row[
"current_item"] = entity.progress.current_item;
855 row[
"current_item_description"] = entity.progress.current_item_description;
857 row[
"error_message"] = entity.error_message;
858 row[
"error_details"] = entity.error_details;
859 row[
"retry_count"] =
static_cast<int64_t
>(entity.retry_count);
860 row[
"max_retries"] =
static_cast<int64_t
>(entity.max_retries);
862 row[
"created_by"] = entity.created_by;
863 row[
"metadata_json"] = serialize_metadata(entity.metadata);
865 if (entity.created_at != std::chrono::system_clock::time_point{}) {
866 row[
"created_at"] = format_timestamp(entity.created_at);
868 row[
"created_at"] = format_timestamp(std::chrono::system_clock::now());
871 row[
"queued_at"] = format_optional_timestamp(entity.queued_at);
872 row[
"started_at"] = format_optional_timestamp(entity.started_at);
873 row[
"completed_at"] = format_optional_timestamp(entity.completed_at);
878auto job_repository::get_pk(
const client::job_record& entity)
const
880 return entity.job_id;
883auto job_repository::has_pk(
const client::job_record& entity)
const ->
bool {
884 return !entity.job_id.empty();
887auto job_repository::select_columns() const -> std::vector<std::
string> {
894 "destination_node_id",
899 "instance_uids_json",
906 "current_item_description",
934[[nodiscard]] std::string to_timestamp_string(
935 std::chrono::system_clock::time_point tp) {
936 if (tp == std::chrono::system_clock::time_point{}) {
939 auto time = std::chrono::system_clock::to_time_t(tp);
942 gmtime_s(&tm, &time);
944 gmtime_r(&time, &tm);
947 std::strftime(buf,
sizeof(buf),
"%Y-%m-%d %H:%M:%S", &tm);
952[[nodiscard]] std::chrono::system_clock::time_point from_timestamp_string(
954 if (!str || str[0] ==
'\0') {
958 if (std::sscanf(str,
"%d-%d-%d %d:%d:%d", &tm.tm_year, &tm.tm_mon,
959 &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
965 auto time = _mkgmtime(&tm);
967 auto time = timegm(&tm);
969 return std::chrono::system_clock::from_time_t(time);
973[[nodiscard]] std::optional<std::chrono::system_clock::time_point>
974from_optional_timestamp(
const char* str) {
975 if (!str || str[0] ==
'\0') {
978 auto tp = from_timestamp_string(str);
979 if (tp == std::chrono::system_clock::time_point{}) {
986[[nodiscard]] std::string get_text_column(sqlite3_stmt* stmt,
int col) {
987 auto text =
reinterpret_cast<const char*
>(sqlite3_column_text(stmt, col));
992[[nodiscard]]
int get_int_column(sqlite3_stmt* stmt,
int col,
993 int default_val = 0) {
994 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
997 return sqlite3_column_int(stmt, col);
1001[[nodiscard]] int64_t get_int64_column(sqlite3_stmt* stmt,
int col,
1002 int64_t default_val = 0) {
1003 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
1006 return sqlite3_column_int64(stmt, col);
1010[[nodiscard]] std::optional<std::string> get_optional_text(sqlite3_stmt* stmt,
1012 if (sqlite3_column_type(stmt, col) == SQLITE_NULL) {
1013 return std::nullopt;
1015 auto text =
reinterpret_cast<const char*
>(sqlite3_column_text(stmt, col));
1016 return text ? std::optional<std::string>{
text} : std::nullopt;
1020void bind_optional_text(sqlite3_stmt* stmt,
int idx,
1021 const std::optional<std::string>& value) {
1022 if (value.has_value()) {
1023 sqlite3_bind_text(stmt, idx, value->c_str(), -1, SQLITE_TRANSIENT);
1025 sqlite3_bind_null(stmt, idx);
1030void bind_optional_timestamp(
1031 sqlite3_stmt* stmt,
int idx,
1032 const std::optional<std::chrono::system_clock::time_point>& tp) {
1033 if (tp.has_value()) {
1034 auto str = to_timestamp_string(tp.value());
1035 sqlite3_bind_text(stmt, idx, str.c_str(), -1, SQLITE_TRANSIENT);
1037 sqlite3_bind_null(stmt, idx);
1048 const std::vector<std::string>& uids) {
1049 if (uids.empty())
return "[]";
1051 std::ostringstream oss;
1053 for (
size_t i = 0; i < uids.size(); ++i) {
1054 if (i > 0) oss <<
",";
1056 for (
char c : uids[i]) {
1071 std::string_view json) {
1072 std::vector<std::string> result;
1073 if (json.empty() || json ==
"[]")
return result;
1076 while (pos < json.size()) {
1077 auto start = json.find(
'"', pos);
1078 if (start == std::string_view::npos)
break;
1080 size_t end = start + 1;
1081 while (end < json.size()) {
1082 if (json[end] ==
'\\' && end + 1 < json.size()) {
1084 }
else if (json[end] ==
'"') {
1091 if (end < json.size()) {
1092 std::string value{json.substr(start + 1, end - start - 1)};
1093 std::string unescaped;
1094 for (
size_t i = 0; i < value.size(); ++i) {
1095 if (value[i] ==
'\\' && i + 1 < value.size()) {
1096 unescaped += value[++i];
1098 unescaped += value[i];
1101 result.push_back(std::move(unescaped));
1111 const std::unordered_map<std::string, std::string>& metadata) {
1112 if (metadata.empty())
return "{}";
1114 std::ostringstream oss;
1117 for (
const auto& [key, value] : metadata) {
1118 if (!first) oss <<
",";
1122 for (
char c : key) {
1132 for (
char c : value) {
1147 std::string_view json) {
1148 std::unordered_map<std::string, std::string> result;
1149 if (json.empty() || json ==
"{}")
return result;
1152 while (pos < json.size()) {
1153 auto key_start = json.find(
'"', pos);
1154 if (key_start == std::string_view::npos)
break;
1156 size_t key_end = key_start + 1;
1157 while (key_end < json.size() && json[key_end] !=
'"') {
1158 if (json[key_end] ==
'\\') ++key_end;
1161 if (key_end >= json.size())
break;
1163 std::string key{json.substr(key_start + 1, key_end - key_start - 1)};
1165 auto val_start = json.find(
'"', key_end + 1);
1166 if (val_start == std::string_view::npos)
break;
1168 size_t val_end = val_start + 1;
1169 while (val_end < json.size() && json[val_end] !=
'"') {
1170 if (json[val_end] ==
'\\') ++val_end;
1174 if (val_end < json.size()) {
1175 std::string value{json.substr(val_start + 1, val_end - val_start - 1)};
1176 result[key] = value;
1204 return VoidResult(kcenon::common::error_info{
1205 -1,
"Database not initialized",
"job_repository"});
1208 static constexpr const char* sql = R
"(
1210 job_id, type, status, priority,
1211 source_node_id, destination_node_id,
1212 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1213 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1214 current_item, current_item_description,
1215 error_message, error_details, retry_count, max_retries,
1216 created_by, metadata_json,
1217 created_at, queued_at, started_at, completed_at
1218 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1219 ON CONFLICT(job_id) DO UPDATE SET
1220 status = excluded.status,
1221 priority = excluded.priority,
1222 total_items = excluded.total_items,
1223 completed_items = excluded.completed_items,
1224 failed_items = excluded.failed_items,
1225 skipped_items = excluded.skipped_items,
1226 bytes_transferred = excluded.bytes_transferred,
1227 current_item = excluded.current_item,
1228 current_item_description = excluded.current_item_description,
1229 error_message = excluded.error_message,
1230 error_details = excluded.error_details,
1231 retry_count = excluded.retry_count,
1232 queued_at = excluded.queued_at,
1233 started_at = excluded.started_at,
1234 completed_at = excluded.completed_at
1237 sqlite3_stmt* stmt = nullptr;
1238 if (sqlite3_prepare_v2(db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1239 return VoidResult(kcenon::common::error_info{
1241 "Failed to prepare statement: " + std::string(sqlite3_errmsg(db_)),
1246 sqlite3_bind_text(stmt, idx++, job.job_id.c_str(), -1, SQLITE_TRANSIENT);
1251 sqlite3_bind_int(stmt, idx++,
static_cast<int>(job.priority));
1253 sqlite3_bind_text(stmt, idx++, job.source_node_id.c_str(), -1,
1255 sqlite3_bind_text(stmt, idx++, job.destination_node_id.c_str(), -1,
1258 bind_optional_text(stmt, idx++, job.patient_id);
1259 bind_optional_text(stmt, idx++, job.study_uid);
1260 bind_optional_text(stmt, idx++, job.series_uid);
1261 bind_optional_text(stmt, idx++, job.sop_instance_uid);
1263 auto uids_json = serialize_instance_uids(job.instance_uids);
1264 sqlite3_bind_text(stmt, idx++, uids_json.c_str(), -1, SQLITE_TRANSIENT);
1266 sqlite3_bind_int64(stmt, idx++,
1267 static_cast<int64_t
>(job.progress.total_items));
1268 sqlite3_bind_int64(stmt, idx++,
1269 static_cast<int64_t
>(job.progress.completed_items));
1270 sqlite3_bind_int64(stmt, idx++,
1271 static_cast<int64_t
>(job.progress.failed_items));
1272 sqlite3_bind_int64(stmt, idx++,
1273 static_cast<int64_t
>(job.progress.skipped_items));
1274 sqlite3_bind_int64(stmt, idx++,
1275 static_cast<int64_t
>(job.progress.bytes_transferred));
1277 sqlite3_bind_text(stmt, idx++, job.progress.current_item.c_str(), -1,
1279 sqlite3_bind_text(stmt, idx++,
1280 job.progress.current_item_description.c_str(), -1,
1283 sqlite3_bind_text(stmt, idx++, job.error_message.c_str(), -1,
1285 sqlite3_bind_text(stmt, idx++, job.error_details.c_str(), -1,
1287 sqlite3_bind_int(stmt, idx++, job.retry_count);
1288 sqlite3_bind_int(stmt, idx++, job.max_retries);
1290 sqlite3_bind_text(stmt, idx++, job.created_by.c_str(), -1, SQLITE_TRANSIENT);
1292 auto metadata_json = serialize_metadata(job.metadata);
1293 sqlite3_bind_text(stmt, idx++, metadata_json.c_str(), -1, SQLITE_TRANSIENT);
1295 auto created_str = to_timestamp_string(job.created_at);
1296 sqlite3_bind_text(stmt, idx++, created_str.c_str(), -1, SQLITE_TRANSIENT);
1298 bind_optional_timestamp(stmt, idx++, job.queued_at);
1299 bind_optional_timestamp(stmt, idx++, job.started_at);
1300 bind_optional_timestamp(stmt, idx++, job.completed_at);
1302 auto rc = sqlite3_step(stmt);
1303 sqlite3_finalize(stmt);
1305 if (rc != SQLITE_DONE) {
1306 return VoidResult(kcenon::common::error_info{
1307 -1,
"Failed to save job: " + std::string(sqlite3_errmsg(db_)),
1311 return kcenon::common::ok();
1315 std::string_view job_id)
const {
1316 if (!
db_)
return std::nullopt;
1318 static constexpr const char* sql = R
"(
1319 SELECT pk, job_id, type, status, priority,
1320 source_node_id, destination_node_id,
1321 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1322 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1323 current_item, current_item_description,
1324 error_message, error_details, retry_count, max_retries,
1325 created_by, metadata_json,
1326 created_at, queued_at, started_at, completed_at
1327 FROM jobs WHERE job_id = ?
1330 sqlite3_stmt* stmt = nullptr;
1331 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1332 return std::nullopt;
1335 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1338 std::optional<client::job_record> result;
1339 if (sqlite3_step(stmt) == SQLITE_ROW) {
1343 sqlite3_finalize(stmt);
1348 if (!
db_)
return std::nullopt;
1350 static constexpr const char* sql = R
"(
1351 SELECT pk, job_id, type, status, priority,
1352 source_node_id, destination_node_id,
1353 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1354 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1355 current_item, current_item_description,
1356 error_message, error_details, retry_count, max_retries,
1357 created_by, metadata_json,
1358 created_at, queued_at, started_at, completed_at
1359 FROM jobs WHERE pk = ?
1362 sqlite3_stmt* stmt = nullptr;
1363 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1364 return std::nullopt;
1367 sqlite3_bind_int64(stmt, 1, pk);
1369 std::optional<client::job_record> result;
1370 if (sqlite3_step(stmt) == SQLITE_ROW) {
1374 sqlite3_finalize(stmt);
1380 std::vector<client::job_record> result;
1381 if (!
db_)
return result;
1383 std::ostringstream sql;
1385 SELECT pk, job_id, type, status, priority,
1386 source_node_id, destination_node_id,
1387 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1388 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1389 current_item, current_item_description,
1390 error_message, error_details, retry_count, max_retries,
1391 created_by, metadata_json,
1392 created_at, queued_at, started_at, completed_at
1396 std::vector<std::pair<int, std::string>> bindings;
1399 if (options.status.has_value()) {
1400 sql <<
" AND status = ?";
1401 bindings.emplace_back(
1405 if (options.type.has_value()) {
1406 sql <<
" AND type = ?";
1407 bindings.emplace_back(param_idx++,
1411 if (options.node_id.has_value()) {
1412 sql <<
" AND (source_node_id = ? OR destination_node_id = ?)";
1413 bindings.emplace_back(param_idx++, options.node_id.value());
1414 bindings.emplace_back(param_idx++, options.node_id.value());
1417 if (options.created_by.has_value()) {
1418 sql <<
" AND created_by = ?";
1419 bindings.emplace_back(param_idx++, options.created_by.value());
1422 if (options.order_by_priority) {
1423 sql <<
" ORDER BY priority DESC, created_at ASC";
1425 sql <<
" ORDER BY created_at DESC";
1428 sql <<
" LIMIT " << options.limit <<
" OFFSET " << options.offset;
1430 sqlite3_stmt* stmt =
nullptr;
1431 auto sql_str = sql.str();
1432 if (sqlite3_prepare_v2(
db_, sql_str.c_str(), -1, &stmt,
nullptr) !=
1437 for (
const auto& [idx, value] : bindings) {
1438 sqlite3_bind_text(stmt, idx, value.c_str(), -1, SQLITE_TRANSIENT);
1441 while (sqlite3_step(stmt) == SQLITE_ROW) {
1445 sqlite3_finalize(stmt);
1453 options.limit = limit;
1458 size_t limit)
const {
1459 std::vector<client::job_record> result;
1460 if (!
db_)
return result;
1462 static constexpr const char* sql = R
"(
1463 SELECT pk, job_id, type, status, priority,
1464 source_node_id, destination_node_id,
1465 patient_id, study_uid, series_uid, sop_instance_uid, instance_uids_json,
1466 total_items, completed_items, failed_items, skipped_items, bytes_transferred,
1467 current_item, current_item_description,
1468 error_message, error_details, retry_count, max_retries,
1469 created_by, metadata_json,
1470 created_at, queued_at, started_at, completed_at
1472 WHERE status IN ('pending', 'queued')
1473 ORDER BY priority DESC, created_at ASC
1477 sqlite3_stmt* stmt = nullptr;
1478 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1482 sqlite3_bind_int64(stmt, 1,
static_cast<int64_t
>(limit));
1484 while (sqlite3_step(stmt) == SQLITE_ROW) {
1488 sqlite3_finalize(stmt);
1493 std::string_view node_id)
const {
1495 options.
node_id = std::string(node_id);
1501 return VoidResult(kcenon::common::error_info{
1502 -1,
"Database not initialized",
"job_repository"});
1505 static constexpr const char* sql =
"DELETE FROM jobs WHERE job_id = ?";
1507 sqlite3_stmt* stmt =
nullptr;
1508 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1509 return VoidResult(kcenon::common::error_info{
1511 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1515 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1518 auto rc = sqlite3_step(stmt);
1519 sqlite3_finalize(stmt);
1521 if (rc != SQLITE_DONE) {
1522 return VoidResult(kcenon::common::error_info{
1523 -1,
"Failed to delete job: " + std::string(sqlite3_errmsg(
db_)),
1527 return kcenon::common::ok();
1532 return kcenon::common::make_error<size_t>(
1533 -1,
"Database not initialized",
"job_repository");
1536 auto cutoff = std::chrono::system_clock::now() - max_age;
1537 auto cutoff_str = to_timestamp_string(cutoff);
1539 static constexpr const char* sql = R
"(
1541 WHERE status IN ('completed', 'failed', 'cancelled')
1542 AND completed_at < ?
1545 sqlite3_stmt* stmt = nullptr;
1546 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1547 return kcenon::common::make_error<size_t>(
1549 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1553 sqlite3_bind_text(stmt, 1, cutoff_str.c_str(), -1, SQLITE_TRANSIENT);
1555 auto rc = sqlite3_step(stmt);
1556 sqlite3_finalize(stmt);
1558 if (rc != SQLITE_DONE) {
1559 return kcenon::common::make_error<size_t>(
1560 -1,
"Failed to cleanup jobs: " + std::string(sqlite3_errmsg(
db_)),
1564 return kcenon::common::ok(
static_cast<size_t>(sqlite3_changes(
db_)));
1568 if (!
db_)
return false;
1570 static constexpr const char* sql =
"SELECT 1 FROM jobs WHERE job_id = ?";
1572 sqlite3_stmt* stmt =
nullptr;
1573 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1577 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1580 bool found = (sqlite3_step(stmt) == SQLITE_ROW);
1581 sqlite3_finalize(stmt);
1591 std::string_view error_message,
1592 std::string_view error_details) {
1594 return VoidResult(kcenon::common::error_info{
1595 -1,
"Database not initialized",
"job_repository"});
1598 const char* sql =
nullptr;
1608 sql =
"UPDATE jobs SET status = ? WHERE job_id = ?";
1611 sqlite3_stmt* stmt =
nullptr;
1612 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1613 return VoidResult(kcenon::common::error_info{
1615 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1623 sqlite3_bind_text(stmt, idx++, error_message.data(),
1624 static_cast<int>(error_message.size()),
1626 sqlite3_bind_text(stmt, idx++, error_details.data(),
1627 static_cast<int>(error_details.size()),
1631 sqlite3_bind_text(stmt, idx++, job_id.data(),
1632 static_cast<int>(job_id.size()), SQLITE_TRANSIENT);
1634 auto rc = sqlite3_step(stmt);
1635 sqlite3_finalize(stmt);
1637 if (rc != SQLITE_DONE) {
1638 return VoidResult(kcenon::common::error_info{
1639 -1,
"Failed to update status: " + std::string(sqlite3_errmsg(
db_)),
1643 return kcenon::common::ok();
1649 return VoidResult(kcenon::common::error_info{
1650 -1,
"Database not initialized",
"job_repository"});
1653 static constexpr const char* sql = R
"(
1656 completed_items = ?,
1659 bytes_transferred = ?,
1661 current_item_description = ?
1665 sqlite3_stmt* stmt = nullptr;
1666 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1667 return VoidResult(kcenon::common::error_info{
1669 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1674 sqlite3_bind_int64(stmt, idx++,
1676 sqlite3_bind_int64(stmt, idx++,
1678 sqlite3_bind_int64(stmt, idx++,
1680 sqlite3_bind_int64(stmt, idx++,
1682 sqlite3_bind_int64(stmt, idx++,
1684 sqlite3_bind_text(stmt, idx++, progress.
current_item.c_str(), -1,
1687 -1, SQLITE_TRANSIENT);
1688 sqlite3_bind_text(stmt, idx++, job_id.data(),
1689 static_cast<int>(job_id.size()), SQLITE_TRANSIENT);
1691 auto rc = sqlite3_step(stmt);
1692 sqlite3_finalize(stmt);
1694 if (rc != SQLITE_DONE) {
1695 return VoidResult(kcenon::common::error_info{
1696 -1,
"Failed to update progress: " + std::string(sqlite3_errmsg(
db_)),
1700 return kcenon::common::ok();
1705 return VoidResult(kcenon::common::error_info{
1706 -1,
"Database not initialized",
"job_repository"});
1709 static constexpr const char* sql = R
"(
1712 started_at = CURRENT_TIMESTAMP
1716 sqlite3_stmt* stmt = nullptr;
1717 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1718 return VoidResult(kcenon::common::error_info{
1720 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1724 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1727 auto rc = sqlite3_step(stmt);
1728 sqlite3_finalize(stmt);
1730 if (rc != SQLITE_DONE) {
1731 return VoidResult(kcenon::common::error_info{
1732 -1,
"Failed to mark started: " + std::string(sqlite3_errmsg(
db_)),
1736 return kcenon::common::ok();
1741 return VoidResult(kcenon::common::error_info{
1742 -1,
"Database not initialized",
"job_repository"});
1745 static constexpr const char* sql = R
"(
1747 status = 'completed',
1748 completed_at = CURRENT_TIMESTAMP
1752 sqlite3_stmt* stmt = nullptr;
1753 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1754 return VoidResult(kcenon::common::error_info{
1756 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1760 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1763 auto rc = sqlite3_step(stmt);
1764 sqlite3_finalize(stmt);
1766 if (rc != SQLITE_DONE) {
1767 return VoidResult(kcenon::common::error_info{
1768 -1,
"Failed to mark completed: " + std::string(sqlite3_errmsg(
db_)),
1772 return kcenon::common::ok();
1776 std::string_view error_message,
1777 std::string_view error_details) {
1779 return VoidResult(kcenon::common::error_info{
1780 -1,
"Database not initialized",
"job_repository"});
1783 static constexpr const char* sql = R
"(
1788 retry_count = retry_count + 1,
1789 completed_at = CURRENT_TIMESTAMP
1793 sqlite3_stmt* stmt = nullptr;
1794 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1795 return VoidResult(kcenon::common::error_info{
1797 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1801 sqlite3_bind_text(stmt, 1, error_message.data(),
1802 static_cast<int>(error_message.size()), SQLITE_TRANSIENT);
1803 sqlite3_bind_text(stmt, 2, error_details.data(),
1804 static_cast<int>(error_details.size()), SQLITE_TRANSIENT);
1805 sqlite3_bind_text(stmt, 3, job_id.data(),
static_cast<int>(job_id.size()),
1808 auto rc = sqlite3_step(stmt);
1809 sqlite3_finalize(stmt);
1811 if (rc != SQLITE_DONE) {
1812 return VoidResult(kcenon::common::error_info{
1813 -1,
"Failed to mark failed: " + std::string(sqlite3_errmsg(
db_)),
1817 return kcenon::common::ok();
1822 return VoidResult(kcenon::common::error_info{
1823 -1,
"Database not initialized",
"job_repository"});
1826 static constexpr const char* sql = R
"(
1827 UPDATE jobs SET retry_count = retry_count + 1 WHERE job_id = ?
1830 sqlite3_stmt* stmt = nullptr;
1831 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1832 return VoidResult(kcenon::common::error_info{
1834 "Failed to prepare statement: " + std::string(sqlite3_errmsg(
db_)),
1838 sqlite3_bind_text(stmt, 1, job_id.data(),
static_cast<int>(job_id.size()),
1841 auto rc = sqlite3_step(stmt);
1842 sqlite3_finalize(stmt);
1844 if (rc != SQLITE_DONE) {
1845 return VoidResult(kcenon::common::error_info{
1846 -1,
"Failed to increment retry: " + std::string(sqlite3_errmsg(
db_)),
1850 return kcenon::common::ok();
1860 static constexpr const char* sql =
"SELECT COUNT(*) FROM jobs";
1862 sqlite3_stmt* stmt =
nullptr;
1863 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1868 if (sqlite3_step(stmt) == SQLITE_ROW) {
1869 result =
static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1872 sqlite3_finalize(stmt);
1879 static constexpr const char* sql =
1880 "SELECT COUNT(*) FROM jobs WHERE status = ?";
1882 sqlite3_stmt* stmt =
nullptr;
1883 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1890 if (sqlite3_step(stmt) == SQLITE_ROW) {
1891 result =
static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1894 sqlite3_finalize(stmt);
1901 static constexpr const char* sql = R
"(
1902 SELECT COUNT(*) FROM jobs
1903 WHERE status = 'completed'
1904 AND date(completed_at) = date('now')
1907 sqlite3_stmt* stmt = nullptr;
1908 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1913 if (sqlite3_step(stmt) == SQLITE_ROW) {
1914 result =
static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1917 sqlite3_finalize(stmt);
1924 static constexpr const char* sql = R
"(
1925 SELECT COUNT(*) FROM jobs
1926 WHERE status = 'failed'
1927 AND date(completed_at) = date('now')
1930 sqlite3_stmt* stmt = nullptr;
1931 if (sqlite3_prepare_v2(
db_, sql, -1, &stmt,
nullptr) != SQLITE_OK) {
1936 if (sqlite3_step(stmt) == SQLITE_ROW) {
1937 result =
static_cast<size_t>(sqlite3_column_int64(stmt, 0));
1940 sqlite3_finalize(stmt);
1955 auto* stmt =
static_cast<sqlite3_stmt*
>(stmt_ptr);
1959 job.pk = get_int64_column(stmt, col++);
1960 job.job_id = get_text_column(stmt, col++);
1965 job.source_node_id = get_text_column(stmt, col++);
1966 job.destination_node_id = get_text_column(stmt, col++);
1968 job.patient_id = get_optional_text(stmt, col++);
1969 job.study_uid = get_optional_text(stmt, col++);
1970 job.series_uid = get_optional_text(stmt, col++);
1971 job.sop_instance_uid = get_optional_text(stmt, col++);
1973 auto uids_json = get_text_column(stmt, col++);
1976 job.progress.total_items =
1977 static_cast<size_t>(get_int64_column(stmt, col++));
1978 job.progress.completed_items =
1979 static_cast<size_t>(get_int64_column(stmt, col++));
1980 job.progress.failed_items =
1981 static_cast<size_t>(get_int64_column(stmt, col++));
1982 job.progress.skipped_items =
1983 static_cast<size_t>(get_int64_column(stmt, col++));
1984 job.progress.bytes_transferred =
1985 static_cast<size_t>(get_int64_column(stmt, col++));
1986 job.progress.current_item = get_text_column(stmt, col++);
1987 job.progress.current_item_description = get_text_column(stmt, col++);
1988 job.progress.calculate_percent();
1990 job.error_message = get_text_column(stmt, col++);
1991 job.error_details = get_text_column(stmt, col++);
1992 job.retry_count = get_int_column(stmt, col++);
1993 job.max_retries = get_int_column(stmt, col++, 3);
1995 job.created_by = get_text_column(stmt, col++);
1997 auto metadata_json = get_text_column(stmt, col++);
2000 auto created_str = get_text_column(stmt, col++);
2001 job.created_at = from_timestamp_string(created_str.c_str());
2003 auto queued_str = get_text_column(stmt, col++);
2004 job.queued_at = from_optional_timestamp(queued_str.c_str());
2006 auto started_str = get_text_column(stmt, col++);
2007 job.started_at = from_optional_timestamp(started_str.c_str());
2009 auto completed_str = get_text_column(stmt, col++);
2010 job.completed_at = from_optional_timestamp(completed_str.c_str());
Repository for job persistence (legacy SQLite interface)
auto cleanup_old_jobs(std::chrono::hours max_age) -> Result< size_t >
job_repository(sqlite3 *db)
static auto deserialize_metadata(std::string_view json) -> std::unordered_map< std::string, std::string >
auto count_failed_today() const -> size_t
auto find_by_node(std::string_view node_id) const -> std::vector< client::job_record >
auto mark_completed(std::string_view job_id) -> VoidResult
static auto serialize_metadata(const std::unordered_map< std::string, std::string > &metadata) -> std::string
auto count_completed_today() const -> size_t
auto increment_retry(std::string_view job_id) -> VoidResult
auto remove(std::string_view job_id) -> VoidResult
auto update_status(std::string_view job_id, client::job_status status, std::string_view error_message="", std::string_view error_details="") -> VoidResult
auto mark_started(std::string_view job_id) -> VoidResult
auto find_by_status(client::job_status status, size_t limit=100) const -> std::vector< client::job_record >
auto count() const -> size_t
auto find_pending_jobs(size_t limit=10) const -> std::vector< client::job_record >
static auto deserialize_instance_uids(std::string_view json) -> std::vector< std::string >
auto find_by_id(std::string_view job_id) const -> std::optional< client::job_record >
auto is_valid() const noexcept -> bool
auto mark_failed(std::string_view job_id, std::string_view error_message, std::string_view error_details="") -> VoidResult
static auto serialize_instance_uids(const std::vector< std::string > &uids) -> std::string
auto find_jobs(const job_query_options &options={}) const -> std::vector< client::job_record >
auto exists(std::string_view job_id) const -> bool
auto count_by_status(client::job_status status) const -> size_t
auto parse_row(void *stmt) const -> client::job_record
auto update_progress(std::string_view job_id, const client::job_progress &progress) -> VoidResult
auto find_by_pk(int64_t pk) const -> std::optional< client::job_record >
Repository for job persistence using base_repository pattern.
job_priority job_priority_from_int(int value) noexcept
Parse job_priority from integer.
job_type job_type_from_string(std::string_view str) noexcept
Parse job_type from string.
job_status job_status_from_string(std::string_view str) noexcept
Parse job_status from string.
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
job_status
Current status of a job.
@ failed
Job failed with error.
@ move
C-MOVE move request/response.
Progress tracking for a job.
size_t failed_items
Failed items.
std::string current_item
Current SOP Instance UID being processed.
size_t skipped_items
Skipped items.
std::string current_item_description
Human-readable description.
size_t bytes_transferred
Total bytes transferred.
size_t completed_items
Successfully completed items.
size_t total_items
Total number of items to process.
Complete job record with all metadata.
Query options for listing jobs.
std::optional< std::string > node_id
Filter by source or destination node.
std::optional< client::job_status > status
Filter by status.