PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
jobs_endpoints.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17// IMPORTANT: Include Crow FIRST before any PACS headers to avoid forward
18// declaration conflicts
19#include "crow.h"
20
21// Workaround for Windows: DELETE is defined as a macro in <winnt.h>
22// which conflicts with crow::HTTPMethod::DELETE
23#ifdef DELETE
24#undef DELETE
25#endif
26
33
34#include <chrono>
35#include <mutex>
36#include <shared_mutex>
37#include <sstream>
38#include <unordered_map>
39#include <unordered_set>
40
42
43namespace {
44
45// =============================================================================
46// WebSocket Subscriber Management (Issue #560)
47// =============================================================================
48
55struct ws_subscriber_state {
57 std::unordered_map<std::string, std::unordered_set<crow::websocket::connection*>>
59
61 std::unordered_set<crow::websocket::connection*> all_jobs_subscribers;
62
64 mutable std::shared_mutex mutex;
65
67 static ws_subscriber_state& instance() {
68 static ws_subscriber_state state;
69 return state;
70 }
71
75 void add_job_subscriber(const std::string& job_id,
76 crow::websocket::connection* conn) {
77 std::unique_lock lock(mutex);
78 job_subscribers[job_id].insert(conn);
79 }
80
84 void remove_job_subscriber(const std::string& job_id,
85 crow::websocket::connection* conn) {
86 std::unique_lock lock(mutex);
87 auto it = job_subscribers.find(job_id);
88 if (it != job_subscribers.end()) {
89 it->second.erase(conn);
90 if (it->second.empty()) {
91 job_subscribers.erase(it);
92 }
93 }
94 }
95
99 void add_all_jobs_subscriber(crow::websocket::connection* conn) {
100 std::unique_lock lock(mutex);
101 all_jobs_subscribers.insert(conn);
102 }
103
107 void remove_all_jobs_subscriber(crow::websocket::connection* conn) {
108 std::unique_lock lock(mutex);
109 all_jobs_subscribers.erase(conn);
110 }
111
115 void remove_connection(crow::websocket::connection* conn) {
116 std::unique_lock lock(mutex);
117 all_jobs_subscribers.erase(conn);
118 for (auto& [job_id, subscribers] : job_subscribers) {
119 subscribers.erase(conn);
120 }
121 }
122
126 void broadcast_progress(const std::string& job_id, const std::string& message) {
127 std::shared_lock lock(mutex);
128
129 // Send to specific job subscribers
130 auto it = job_subscribers.find(job_id);
131 if (it != job_subscribers.end()) {
132 for (auto* conn : it->second) {
133 conn->send_text(message);
134 }
135 }
136
137 // Send to all-jobs subscribers
138 for (auto* conn : all_jobs_subscribers) {
139 conn->send_text(message);
140 }
141 }
142};
143
147std::string make_progress_message(const std::string& job_id,
148 const client::job_progress& progress) {
149 std::ostringstream oss;
150 oss << R"({"type":"progress","job_id":")" << json_escape(job_id)
151 << R"(","progress":{)"
152 << R"("total_items":)" << progress.total_items
153 << R"(,"completed_items":)" << progress.completed_items
154 << R"(,"failed_items":)" << progress.failed_items
155 << R"(,"skipped_items":)" << progress.skipped_items
156 << R"(,"bytes_transferred":)" << progress.bytes_transferred
157 << R"(,"percent_complete":)" << progress.percent_complete;
158
159 if (!progress.current_item.empty()) {
160 oss << R"(,"current_item":")" << json_escape(progress.current_item) << '"';
161 }
162
163 if (!progress.current_item_description.empty()) {
164 oss << R"(,"current_item_description":")"
165 << json_escape(progress.current_item_description) << '"';
166 }
167
168 oss << R"(,"elapsed_ms":)" << progress.elapsed.count()
169 << R"(,"estimated_remaining_ms":)" << progress.estimated_remaining.count()
170 << R"(}})";
171 return oss.str();
172}
173
177std::string make_status_message(const std::string& job_id,
178 client::job_status old_status,
179 client::job_status new_status) {
180 std::ostringstream oss;
181 oss << R"({"type":"status_change","job_id":")" << json_escape(job_id)
182 << R"(","old_status":")" << client::to_string(old_status)
183 << R"(","new_status":")" << client::to_string(new_status)
184 << R"("})";
185 return oss.str();
186}
187
191std::string make_completion_message(const std::string& job_id,
192 const client::job_record& record) {
193 std::ostringstream oss;
194 oss << R"({"type":"completed","job_id":")" << json_escape(job_id)
195 << R"(","status":")" << client::to_string(record.status)
196 << R"(","progress":{)"
197 << R"("total_items":)" << record.progress.total_items
198 << R"(,"completed_items":)" << record.progress.completed_items
199 << R"(,"failed_items":)" << record.progress.failed_items
200 << R"(,"percent_complete":)" << record.progress.percent_complete
201 << R"(}})";
202 return oss.str();
203}
204
205// Flag to track if callbacks are registered
206static std::atomic<bool> g_callbacks_registered{false};
207
211void add_cors_headers(crow::response& res, const rest_server_context& ctx) {
212 if (ctx.config && !ctx.config->cors_allowed_origins.empty()) {
213 res.add_header("Access-Control-Allow-Origin",
214 ctx.config->cors_allowed_origins);
215 }
216}
217
221std::string format_timestamp(std::chrono::system_clock::time_point tp) {
222 if (tp == std::chrono::system_clock::time_point{}) {
223 return "";
224 }
225 auto time_t_val = std::chrono::system_clock::to_time_t(tp);
226 std::tm tm_val{};
227#ifdef _WIN32
228 gmtime_s(&tm_val, &time_t_val);
229#else
230 gmtime_r(&time_t_val, &tm_val);
231#endif
232 char buf[32];
233 std::strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%SZ", &tm_val);
234 return buf;
235}
236
240std::string progress_to_json(const client::job_progress& progress) {
241 std::ostringstream oss;
242 oss << R"({"total_items":)" << progress.total_items
243 << R"(,"completed_items":)" << progress.completed_items
244 << R"(,"failed_items":)" << progress.failed_items
245 << R"(,"skipped_items":)" << progress.skipped_items
246 << R"(,"bytes_transferred":)" << progress.bytes_transferred
247 << R"(,"percent_complete":)" << progress.percent_complete;
248
249 if (!progress.current_item.empty()) {
250 oss << R"(,"current_item":")" << json_escape(progress.current_item) << '"';
251 }
252
253 if (!progress.current_item_description.empty()) {
254 oss << R"(,"current_item_description":")"
255 << json_escape(progress.current_item_description) << '"';
256 }
257
258 oss << R"(,"elapsed_ms":)" << progress.elapsed.count()
259 << R"(,"estimated_remaining_ms":)" << progress.estimated_remaining.count()
260 << '}';
261 return oss.str();
262}
263
267std::string job_to_json(const client::job_record& job) {
268 std::ostringstream oss;
269 oss << R"({"job_id":")" << json_escape(job.job_id)
270 << R"(","type":")" << client::to_string(job.type)
271 << R"(","status":")" << client::to_string(job.status)
272 << R"(","priority":")" << client::to_string(job.priority) << '"';
273
274 if (!job.source_node_id.empty()) {
275 oss << R"(,"source_node_id":")" << json_escape(job.source_node_id) << '"';
276 }
277
278 if (!job.destination_node_id.empty()) {
279 oss << R"(,"destination_node_id":")" << json_escape(job.destination_node_id) << '"';
280 }
281
282 if (job.patient_id.has_value()) {
283 oss << R"(,"patient_id":")" << json_escape(*job.patient_id) << '"';
284 }
285
286 if (job.study_uid.has_value()) {
287 oss << R"(,"study_uid":")" << json_escape(*job.study_uid) << '"';
288 }
289
290 if (job.series_uid.has_value()) {
291 oss << R"(,"series_uid":")" << json_escape(*job.series_uid) << '"';
292 }
293
294 // Progress
295 oss << R"(,"progress":)" << progress_to_json(job.progress);
296
297 // Error info
298 if (!job.error_message.empty()) {
299 oss << R"(,"error_message":")" << json_escape(job.error_message) << '"';
300 }
301
302 if (!job.error_details.empty()) {
303 oss << R"(,"error_details":")" << json_escape(job.error_details) << '"';
304 }
305
306 oss << R"(,"retry_count":)" << job.retry_count
307 << R"(,"max_retries":)" << job.max_retries;
308
309 // Timestamps
310 auto created_at = format_timestamp(job.created_at);
311 if (!created_at.empty()) {
312 oss << R"(,"created_at":")" << created_at << '"';
313 }
314
315 if (job.queued_at.has_value()) {
316 auto queued_at = format_timestamp(*job.queued_at);
317 if (!queued_at.empty()) {
318 oss << R"(,"queued_at":")" << queued_at << '"';
319 }
320 }
321
322 if (job.started_at.has_value()) {
323 auto started_at = format_timestamp(*job.started_at);
324 if (!started_at.empty()) {
325 oss << R"(,"started_at":")" << started_at << '"';
326 }
327 }
328
329 if (job.completed_at.has_value()) {
330 auto completed_at = format_timestamp(*job.completed_at);
331 if (!completed_at.empty()) {
332 oss << R"(,"completed_at":")" << completed_at << '"';
333 }
334 }
335
336 if (!job.created_by.empty()) {
337 oss << R"(,"created_by":")" << json_escape(job.created_by) << '"';
338 }
339
340 oss << '}';
341 return oss.str();
342}
343
347std::string jobs_to_json(const std::vector<client::job_record>& jobs,
348 size_t total_count) {
349 std::ostringstream oss;
350 oss << R"({"jobs":[)";
351 for (size_t i = 0; i < jobs.size(); ++i) {
352 if (i > 0) {
353 oss << ',';
354 }
355 oss << job_to_json(jobs[i]);
356 }
357 oss << R"(],"total":)" << total_count << '}';
358 return oss.str();
359}
360
364std::pair<size_t, size_t> parse_pagination(const crow::request& req) {
365 size_t limit = 20;
366 size_t offset = 0;
367
368 auto limit_param = req.url_params.get("limit");
369 if (limit_param) {
370 try {
371 limit = std::stoul(limit_param);
372 if (limit > 100) {
373 limit = 100;
374 }
375 } catch (...) {
376 // Use default
377 }
378 }
379
380 auto offset_param = req.url_params.get("offset");
381 if (offset_param) {
382 try {
383 offset = std::stoul(offset_param);
384 } catch (...) {
385 // Use default
386 }
387 }
388
389 return {limit, offset};
390}
391
395std::string get_json_string_value(const std::string& body, const std::string& key) {
396 std::string search = "\"" + key + "\":\"";
397 auto pos = body.find(search);
398 if (pos == std::string::npos) {
399 return "";
400 }
401 pos += search.length();
402 auto end_pos = body.find('"', pos);
403 if (end_pos == std::string::npos) {
404 return "";
405 }
406 return body.substr(pos, end_pos - pos);
407}
408
412std::vector<std::string> get_json_string_array(const std::string& body,
413 const std::string& key) {
414 std::vector<std::string> result;
415 std::string search = "\"" + key + "\":[";
416 auto pos = body.find(search);
417 if (pos == std::string::npos) {
418 return result;
419 }
420 pos += search.length();
421 auto end_pos = body.find(']', pos);
422 if (end_pos == std::string::npos) {
423 return result;
424 }
425
426 std::string array_content = body.substr(pos, end_pos - pos);
427
428 size_t start = 0;
429 while (start < array_content.length()) {
430 auto quote_start = array_content.find('"', start);
431 if (quote_start == std::string::npos) break;
432 auto quote_end = array_content.find('"', quote_start + 1);
433 if (quote_end == std::string::npos) break;
434 result.push_back(array_content.substr(quote_start + 1, quote_end - quote_start - 1));
435 start = quote_end + 1;
436 }
437
438 return result;
439}
440
441} // namespace
442
443// Internal implementation function called from rest_server.cpp
444void register_jobs_endpoints_impl(crow::SimpleApp& app,
445 std::shared_ptr<rest_server_context> ctx) {
446 // GET /api/v1/jobs - List jobs (paginated with filters)
447 CROW_ROUTE(app, "/api/v1/jobs")
448 .methods(crow::HTTPMethod::GET)([ctx](const crow::request& req) {
449 crow::response res;
450 res.add_header("Content-Type", "application/json");
451 add_cors_headers(res, *ctx);
452
453 if (!ctx->job_manager) {
454 res.code = 503;
455 res.body = make_error_json("SERVICE_UNAVAILABLE",
456 "Job manager not configured");
457 return res;
458 }
459
460 // Parse pagination
461 auto [limit, offset] = parse_pagination(req);
462
463 // Parse filters
464 std::optional<client::job_status> status_filter;
465 std::optional<client::job_type> type_filter;
466
467 auto status_param = req.url_params.get("status");
468 if (status_param) {
469 status_filter = client::job_status_from_string(status_param);
470 }
471
472 auto type_param = req.url_params.get("type");
473 if (type_param) {
474 type_filter = client::job_type_from_string(type_param);
475 }
476
477 // Get jobs with filters
478 auto jobs = ctx->job_manager->list_jobs(status_filter, type_filter,
479 limit, offset);
480
481 // Get total count (without pagination for now, we use jobs.size())
482 // TODO: Add count method to job_manager for accurate total
483 size_t total_count = jobs.size();
484
485 res.code = 200;
486 res.body = jobs_to_json(jobs, total_count);
487 return res;
488 });
489
490 // POST /api/v1/jobs - Create a new job
491 CROW_ROUTE(app, "/api/v1/jobs")
492 .methods(crow::HTTPMethod::POST)([ctx](const crow::request& req) {
493 crow::response res;
494 res.add_header("Content-Type", "application/json");
495 add_cors_headers(res, *ctx);
496
497 if (!ctx->job_manager) {
498 res.code = 503;
499 res.body = make_error_json("SERVICE_UNAVAILABLE",
500 "Job manager not configured");
501 return res;
502 }
503
504 // Parse job type
505 auto type_str = get_json_string_value(req.body, "type");
506 if (type_str.empty()) {
507 res.code = 400;
508 res.body = make_error_json("INVALID_REQUEST", "type is required");
509 return res;
510 }
511
512 auto type = client::job_type_from_string(type_str);
513
514 // Parse priority (optional, defaults to normal)
515 auto priority_str = get_json_string_value(req.body, "priority");
516 auto priority = priority_str.empty()
518 : client::job_priority_from_string(priority_str);
519
520 std::string job_id;
521
522 // Create job based on type
523 switch (type) {
525 auto source_node_id = get_json_string_value(req.body, "source_node_id");
526 auto study_uid = get_json_string_value(req.body, "study_uid");
527
528 if (source_node_id.empty()) {
529 res.code = 400;
530 res.body = make_error_json("INVALID_REQUEST",
531 "source_node_id is required for retrieve job");
532 return res;
533 }
534
535 if (study_uid.empty()) {
536 res.code = 400;
537 res.body = make_error_json("INVALID_REQUEST",
538 "study_uid is required for retrieve job");
539 return res;
540 }
541
542 auto series_uid = get_json_string_value(req.body, "series_uid");
543 std::optional<std::string_view> series_opt;
544 if (!series_uid.empty()) {
545 series_opt = series_uid;
546 }
547
548 job_id = ctx->job_manager->create_retrieve_job(
549 source_node_id, study_uid, series_opt, priority);
550 break;
551 }
552
554 auto destination_node_id = get_json_string_value(req.body, "destination_node_id");
555 auto instance_uids = get_json_string_array(req.body, "instance_uids");
556
557 if (destination_node_id.empty()) {
558 res.code = 400;
559 res.body = make_error_json("INVALID_REQUEST",
560 "destination_node_id is required for store job");
561 return res;
562 }
563
564 if (instance_uids.empty()) {
565 res.code = 400;
566 res.body = make_error_json("INVALID_REQUEST",
567 "instance_uids is required for store job");
568 return res;
569 }
570
571 job_id = ctx->job_manager->create_store_job(
572 destination_node_id, instance_uids, priority);
573 break;
574 }
575
577 auto node_id = get_json_string_value(req.body, "node_id");
578 auto query_level = get_json_string_value(req.body, "query_level");
579
580 if (node_id.empty()) {
581 res.code = 400;
582 res.body = make_error_json("INVALID_REQUEST",
583 "node_id is required for query job");
584 return res;
585 }
586
587 if (query_level.empty()) {
588 query_level = "STUDY"; // Default to study level
589 }
590
591 // Parse query keys (simplified - in production use proper JSON parser)
592 std::unordered_map<std::string, std::string> query_keys;
593 auto patient_id = get_json_string_value(req.body, "patient_id");
594 if (!patient_id.empty()) {
595 query_keys["PatientID"] = patient_id;
596 }
597 auto patient_name = get_json_string_value(req.body, "patient_name");
598 if (!patient_name.empty()) {
599 query_keys["PatientName"] = patient_name;
600 }
601
602 job_id = ctx->job_manager->create_query_job(
603 node_id, query_level, query_keys, priority);
604 break;
605 }
606
608 auto source_node_id = get_json_string_value(req.body, "source_node_id");
609
610 if (source_node_id.empty()) {
611 res.code = 400;
612 res.body = make_error_json("INVALID_REQUEST",
613 "source_node_id is required for sync job");
614 return res;
615 }
616
617 auto patient_id = get_json_string_value(req.body, "patient_id");
618 std::optional<std::string_view> patient_opt;
619 if (!patient_id.empty()) {
620 patient_opt = patient_id;
621 }
622
623 job_id = ctx->job_manager->create_sync_job(
624 source_node_id, patient_opt, priority);
625 break;
626 }
627
629 auto source_node_id = get_json_string_value(req.body, "source_node_id");
630 auto patient_id = get_json_string_value(req.body, "patient_id");
631
632 if (source_node_id.empty()) {
633 res.code = 400;
634 res.body = make_error_json("INVALID_REQUEST",
635 "source_node_id is required for prefetch job");
636 return res;
637 }
638
639 if (patient_id.empty()) {
640 res.code = 400;
641 res.body = make_error_json("INVALID_REQUEST",
642 "patient_id is required for prefetch job");
643 return res;
644 }
645
646 job_id = ctx->job_manager->create_prefetch_job(
647 source_node_id, patient_id, priority);
648 break;
649 }
650
651 default: {
652 res.code = 400;
653 res.body = make_error_json("INVALID_REQUEST",
654 "Unsupported job type: " + type_str);
655 return res;
656 }
657 }
658
659 // Retrieve and return the created job
660 auto created_job = ctx->job_manager->get_job(job_id);
661 if (!created_job) {
662 res.code = 201;
663 res.body = R"({"job_id":")" + json_escape(job_id) + R"(","status":"pending"})";
664 return res;
665 }
666
667 res.code = 201;
668 res.body = job_to_json(*created_job);
669 return res;
670 });
671
672 // GET /api/v1/jobs/<jobId> - Get a specific job
673 CROW_ROUTE(app, "/api/v1/jobs/<string>")
674 .methods(crow::HTTPMethod::GET)(
675 [ctx](const crow::request& /*req*/, const std::string& job_id) {
676 crow::response res;
677 res.add_header("Content-Type", "application/json");
678 add_cors_headers(res, *ctx);
679
680 if (!ctx->job_manager) {
681 res.code = 503;
682 res.body = make_error_json("SERVICE_UNAVAILABLE",
683 "Job manager not configured");
684 return res;
685 }
686
687 auto job = ctx->job_manager->get_job(job_id);
688 if (!job) {
689 res.code = 404;
690 res.body = make_error_json("NOT_FOUND", "Job not found");
691 return res;
692 }
693
694 res.code = 200;
695 res.body = job_to_json(*job);
696 return res;
697 });
698
699 // DELETE /api/v1/jobs/<jobId> - Delete a job
700 CROW_ROUTE(app, "/api/v1/jobs/<string>")
701 .methods(crow::HTTPMethod::DELETE)(
702 [ctx](const crow::request& /*req*/, const std::string& job_id) {
703 crow::response res;
704 add_cors_headers(res, *ctx);
705
706 if (!ctx->job_manager) {
707 res.add_header("Content-Type", "application/json");
708 res.code = 503;
709 res.body = make_error_json("SERVICE_UNAVAILABLE",
710 "Job manager not configured");
711 return res;
712 }
713
714 // Check if job exists
715 auto job = ctx->job_manager->get_job(job_id);
716 if (!job) {
717 res.add_header("Content-Type", "application/json");
718 res.code = 404;
719 res.body = make_error_json("NOT_FOUND", "Job not found");
720 return res;
721 }
722
723 auto result = ctx->job_manager->delete_job(job_id);
724 if (!result.is_ok()) {
725 res.add_header("Content-Type", "application/json");
726 res.code = 500;
727 res.body = make_error_json("DELETE_FAILED", result.error().message);
728 return res;
729 }
730
731 res.code = 204;
732 return res;
733 });
734
735 // GET /api/v1/jobs/<jobId>/progress - Get job progress
736 CROW_ROUTE(app, "/api/v1/jobs/<string>/progress")
737 .methods(crow::HTTPMethod::GET)(
738 [ctx](const crow::request& /*req*/, const std::string& job_id) {
739 crow::response res;
740 res.add_header("Content-Type", "application/json");
741 add_cors_headers(res, *ctx);
742
743 if (!ctx->job_manager) {
744 res.code = 503;
745 res.body = make_error_json("SERVICE_UNAVAILABLE",
746 "Job manager not configured");
747 return res;
748 }
749
750 // Check if job exists
751 auto job = ctx->job_manager->get_job(job_id);
752 if (!job) {
753 res.code = 404;
754 res.body = make_error_json("NOT_FOUND", "Job not found");
755 return res;
756 }
757
758 auto progress = ctx->job_manager->get_progress(job_id);
759
760 res.code = 200;
761 res.body = progress_to_json(progress);
762 return res;
763 });
764
765 // =========================================================================
766 // Job Control Endpoints (Issue #559)
767 // =========================================================================
768
769 // POST /api/v1/jobs/<jobId>/start - Start a pending job
770 CROW_ROUTE(app, "/api/v1/jobs/<string>/start")
771 .methods(crow::HTTPMethod::POST)(
772 [ctx](const crow::request& /*req*/, const std::string& job_id) {
773 crow::response res;
774 res.add_header("Content-Type", "application/json");
775 add_cors_headers(res, *ctx);
776
777 if (!ctx->job_manager) {
778 res.code = 503;
779 res.body = make_error_json("SERVICE_UNAVAILABLE",
780 "Job manager not configured");
781 return res;
782 }
783
784 // Check if job exists
785 auto job = ctx->job_manager->get_job(job_id);
786 if (!job) {
787 res.code = 404;
788 res.body = make_error_json("NOT_FOUND", "Job not found");
789 return res;
790 }
791
792 auto result = ctx->job_manager->start_job(job_id);
793 if (!result.is_ok()) {
794 res.code = 409;
795 res.body = make_error_json("INVALID_STATE_TRANSITION",
796 result.error().message);
797 return res;
798 }
799
800 // Return updated job
801 auto updated_job = ctx->job_manager->get_job(job_id);
802 if (updated_job) {
803 res.code = 200;
804 res.body = job_to_json(*updated_job);
805 } else {
806 res.code = 200;
807 res.body = R"({"job_id":")" + json_escape(job_id) +
808 R"(","message":"Job started"})";
809 }
810 return res;
811 });
812
813 // POST /api/v1/jobs/<jobId>/pause - Pause a running job
814 CROW_ROUTE(app, "/api/v1/jobs/<string>/pause")
815 .methods(crow::HTTPMethod::POST)(
816 [ctx](const crow::request& /*req*/, const std::string& job_id) {
817 crow::response res;
818 res.add_header("Content-Type", "application/json");
819 add_cors_headers(res, *ctx);
820
821 if (!ctx->job_manager) {
822 res.code = 503;
823 res.body = make_error_json("SERVICE_UNAVAILABLE",
824 "Job manager not configured");
825 return res;
826 }
827
828 // Check if job exists
829 auto job = ctx->job_manager->get_job(job_id);
830 if (!job) {
831 res.code = 404;
832 res.body = make_error_json("NOT_FOUND", "Job not found");
833 return res;
834 }
835
836 auto result = ctx->job_manager->pause_job(job_id);
837 if (!result.is_ok()) {
838 res.code = 409;
839 res.body = make_error_json("INVALID_STATE_TRANSITION",
840 result.error().message);
841 return res;
842 }
843
844 // Return updated job
845 auto updated_job = ctx->job_manager->get_job(job_id);
846 if (updated_job) {
847 res.code = 200;
848 res.body = job_to_json(*updated_job);
849 } else {
850 res.code = 200;
851 res.body = R"({"job_id":")" + json_escape(job_id) +
852 R"(","message":"Job paused"})";
853 }
854 return res;
855 });
856
857 // POST /api/v1/jobs/<jobId>/resume - Resume a paused job
858 CROW_ROUTE(app, "/api/v1/jobs/<string>/resume")
859 .methods(crow::HTTPMethod::POST)(
860 [ctx](const crow::request& /*req*/, const std::string& job_id) {
861 crow::response res;
862 res.add_header("Content-Type", "application/json");
863 add_cors_headers(res, *ctx);
864
865 if (!ctx->job_manager) {
866 res.code = 503;
867 res.body = make_error_json("SERVICE_UNAVAILABLE",
868 "Job manager not configured");
869 return res;
870 }
871
872 // Check if job exists
873 auto job = ctx->job_manager->get_job(job_id);
874 if (!job) {
875 res.code = 404;
876 res.body = make_error_json("NOT_FOUND", "Job not found");
877 return res;
878 }
879
880 auto result = ctx->job_manager->resume_job(job_id);
881 if (!result.is_ok()) {
882 res.code = 409;
883 res.body = make_error_json("INVALID_STATE_TRANSITION",
884 result.error().message);
885 return res;
886 }
887
888 // Return updated job
889 auto updated_job = ctx->job_manager->get_job(job_id);
890 if (updated_job) {
891 res.code = 200;
892 res.body = job_to_json(*updated_job);
893 } else {
894 res.code = 200;
895 res.body = R"({"job_id":")" + json_escape(job_id) +
896 R"(","message":"Job resumed"})";
897 }
898 return res;
899 });
900
901 // POST /api/v1/jobs/<jobId>/cancel - Cancel a job
902 CROW_ROUTE(app, "/api/v1/jobs/<string>/cancel")
903 .methods(crow::HTTPMethod::POST)(
904 [ctx](const crow::request& /*req*/, const std::string& job_id) {
905 crow::response res;
906 res.add_header("Content-Type", "application/json");
907 add_cors_headers(res, *ctx);
908
909 if (!ctx->job_manager) {
910 res.code = 503;
911 res.body = make_error_json("SERVICE_UNAVAILABLE",
912 "Job manager not configured");
913 return res;
914 }
915
916 // Check if job exists
917 auto job = ctx->job_manager->get_job(job_id);
918 if (!job) {
919 res.code = 404;
920 res.body = make_error_json("NOT_FOUND", "Job not found");
921 return res;
922 }
923
924 auto result = ctx->job_manager->cancel_job(job_id);
925 if (!result.is_ok()) {
926 res.code = 409;
927 res.body = make_error_json("INVALID_STATE_TRANSITION",
928 result.error().message);
929 return res;
930 }
931
932 // Return updated job
933 auto updated_job = ctx->job_manager->get_job(job_id);
934 if (updated_job) {
935 res.code = 200;
936 res.body = job_to_json(*updated_job);
937 } else {
938 res.code = 200;
939 res.body = R"({"job_id":")" + json_escape(job_id) +
940 R"(","message":"Job cancelled"})";
941 }
942 return res;
943 });
944
945 // POST /api/v1/jobs/<jobId>/retry - Retry a failed job
946 CROW_ROUTE(app, "/api/v1/jobs/<string>/retry")
947 .methods(crow::HTTPMethod::POST)(
948 [ctx](const crow::request& /*req*/, const std::string& job_id) {
949 crow::response res;
950 res.add_header("Content-Type", "application/json");
951 add_cors_headers(res, *ctx);
952
953 if (!ctx->job_manager) {
954 res.code = 503;
955 res.body = make_error_json("SERVICE_UNAVAILABLE",
956 "Job manager not configured");
957 return res;
958 }
959
960 // Check if job exists
961 auto job = ctx->job_manager->get_job(job_id);
962 if (!job) {
963 res.code = 404;
964 res.body = make_error_json("NOT_FOUND", "Job not found");
965 return res;
966 }
967
968 auto result = ctx->job_manager->retry_job(job_id);
969 if (!result.is_ok()) {
970 res.code = 409;
971 res.body = make_error_json("INVALID_STATE_TRANSITION",
972 result.error().message);
973 return res;
974 }
975
976 // Return updated job
977 auto updated_job = ctx->job_manager->get_job(job_id);
978 if (updated_job) {
979 res.code = 200;
980 res.body = job_to_json(*updated_job);
981 } else {
982 res.code = 200;
983 res.body = R"({"job_id":")" + json_escape(job_id) +
984 R"(","message":"Job retry queued"})";
985 }
986 return res;
987 });
988
989 // =========================================================================
990 // WebSocket Endpoints (Issue #560)
991 // =========================================================================
992
993 // Register job_manager callbacks for broadcasting (only once)
994 if (ctx->job_manager && !g_callbacks_registered.exchange(true)) {
995 // Progress callback - broadcasts to all subscribers
996 ctx->job_manager->set_progress_callback(
997 [](const std::string& job_id, const client::job_progress& progress) {
998 auto message = make_progress_message(job_id, progress);
999 ws_subscriber_state::instance().broadcast_progress(job_id, message);
1000 });
1001
1002 // Completion callback - broadcasts final status
1003 ctx->job_manager->set_completion_callback(
1004 [](const std::string& job_id, const client::job_record& record) {
1005 auto message = make_completion_message(job_id, record);
1006 ws_subscriber_state::instance().broadcast_progress(job_id, message);
1007 });
1008 }
1009
1010 // WS /api/v1/jobs/<jobId>/progress/stream - Stream progress for specific job
1011 // Note: URL parameters in WebSocket routes require using onaccept to extract
1012 // the job_id from the request URL and store it in userdata
1013 CROW_WEBSOCKET_ROUTE(app, "/api/v1/jobs/<string>/progress/stream")
1014 .onaccept([ctx](const crow::request& req, void** userdata) -> bool {
1015 // Extract job_id from URL: /api/v1/jobs/{job_id}/progress/stream
1016 std::string url = req.url;
1017 // Find the job_id between /jobs/ and /progress
1018 const std::string prefix = "/api/v1/jobs/";
1019 const std::string suffix = "/progress/stream";
1020
1021 if (url.find(prefix) != 0) {
1022 return false;
1023 }
1024
1025 auto suffix_pos = url.rfind(suffix);
1026 if (suffix_pos == std::string::npos) {
1027 return false;
1028 }
1029
1030 std::string job_id = url.substr(prefix.length(),
1031 suffix_pos - prefix.length());
1032
1033 if (job_id.empty()) {
1034 return false;
1035 }
1036
1037 // Verify job manager is available
1038 if (!ctx->job_manager) {
1039 return false;
1040 }
1041
1042 // Verify job exists
1043 auto job = ctx->job_manager->get_job(job_id);
1044 if (!job) {
1045 return false;
1046 }
1047
1048 // Store job_id in userdata (allocate on heap)
1049 *userdata = new std::string(job_id);
1050 return true;
1051 })
1052 .onopen([ctx](crow::websocket::connection& conn) {
1053 auto* job_id_ptr = static_cast<std::string*>(conn.userdata());
1054 if (!job_id_ptr || !ctx->job_manager) {
1055 conn.send_text(R"({"error":"Invalid connection state"})");
1056 conn.close("Invalid state");
1057 return;
1058 }
1059
1060 const std::string& job_id = *job_id_ptr;
1061
1062 // Subscribe to job updates
1063 ws_subscriber_state::instance().add_job_subscriber(job_id, &conn);
1064
1065 // Send initial progress
1066 auto progress = ctx->job_manager->get_progress(job_id);
1067 conn.send_text(make_progress_message(job_id, progress));
1068 })
1069 .onclose([](crow::websocket::connection& conn,
1070 const std::string& /*reason*/, uint16_t /*code*/) {
1071 auto* job_id_ptr = static_cast<std::string*>(conn.userdata());
1072 if (job_id_ptr) {
1073 ws_subscriber_state::instance().remove_job_subscriber(*job_id_ptr, &conn);
1074 delete job_id_ptr;
1075 conn.userdata(nullptr);
1076 }
1077 })
1078 .onmessage([](crow::websocket::connection& /*conn*/,
1079 const std::string& /*data*/, bool /*is_binary*/) {
1080 // Client messages not expected, but handle gracefully
1081 });
1082
1083 // WS /api/v1/jobs/stream - Stream all job updates
1084 CROW_WEBSOCKET_ROUTE(app, "/api/v1/jobs/stream")
1085 .onaccept([ctx](const crow::request& /*req*/, void** /*userdata*/) -> bool {
1086 // Only accept if job_manager is available
1087 return ctx->job_manager != nullptr;
1088 })
1089 .onopen([](crow::websocket::connection& conn) {
1090 // Subscribe to all job updates
1091 ws_subscriber_state::instance().add_all_jobs_subscriber(&conn);
1092
1093 // Send initial connected message
1094 conn.send_text(R"({"type":"connected","message":"Subscribed to all job updates"})");
1095 })
1096 .onclose([](crow::websocket::connection& conn,
1097 const std::string& /*reason*/, uint16_t /*code*/) {
1098 ws_subscriber_state::instance().remove_all_jobs_subscriber(&conn);
1099 })
1100 .onmessage([](crow::websocket::connection& /*conn*/,
1101 const std::string& /*data*/, bool /*is_binary*/) {
1102 // Client messages not expected, but handle gracefully
1103 });
1104}
1105
1106} // namespace kcenon::pacs::web::endpoints
Job manager for asynchronous DICOM operations.
Job types and structures for asynchronous DICOM operations.
std::unordered_map< std::string, std::unordered_set< crow::websocket::connection * > > job_subscribers
Connections subscribed to specific job progress.
std::shared_mutex mutex
Mutex for thread-safe access.
std::unordered_set< crow::websocket::connection * > all_jobs_subscribers
Connections subscribed to all job updates.
Job management REST API and WebSocket endpoints.
job_type job_type_from_string(std::string_view str) noexcept
Parse job_type from string.
Definition job_types.h:72
@ prefetch
Prefetch prior studies.
@ store
C-STORE operation.
@ retrieve
C-MOVE/C-GET operation.
job_status job_status_from_string(std::string_view str) noexcept
Parse job_status from string.
Definition job_types.h:123
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
job_status
Current status of a job.
Definition job_types.h:90
job_priority job_priority_from_string(std::string_view str) noexcept
Parse job_priority from string.
Definition job_types.h:181
void register_jobs_endpoints_impl(crow::SimpleApp &app, std::shared_ptr< rest_server_context > ctx)
std::string make_error_json(std::string_view code, std::string_view message)
Create JSON error response body with details.
Definition rest_types.h:79
std::string json_escape(std::string_view s)
Escape a string for JSON.
Definition rest_types.h:101
Configuration for REST API server.
Common types and utilities for REST API.
Progress tracking for a job.
Definition job_types.h:211
Complete job record with all metadata.
Definition job_types.h:255
System API endpoints for REST server.