17#include <kcenon/common/interfaces/executor_interface.h>
36 std::istringstream iss(expr);
37 std::vector<std::string> parts;
41 parts.push_back(part);
44 if (parts.size() >= 1) result.
minute = parts[0];
45 if (parts.size() >= 2) result.
hour = parts[1];
47 if (parts.size() >= 4) result.
month = parts[3];
48 if (parts.size() >= 5) result.
day_of_week = parts[4];
75 schedule_cleanup(*config_.cleanup);
78 schedule_archive(*config_.archive);
81 schedule_verification(*config_.verification);
92 std::shared_ptr<kcenon::thread::thread_pool> thread_pool,
95 , file_storage_(&file_storage)
96 , thread_pool_(std::move(thread_pool))
106 schedule_cleanup(*config_.cleanup);
109 schedule_archive(*config_.archive);
112 schedule_verification(*config_.verification);
123 std::shared_ptr<kcenon::common::interfaces::IExecutor> executor,
125 : database_(database)
126 , file_storage_(&file_storage)
127 , executor_(std::move(executor))
137 schedule_cleanup(*config_.cleanup);
140 schedule_archive(*config_.archive);
143 schedule_verification(*config_.verification);
177 "Task scheduler started check_interval_sec={} max_concurrent={}",
211 task.
id =
"cleanup_task";
212 task.
name =
"Storage Cleanup";
213 task.
description =
"Removes old studies based on retention policy";
218 task.
tags = {
"maintenance",
"storage"};
219 task.
callback = create_cleanup_callback(config);
220 task.
created_at = std::chrono::system_clock::now();
232 task.
id =
"archive_task";
233 task.
name =
"Study Archival";
234 task.
description =
"Archives studies to secondary storage";
239 task.
tags = {
"maintenance",
"archive"};
240 task.
callback = create_archive_callback(config);
241 task.
created_at = std::chrono::system_clock::now();
254 task.
id =
"verification_task";
255 task.
name =
"Data Verification";
256 task.
description =
"Verifies data integrity and consistency";
261 task.
tags = {
"maintenance",
"integrity"};
262 task.
callback = create_verification_callback(config);
263 task.
created_at = std::chrono::system_clock::now();
274 const std::string&
name,
275 const std::string& description,
276 std::chrono::seconds interval,
280 task.
id = generate_task_id();
286 task.
callback = std::move(callback);
287 task.
created_at = std::chrono::system_clock::now();
294 const std::string&
name,
295 const std::string& description,
300 task.
id = generate_task_id();
306 task.
callback = std::move(callback);
307 task.
created_at = std::chrono::system_clock::now();
314 const std::string&
name,
315 const std::string& description,
316 std::chrono::system_clock::time_point execute_at,
320 task.
id = generate_task_id();
326 task.
callback = std::move(callback);
327 task.
created_at = std::chrono::system_clock::now();
335 std::lock_guard<std::mutex> lock(tasks_mutex_);
338 if (!task.next_run_at) {
339 task.next_run_at = calculate_next_run(task.task_schedule);
345 auto it = tasks_.find(
id);
346 if (it != tasks_.end()) {
348 it->second = std::move(task);
350 "Updated scheduled task task_id={} name={}",
351 id, it->second.name);
354 tasks_.emplace(
id, std::move(task));
356 "Added scheduled task task_id={} name={}",
357 id, tasks_[
id].
name);
362 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
363 stats_.scheduled_tasks = tasks_.size();
376 std::vector<scheduled_task> result;
377 result.reserve(
tasks_.size());
379 for (
const auto& [
id, task] :
tasks_) {
380 result.push_back(task);
387 -> std::vector<scheduled_task> {
388 std::lock_guard<std::mutex> lock(tasks_mutex_);
390 std::vector<scheduled_task> result;
392 for (
const auto& [
id, task] : tasks_) {
393 if (task.type == type) {
394 result.push_back(task);
402 -> std::vector<scheduled_task> {
403 std::lock_guard<std::mutex> lock(tasks_mutex_);
405 std::vector<scheduled_task> result;
407 for (
const auto& [
id, task] : tasks_) {
408 if (task.state == state) {
409 result.push_back(task);
417 -> std::optional<scheduled_task> {
418 std::lock_guard<std::mutex> lock(tasks_mutex_);
420 auto it = tasks_.find(
id);
421 if (it == tasks_.end()) {
429 std::lock_guard<std::mutex> lock(tasks_mutex_);
431 auto it = tasks_.find(
id);
432 if (it == tasks_.end()) {
437 it->second.enabled =
false;
438 it->second.updated_at = std::chrono::system_clock::now();
441 "Cancelled scheduled task task_id={} name={}",
442 id, it->second.name);
448 std::lock_guard<std::mutex> lock(tasks_mutex_);
450 auto it = tasks_.find(
id);
451 if (it == tasks_.end()) {
460 it->second.updated_at = std::chrono::system_clock::now();
463 "Paused scheduled task task_id={} name={}",
464 id, it->second.name);
470 std::lock_guard<std::mutex> lock(tasks_mutex_);
472 auto it = tasks_.find(
id);
473 if (it == tasks_.end()) {
482 it->second.next_run_at = calculate_next_run(it->second.task_schedule);
483 it->second.updated_at = std::chrono::system_clock::now();
486 "Resumed scheduled task task_id={} name={}",
487 id, it->second.name);
493 std::lock_guard<std::mutex> lock(tasks_mutex_);
495 auto it = tasks_.find(
id);
496 if (it == tasks_.end()) {
505 it->second.next_run_at = std::chrono::system_clock::now();
511 "Triggered immediate execution task_id={} name={}",
512 id, it->second.name);
519 std::lock_guard<std::mutex> lock(tasks_mutex_);
521 auto it = tasks_.find(
id);
522 if (it == tasks_.end()) {
526 it->second.task_schedule = new_schedule;
527 it->second.next_run_at = calculate_next_run(new_schedule);
528 it->second.updated_at = std::chrono::system_clock::now();
531 "Updated schedule for task task_id={} name={}",
532 id, it->second.name);
543 std::size_t limit)
const -> std::vector<task_execution_record> {
544 std::lock_guard<std::mutex> lock(history_mutex_);
546 auto it = execution_history_.find(
id);
547 if (it == execution_history_.end()) {
551 const auto& records = it->second;
552 if (records.size() <= limit) {
557 return std::vector<task_execution_record>(
558 records.end() -
static_cast<std::ptrdiff_t
>(limit),
563 -> std::vector<task_execution_record> {
564 std::lock_guard<std::mutex> lock(history_mutex_);
566 std::vector<task_execution_record> all_records;
568 for (
const auto& [
id, records] : execution_history_) {
569 all_records.insert(all_records.end(), records.begin(), records.end());
573 std::ranges::sort(all_records, [](
const auto& a,
const auto& b) {
574 return a.started_at > b.started_at;
577 if (all_records.size() <= limit) {
581 all_records.resize(limit);
593 auto& records = it->second;
594 if (records.size() <= keep_last) {
601 records.begin() +
static_cast<std::ptrdiff_t
>(records.size() - keep_last));
615 auto now = std::chrono::steady_clock::now();
616 result.
uptime = std::chrono::duration_cast<std::chrono::seconds>(
633 std::size_t count = 0;
634 for (
const auto& [
id, task] :
tasks_) {
661 "Failed to open persistence file path={}",
670 "Saved tasks to persistence path={}",
674 }
catch (
const std::exception& e) {
676 "Failed to save tasks error={}", e.what());
682 if (config_.persistence_path.empty()) {
687 std::ifstream file(config_.persistence_path);
690 "No persistence file found path={}",
691 config_.persistence_path);
695 std::string json((std::istreambuf_iterator<char>(file)),
696 std::istreambuf_iterator<char>());
699 std::size_t count = deserialize_tasks(json);
702 "Loaded tasks from persistence path={} count={}",
703 config_.persistence_path, count);
706 }
catch (
const std::exception& e) {
708 "Failed to load tasks error={}", e.what());
719 std::lock_guard<std::mutex> lock(
mutex_);
725 std::lock_guard<std::mutex> lock(
mutex_);
737 std::unique_lock<std::mutex> lock(
mutex_);
741 return stop_requested_.load();
756 auto now = std::chrono::system_clock::now();
757 std::vector<task_id> due_tasks;
763 for (
auto& [
id, task] :
tasks_) {
768 if (task.next_run_at && *task.next_run_at <= now) {
769 due_tasks.push_back(
id);
774 if (due_tasks.empty()) {
781 std::ranges::sort(due_tasks, [
this](
const auto& a,
const auto& b) {
787 std::size_t executed = 0;
788 std::size_t succeeded = 0;
791 for (
const auto&
id : due_tasks) {
797 auto it =
tasks_.find(
id);
802 auto& task = it->second;
815 task.last_run_at = record.started_at;
816 task.last_execution = record;
817 ++task.execution_count;
820 ++task.success_count;
823 ++task.failure_count;
831 std::chrono::system_clock::now());
834 if (std::holds_alternative<one_time_schedule>(task.task_schedule)) {
835 task.enabled =
false;
858 "Scheduler cycle completed executed={} succeeded={} failed={}",
859 executed, succeeded,
failed);
863 "scheduler_tasks_executed",
static_cast<int64_t
>(executed));
865 "scheduler_tasks_succeeded",
static_cast<int64_t
>(succeeded));
867 "scheduler_tasks_failed",
static_cast<int64_t
>(
failed));
885 record.execution_id = generate_execution_id();
886 record.task_id = task.id;
887 record.started_at = std::chrono::system_clock::now();
891 "Executing task task_id={} name={}",
894 auto start_time = std::chrono::steady_clock::now();
897 std::size_t attempt = 0;
898 const std::size_t max_attempts = task.max_retries + 1;
900 while (attempt < max_attempts) {
905 "Retrying task task_id={} attempt={}/{}",
906 task.id, attempt, max_attempts);
909 std::this_thread::sleep_for(task.retry_delay);
913 if (!task.callback) {
915 record.error_message =
"No callback defined";
920 if (task.timeout.count() > 0) {
922 auto future = std::async(std::launch::async, task.callback);
924 auto status = future.wait_for(task.timeout);
925 if (status == std::future_status::timeout) {
927 record.error_message =
"Task execution timed out after " +
928 std::to_string(task.timeout.count()) +
" seconds";
930 "Task timed out task_id={} timeout_seconds={}",
931 task.id, task.timeout.count());
937 auto result = future.get();
938 if (result.has_value()) {
940 record.error_message = *result;
947 auto result = task.callback();
949 if (result.has_value()) {
951 record.error_message = *result;
957 }
catch (
const std::exception& e) {
959 record.error_message = e.what();
961 "Task execution failed task_id={} attempt={} error={}",
962 task.id, attempt, e.what());
968 "Task failed after {} attempts task_id={}",
969 max_attempts, task.id);
973 record.ended_at = std::chrono::system_clock::now();
975 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
976 std::chrono::steady_clock::now() - start_time);
979 "Task execution completed task_id={} name={} state={} duration_ms={} attempts={}",
980 task.id, task.name, to_string(record.state), duration.count(), attempt);
984 "scheduler_task_duration_ms",
985 static_cast<double>(duration.count()));
989 "scheduler_task_retries_total");
997 std::chrono::system_clock::time_point from)
const
998 -> std::optional<std::chrono::system_clock::time_point> {
1000 return std::visit([
this, from](
const auto& s) ->
1001 std::optional<std::chrono::system_clock::time_point> {
1002 using T = std::decay_t<
decltype(s)>;
1004 if constexpr (std::is_same_v<T, interval_schedule>) {
1005 if (s.start_at && *s.start_at > from) {
1008 return from + s.interval;
1009 }
else if constexpr (std::is_same_v<T, cron_schedule>) {
1010 return calculate_next_cron_run(s, from);
1011 }
else if constexpr (std::is_same_v<T, one_time_schedule>) {
1012 if (s.execute_at > from) {
1013 return s.execute_at;
1015 return std::nullopt;
1018 return std::nullopt;
1024 std::chrono::system_clock::time_point from)
const
1025 -> std::optional<std::chrono::system_clock::time_point> {
1031 auto time = from + std::chrono::minutes{1};
1032 auto end_time = from + std::chrono::hours{24 * 365};
1035 auto matches_field = [](
const std::string& field,
int value, [[maybe_unused]]
int max) ->
bool {
1036 if (field ==
"*")
return true;
1039 if (field.starts_with(
"*/")) {
1040 int step = std::stoi(field.substr(2));
1041 return (value % step) == 0;
1045 auto dash_pos = field.find(
'-');
1046 if (dash_pos != std::string::npos) {
1047 int start = std::stoi(field.substr(0, dash_pos));
1048 int end = std::stoi(field.substr(dash_pos + 1));
1049 return value >= start && value <= end;
1053 if (field.find(
',') != std::string::npos) {
1054 std::istringstream iss(field);
1056 while (std::getline(iss, item,
',')) {
1057 if (std::stoi(item) == value) {
1065 return std::stoi(field) == value;
1068 while (time < end_time) {
1069 auto time_t = std::chrono::system_clock::to_time_t(time);
1070 std::tm tm = *std::localtime(&time_t);
1073 matches_field(cron.minute, tm.tm_min, 59) &&
1074 matches_field(cron.hour, tm.tm_hour, 23) &&
1075 matches_field(cron.day_of_month, tm.tm_mday, 31) &&
1076 matches_field(cron.month, tm.tm_mon + 1, 12) &&
1077 matches_field(cron.day_of_week, tm.tm_wday, 6);
1081 auto duration = time.time_since_epoch();
1082 auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
1083 return std::chrono::system_clock::time_point(minutes);
1086 time += std::chrono::minutes{1};
1089 return std::nullopt;
1094 return "task_" + std::to_string(
id);
1101 auto now = std::chrono::system_clock::now();
1102 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
1103 now.time_since_epoch()).count();
1105 return "exec_" + std::to_string(millis) +
"_" + std::to_string(
id);
1110 return [
this, config]() -> std::optional<std::string> {
1112 "Running cleanup task retention_days={}",
1113 config.default_retention.count());
1117 auto now = std::chrono::system_clock::now();
1118 auto cutoff = now - config.default_retention;
1121 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1122 std::tm tm = *std::localtime(&cutoff_time_t);
1123 std::ostringstream date_oss;
1124 date_oss << std::put_time(&tm,
"%Y%m%d");
1125 std::string cutoff_date = date_oss.str();
1129 query.study_date_to = cutoff_date;
1130 query.limit = config.max_deletions_per_cycle;
1132 auto studies_result = database_.search_studies(query);
1133 if (!studies_result.is_ok()) {
1135 "Failed to query studies error={}",
1136 studies_result.error().message);
1137 return "Cleanup failed: " + studies_result.error().message;
1140 std::size_t deleted_count = 0;
1141 std::size_t skipped_count = 0;
1143 for (
const auto& study : studies_result.value()) {
1145 auto modality_retention = config.retention_for(study.modalities_in_study);
1146 auto modality_cutoff = now - modality_retention;
1147 auto modality_cutoff_t = std::chrono::system_clock::to_time_t(modality_cutoff);
1148 std::tm mod_tm = *std::localtime(&modality_cutoff_t);
1149 std::ostringstream mod_oss;
1150 mod_oss << std::put_time(&mod_tm,
"%Y%m%d");
1151 std::string modality_cutoff_date = mod_oss.str();
1154 if (study.study_date > modality_cutoff_date) {
1160 bool excluded =
false;
1161 for (
const auto& pattern : config.exclude_patterns) {
1162 if (study.study_description.find(pattern) != std::string::npos) {
1172 if (config.dry_run) {
1174 "Dry-run: would delete study study_uid={} study_date={} modality={}",
1175 study.study_uid, study.study_date, study.modalities_in_study);
1181 if (!config.database_only && file_storage_ !=
nullptr) {
1182 auto file_paths_result = database_.get_study_files(study.study_uid);
1183 if (file_paths_result.is_ok()) {
1184 for (
const auto& file_path : file_paths_result.value()) {
1186 std::filesystem::path p(file_path);
1187 std::string sop_uid = p.stem().string();
1188 auto remove_result = file_storage_->remove(sop_uid);
1189 if (remove_result.is_err()) {
1191 "Failed to remove file file_path={} error={}",
1192 file_path, remove_result.error().message);
1199 auto delete_result = database_.delete_study(study.study_uid);
1200 if (delete_result.is_err()) {
1202 "Failed to delete study study_uid={} error={}",
1203 study.study_uid, delete_result.error().message);
1209 "Deleted study study_uid={} study_date={}",
1210 study.study_uid, study.study_date);
1214 "Cleanup task completed deleted={} skipped={} dry_run={}",
1215 deleted_count, skipped_count, config.dry_run);
1217 return std::nullopt;
1218 }
catch (
const std::exception& e) {
1219 return std::string(
"Cleanup failed: ") + e.what();
1226 return [
this, config]() -> std::optional<std::string> {
1228 "Running archive task archive_after_days={} destination={}",
1229 config.archive_after.count(),
1230 config.destination);
1234 auto now = std::chrono::system_clock::now();
1235 auto cutoff = now - config.archive_after;
1238 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1239 std::tm tm = *std::localtime(&cutoff_time_t);
1240 std::ostringstream date_oss;
1241 date_oss << std::put_time(&tm,
"%Y%m%d");
1242 std::string cutoff_date = date_oss.str();
1246 query.study_date_to = cutoff_date;
1247 query.limit = config.max_archives_per_cycle;
1249 auto studies_result = database_.search_studies(query);
1250 if (!studies_result.is_ok()) {
1252 "Failed to query studies error={}",
1253 studies_result.error().message);
1254 return "Archive failed: " + studies_result.error().message;
1257 std::size_t archived_count = 0;
1258 std::size_t failed_count = 0;
1261 std::filesystem::path dest_path(config.destination);
1262 if (!std::filesystem::exists(dest_path)) {
1263 std::filesystem::create_directories(dest_path);
1266 for (
const auto& study : studies_result.value()) {
1268 auto file_paths_result = database_.get_study_files(study.study_uid);
1269 if (!file_paths_result.is_ok() || file_paths_result.value().empty()) {
1272 const auto& file_paths = file_paths_result.value();
1275 std::filesystem::path study_dest = dest_path / study.study_uid;
1276 if (!std::filesystem::exists(study_dest)) {
1277 std::filesystem::create_directories(study_dest);
1280 bool archive_success =
true;
1282 for (
const auto& src_file : file_paths) {
1283 std::filesystem::path src_path(src_file);
1284 if (!std::filesystem::exists(src_path)) {
1286 "Source file not found file_path={}", src_file);
1291 std::filesystem::path dest_file = study_dest / src_path.filename();
1295 std::filesystem::copy_file(
1296 src_path, dest_file,
1297 std::filesystem::copy_options::overwrite_existing);
1300 if (config.verify_after_archive) {
1301 auto src_size = std::filesystem::file_size(src_path);
1302 auto dest_size = std::filesystem::file_size(dest_file);
1303 if (src_size != dest_size) {
1305 "Archive verification failed: size mismatch "
1306 "src={} dest={}", src_file, dest_file.string());
1307 archive_success =
false;
1311 }
catch (
const std::filesystem::filesystem_error& e) {
1313 "Failed to archive file src={} dest={} error={}",
1314 src_file, dest_file.string(), e.what());
1315 archive_success =
false;
1320 if (archive_success) {
1324 if (config.delete_after_archive && file_storage_ !=
nullptr) {
1325 for (
const auto& file_path : file_paths) {
1326 std::filesystem::path p(file_path);
1327 std::string sop_uid = p.stem().string();
1328 (void)file_storage_->remove(sop_uid);
1330 (void)database_.delete_study(study.study_uid);
1334 "Archived study study_uid={} files={}",
1335 study.study_uid, file_paths.size());
1342 "Archive task completed archived={} failed={}",
1343 archived_count, failed_count);
1345 if (failed_count > 0) {
1346 return "Archive completed with " + std::to_string(failed_count) +
" failures";
1349 return std::nullopt;
1350 }
catch (
const std::exception& e) {
1351 return std::string(
"Archive failed: ") + e.what();
1358 return [
this, config]() -> std::optional<std::string> {
1360 "Running verification task check_checksums={} check_db={}",
1361 config.check_checksums,
1362 config.check_db_consistency);
1365 std::size_t verified = 0;
1366 std::size_t errors = 0;
1367 std::size_t missing_files = 0;
1370 if (config.check_db_consistency) {
1371 auto db_result = database_.verify_integrity();
1372 if (db_result.is_err()) {
1374 "Database integrity check failed error={}",
1375 db_result.error().message);
1384 query.limit = config.max_verifications_per_cycle;
1385 auto studies_result = database_.search_studies(query);
1386 if (!studies_result.is_ok()) {
1388 "Failed to query studies error={}",
1389 studies_result.error().message);
1390 return "Verification failed: " + studies_result.error().message;
1393 for (
const auto& study : studies_result.value()) {
1395 auto file_paths_result = database_.get_study_files(study.study_uid);
1396 if (!file_paths_result.is_ok()) {
1398 "Failed to get study files study_uid={} error={}",
1399 study.study_uid, file_paths_result.error().message);
1404 for (
const auto& file_path : file_paths_result.value()) {
1405 std::filesystem::path path(file_path);
1408 if (!std::filesystem::exists(path)) {
1411 "Missing file detected file_path={} study_uid={}",
1412 file_path, study.study_uid);
1414 if (config.repair_on_failure) {
1416 std::string sop_uid = path.stem().string();
1417 (void)database_.delete_instance(sop_uid);
1419 "Removed orphaned database record sop_uid={}", sop_uid);
1425 if (config.check_checksums) {
1429 auto file_size = std::filesystem::file_size(path);
1430 if (file_size == 0) {
1433 "Empty file detected file_path={}", file_path);
1435 }
catch (
const std::filesystem::filesystem_error& e) {
1438 "Cannot read file file_path={} error={}",
1439 file_path, e.what());
1448 if (file_storage_ !=
nullptr) {
1449 auto storage_result = file_storage_->verify_integrity();
1450 if (storage_result.is_err()) {
1452 "Storage integrity check reported issues error={}",
1453 storage_result.error().message);
1458 "Verification task completed verified={} errors={} missing_files={}",
1459 verified, errors, missing_files);
1461 if (errors > 0 || missing_files > 0) {
1462 std::ostringstream oss;
1463 oss <<
"Verification found " << errors <<
" errors and "
1464 << missing_files <<
" missing files";
1468 return std::nullopt;
1469 }
catch (
const std::exception& e) {
1470 return std::string(
"Verification failed: ") + e.what();
1480 history.push_back(record);
1483 constexpr std::size_t max_history = 1000;
1484 if (history.size() > max_history) {
1487 history.begin() +
static_cast<std::ptrdiff_t
>(history.size() - max_history));
1505 if (
auto dur = record.duration()) {
1506 auto ms = dur->count();
1518 auto new_avg = current_avg +
1519 (ms - current_avg) /
1532 std::ostringstream oss;
1533 oss <<
"{\n \"tasks\": [\n";
1536 for (
const auto& [
id, task] :
tasks_) {
1537 if (!first) oss <<
",\n";
1541 oss <<
" \"id\": \"" << task.id <<
"\",\n";
1542 oss <<
" \"name\": \"" << task.name <<
"\",\n";
1543 oss <<
" \"type\": \"" <<
to_string(task.type) <<
"\",\n";
1544 oss <<
" \"enabled\": " << (task.enabled ?
"true" :
"false") <<
",\n";
1545 oss <<
" \"priority\": " << task.priority <<
"\n";
1575 "deserialize_tasks called but full deserialization is not implemented. "
1576 "Tasks must be re-registered programmatically after scheduler restart.");
if(!color.empty()) style.color
static void debug(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a debug-level message.
static void info(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an info-level message.
static void warn(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a warning-level message.
static void error(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an error-level message.
static void record_histogram(std::string_view name, double value)
Record a histogram sample.
static void increment_counter(std::string_view name, std::int64_t value=1)
Increment a counter metric.
std::atomic< uint64_t > next_task_id_
Next task ID counter.
void execute_cycle()
Execute a single scheduler cycle.
auto generate_execution_id() const -> std::string
Generate unique execution ID.
std::map< task_id, scheduled_task > tasks_
All scheduled tasks (id -> task)
void update_stats(const task_execution_record &record)
Update statistics after execution.
auto deserialize_tasks(const std::string &json) -> std::size_t
Deserialize tasks from JSON.
auto execute_task(scheduled_task &task) -> task_execution_record
Execute a single task.
std::condition_variable cv_
Condition variable for sleep/wake.
auto pause_task(const task_id &id) -> bool
Pause a scheduled task.
std::atomic< bool > running_
Flag indicating scheduler is running.
auto cancel_task(const task_id &id) -> bool
Cancel a scheduled task.
auto create_verification_callback(const verification_config &config) -> task_callback_with_result
Create verification task callback.
auto save_tasks() const -> bool
Save all tasks to persistence storage.
std::mutex mutex_
Mutex for thread synchronization.
std::mutex stats_mutex_
Mutex for statistics.
task_scheduler(storage::index_database &database, const task_scheduler_config &config={})
Construct task scheduler.
task_scheduler_config config_
Service configuration.
auto running_count() const noexcept -> std::size_t
Get number of running tasks.
auto calculate_next_cron_run(const cron_schedule &cron, std::chrono::system_clock::time_point from) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for cron schedule.
auto create_archive_callback(const archive_config &config) -> task_callback_with_result
Create archive task callback.
auto serialize_tasks() const -> std::string
Serialize tasks to JSON.
void set_task_complete_callback(task_scheduler_config::task_complete_callback callback)
Set the task complete callback.
void set_error_callback(task_scheduler_config::task_error_callback callback)
Set the error callback.
auto is_running() const noexcept -> bool
Check if the scheduler is running.
std::thread scheduler_thread_
Background scheduler thread.
auto update_schedule(const task_id &id, const kcenon::pacs::workflow::schedule &new_schedule) -> bool
Update task schedule.
std::mutex tasks_mutex_
Mutex for tasks map.
auto schedule_verification(const verification_config &config) -> task_id
Schedule verification task.
auto get_stats() const -> scheduler_stats
Get scheduler statistics.
auto list_tasks() const -> std::vector< scheduled_task >
List all scheduled tasks.
void clear_history(const task_id &id, std::size_t keep_last=0)
Clear execution history for a task.
~task_scheduler()
Destructor - ensures graceful shutdown.
auto get_execution_history(const task_id &id, std::size_t limit=100) const -> std::vector< task_execution_record >
Get execution history for a task.
auto schedule_archive(const archive_config &config) -> task_id
Schedule archive task.
void run_loop()
Background thread main loop.
auto generate_task_id() const -> task_id
Generate unique task ID.
auto calculate_next_run(const kcenon::pacs::workflow::schedule &sched, std::chrono::system_clock::time_point from=std::chrono::system_clock::now()) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for a schedule.
void record_execution(const task_id &task_id, const task_execution_record &record)
Record task execution.
auto resume_task(const task_id &id) -> bool
Resume a paused task.
auto get_recent_executions(std::size_t limit=100) const -> std::vector< task_execution_record >
Get all recent executions.
auto load_tasks() -> std::size_t
Load tasks from persistence storage.
std::mutex history_mutex_
Mutex for execution history.
std::map< task_id, std::vector< task_execution_record > > execution_history_
Execution history (task_id -> records)
auto get_task(const task_id &id) const -> std::optional< scheduled_task >
Get a specific task by ID.
auto schedule_cleanup(const cleanup_config &config) -> task_id
Schedule cleanup task.
std::atomic< std::size_t > running_count_
Running task count.
auto pending_count() const noexcept -> std::size_t
Get number of pending tasks.
scheduler_stats stats_
Scheduler statistics.
auto schedule(const std::string &name, const std::string &description, std::chrono::seconds interval, task_callback_with_result callback) -> task_id
Schedule a custom task with interval.
std::atomic< uint64_t > next_execution_id_
Next execution ID counter.
auto schedule_once(const std::string &name, const std::string &description, std::chrono::system_clock::time_point execute_at, task_callback_with_result callback) -> task_id
Schedule a one-time task.
void stop(bool wait_for_completion=true)
Stop the scheduler service.
auto trigger_task(const task_id &id) -> bool
Trigger immediate execution of a task.
std::chrono::steady_clock::time_point start_time_
Start time for uptime calculation.
auto create_cleanup_callback(const cleanup_config &config) -> task_callback_with_result
Create cleanup task callback.
void start()
Start the scheduler service.
std::atomic< bool > stop_requested_
Flag to signal shutdown.
Adapter for integrating common_system's IExecutor interface.
Filesystem-based DICOM storage with hierarchical organization.
PACS index database for metadata storage and retrieval.
Adapter for DICOM audit logging using logger_system.
Adapter for PACS performance metrics and distributed tracing.
task_state
Task execution state.
@ failed
Execution failed.
@ cancelled
Cancelled by user.
@ running
Currently executing.
@ pending
Waiting for scheduled time.
@ completed
Completed successfully.
@ paused
Temporarily paused.
std::function< std::optional< std::string >()> task_callback_with_result
Task callback with result details.
task_type
Task type enumeration.
@ archive
Study archival task.
@ custom
User-defined task.
@ cleanup
Storage cleanup task.
@ verification
Data integrity verification.
std::string task_id
Unique task identifier.
auto to_string(lock_type type) -> std::string
Convert lock_type to string.
std::variant< interval_schedule, cron_schedule, one_time_schedule > schedule
Combined schedule type.
Configuration for archive scheduling.
Configuration for cleanup scheduling.
Cron-like schedule expression.
static auto parse(const std::string &expr) -> cron_schedule
Parse a cron expression string.
std::string minute
Minute (0-59, or "*")
std::string month
Month (1-12, or "*")
std::string day_of_month
Day of month (1-31, or "*")
auto to_string() const -> std::string
Convert to cron expression string.
std::string hour
Hour (0-23, or "*")
std::string day_of_week
Day of week (0-6, Sunday=0, or "*")
auto is_valid() const noexcept -> bool
Check if the schedule is valid.
Simple interval-based schedule.
One-time execution at specific time.
Scheduled task definition.
std::string description
Task description.
std::chrono::system_clock::time_point created_at
Creation time.
task_id id
Unique task ID.
std::optional< std::chrono::system_clock::time_point > next_run_at
Next scheduled execution time.
std::set< std::string > tags
Tags for categorization.
bool enabled
Whether task is enabled.
std::chrono::system_clock::time_point updated_at
Last modification time.
task_callback_with_result callback
Task callback.
std::string name
Human-readable task name.
int priority
Task priority (higher = more important)
schedule task_schedule
Schedule for execution.
Statistics for task scheduler operations.
std::size_t cancelled_executions
Cancelled executions.
std::size_t successful_executions
Successful executions.
std::chrono::milliseconds max_execution_time
Maximum execution time observed.
std::size_t scheduled_tasks
Number of tasks currently scheduled.
std::optional< std::chrono::system_clock::time_point > last_cycle_at
Last cycle time.
std::size_t failed_executions
Failed executions.
std::size_t running_tasks
Number of tasks currently running.
std::chrono::seconds uptime
Scheduler uptime.
std::size_t total_executions
Total tasks executed.
std::chrono::milliseconds avg_execution_time
Average execution time.
Record of a single task execution.
Configuration for the task scheduler service.
std::size_t max_concurrent_tasks
Maximum concurrent task executions.
bool auto_start
Whether to start automatically on construction.
std::function< void(const task_id &id, const std::string &error)> task_error_callback
Callback when any task fails.
std::optional< verification_config > verification
Verification configuration.
std::function< void(const task_id &id, const task_execution_record &record)> task_complete_callback
Callback when any task completes.
bool restore_on_startup
Restore tasks from persistence on startup.
cycle_complete_callback on_cycle_complete
task_complete_callback on_task_complete
std::string persistence_path
Path to persist scheduled tasks (empty = no persistence)
std::chrono::seconds check_interval
Scheduler check interval (how often to check for due tasks)
bool enabled
Enable/disable the scheduler service.
std::optional< cleanup_config > cleanup
Cleanup configuration.
std::optional< archive_config > archive
Archive configuration.
task_error_callback on_task_error
Configuration for verification scheduling.
Task scheduler service for automated PACS operations.