445 std::shared_ptr<rest_server_context> ctx) {
447 CROW_ROUTE(app,
"/api/v1/jobs")
448 .methods(crow::HTTPMethod::GET)([ctx](
const crow::request& req) {
450 res.add_header(
"Content-Type",
"application/json");
451 add_cors_headers(res, *ctx);
453 if (!ctx->job_manager) {
456 "Job manager not configured");
461 auto [limit, offset] = parse_pagination(req);
464 std::optional<client::job_status> status_filter;
465 std::optional<client::job_type> type_filter;
467 auto status_param = req.url_params.get(
"status");
472 auto type_param = req.url_params.get(
"type");
478 auto jobs = ctx->job_manager->list_jobs(status_filter, type_filter,
483 size_t total_count = jobs.size();
486 res.body = jobs_to_json(jobs, total_count);
491 CROW_ROUTE(app,
"/api/v1/jobs")
492 .methods(crow::HTTPMethod::POST)([ctx](
const crow::request& req) {
494 res.add_header(
"Content-Type",
"application/json");
495 add_cors_headers(res, *ctx);
497 if (!ctx->job_manager) {
500 "Job manager not configured");
505 auto type_str = get_json_string_value(req.body,
"type");
506 if (type_str.empty()) {
515 auto priority_str = get_json_string_value(req.body,
"priority");
516 auto priority = priority_str.empty()
525 auto source_node_id = get_json_string_value(req.body,
"source_node_id");
526 auto study_uid = get_json_string_value(req.body,
"study_uid");
528 if (source_node_id.empty()) {
531 "source_node_id is required for retrieve job");
535 if (study_uid.empty()) {
538 "study_uid is required for retrieve job");
542 auto series_uid = get_json_string_value(req.body,
"series_uid");
543 std::optional<std::string_view> series_opt;
544 if (!series_uid.empty()) {
545 series_opt = series_uid;
548 job_id = ctx->job_manager->create_retrieve_job(
549 source_node_id, study_uid, series_opt, priority);
554 auto destination_node_id = get_json_string_value(req.body,
"destination_node_id");
555 auto instance_uids = get_json_string_array(req.body,
"instance_uids");
557 if (destination_node_id.empty()) {
560 "destination_node_id is required for store job");
564 if (instance_uids.empty()) {
567 "instance_uids is required for store job");
571 job_id = ctx->job_manager->create_store_job(
572 destination_node_id, instance_uids, priority);
577 auto node_id = get_json_string_value(req.body,
"node_id");
578 auto query_level = get_json_string_value(req.body,
"query_level");
580 if (node_id.empty()) {
583 "node_id is required for query job");
587 if (query_level.empty()) {
588 query_level =
"STUDY";
592 std::unordered_map<std::string, std::string> query_keys;
593 auto patient_id = get_json_string_value(req.body,
"patient_id");
594 if (!patient_id.empty()) {
595 query_keys[
"PatientID"] = patient_id;
597 auto patient_name = get_json_string_value(req.body,
"patient_name");
598 if (!patient_name.empty()) {
599 query_keys[
"PatientName"] = patient_name;
602 job_id = ctx->job_manager->create_query_job(
603 node_id, query_level, query_keys, priority);
608 auto source_node_id = get_json_string_value(req.body,
"source_node_id");
610 if (source_node_id.empty()) {
613 "source_node_id is required for sync job");
617 auto patient_id = get_json_string_value(req.body,
"patient_id");
618 std::optional<std::string_view> patient_opt;
619 if (!patient_id.empty()) {
620 patient_opt = patient_id;
623 job_id = ctx->job_manager->create_sync_job(
624 source_node_id, patient_opt, priority);
629 auto source_node_id = get_json_string_value(req.body,
"source_node_id");
630 auto patient_id = get_json_string_value(req.body,
"patient_id");
632 if (source_node_id.empty()) {
635 "source_node_id is required for prefetch job");
639 if (patient_id.empty()) {
642 "patient_id is required for prefetch job");
646 job_id = ctx->job_manager->create_prefetch_job(
647 source_node_id, patient_id, priority);
654 "Unsupported job type: " + type_str);
660 auto created_job = ctx->job_manager->get_job(job_id);
663 res.body = R
"({"job_id":")" + json_escape(job_id) + R"(","status":"pending"})";
668 res.body = job_to_json(*created_job);
673 CROW_ROUTE(app,
"/api/v1/jobs/<string>")
674 .methods(crow::HTTPMethod::GET)(
675 [ctx](
const crow::request& ,
const std::string& job_id) {
677 res.add_header(
"Content-Type",
"application/json");
678 add_cors_headers(res, *ctx);
680 if (!ctx->job_manager) {
683 "Job manager not configured");
687 auto job = ctx->job_manager->get_job(job_id);
695 res.body = job_to_json(*job);
700 CROW_ROUTE(app,
"/api/v1/jobs/<string>")
701 .methods(crow::HTTPMethod::DELETE)(
702 [ctx](
const crow::request& ,
const std::string& job_id) {
704 add_cors_headers(res, *ctx);
706 if (!ctx->job_manager) {
707 res.add_header(
"Content-Type",
"application/json");
710 "Job manager not configured");
715 auto job = ctx->job_manager->get_job(job_id);
717 res.add_header(
"Content-Type",
"application/json");
723 auto result = ctx->job_manager->delete_job(job_id);
724 if (!result.is_ok()) {
725 res.add_header(
"Content-Type",
"application/json");
736 CROW_ROUTE(app,
"/api/v1/jobs/<string>/progress")
737 .methods(crow::HTTPMethod::GET)(
738 [ctx](
const crow::request& ,
const std::string& job_id) {
740 res.add_header(
"Content-Type",
"application/json");
741 add_cors_headers(res, *ctx);
743 if (!ctx->job_manager) {
746 "Job manager not configured");
751 auto job = ctx->job_manager->get_job(job_id);
758 auto progress = ctx->job_manager->get_progress(job_id);
761 res.body = progress_to_json(progress);
770 CROW_ROUTE(app,
"/api/v1/jobs/<string>/start")
771 .methods(crow::HTTPMethod::POST)(
772 [ctx](
const crow::request& ,
const std::string& job_id) {
774 res.add_header(
"Content-Type",
"application/json");
775 add_cors_headers(res, *ctx);
777 if (!ctx->job_manager) {
780 "Job manager not configured");
785 auto job = ctx->job_manager->get_job(job_id);
792 auto result = ctx->job_manager->start_job(job_id);
793 if (!result.is_ok()) {
796 result.error().message);
801 auto updated_job = ctx->job_manager->get_job(job_id);
804 res.body = job_to_json(*updated_job);
807 res.body = R
"({"job_id":")" + json_escape(job_id) +
808 R"(","message":"Job started"})";
814 CROW_ROUTE(app,
"/api/v1/jobs/<string>/pause")
815 .methods(crow::HTTPMethod::POST)(
816 [ctx](
const crow::request& ,
const std::string& job_id) {
818 res.add_header(
"Content-Type",
"application/json");
819 add_cors_headers(res, *ctx);
821 if (!ctx->job_manager) {
824 "Job manager not configured");
829 auto job = ctx->job_manager->get_job(job_id);
836 auto result = ctx->job_manager->pause_job(job_id);
837 if (!result.is_ok()) {
840 result.error().message);
845 auto updated_job = ctx->job_manager->get_job(job_id);
848 res.body = job_to_json(*updated_job);
851 res.body = R
"({"job_id":")" + json_escape(job_id) +
852 R"(","message":"Job paused"})";
858 CROW_ROUTE(app,
"/api/v1/jobs/<string>/resume")
859 .methods(crow::HTTPMethod::POST)(
860 [ctx](
const crow::request& ,
const std::string& job_id) {
862 res.add_header(
"Content-Type",
"application/json");
863 add_cors_headers(res, *ctx);
865 if (!ctx->job_manager) {
868 "Job manager not configured");
873 auto job = ctx->job_manager->get_job(job_id);
880 auto result = ctx->job_manager->resume_job(job_id);
881 if (!result.is_ok()) {
884 result.error().message);
889 auto updated_job = ctx->job_manager->get_job(job_id);
892 res.body = job_to_json(*updated_job);
895 res.body = R
"({"job_id":")" + json_escape(job_id) +
896 R"(","message":"Job resumed"})";
902 CROW_ROUTE(app,
"/api/v1/jobs/<string>/cancel")
903 .methods(crow::HTTPMethod::POST)(
904 [ctx](
const crow::request& ,
const std::string& job_id) {
906 res.add_header(
"Content-Type",
"application/json");
907 add_cors_headers(res, *ctx);
909 if (!ctx->job_manager) {
912 "Job manager not configured");
917 auto job = ctx->job_manager->get_job(job_id);
924 auto result = ctx->job_manager->cancel_job(job_id);
925 if (!result.is_ok()) {
928 result.error().message);
933 auto updated_job = ctx->job_manager->get_job(job_id);
936 res.body = job_to_json(*updated_job);
939 res.body = R
"({"job_id":")" + json_escape(job_id) +
940 R"(","message":"Job cancelled"})";
946 CROW_ROUTE(app,
"/api/v1/jobs/<string>/retry")
947 .methods(crow::HTTPMethod::POST)(
948 [ctx](
const crow::request& ,
const std::string& job_id) {
950 res.add_header(
"Content-Type",
"application/json");
951 add_cors_headers(res, *ctx);
953 if (!ctx->job_manager) {
956 "Job manager not configured");
961 auto job = ctx->job_manager->get_job(job_id);
968 auto result = ctx->job_manager->retry_job(job_id);
969 if (!result.is_ok()) {
972 result.error().message);
977 auto updated_job = ctx->job_manager->get_job(job_id);
980 res.body = job_to_json(*updated_job);
983 res.body = R
"({"job_id":")" + json_escape(job_id) +
984 R"(","message":"Job retry queued"})";
994 if (ctx->job_manager && !g_callbacks_registered.exchange(
true)) {
996 ctx->job_manager->set_progress_callback(
998 auto message = make_progress_message(job_id, progress);
999 ws_subscriber_state::instance().broadcast_progress(job_id, message);
1003 ctx->job_manager->set_completion_callback(
1005 auto message = make_completion_message(job_id, record);
1006 ws_subscriber_state::instance().broadcast_progress(job_id, message);
1013 CROW_WEBSOCKET_ROUTE(app,
"/api/v1/jobs/<string>/progress/stream")
1014 .onaccept([ctx](
const crow::request& req,
void** userdata) ->
bool {
1016 std::string url = req.url;
1018 const std::string prefix =
"/api/v1/jobs/";
1019 const std::string suffix =
"/progress/stream";
1021 if (url.find(prefix) != 0) {
1025 auto suffix_pos = url.rfind(suffix);
1026 if (suffix_pos == std::string::npos) {
1030 std::string job_id = url.substr(prefix.length(),
1031 suffix_pos - prefix.length());
1033 if (job_id.empty()) {
1038 if (!ctx->job_manager) {
1043 auto job = ctx->job_manager->get_job(job_id);
1049 *userdata =
new std::string(job_id);
1052 .onopen([ctx](crow::websocket::connection& conn) {
1053 auto* job_id_ptr =
static_cast<std::string*
>(conn.userdata());
1054 if (!job_id_ptr || !ctx->job_manager) {
1055 conn.send_text(R
"({"error":"Invalid connection state"})");
1056 conn.close("Invalid state");
1060 const std::string& job_id = *job_id_ptr;
1063 ws_subscriber_state::instance().add_job_subscriber(job_id, &conn);
1066 auto progress = ctx->job_manager->get_progress(job_id);
1067 conn.send_text(make_progress_message(job_id, progress));
1069 .onclose([](crow::websocket::connection& conn,
1070 const std::string& , uint16_t ) {
1071 auto* job_id_ptr =
static_cast<std::string*
>(conn.userdata());
1073 ws_subscriber_state::instance().remove_job_subscriber(*job_id_ptr, &conn);
1075 conn.userdata(
nullptr);
1078 .onmessage([](crow::websocket::connection& ,
1079 const std::string& ,
bool ) {
1084 CROW_WEBSOCKET_ROUTE(app,
"/api/v1/jobs/stream")
1085 .onaccept([ctx](
const crow::request& ,
void** ) ->
bool {
1087 return ctx->job_manager !=
nullptr;
1089 .onopen([](crow::websocket::connection& conn) {
1091 ws_subscriber_state::instance().add_all_jobs_subscriber(&conn);
1094 conn.send_text(R
"({"type":"connected","message":"Subscribed to all job updates"})");
1096 .onclose([](crow::websocket::connection& conn,
1097 const std::string& , uint16_t ) {
1098 ws_subscriber_state::instance().remove_all_jobs_subscriber(&conn);
1100 .onmessage([](crow::websocket::connection& ,
1101 const std::string& ,
bool ) {