PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
task_scheduler.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16
17#include <kcenon/common/interfaces/executor_interface.h>
18
19#include <algorithm>
20#include <chrono>
21#include <fstream>
22#include <future>
23#include <iomanip>
24#include <random>
25#include <ranges>
26#include <sstream>
27
28namespace kcenon::pacs::workflow {
29
30// =============================================================================
31// Cron Schedule Implementation
32// =============================================================================
33
34auto cron_schedule::parse(const std::string& expr) -> cron_schedule {
35 cron_schedule result;
36 std::istringstream iss(expr);
37 std::vector<std::string> parts;
38 std::string part;
39
40 while (iss >> part) {
41 parts.push_back(part);
42 }
43
44 if (parts.size() >= 1) result.minute = parts[0];
45 if (parts.size() >= 2) result.hour = parts[1];
46 if (parts.size() >= 3) result.day_of_month = parts[2];
47 if (parts.size() >= 4) result.month = parts[3];
48 if (parts.size() >= 5) result.day_of_week = parts[4];
49
50 return result;
51}
52
53auto cron_schedule::to_string() const -> std::string {
54 return minute + " " + hour + " " + day_of_month + " " + month + " " +
56}
57
58auto cron_schedule::is_valid() const noexcept -> bool {
59 // Basic validation - check that fields are not empty
60 return !minute.empty() && !hour.empty() && !day_of_month.empty() &&
61 !month.empty() && !day_of_week.empty();
62}
63
64// =============================================================================
65// Construction
66// =============================================================================
67
70 const task_scheduler_config& config)
71 : database_(database)
72 , config_(config) {
73 // Schedule built-in tasks from config
74 if (config_.cleanup) {
75 schedule_cleanup(*config_.cleanup);
76 }
78 schedule_archive(*config_.archive);
79 }
81 schedule_verification(*config_.verification);
82 }
83
85 start();
86 }
87}
88
91 storage::file_storage& file_storage,
92 std::shared_ptr<kcenon::thread::thread_pool> thread_pool,
93 const task_scheduler_config& config)
94 : database_(database)
95 , file_storage_(&file_storage)
96 , thread_pool_(std::move(thread_pool))
97 , config_(config) {
98
99 // Load persisted tasks if configured
101 load_tasks();
102 }
103
104 // Schedule built-in tasks from config
105 if (config_.cleanup) {
106 schedule_cleanup(*config_.cleanup);
107 }
108 if (config_.archive) {
109 schedule_archive(*config_.archive);
110 }
112 schedule_verification(*config_.verification);
113 }
114
116 start();
117 }
118}
119
121 storage::index_database& database,
122 storage::file_storage& file_storage,
123 std::shared_ptr<kcenon::common::interfaces::IExecutor> executor,
124 const task_scheduler_config& config)
125 : database_(database)
126 , file_storage_(&file_storage)
127 , executor_(std::move(executor))
128 , config_(config) {
129
130 // Load persisted tasks if configured
132 load_tasks();
133 }
134
135 // Schedule built-in tasks from config
136 if (config_.cleanup) {
137 schedule_cleanup(*config_.cleanup);
138 }
139 if (config_.archive) {
140 schedule_archive(*config_.archive);
141 }
143 schedule_verification(*config_.verification);
144 }
145
147 start();
148 }
149}
150
152 stop(true);
153
154 // Persist tasks before destruction
155 if (!config_.persistence_path.empty()) {
156 save_tasks();
157 }
158}
159
160// =============================================================================
161// Lifecycle Management
162// =============================================================================
163
165 if (running_.exchange(true)) {
166 return; // Already running
167 }
168
169 stop_requested_.store(false);
170 start_time_ = std::chrono::steady_clock::now();
171
172 scheduler_thread_ = std::thread([this]() {
173 run_loop();
174 });
175
177 "Task scheduler started check_interval_sec={} max_concurrent={}",
178 config_.check_interval.count(),
180}
181
182void task_scheduler::stop(bool wait_for_completion) {
183 if (!running_.exchange(false)) {
184 return; // Already stopped
185 }
186
187 stop_requested_.store(true);
188
189 // Wake up the scheduler thread
190 cv_.notify_all();
191
192 if (wait_for_completion && scheduler_thread_.joinable()) {
193 scheduler_thread_.join();
194 } else if (scheduler_thread_.joinable()) {
195 scheduler_thread_.detach();
196 }
197
198 integration::logger_adapter::info("Task scheduler stopped");
199}
200
201auto task_scheduler::is_running() const noexcept -> bool {
202 return running_.load();
203}
204
205// =============================================================================
206// Task Scheduling - Cleanup
207// =============================================================================
208
210 scheduled_task task;
211 task.id = "cleanup_task";
212 task.name = "Storage Cleanup";
213 task.description = "Removes old studies based on retention policy";
215 task.task_schedule = config.cleanup_schedule;
216 task.enabled = true;
217 task.priority = 10;
218 task.tags = {"maintenance", "storage"};
219 task.callback = create_cleanup_callback(config);
220 task.created_at = std::chrono::system_clock::now();
221 task.updated_at = task.created_at;
222
223 return schedule(std::move(task));
224}
225
226// =============================================================================
227// Task Scheduling - Archive
228// =============================================================================
229
231 scheduled_task task;
232 task.id = "archive_task";
233 task.name = "Study Archival";
234 task.description = "Archives studies to secondary storage";
236 task.task_schedule = config.archive_schedule;
237 task.enabled = true;
238 task.priority = 5;
239 task.tags = {"maintenance", "archive"};
240 task.callback = create_archive_callback(config);
241 task.created_at = std::chrono::system_clock::now();
242 task.updated_at = task.created_at;
243
244 return schedule(std::move(task));
245}
246
247// =============================================================================
248// Task Scheduling - Verification
249// =============================================================================
250
252 -> task_id {
253 scheduled_task task;
254 task.id = "verification_task";
255 task.name = "Data Verification";
256 task.description = "Verifies data integrity and consistency";
258 task.task_schedule = config.verification_schedule;
259 task.enabled = true;
260 task.priority = 8;
261 task.tags = {"maintenance", "integrity"};
262 task.callback = create_verification_callback(config);
263 task.created_at = std::chrono::system_clock::now();
264 task.updated_at = task.created_at;
265
266 return schedule(std::move(task));
267}
268
269// =============================================================================
270// Task Scheduling - Custom
271// =============================================================================
272
274 const std::string& name,
275 const std::string& description,
276 std::chrono::seconds interval,
278
279 scheduled_task task;
280 task.id = generate_task_id();
281 task.name = name;
282 task.description = description;
283 task.type = task_type::custom;
284 task.task_schedule = interval_schedule{interval, std::nullopt};
285 task.enabled = true;
286 task.callback = std::move(callback);
287 task.created_at = std::chrono::system_clock::now();
288 task.updated_at = task.created_at;
289
290 return schedule(std::move(task));
291}
292
294 const std::string& name,
295 const std::string& description,
296 const cron_schedule& cron_expr,
298
299 scheduled_task task;
300 task.id = generate_task_id();
301 task.name = name;
302 task.description = description;
303 task.type = task_type::custom;
304 task.task_schedule = cron_expr;
305 task.enabled = true;
306 task.callback = std::move(callback);
307 task.created_at = std::chrono::system_clock::now();
308 task.updated_at = task.created_at;
309
310 return schedule(std::move(task));
311}
312
314 const std::string& name,
315 const std::string& description,
316 std::chrono::system_clock::time_point execute_at,
318
319 scheduled_task task;
320 task.id = generate_task_id();
321 task.name = name;
322 task.description = description;
323 task.type = task_type::custom;
324 task.task_schedule = one_time_schedule{execute_at};
325 task.enabled = true;
326 task.callback = std::move(callback);
327 task.created_at = std::chrono::system_clock::now();
328 task.updated_at = task.created_at;
329 task.next_run_at = execute_at;
330
331 return schedule(std::move(task));
332}
333
335 std::lock_guard<std::mutex> lock(tasks_mutex_);
336
337 // Calculate next run time if not set
338 if (!task.next_run_at) {
339 task.next_run_at = calculate_next_run(task.task_schedule);
340 }
341
342 auto id = task.id;
343
344 // Check if task already exists
345 auto it = tasks_.find(id);
346 if (it != tasks_.end()) {
347 // Update existing task
348 it->second = std::move(task);
350 "Updated scheduled task task_id={} name={}",
351 id, it->second.name);
352 } else {
353 // Add new task
354 tasks_.emplace(id, std::move(task));
356 "Added scheduled task task_id={} name={}",
357 id, tasks_[id].name);
358 }
359
360 // Update stats
361 {
362 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
363 stats_.scheduled_tasks = tasks_.size();
364 }
365
366 return id;
367}
368
369// =============================================================================
370// Task Management
371// =============================================================================
372
373auto task_scheduler::list_tasks() const -> std::vector<scheduled_task> {
374 std::lock_guard<std::mutex> lock(tasks_mutex_);
375
376 std::vector<scheduled_task> result;
377 result.reserve(tasks_.size());
378
379 for (const auto& [id, task] : tasks_) {
380 result.push_back(task);
381 }
382
383 return result;
384}
385
387 -> std::vector<scheduled_task> {
388 std::lock_guard<std::mutex> lock(tasks_mutex_);
389
390 std::vector<scheduled_task> result;
391
392 for (const auto& [id, task] : tasks_) {
393 if (task.type == type) {
394 result.push_back(task);
395 }
396 }
397
398 return result;
399}
400
402 -> std::vector<scheduled_task> {
403 std::lock_guard<std::mutex> lock(tasks_mutex_);
404
405 std::vector<scheduled_task> result;
406
407 for (const auto& [id, task] : tasks_) {
408 if (task.state == state) {
409 result.push_back(task);
410 }
411 }
412
413 return result;
414}
415
417 -> std::optional<scheduled_task> {
418 std::lock_guard<std::mutex> lock(tasks_mutex_);
419
420 auto it = tasks_.find(id);
421 if (it == tasks_.end()) {
422 return std::nullopt;
423 }
424
425 return it->second;
426}
427
428auto task_scheduler::cancel_task(const task_id& id) -> bool {
429 std::lock_guard<std::mutex> lock(tasks_mutex_);
430
431 auto it = tasks_.find(id);
432 if (it == tasks_.end()) {
433 return false;
434 }
435
436 it->second.state = task_state::cancelled;
437 it->second.enabled = false;
438 it->second.updated_at = std::chrono::system_clock::now();
439
441 "Cancelled scheduled task task_id={} name={}",
442 id, it->second.name);
443
444 return true;
445}
446
447auto task_scheduler::pause_task(const task_id& id) -> bool {
448 std::lock_guard<std::mutex> lock(tasks_mutex_);
449
450 auto it = tasks_.find(id);
451 if (it == tasks_.end()) {
452 return false;
453 }
454
455 if (it->second.state == task_state::running) {
456 return false; // Cannot pause a running task
457 }
458
459 it->second.state = task_state::paused;
460 it->second.updated_at = std::chrono::system_clock::now();
461
463 "Paused scheduled task task_id={} name={}",
464 id, it->second.name);
465
466 return true;
467}
468
469auto task_scheduler::resume_task(const task_id& id) -> bool {
470 std::lock_guard<std::mutex> lock(tasks_mutex_);
471
472 auto it = tasks_.find(id);
473 if (it == tasks_.end()) {
474 return false;
475 }
476
477 if (it->second.state != task_state::paused) {
478 return false; // Not paused
479 }
480
481 it->second.state = task_state::pending;
482 it->second.next_run_at = calculate_next_run(it->second.task_schedule);
483 it->second.updated_at = std::chrono::system_clock::now();
484
486 "Resumed scheduled task task_id={} name={}",
487 id, it->second.name);
488
489 return true;
490}
491
492auto task_scheduler::trigger_task(const task_id& id) -> bool {
493 std::lock_guard<std::mutex> lock(tasks_mutex_);
494
495 auto it = tasks_.find(id);
496 if (it == tasks_.end()) {
497 return false;
498 }
499
500 if (!it->second.enabled || it->second.state == task_state::running) {
501 return false;
502 }
503
504 // Set next run to now to trigger immediate execution
505 it->second.next_run_at = std::chrono::system_clock::now();
506
507 // Wake up scheduler
508 cv_.notify_one();
509
511 "Triggered immediate execution task_id={} name={}",
512 id, it->second.name);
513
514 return true;
515}
516
518 -> bool {
519 std::lock_guard<std::mutex> lock(tasks_mutex_);
520
521 auto it = tasks_.find(id);
522 if (it == tasks_.end()) {
523 return false;
524 }
525
526 it->second.task_schedule = new_schedule;
527 it->second.next_run_at = calculate_next_run(new_schedule);
528 it->second.updated_at = std::chrono::system_clock::now();
529
531 "Updated schedule for task task_id={} name={}",
532 id, it->second.name);
533
534 return true;
535}
536
537// =============================================================================
538// Execution History
539// =============================================================================
540
542 const task_id& id,
543 std::size_t limit) const -> std::vector<task_execution_record> {
544 std::lock_guard<std::mutex> lock(history_mutex_);
545
546 auto it = execution_history_.find(id);
547 if (it == execution_history_.end()) {
548 return {};
549 }
550
551 const auto& records = it->second;
552 if (records.size() <= limit) {
553 return records;
554 }
555
556 // Return most recent records
557 return std::vector<task_execution_record>(
558 records.end() - static_cast<std::ptrdiff_t>(limit),
559 records.end());
560}
561
562auto task_scheduler::get_recent_executions(std::size_t limit) const
563 -> std::vector<task_execution_record> {
564 std::lock_guard<std::mutex> lock(history_mutex_);
565
566 std::vector<task_execution_record> all_records;
567
568 for (const auto& [id, records] : execution_history_) {
569 all_records.insert(all_records.end(), records.begin(), records.end());
570 }
571
572 // Sort by start time (most recent first)
573 std::ranges::sort(all_records, [](const auto& a, const auto& b) {
574 return a.started_at > b.started_at;
575 });
576
577 if (all_records.size() <= limit) {
578 return all_records;
579 }
580
581 all_records.resize(limit);
582 return all_records;
583}
584
585void task_scheduler::clear_history(const task_id& id, std::size_t keep_last) {
586 std::lock_guard<std::mutex> lock(history_mutex_);
587
588 auto it = execution_history_.find(id);
589 if (it == execution_history_.end()) {
590 return;
591 }
592
593 auto& records = it->second;
594 if (records.size() <= keep_last) {
595 return;
596 }
597
598 // Keep only the last N records
599 records.erase(
600 records.begin(),
601 records.begin() + static_cast<std::ptrdiff_t>(records.size() - keep_last));
602}
603
604// =============================================================================
605// Statistics and Monitoring
606// =============================================================================
607
609 std::lock_guard<std::mutex> lock(stats_mutex_);
610
611 scheduler_stats result = stats_;
612
613 // Calculate uptime
614 if (running_.load()) {
615 auto now = std::chrono::steady_clock::now();
616 result.uptime = std::chrono::duration_cast<std::chrono::seconds>(
617 now - start_time_);
618 }
619
620 result.running_tasks = running_count_.load();
621
622 {
623 std::lock_guard<std::mutex> tasks_lock(tasks_mutex_);
624 result.scheduled_tasks = tasks_.size();
625 }
626
627 return result;
628}
629
630auto task_scheduler::pending_count() const noexcept -> std::size_t {
631 std::lock_guard<std::mutex> lock(tasks_mutex_);
632
633 std::size_t count = 0;
634 for (const auto& [id, task] : tasks_) {
635 if (task.enabled && task.state == task_state::pending) {
636 ++count;
637 }
638 }
639 return count;
640}
641
642auto task_scheduler::running_count() const noexcept -> std::size_t {
643 return running_count_.load();
644}
645
646// =============================================================================
647// Persistence
648// =============================================================================
649
650auto task_scheduler::save_tasks() const -> bool {
651 if (config_.persistence_path.empty()) {
652 return false;
653 }
654
655 try {
656 std::string json = serialize_tasks();
657
658 std::ofstream file(config_.persistence_path);
659 if (!file) {
661 "Failed to open persistence file path={}",
663 return false;
664 }
665
666 file << json;
667 file.close();
668
670 "Saved tasks to persistence path={}",
672
673 return true;
674 } catch (const std::exception& e) {
676 "Failed to save tasks error={}", e.what());
677 return false;
678 }
679}
680
681auto task_scheduler::load_tasks() -> std::size_t {
682 if (config_.persistence_path.empty()) {
683 return 0;
684 }
685
686 try {
687 std::ifstream file(config_.persistence_path);
688 if (!file) {
690 "No persistence file found path={}",
691 config_.persistence_path);
692 return 0;
693 }
694
695 std::string json((std::istreambuf_iterator<char>(file)),
696 std::istreambuf_iterator<char>());
697 file.close();
698
699 std::size_t count = deserialize_tasks(json);
700
702 "Loaded tasks from persistence path={} count={}",
703 config_.persistence_path, count);
704
705 return count;
706 } catch (const std::exception& e) {
708 "Failed to load tasks error={}", e.what());
709 return 0;
710 }
711}
712
713// =============================================================================
714// Configuration
715// =============================================================================
716
719 std::lock_guard<std::mutex> lock(mutex_);
720 config_.on_task_complete = std::move(callback);
721}
722
725 std::lock_guard<std::mutex> lock(mutex_);
726 config_.on_task_error = std::move(callback);
727}
728
729// =============================================================================
730// Internal Methods
731// =============================================================================
732
734 integration::logger_adapter::debug("Task scheduler thread started");
735
736 while (!stop_requested_.load()) {
737 std::unique_lock<std::mutex> lock(mutex_);
738
739 // Wait for check interval or until woken up
740 cv_.wait_for(lock, config_.check_interval, [this]() {
741 return stop_requested_.load();
742 });
743
744 if (stop_requested_.load()) {
745 break;
746 }
747
748 lock.unlock();
750 }
751
752 integration::logger_adapter::debug("Task scheduler thread stopped");
753}
754
756 auto now = std::chrono::system_clock::now();
757 std::vector<task_id> due_tasks;
758
759 // Find due tasks
760 {
761 std::lock_guard<std::mutex> lock(tasks_mutex_);
762
763 for (auto& [id, task] : tasks_) {
764 if (!task.enabled || task.state != task_state::pending) {
765 continue;
766 }
767
768 if (task.next_run_at && *task.next_run_at <= now) {
769 due_tasks.push_back(id);
770 }
771 }
772 }
773
774 if (due_tasks.empty()) {
775 return;
776 }
777
778 // Sort by priority (higher first)
779 {
780 std::lock_guard<std::mutex> lock(tasks_mutex_);
781 std::ranges::sort(due_tasks, [this](const auto& a, const auto& b) {
782 return tasks_[a].priority > tasks_[b].priority;
783 });
784 }
785
786 // Execute tasks (respecting max concurrent limit)
787 std::size_t executed = 0;
788 std::size_t succeeded = 0;
789 std::size_t failed = 0;
790
791 for (const auto& id : due_tasks) {
793 break; // Max concurrent reached
794 }
795
796 std::lock_guard<std::mutex> lock(tasks_mutex_);
797 auto it = tasks_.find(id);
798 if (it == tasks_.end()) {
799 continue;
800 }
801
802 auto& task = it->second;
803
804 // Mark as running
805 task.state = task_state::running;
807
808 // Execute task
809 auto record = execute_task(task);
810
811 // Update task state
812 task.state = (record.state == task_state::completed)
814 : record.state;
815 task.last_run_at = record.started_at;
816 task.last_execution = record;
817 ++task.execution_count;
818
819 if (record.state == task_state::completed) {
820 ++task.success_count;
821 ++succeeded;
822 } else {
823 ++task.failure_count;
824 ++failed;
825 }
826
827 // Calculate next run time
828 if (task.enabled && task.state == task_state::pending) {
829 task.next_run_at = calculate_next_run(
830 task.task_schedule,
831 std::chrono::system_clock::now());
832
833 // For one-time tasks, disable after execution
834 if (std::holds_alternative<one_time_schedule>(task.task_schedule)) {
835 task.enabled = false;
836 }
837 }
838
840 ++executed;
841
842 // Record execution history
843 record_execution(id, record);
844 update_stats(record);
845
846 // Invoke callbacks
848 config_.on_task_complete(id, record);
849 }
850 if (record.state == task_state::failed && config_.on_task_error) {
851 config_.on_task_error(id, record.error_message.value_or("Unknown error"));
852 }
853 }
854
855 // Log cycle summary
856 if (executed > 0) {
858 "Scheduler cycle completed executed={} succeeded={} failed={}",
859 executed, succeeded, failed);
860
861 // Record metrics
863 "scheduler_tasks_executed", static_cast<int64_t>(executed));
865 "scheduler_tasks_succeeded", static_cast<int64_t>(succeeded));
867 "scheduler_tasks_failed", static_cast<int64_t>(failed));
868
869 // Update stats
870 {
871 std::lock_guard<std::mutex> lock(stats_mutex_);
872 stats_.last_cycle_at = std::chrono::system_clock::now();
873 }
874
875 // Invoke cycle callback
877 config_.on_cycle_complete(executed, succeeded, failed);
878 }
879 }
880}
881
885 record.execution_id = generate_execution_id();
886 record.task_id = task.id;
887 record.started_at = std::chrono::system_clock::now();
888 record.state = task_state::running;
889
891 "Executing task task_id={} name={}",
892 task.id, task.name);
893
894 auto start_time = std::chrono::steady_clock::now();
895
896 // Retry logic: attempt up to (max_retries + 1) times
897 std::size_t attempt = 0;
898 const std::size_t max_attempts = task.max_retries + 1;
899
900 while (attempt < max_attempts) {
901 attempt++;
902
903 if (attempt > 1) {
905 "Retrying task task_id={} attempt={}/{}",
906 task.id, attempt, max_attempts);
907
908 // Wait before retry
909 std::this_thread::sleep_for(task.retry_delay);
910 }
911
912 try {
913 if (!task.callback) {
914 record.state = task_state::failed;
915 record.error_message = "No callback defined";
916 break; // No point retrying without callback
917 }
918
919 // Execute with timeout if configured
920 if (task.timeout.count() > 0) {
921 // Run callback in async task with timeout
922 auto future = std::async(std::launch::async, task.callback);
923
924 auto status = future.wait_for(task.timeout);
925 if (status == std::future_status::timeout) {
926 record.state = task_state::failed;
927 record.error_message = "Task execution timed out after " +
928 std::to_string(task.timeout.count()) + " seconds";
930 "Task timed out task_id={} timeout_seconds={}",
931 task.id, task.timeout.count());
932 // Continue to retry if attempts remain
933 continue;
934 }
935
936 // Get result (may throw)
937 auto result = future.get();
938 if (result.has_value()) {
939 record.state = task_state::failed;
940 record.error_message = *result;
941 } else {
942 record.state = task_state::completed;
943 break; // Success, no need to retry
944 }
945 } else {
946 // No timeout, execute directly
947 auto result = task.callback();
948
949 if (result.has_value()) {
950 record.state = task_state::failed;
951 record.error_message = *result;
952 } else {
953 record.state = task_state::completed;
954 break; // Success, no need to retry
955 }
956 }
957 } catch (const std::exception& e) {
958 record.state = task_state::failed;
959 record.error_message = e.what();
961 "Task execution failed task_id={} attempt={} error={}",
962 task.id, attempt, e.what());
963 }
964
965 // If we've exhausted all retries and still failed
966 if (attempt >= max_attempts && record.state == task_state::failed) {
968 "Task failed after {} attempts task_id={}",
969 max_attempts, task.id);
970 }
971 }
972
973 record.ended_at = std::chrono::system_clock::now();
974
975 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
976 std::chrono::steady_clock::now() - start_time);
977
979 "Task execution completed task_id={} name={} state={} duration_ms={} attempts={}",
980 task.id, task.name, to_string(record.state), duration.count(), attempt);
981
982 // Record metrics
984 "scheduler_task_duration_ms",
985 static_cast<double>(duration.count()));
986
987 if (attempt > 1) {
989 "scheduler_task_retries_total");
990 }
991
992 return record;
993}
994
997 std::chrono::system_clock::time_point from) const
998 -> std::optional<std::chrono::system_clock::time_point> {
999
1000 return std::visit([this, from](const auto& s) ->
1001 std::optional<std::chrono::system_clock::time_point> {
1002 using T = std::decay_t<decltype(s)>;
1003
1004 if constexpr (std::is_same_v<T, interval_schedule>) {
1005 if (s.start_at && *s.start_at > from) {
1006 return s.start_at;
1007 }
1008 return from + s.interval;
1009 } else if constexpr (std::is_same_v<T, cron_schedule>) {
1010 return calculate_next_cron_run(s, from);
1011 } else if constexpr (std::is_same_v<T, one_time_schedule>) {
1012 if (s.execute_at > from) {
1013 return s.execute_at;
1014 }
1015 return std::nullopt; // Already passed
1016 }
1017
1018 return std::nullopt;
1019 }, sched);
1020}
1021
1023 const cron_schedule& cron,
1024 std::chrono::system_clock::time_point from) const
1025 -> std::optional<std::chrono::system_clock::time_point> {
1026
1027 // Simple cron calculation - advance minute by minute until match
1028 // This is a simplified implementation; production would use more
1029 // efficient algorithms
1030
1031 auto time = from + std::chrono::minutes{1}; // Start from next minute
1032 auto end_time = from + std::chrono::hours{24 * 365}; // Max 1 year ahead
1033
1034 // Helper to parse cron field
1035 auto matches_field = [](const std::string& field, int value, [[maybe_unused]] int max) -> bool {
1036 if (field == "*") return true;
1037
1038 // Check for step values (*/n)
1039 if (field.starts_with("*/")) {
1040 int step = std::stoi(field.substr(2));
1041 return (value % step) == 0;
1042 }
1043
1044 // Check for range (n-m)
1045 auto dash_pos = field.find('-');
1046 if (dash_pos != std::string::npos) {
1047 int start = std::stoi(field.substr(0, dash_pos));
1048 int end = std::stoi(field.substr(dash_pos + 1));
1049 return value >= start && value <= end;
1050 }
1051
1052 // Check for list (n,m,o)
1053 if (field.find(',') != std::string::npos) {
1054 std::istringstream iss(field);
1055 std::string item;
1056 while (std::getline(iss, item, ',')) {
1057 if (std::stoi(item) == value) {
1058 return true;
1059 }
1060 }
1061 return false;
1062 }
1063
1064 // Single value
1065 return std::stoi(field) == value;
1066 };
1067
1068 while (time < end_time) {
1069 auto time_t = std::chrono::system_clock::to_time_t(time);
1070 std::tm tm = *std::localtime(&time_t);
1071
1072 bool matches =
1073 matches_field(cron.minute, tm.tm_min, 59) &&
1074 matches_field(cron.hour, tm.tm_hour, 23) &&
1075 matches_field(cron.day_of_month, tm.tm_mday, 31) &&
1076 matches_field(cron.month, tm.tm_mon + 1, 12) &&
1077 matches_field(cron.day_of_week, tm.tm_wday, 6);
1078
1079 if (matches) {
1080 // Round to start of minute
1081 auto duration = time.time_since_epoch();
1082 auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
1083 return std::chrono::system_clock::time_point(minutes);
1084 }
1085
1086 time += std::chrono::minutes{1};
1087 }
1088
1089 return std::nullopt;
1090}
1091
1093 auto id = next_task_id_.fetch_add(1);
1094 return "task_" + std::to_string(id);
1095}
1096
1097auto task_scheduler::generate_execution_id() const -> std::string {
1098 auto id = next_execution_id_.fetch_add(1);
1099
1100 // Add timestamp for uniqueness
1101 auto now = std::chrono::system_clock::now();
1102 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
1103 now.time_since_epoch()).count();
1104
1105 return "exec_" + std::to_string(millis) + "_" + std::to_string(id);
1106}
1107
1110 return [this, config]() -> std::optional<std::string> {
1112 "Running cleanup task retention_days={}",
1113 config.default_retention.count());
1114
1115 try {
1116 // Calculate cutoff date
1117 auto now = std::chrono::system_clock::now();
1118 auto cutoff = now - config.default_retention;
1119
1120 // Convert to YYYYMMDD format for database query
1121 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1122 std::tm tm = *std::localtime(&cutoff_time_t);
1123 std::ostringstream date_oss;
1124 date_oss << std::put_time(&tm, "%Y%m%d");
1125 std::string cutoff_date = date_oss.str();
1126
1127 // Query for studies older than cutoff
1129 query.study_date_to = cutoff_date;
1130 query.limit = config.max_deletions_per_cycle;
1131
1132 auto studies_result = database_.search_studies(query);
1133 if (!studies_result.is_ok()) {
1135 "Failed to query studies error={}",
1136 studies_result.error().message);
1137 return "Cleanup failed: " + studies_result.error().message;
1138 }
1139
1140 std::size_t deleted_count = 0;
1141 std::size_t skipped_count = 0;
1142
1143 for (const auto& study : studies_result.value()) {
1144 // Check modality-specific retention
1145 auto modality_retention = config.retention_for(study.modalities_in_study);
1146 auto modality_cutoff = now - modality_retention;
1147 auto modality_cutoff_t = std::chrono::system_clock::to_time_t(modality_cutoff);
1148 std::tm mod_tm = *std::localtime(&modality_cutoff_t);
1149 std::ostringstream mod_oss;
1150 mod_oss << std::put_time(&mod_tm, "%Y%m%d");
1151 std::string modality_cutoff_date = mod_oss.str();
1152
1153 // Skip if study is newer than modality-specific retention
1154 if (study.study_date > modality_cutoff_date) {
1155 ++skipped_count;
1156 continue;
1157 }
1158
1159 // Check exclusion patterns
1160 bool excluded = false;
1161 for (const auto& pattern : config.exclude_patterns) {
1162 if (study.study_description.find(pattern) != std::string::npos) {
1163 excluded = true;
1164 break;
1165 }
1166 }
1167 if (excluded) {
1168 ++skipped_count;
1169 continue;
1170 }
1171
1172 if (config.dry_run) {
1174 "Dry-run: would delete study study_uid={} study_date={} modality={}",
1175 study.study_uid, study.study_date, study.modalities_in_study);
1176 ++deleted_count;
1177 continue;
1178 }
1179
1180 // Delete files from storage if not database_only
1181 if (!config.database_only && file_storage_ != nullptr) {
1182 auto file_paths_result = database_.get_study_files(study.study_uid);
1183 if (file_paths_result.is_ok()) {
1184 for (const auto& file_path : file_paths_result.value()) {
1185 // Extract SOP UID from file path for removal
1186 std::filesystem::path p(file_path);
1187 std::string sop_uid = p.stem().string();
1188 auto remove_result = file_storage_->remove(sop_uid);
1189 if (remove_result.is_err()) {
1191 "Failed to remove file file_path={} error={}",
1192 file_path, remove_result.error().message);
1193 }
1194 }
1195 }
1196 }
1197
1198 // Delete from database
1199 auto delete_result = database_.delete_study(study.study_uid);
1200 if (delete_result.is_err()) {
1202 "Failed to delete study study_uid={} error={}",
1203 study.study_uid, delete_result.error().message);
1204 continue;
1205 }
1206
1207 ++deleted_count;
1209 "Deleted study study_uid={} study_date={}",
1210 study.study_uid, study.study_date);
1211 }
1212
1214 "Cleanup task completed deleted={} skipped={} dry_run={}",
1215 deleted_count, skipped_count, config.dry_run);
1216
1217 return std::nullopt; // Success
1218 } catch (const std::exception& e) {
1219 return std::string("Cleanup failed: ") + e.what();
1220 }
1221 };
1222}
1223
1226 return [this, config]() -> std::optional<std::string> {
1228 "Running archive task archive_after_days={} destination={}",
1229 config.archive_after.count(),
1230 config.destination);
1231
1232 try {
1233 // Calculate cutoff date
1234 auto now = std::chrono::system_clock::now();
1235 auto cutoff = now - config.archive_after;
1236
1237 // Convert to YYYYMMDD format
1238 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1239 std::tm tm = *std::localtime(&cutoff_time_t);
1240 std::ostringstream date_oss;
1241 date_oss << std::put_time(&tm, "%Y%m%d");
1242 std::string cutoff_date = date_oss.str();
1243
1244 // Query for studies older than cutoff
1246 query.study_date_to = cutoff_date;
1247 query.limit = config.max_archives_per_cycle;
1248
1249 auto studies_result = database_.search_studies(query);
1250 if (!studies_result.is_ok()) {
1252 "Failed to query studies error={}",
1253 studies_result.error().message);
1254 return "Archive failed: " + studies_result.error().message;
1255 }
1256
1257 std::size_t archived_count = 0;
1258 std::size_t failed_count = 0;
1259
1260 // Create destination directory if needed
1261 std::filesystem::path dest_path(config.destination);
1262 if (!std::filesystem::exists(dest_path)) {
1263 std::filesystem::create_directories(dest_path);
1264 }
1265
1266 for (const auto& study : studies_result.value()) {
1267 // Get all files for this study
1268 auto file_paths_result = database_.get_study_files(study.study_uid);
1269 if (!file_paths_result.is_ok() || file_paths_result.value().empty()) {
1270 continue;
1271 }
1272 const auto& file_paths = file_paths_result.value();
1273
1274 // Create study archive directory
1275 std::filesystem::path study_dest = dest_path / study.study_uid;
1276 if (!std::filesystem::exists(study_dest)) {
1277 std::filesystem::create_directories(study_dest);
1278 }
1279
1280 bool archive_success = true;
1281
1282 for (const auto& src_file : file_paths) {
1283 std::filesystem::path src_path(src_file);
1284 if (!std::filesystem::exists(src_path)) {
1286 "Source file not found file_path={}", src_file);
1287 continue;
1288 }
1289
1290 // Determine destination filename
1291 std::filesystem::path dest_file = study_dest / src_path.filename();
1292
1293 try {
1294 // Copy file to archive location
1295 std::filesystem::copy_file(
1296 src_path, dest_file,
1297 std::filesystem::copy_options::overwrite_existing);
1298
1299 // Verify copy if configured
1300 if (config.verify_after_archive) {
1301 auto src_size = std::filesystem::file_size(src_path);
1302 auto dest_size = std::filesystem::file_size(dest_file);
1303 if (src_size != dest_size) {
1305 "Archive verification failed: size mismatch "
1306 "src={} dest={}", src_file, dest_file.string());
1307 archive_success = false;
1308 break;
1309 }
1310 }
1311 } catch (const std::filesystem::filesystem_error& e) {
1313 "Failed to archive file src={} dest={} error={}",
1314 src_file, dest_file.string(), e.what());
1315 archive_success = false;
1316 break;
1317 }
1318 }
1319
1320 if (archive_success) {
1321 ++archived_count;
1322
1323 // Delete originals if configured
1324 if (config.delete_after_archive && file_storage_ != nullptr) {
1325 for (const auto& file_path : file_paths) {
1326 std::filesystem::path p(file_path);
1327 std::string sop_uid = p.stem().string();
1328 (void)file_storage_->remove(sop_uid);
1329 }
1330 (void)database_.delete_study(study.study_uid);
1331 }
1332
1334 "Archived study study_uid={} files={}",
1335 study.study_uid, file_paths.size());
1336 } else {
1337 ++failed_count;
1338 }
1339 }
1340
1342 "Archive task completed archived={} failed={}",
1343 archived_count, failed_count);
1344
1345 if (failed_count > 0) {
1346 return "Archive completed with " + std::to_string(failed_count) + " failures";
1347 }
1348
1349 return std::nullopt; // Success
1350 } catch (const std::exception& e) {
1351 return std::string("Archive failed: ") + e.what();
1352 }
1353 };
1354}
1355
1358 return [this, config]() -> std::optional<std::string> {
1360 "Running verification task check_checksums={} check_db={}",
1361 config.check_checksums,
1362 config.check_db_consistency);
1363
1364 try {
1365 std::size_t verified = 0;
1366 std::size_t errors = 0;
1367 std::size_t missing_files = 0;
1368
1369 // Database integrity check
1370 if (config.check_db_consistency) {
1371 auto db_result = database_.verify_integrity();
1372 if (db_result.is_err()) {
1374 "Database integrity check failed error={}",
1375 db_result.error().message);
1376 ++errors;
1377 } else {
1378 integration::logger_adapter::debug("Database integrity check passed");
1379 }
1380 }
1381
1382 // Get studies for verification
1384 query.limit = config.max_verifications_per_cycle;
1385 auto studies_result = database_.search_studies(query);
1386 if (!studies_result.is_ok()) {
1388 "Failed to query studies error={}",
1389 studies_result.error().message);
1390 return "Verification failed: " + studies_result.error().message;
1391 }
1392
1393 for (const auto& study : studies_result.value()) {
1394 // Get all files for this study
1395 auto file_paths_result = database_.get_study_files(study.study_uid);
1396 if (!file_paths_result.is_ok()) {
1398 "Failed to get study files study_uid={} error={}",
1399 study.study_uid, file_paths_result.error().message);
1400 ++errors;
1401 continue;
1402 }
1403
1404 for (const auto& file_path : file_paths_result.value()) {
1405 std::filesystem::path path(file_path);
1406
1407 // Check file existence
1408 if (!std::filesystem::exists(path)) {
1409 ++missing_files;
1411 "Missing file detected file_path={} study_uid={}",
1412 file_path, study.study_uid);
1413
1414 if (config.repair_on_failure) {
1415 // Extract SOP UID and remove orphaned database record
1416 std::string sop_uid = path.stem().string();
1417 (void)database_.delete_instance(sop_uid);
1419 "Removed orphaned database record sop_uid={}", sop_uid);
1420 }
1421 continue;
1422 }
1423
1424 // Checksum verification
1425 if (config.check_checksums) {
1426 // Get file size as basic verification
1427 // Full checksum verification would require storing checksums in DB
1428 try {
1429 auto file_size = std::filesystem::file_size(path);
1430 if (file_size == 0) {
1431 ++errors;
1433 "Empty file detected file_path={}", file_path);
1434 }
1435 } catch (const std::filesystem::filesystem_error& e) {
1436 ++errors;
1438 "Cannot read file file_path={} error={}",
1439 file_path, e.what());
1440 }
1441 }
1442
1443 ++verified;
1444 }
1445 }
1446
1447 // File storage integrity check
1448 if (file_storage_ != nullptr) {
1449 auto storage_result = file_storage_->verify_integrity();
1450 if (storage_result.is_err()) {
1452 "Storage integrity check reported issues error={}",
1453 storage_result.error().message);
1454 }
1455 }
1456
1458 "Verification task completed verified={} errors={} missing_files={}",
1459 verified, errors, missing_files);
1460
1461 if (errors > 0 || missing_files > 0) {
1462 std::ostringstream oss;
1463 oss << "Verification found " << errors << " errors and "
1464 << missing_files << " missing files";
1465 return oss.str();
1466 }
1467
1468 return std::nullopt; // Success
1469 } catch (const std::exception& e) {
1470 return std::string("Verification failed: ") + e.what();
1471 }
1472 };
1473}
1474
1476 const task_execution_record& record) {
1477 std::lock_guard<std::mutex> lock(history_mutex_);
1478
1479 auto& history = execution_history_[task_id];
1480 history.push_back(record);
1481
1482 // Limit history size (keep last 1000 records per task)
1483 constexpr std::size_t max_history = 1000;
1484 if (history.size() > max_history) {
1485 history.erase(
1486 history.begin(),
1487 history.begin() + static_cast<std::ptrdiff_t>(history.size() - max_history));
1488 }
1489}
1490
1492 std::lock_guard<std::mutex> lock(stats_mutex_);
1493
1495
1496 if (record.state == task_state::completed) {
1498 } else if (record.state == task_state::failed) {
1500 } else if (record.state == task_state::cancelled) {
1502 }
1503
1504 // Update average and max execution time
1505 if (auto dur = record.duration()) {
1506 auto ms = dur->count();
1507
1508 // Update max
1509 if (ms > stats_.max_execution_time.count()) {
1510 stats_.max_execution_time = std::chrono::milliseconds(ms);
1511 }
1512
1513 // Update running average
1514 if (stats_.total_executions == 1) {
1515 stats_.avg_execution_time = std::chrono::milliseconds(ms);
1516 } else {
1517 auto current_avg = stats_.avg_execution_time.count();
1518 auto new_avg = current_avg +
1519 (ms - current_avg) /
1520 static_cast<int64_t>(stats_.total_executions);
1521 stats_.avg_execution_time = std::chrono::milliseconds(new_avg);
1522 }
1523 }
1524}
1525
1526auto task_scheduler::serialize_tasks() const -> std::string {
1527 // Simple JSON serialization
1528 // In production, use a proper JSON library
1529
1530 std::lock_guard<std::mutex> lock(tasks_mutex_);
1531
1532 std::ostringstream oss;
1533 oss << "{\n \"tasks\": [\n";
1534
1535 bool first = true;
1536 for (const auto& [id, task] : tasks_) {
1537 if (!first) oss << ",\n";
1538 first = false;
1539
1540 oss << " {\n";
1541 oss << " \"id\": \"" << task.id << "\",\n";
1542 oss << " \"name\": \"" << task.name << "\",\n";
1543 oss << " \"type\": \"" << to_string(task.type) << "\",\n";
1544 oss << " \"enabled\": " << (task.enabled ? "true" : "false") << ",\n";
1545 oss << " \"priority\": " << task.priority << "\n";
1546 oss << " }";
1547 }
1548
1549 oss << "\n ]\n}";
1550
1551 return oss.str();
1552}
1553
1554auto task_scheduler::deserialize_tasks(const std::string& json) -> std::size_t {
1555 // Note: Full deserialization is not supported without a proper JSON library.
1556 //
1557 // Limitations:
1558 // 1. Callbacks cannot be serialized/deserialized (function pointers)
1559 // 2. Schedule details are not currently serialized
1560 // 3. Tags and other complex fields are not serialized
1561 //
1562 // For production use, consider:
1563 // 1. Using nlohmann/json or similar library for proper JSON parsing
1564 // 2. Storing task configurations separately and re-creating tasks on startup
1565 // 3. Using a database-backed persistence layer
1566 //
1567 // Current implementation returns 0 (no tasks loaded) to indicate
1568 // that manual task registration is required after restart.
1569
1570 if (json.empty()) {
1571 return 0;
1572 }
1573
1575 "deserialize_tasks called but full deserialization is not implemented. "
1576 "Tasks must be re-registered programmatically after scheduler restart.");
1577
1578 return 0;
1579}
1580
1581} // namespace kcenon::pacs::workflow
if(!color.empty()) style.color
static void debug(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a debug-level message.
static void info(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an info-level message.
static void warn(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a warning-level message.
static void error(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an error-level message.
static void record_histogram(std::string_view name, double value)
Record a histogram sample.
static void increment_counter(std::string_view name, std::int64_t value=1)
Increment a counter metric.
std::atomic< uint64_t > next_task_id_
Next task ID counter.
void execute_cycle()
Execute a single scheduler cycle.
auto generate_execution_id() const -> std::string
Generate unique execution ID.
std::map< task_id, scheduled_task > tasks_
All scheduled tasks (id -> task)
void update_stats(const task_execution_record &record)
Update statistics after execution.
auto deserialize_tasks(const std::string &json) -> std::size_t
Deserialize tasks from JSON.
auto execute_task(scheduled_task &task) -> task_execution_record
Execute a single task.
std::condition_variable cv_
Condition variable for sleep/wake.
auto pause_task(const task_id &id) -> bool
Pause a scheduled task.
std::atomic< bool > running_
Flag indicating scheduler is running.
auto cancel_task(const task_id &id) -> bool
Cancel a scheduled task.
auto create_verification_callback(const verification_config &config) -> task_callback_with_result
Create verification task callback.
auto save_tasks() const -> bool
Save all tasks to persistence storage.
std::mutex mutex_
Mutex for thread synchronization.
std::mutex stats_mutex_
Mutex for statistics.
task_scheduler(storage::index_database &database, const task_scheduler_config &config={})
Construct task scheduler.
task_scheduler_config config_
Service configuration.
auto running_count() const noexcept -> std::size_t
Get number of running tasks.
auto calculate_next_cron_run(const cron_schedule &cron, std::chrono::system_clock::time_point from) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for cron schedule.
auto create_archive_callback(const archive_config &config) -> task_callback_with_result
Create archive task callback.
auto serialize_tasks() const -> std::string
Serialize tasks to JSON.
void set_task_complete_callback(task_scheduler_config::task_complete_callback callback)
Set the task complete callback.
void set_error_callback(task_scheduler_config::task_error_callback callback)
Set the error callback.
auto is_running() const noexcept -> bool
Check if the scheduler is running.
std::thread scheduler_thread_
Background scheduler thread.
auto update_schedule(const task_id &id, const kcenon::pacs::workflow::schedule &new_schedule) -> bool
Update task schedule.
std::mutex tasks_mutex_
Mutex for tasks map.
auto schedule_verification(const verification_config &config) -> task_id
Schedule verification task.
auto get_stats() const -> scheduler_stats
Get scheduler statistics.
auto list_tasks() const -> std::vector< scheduled_task >
List all scheduled tasks.
void clear_history(const task_id &id, std::size_t keep_last=0)
Clear execution history for a task.
~task_scheduler()
Destructor - ensures graceful shutdown.
auto get_execution_history(const task_id &id, std::size_t limit=100) const -> std::vector< task_execution_record >
Get execution history for a task.
auto schedule_archive(const archive_config &config) -> task_id
Schedule archive task.
void run_loop()
Background thread main loop.
auto generate_task_id() const -> task_id
Generate unique task ID.
auto calculate_next_run(const kcenon::pacs::workflow::schedule &sched, std::chrono::system_clock::time_point from=std::chrono::system_clock::now()) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for a schedule.
void record_execution(const task_id &task_id, const task_execution_record &record)
Record task execution.
auto resume_task(const task_id &id) -> bool
Resume a paused task.
auto get_recent_executions(std::size_t limit=100) const -> std::vector< task_execution_record >
Get all recent executions.
auto load_tasks() -> std::size_t
Load tasks from persistence storage.
std::mutex history_mutex_
Mutex for execution history.
std::map< task_id, std::vector< task_execution_record > > execution_history_
Execution history (task_id -> records)
auto get_task(const task_id &id) const -> std::optional< scheduled_task >
Get a specific task by ID.
auto schedule_cleanup(const cleanup_config &config) -> task_id
Schedule cleanup task.
std::atomic< std::size_t > running_count_
Running task count.
auto pending_count() const noexcept -> std::size_t
Get number of pending tasks.
scheduler_stats stats_
Scheduler statistics.
auto schedule(const std::string &name, const std::string &description, std::chrono::seconds interval, task_callback_with_result callback) -> task_id
Schedule a custom task with interval.
std::atomic< uint64_t > next_execution_id_
Next execution ID counter.
auto schedule_once(const std::string &name, const std::string &description, std::chrono::system_clock::time_point execute_at, task_callback_with_result callback) -> task_id
Schedule a one-time task.
void stop(bool wait_for_completion=true)
Stop the scheduler service.
auto trigger_task(const task_id &id) -> bool
Trigger immediate execution of a task.
std::chrono::steady_clock::time_point start_time_
Start time for uptime calculation.
auto create_cleanup_callback(const cleanup_config &config) -> task_callback_with_result
Create cleanup task callback.
void start()
Start the scheduler service.
std::atomic< bool > stop_requested_
Flag to signal shutdown.
Adapter for integrating common_system's IExecutor interface.
Filesystem-based DICOM storage with hierarchical organization.
PACS index database for metadata storage and retrieval.
Adapter for DICOM audit logging using logger_system.
Adapter for PACS performance metrics and distributed tracing.
task_state
Task execution state.
@ running
Currently executing.
@ pending
Waiting for scheduled time.
@ completed
Completed successfully.
std::function< std::optional< std::string >()> task_callback_with_result
Task callback with result details.
task_type
Task type enumeration.
@ archive
Study archival task.
@ cleanup
Storage cleanup task.
@ verification
Data integrity verification.
std::string task_id
Unique task identifier.
auto to_string(lock_type type) -> std::string
Convert lock_type to string.
std::variant< interval_schedule, cron_schedule, one_time_schedule > schedule
Combined schedule type.
Configuration for archive scheduling.
Configuration for cleanup scheduling.
static auto parse(const std::string &expr) -> cron_schedule
Parse a cron expression string.
std::string minute
Minute (0-59, or "*")
std::string month
Month (1-12, or "*")
std::string day_of_month
Day of month (1-31, or "*")
auto to_string() const -> std::string
Convert to cron expression string.
std::string hour
Hour (0-23, or "*")
std::string day_of_week
Day of week (0-6, Sunday=0, or "*")
auto is_valid() const noexcept -> bool
Check if the schedule is valid.
One-time execution at specific time.
std::chrono::system_clock::time_point created_at
Creation time.
std::optional< std::chrono::system_clock::time_point > next_run_at
Next scheduled execution time.
std::set< std::string > tags
Tags for categorization.
std::chrono::system_clock::time_point updated_at
Last modification time.
task_callback_with_result callback
Task callback.
std::string name
Human-readable task name.
int priority
Task priority (higher = more important)
schedule task_schedule
Schedule for execution.
Statistics for task scheduler operations.
std::size_t cancelled_executions
Cancelled executions.
std::size_t successful_executions
Successful executions.
std::chrono::milliseconds max_execution_time
Maximum execution time observed.
std::size_t scheduled_tasks
Number of tasks currently scheduled.
std::optional< std::chrono::system_clock::time_point > last_cycle_at
Last cycle time.
std::size_t failed_executions
Failed executions.
std::size_t running_tasks
Number of tasks currently running.
std::chrono::seconds uptime
Scheduler uptime.
std::size_t total_executions
Total tasks executed.
std::chrono::milliseconds avg_execution_time
Average execution time.
Configuration for the task scheduler service.
std::size_t max_concurrent_tasks
Maximum concurrent task executions.
bool auto_start
Whether to start automatically on construction.
std::function< void(const task_id &id, const std::string &error)> task_error_callback
Callback when any task fails.
std::optional< verification_config > verification
Verification configuration.
std::function< void(const task_id &id, const task_execution_record &record)> task_complete_callback
Callback when any task completes.
bool restore_on_startup
Restore tasks from persistence on startup.
std::string persistence_path
Path to persist scheduled tasks (empty = no persistence)
std::chrono::seconds check_interval
Scheduler check interval (how often to check for due tasks)
bool enabled
Enable/disable the scheduler service.
std::optional< cleanup_config > cleanup
Cleanup configuration.
std::optional< archive_config > archive
Archive configuration.
Configuration for verification scheduling.
Task scheduler service for automated PACS operations.
std::string_view name