PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
kcenon::pacs::workflow::task_scheduler Class Reference

#include <task_scheduler.h>

Collaboration diagram for kcenon::pacs::workflow::task_scheduler:
Collaboration graph

Public Member Functions

 task_scheduler (storage::index_database &database, const task_scheduler_config &config={})
 Construct task scheduler.
 
 task_scheduler (storage::index_database &database, storage::file_storage &file_storage, std::shared_ptr< kcenon::thread::thread_pool > thread_pool, const task_scheduler_config &config={})
 Construct task scheduler with storage and thread pool.
 
 task_scheduler (storage::index_database &database, storage::file_storage &file_storage, std::shared_ptr< kcenon::common::interfaces::IExecutor > executor, const task_scheduler_config &config={})
 Construct task scheduler with IExecutor (recommended)
 
 ~task_scheduler ()
 Destructor - ensures graceful shutdown.
 
 task_scheduler (const task_scheduler &)=delete
 Non-copyable.
 
task_scheduleroperator= (const task_scheduler &)=delete
 
 task_scheduler (task_scheduler &&)=delete
 Non-movable.
 
task_scheduleroperator= (task_scheduler &&)=delete
 
void start ()
 Start the scheduler service.
 
void stop (bool wait_for_completion=true)
 Stop the scheduler service.
 
auto is_running () const noexcept -> bool
 Check if the scheduler is running.
 
auto schedule_cleanup (const cleanup_config &config) -> task_id
 Schedule cleanup task.
 
auto schedule_archive (const archive_config &config) -> task_id
 Schedule archive task.
 
auto schedule_verification (const verification_config &config) -> task_id
 Schedule verification task.
 
auto schedule (const std::string &name, const std::string &description, std::chrono::seconds interval, task_callback_with_result callback) -> task_id
 Schedule a custom task with interval.
 
auto schedule (const std::string &name, const std::string &description, const cron_schedule &cron_expr, task_callback_with_result callback) -> task_id
 Schedule a custom task with cron schedule.
 
auto schedule_once (const std::string &name, const std::string &description, std::chrono::system_clock::time_point execute_at, task_callback_with_result callback) -> task_id
 Schedule a one-time task.
 
auto schedule (scheduled_task task) -> task_id
 Schedule a task with full definition.
 
auto list_tasks () const -> std::vector< scheduled_task >
 List all scheduled tasks.
 
auto list_tasks (task_type type) const -> std::vector< scheduled_task >
 List tasks by type.
 
auto list_tasks (task_state state) const -> std::vector< scheduled_task >
 List tasks by state.
 
auto get_task (const task_id &id) const -> std::optional< scheduled_task >
 Get a specific task by ID.
 
auto cancel_task (const task_id &id) -> bool
 Cancel a scheduled task.
 
auto pause_task (const task_id &id) -> bool
 Pause a scheduled task.
 
auto resume_task (const task_id &id) -> bool
 Resume a paused task.
 
auto trigger_task (const task_id &id) -> bool
 Trigger immediate execution of a task.
 
auto update_schedule (const task_id &id, const kcenon::pacs::workflow::schedule &new_schedule) -> bool
 Update task schedule.
 
auto get_execution_history (const task_id &id, std::size_t limit=100) const -> std::vector< task_execution_record >
 Get execution history for a task.
 
auto get_recent_executions (std::size_t limit=100) const -> std::vector< task_execution_record >
 Get all recent executions.
 
void clear_history (const task_id &id, std::size_t keep_last=0)
 Clear execution history for a task.
 
auto get_stats () const -> scheduler_stats
 Get scheduler statistics.
 
auto pending_count () const noexcept -> std::size_t
 Get number of pending tasks.
 
auto running_count () const noexcept -> std::size_t
 Get number of running tasks.
 
auto save_tasks () const -> bool
 Save all tasks to persistence storage.
 
auto load_tasks () -> std::size_t
 Load tasks from persistence storage.
 
void set_task_complete_callback (task_scheduler_config::task_complete_callback callback)
 Set the task complete callback.
 
void set_error_callback (task_scheduler_config::task_error_callback callback)
 Set the error callback.
 

Private Member Functions

void run_loop ()
 Background thread main loop.
 
void execute_cycle ()
 Execute a single scheduler cycle.
 
auto execute_task (scheduled_task &task) -> task_execution_record
 Execute a single task.
 
auto calculate_next_run (const kcenon::pacs::workflow::schedule &sched, std::chrono::system_clock::time_point from=std::chrono::system_clock::now()) const -> std::optional< std::chrono::system_clock::time_point >
 Calculate next run time for a schedule.
 
auto calculate_next_cron_run (const cron_schedule &cron, std::chrono::system_clock::time_point from) const -> std::optional< std::chrono::system_clock::time_point >
 Calculate next run time for cron schedule.
 
auto generate_task_id () const -> task_id
 Generate unique task ID.
 
auto generate_execution_id () const -> std::string
 Generate unique execution ID.
 
auto create_cleanup_callback (const cleanup_config &config) -> task_callback_with_result
 Create cleanup task callback.
 
auto create_archive_callback (const archive_config &config) -> task_callback_with_result
 Create archive task callback.
 
auto create_verification_callback (const verification_config &config) -> task_callback_with_result
 Create verification task callback.
 
void record_execution (const task_id &task_id, const task_execution_record &record)
 Record task execution.
 
void update_stats (const task_execution_record &record)
 Update statistics after execution.
 
auto serialize_tasks () const -> std::string
 Serialize tasks to JSON.
 
auto deserialize_tasks (const std::string &json) -> std::size_t
 Deserialize tasks from JSON.
 

Private Attributes

storage::index_databasedatabase_
 Reference to PACS index database.
 
storage::file_storagefile_storage_ {nullptr}
 Optional reference to file storage.
 
std::shared_ptr< kcenon::thread::thread_pool > thread_pool_
 Thread pool for parallel task execution (legacy)
 
std::shared_ptr< kcenon::common::interfaces::IExecutor > executor_
 IExecutor for task execution (recommended, Issue #487)
 
task_scheduler_config config_
 Service configuration.
 
std::thread scheduler_thread_
 Background scheduler thread.
 
std::mutex mutex_
 Mutex for thread synchronization.
 
std::condition_variable cv_
 Condition variable for sleep/wake.
 
std::atomic< bool > stop_requested_ {false}
 Flag to signal shutdown.
 
std::atomic< bool > running_ {false}
 Flag indicating scheduler is running.
 
std::map< task_id, scheduled_tasktasks_
 All scheduled tasks (id -> task)
 
std::mutex tasks_mutex_
 Mutex for tasks map.
 
std::map< task_id, std::vector< task_execution_record > > execution_history_
 Execution history (task_id -> records)
 
std::mutex history_mutex_
 Mutex for execution history.
 
std::atomic< std::size_t > running_count_ {0}
 Running task count.
 
scheduler_stats stats_
 Scheduler statistics.
 
std::mutex stats_mutex_
 Mutex for statistics.
 
std::chrono::steady_clock::time_point start_time_
 Start time for uptime calculation.
 
std::atomic< uint64_t > next_task_id_ {1}
 Next task ID counter.
 
std::atomic< uint64_t > next_execution_id_ {1}
 Next execution ID counter.
 

Detailed Description

Definition at line 162 of file task_scheduler.h.

Constructor & Destructor Documentation

◆ task_scheduler() [1/5]

kcenon::pacs::workflow::task_scheduler::task_scheduler ( storage::index_database & database,
const task_scheduler_config & config = {} )
explicit

Construct task scheduler.

Parameters
databaseReference to the PACS index database
configService configuration

Definition at line 68 of file task_scheduler.cpp.

71 : database_(database)
72 , config_(config) {
73 // Schedule built-in tasks from config
74 if (config_.cleanup) {
76 }
77 if (config_.archive) {
79 }
82 }
83
85 start();
86 }
87}
storage::index_database & database_
Reference to PACS index database.
task_scheduler_config config_
Service configuration.
auto schedule_verification(const verification_config &config) -> task_id
Schedule verification task.
auto schedule_archive(const archive_config &config) -> task_id
Schedule archive task.
auto schedule_cleanup(const cleanup_config &config) -> task_id
Schedule cleanup task.
void start()
Start the scheduler service.
bool auto_start
Whether to start automatically on construction.
std::optional< verification_config > verification
Verification configuration.
bool enabled
Enable/disable the scheduler service.
std::optional< cleanup_config > cleanup
Cleanup configuration.
std::optional< archive_config > archive
Archive configuration.

References kcenon::pacs::workflow::task_scheduler_config::archive, kcenon::pacs::workflow::task_scheduler_config::auto_start, kcenon::pacs::workflow::task_scheduler_config::cleanup, config_, kcenon::pacs::workflow::task_scheduler_config::enabled, if(), and kcenon::pacs::workflow::task_scheduler_config::verification.

Here is the call graph for this function:

◆ task_scheduler() [2/5]

kcenon::pacs::workflow::task_scheduler::task_scheduler ( storage::index_database & database,
storage::file_storage & file_storage,
std::shared_ptr< kcenon::thread::thread_pool > thread_pool,
const task_scheduler_config & config = {} )

Construct task scheduler with storage and thread pool.

Parameters
databaseReference to the PACS index database
file_storageReference to file storage for cleanup/archive
thread_poolThread pool for parallel task execution
configService configuration

Definition at line 89 of file task_scheduler.cpp.

94 : database_(database)
95 , file_storage_(&file_storage)
96 , thread_pool_(std::move(thread_pool))
97 , config_(config) {
98
99 // Load persisted tasks if configured
101 load_tasks();
102 }
103
104 // Schedule built-in tasks from config
105 if (config_.cleanup) {
107 }
108 if (config_.archive) {
110 }
111 if (config_.verification) {
113 }
114
116 start();
117 }
118}
storage::file_storage * file_storage_
Optional reference to file storage.
auto load_tasks() -> std::size_t
Load tasks from persistence storage.
std::shared_ptr< kcenon::thread::thread_pool > thread_pool_
Thread pool for parallel task execution (legacy)
bool restore_on_startup
Restore tasks from persistence on startup.
std::string persistence_path
Path to persist scheduled tasks (empty = no persistence)

References kcenon::pacs::workflow::task_scheduler_config::archive, kcenon::pacs::workflow::task_scheduler_config::auto_start, kcenon::pacs::workflow::task_scheduler_config::cleanup, config_, kcenon::pacs::workflow::task_scheduler_config::enabled, if(), kcenon::pacs::workflow::task_scheduler_config::persistence_path, kcenon::pacs::workflow::task_scheduler_config::restore_on_startup, and kcenon::pacs::workflow::task_scheduler_config::verification.

Here is the call graph for this function:

◆ task_scheduler() [3/5]

kcenon::pacs::workflow::task_scheduler::task_scheduler ( storage::index_database & database,
storage::file_storage & file_storage,
std::shared_ptr< kcenon::common::interfaces::IExecutor > executor,
const task_scheduler_config & config = {} )

Construct task scheduler with IExecutor (recommended)

This constructor accepts the standardized IExecutor interface from common_system, enabling better testability and decoupling from specific thread pool implementations.

Parameters
databaseReference to the PACS index database
file_storageReference to file storage for cleanup/archive
executorIExecutor for task execution (from common_system)
configService configuration
See also
Issue #487 - IExecutor integration

Definition at line 120 of file task_scheduler.cpp.

125 : database_(database)
126 , file_storage_(&file_storage)
127 , executor_(std::move(executor))
128 , config_(config) {
129
130 // Load persisted tasks if configured
132 load_tasks();
133 }
134
135 // Schedule built-in tasks from config
136 if (config_.cleanup) {
138 }
139 if (config_.archive) {
141 }
142 if (config_.verification) {
144 }
145
147 start();
148 }
149}
std::shared_ptr< kcenon::common::interfaces::IExecutor > executor_
IExecutor for task execution (recommended, Issue #487)

References kcenon::pacs::workflow::task_scheduler_config::archive, kcenon::pacs::workflow::task_scheduler_config::auto_start, kcenon::pacs::workflow::task_scheduler_config::cleanup, config_, kcenon::pacs::workflow::task_scheduler_config::enabled, if(), kcenon::pacs::workflow::task_scheduler_config::persistence_path, kcenon::pacs::workflow::task_scheduler_config::restore_on_startup, and kcenon::pacs::workflow::task_scheduler_config::verification.

Here is the call graph for this function:

◆ ~task_scheduler()

kcenon::pacs::workflow::task_scheduler::~task_scheduler ( )

Destructor - ensures graceful shutdown.

Definition at line 151 of file task_scheduler.cpp.

151 {
152 stop(true);
153
154 // Persist tasks before destruction
155 if (!config_.persistence_path.empty()) {
156 save_tasks();
157 }
158}
auto save_tasks() const -> bool
Save all tasks to persistence storage.
void stop(bool wait_for_completion=true)
Stop the scheduler service.

References config_, kcenon::pacs::workflow::task_scheduler_config::persistence_path, save_tasks(), and stop().

Here is the call graph for this function:

◆ task_scheduler() [4/5]

kcenon::pacs::workflow::task_scheduler::task_scheduler ( const task_scheduler & )
delete

Non-copyable.

◆ task_scheduler() [5/5]

kcenon::pacs::workflow::task_scheduler::task_scheduler ( task_scheduler && )
delete

Non-movable.

Member Function Documentation

◆ calculate_next_cron_run()

auto kcenon::pacs::workflow::task_scheduler::calculate_next_cron_run ( const cron_schedule & cron,
std::chrono::system_clock::time_point from ) const -> std::optional<std::chrono::system_clock::time_point>
nodiscardprivate

Calculate next run time for cron schedule.

Parameters
cronCron schedule
fromStarting time point
Returns
Next run time

Definition at line 1022 of file task_scheduler.cpp.

1025 {
1026
1027 // Simple cron calculation - advance minute by minute until match
1028 // This is a simplified implementation; production would use more
1029 // efficient algorithms
1030
1031 auto time = from + std::chrono::minutes{1}; // Start from next minute
1032 auto end_time = from + std::chrono::hours{24 * 365}; // Max 1 year ahead
1033
1034 // Helper to parse cron field
1035 auto matches_field = [](const std::string& field, int value, [[maybe_unused]] int max) -> bool {
1036 if (field == "*") return true;
1037
1038 // Check for step values (*/n)
1039 if (field.starts_with("*/")) {
1040 int step = std::stoi(field.substr(2));
1041 return (value % step) == 0;
1042 }
1043
1044 // Check for range (n-m)
1045 auto dash_pos = field.find('-');
1046 if (dash_pos != std::string::npos) {
1047 int start = std::stoi(field.substr(0, dash_pos));
1048 int end = std::stoi(field.substr(dash_pos + 1));
1049 return value >= start && value <= end;
1050 }
1051
1052 // Check for list (n,m,o)
1053 if (field.find(',') != std::string::npos) {
1054 std::istringstream iss(field);
1055 std::string item;
1056 while (std::getline(iss, item, ',')) {
1057 if (std::stoi(item) == value) {
1058 return true;
1059 }
1060 }
1061 return false;
1062 }
1063
1064 // Single value
1065 return std::stoi(field) == value;
1066 };
1067
1068 while (time < end_time) {
1069 auto time_t = std::chrono::system_clock::to_time_t(time);
1070 std::tm tm = *std::localtime(&time_t);
1071
1072 bool matches =
1073 matches_field(cron.minute, tm.tm_min, 59) &&
1074 matches_field(cron.hour, tm.tm_hour, 23) &&
1075 matches_field(cron.day_of_month, tm.tm_mday, 31) &&
1076 matches_field(cron.month, tm.tm_mon + 1, 12) &&
1077 matches_field(cron.day_of_week, tm.tm_wday, 6);
1078
1079 if (matches) {
1080 // Round to start of minute
1081 auto duration = time.time_since_epoch();
1082 auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
1083 return std::chrono::system_clock::time_point(minutes);
1084 }
1085
1086 time += std::chrono::minutes{1};
1087 }
1088
1089 return std::nullopt;
1090}
constexpr dicom_tag item
Item.

◆ calculate_next_run()

auto kcenon::pacs::workflow::task_scheduler::calculate_next_run ( const kcenon::pacs::workflow::schedule & sched,
std::chrono::system_clock::time_point from = std::chrono::system_clock::now() ) const -> std::optional<std::chrono::system_clock::time_point>
nodiscardprivate

Calculate next run time for a schedule.

Parameters
schedSchedule definition
fromStarting time point
Returns
Next run time

Definition at line 995 of file task_scheduler.cpp.

998 {
999
1000 return std::visit([this, from](const auto& s) ->
1001 std::optional<std::chrono::system_clock::time_point> {
1002 using T = std::decay_t<decltype(s)>;
1003
1004 if constexpr (std::is_same_v<T, interval_schedule>) {
1005 if (s.start_at && *s.start_at > from) {
1006 return s.start_at;
1007 }
1008 return from + s.interval;
1009 } else if constexpr (std::is_same_v<T, cron_schedule>) {
1010 return calculate_next_cron_run(s, from);
1011 } else if constexpr (std::is_same_v<T, one_time_schedule>) {
1012 if (s.execute_at > from) {
1013 return s.execute_at;
1014 }
1015 return std::nullopt; // Already passed
1016 }
1017
1018 return std::nullopt;
1019 }, sched);
1020}
auto calculate_next_cron_run(const cron_schedule &cron, std::chrono::system_clock::time_point from) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for cron schedule.

Referenced by execute_cycle().

Here is the caller graph for this function:

◆ cancel_task()

auto kcenon::pacs::workflow::task_scheduler::cancel_task ( const task_id & id) -> bool

Cancel a scheduled task.

Removes the task from the schedule. If the task is currently running, it will complete but not be rescheduled.

Parameters
idTask ID to cancel
Returns
true if task was found and cancelled

Definition at line 428 of file task_scheduler.cpp.

428 {
429 std::lock_guard<std::mutex> lock(tasks_mutex_);
430
431 auto it = tasks_.find(id);
432 if (it == tasks_.end()) {
433 return false;
434 }
435
436 it->second.state = task_state::cancelled;
437 it->second.enabled = false;
438 it->second.updated_at = std::chrono::system_clock::now();
439
441 "Cancelled scheduled task task_id={} name={}",
442 id, it->second.name);
443
444 return true;
445}
static void info(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an info-level message.
std::map< task_id, scheduled_task > tasks_
All scheduled tasks (id -> task)
std::mutex tasks_mutex_
Mutex for tasks map.

References kcenon::pacs::workflow::cancelled, and kcenon::pacs::integration::logger_adapter::info().

Here is the call graph for this function:

◆ clear_history()

void kcenon::pacs::workflow::task_scheduler::clear_history ( const task_id & id,
std::size_t keep_last = 0 )

Clear execution history for a task.

Parameters
idTask ID
keep_lastNumber of records to keep

Definition at line 585 of file task_scheduler.cpp.

585 {
586 std::lock_guard<std::mutex> lock(history_mutex_);
587
588 auto it = execution_history_.find(id);
589 if (it == execution_history_.end()) {
590 return;
591 }
592
593 auto& records = it->second;
594 if (records.size() <= keep_last) {
595 return;
596 }
597
598 // Keep only the last N records
599 records.erase(
600 records.begin(),
601 records.begin() + static_cast<std::ptrdiff_t>(records.size() - keep_last));
602}
std::mutex history_mutex_
Mutex for execution history.
std::map< task_id, std::vector< task_execution_record > > execution_history_
Execution history (task_id -> records)

References execution_history_, and history_mutex_.

◆ create_archive_callback()

auto kcenon::pacs::workflow::task_scheduler::create_archive_callback ( const archive_config & config) -> task_callback_with_result
nodiscardprivate

Create archive task callback.

Parameters
configArchive configuration
Returns
Task callback

Definition at line 1224 of file task_scheduler.cpp.

1225 {
1226 return [this, config]() -> std::optional<std::string> {
1228 "Running archive task archive_after_days={} destination={}",
1229 config.archive_after.count(),
1230 config.destination);
1231
1232 try {
1233 // Calculate cutoff date
1234 auto now = std::chrono::system_clock::now();
1235 auto cutoff = now - config.archive_after;
1236
1237 // Convert to YYYYMMDD format
1238 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1239 std::tm tm = *std::localtime(&cutoff_time_t);
1240 std::ostringstream date_oss;
1241 date_oss << std::put_time(&tm, "%Y%m%d");
1242 std::string cutoff_date = date_oss.str();
1243
1244 // Query for studies older than cutoff
1245 storage::study_query query;
1246 query.study_date_to = cutoff_date;
1247 query.limit = config.max_archives_per_cycle;
1248
1249 auto studies_result = database_.search_studies(query);
1250 if (!studies_result.is_ok()) {
1252 "Failed to query studies error={}",
1253 studies_result.error().message);
1254 return "Archive failed: " + studies_result.error().message;
1255 }
1256
1257 std::size_t archived_count = 0;
1258 std::size_t failed_count = 0;
1259
1260 // Create destination directory if needed
1261 std::filesystem::path dest_path(config.destination);
1262 if (!std::filesystem::exists(dest_path)) {
1263 std::filesystem::create_directories(dest_path);
1264 }
1265
1266 for (const auto& study : studies_result.value()) {
1267 // Get all files for this study
1268 auto file_paths_result = database_.get_study_files(study.study_uid);
1269 if (!file_paths_result.is_ok() || file_paths_result.value().empty()) {
1270 continue;
1271 }
1272 const auto& file_paths = file_paths_result.value();
1273
1274 // Create study archive directory
1275 std::filesystem::path study_dest = dest_path / study.study_uid;
1276 if (!std::filesystem::exists(study_dest)) {
1277 std::filesystem::create_directories(study_dest);
1278 }
1279
1280 bool archive_success = true;
1281
1282 for (const auto& src_file : file_paths) {
1283 std::filesystem::path src_path(src_file);
1284 if (!std::filesystem::exists(src_path)) {
1286 "Source file not found file_path={}", src_file);
1287 continue;
1288 }
1289
1290 // Determine destination filename
1291 std::filesystem::path dest_file = study_dest / src_path.filename();
1292
1293 try {
1294 // Copy file to archive location
1295 std::filesystem::copy_file(
1296 src_path, dest_file,
1297 std::filesystem::copy_options::overwrite_existing);
1298
1299 // Verify copy if configured
1300 if (config.verify_after_archive) {
1301 auto src_size = std::filesystem::file_size(src_path);
1302 auto dest_size = std::filesystem::file_size(dest_file);
1303 if (src_size != dest_size) {
1305 "Archive verification failed: size mismatch "
1306 "src={} dest={}", src_file, dest_file.string());
1307 archive_success = false;
1308 break;
1309 }
1310 }
1311 } catch (const std::filesystem::filesystem_error& e) {
1313 "Failed to archive file src={} dest={} error={}",
1314 src_file, dest_file.string(), e.what());
1315 archive_success = false;
1316 break;
1317 }
1318 }
1319
1320 if (archive_success) {
1321 ++archived_count;
1322
1323 // Delete originals if configured
1324 if (config.delete_after_archive && file_storage_ != nullptr) {
1325 for (const auto& file_path : file_paths) {
1326 std::filesystem::path p(file_path);
1327 std::string sop_uid = p.stem().string();
1328 (void)file_storage_->remove(sop_uid);
1329 }
1330 (void)database_.delete_study(study.study_uid);
1331 }
1332
1334 "Archived study study_uid={} files={}",
1335 study.study_uid, file_paths.size());
1336 } else {
1337 ++failed_count;
1338 }
1339 }
1340
1342 "Archive task completed archived={} failed={}",
1343 archived_count, failed_count);
1344
1345 if (failed_count > 0) {
1346 return "Archive completed with " + std::to_string(failed_count) + " failures";
1347 }
1348
1349 return std::nullopt; // Success
1350 } catch (const std::exception& e) {
1351 return std::string("Archive failed: ") + e.what();
1352 }
1353 };
1354}
static void debug(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a debug-level message.
static void warn(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log a warning-level message.
static void error(kcenon::pacs::compat::format_string< Args... > fmt, Args &&... args)
Log an error-level message.
auto remove(std::string_view sop_instance_uid) -> VoidResult override
Remove a DICOM file by SOP Instance UID.
auto search_studies(const study_query &query) const -> Result< std::vector< study_record > >
Search studies with query criteria.
auto get_study_files(std::string_view study_instance_uid) const -> Result< std::vector< std::string > >
Get all file paths for a study.
auto delete_study(std::string_view study_uid) -> VoidResult
Delete a study by Study Instance UID.
const atna_coded_value query
Query (110112)

References kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), kcenon::pacs::integration::logger_adapter::info(), and kcenon::pacs::integration::logger_adapter::warn().

Here is the call graph for this function:

◆ create_cleanup_callback()

auto kcenon::pacs::workflow::task_scheduler::create_cleanup_callback ( const cleanup_config & config) -> task_callback_with_result
nodiscardprivate

Create cleanup task callback.

Parameters
configCleanup configuration
Returns
Task callback

Definition at line 1108 of file task_scheduler.cpp.

1109 {
1110 return [this, config]() -> std::optional<std::string> {
1112 "Running cleanup task retention_days={}",
1113 config.default_retention.count());
1114
1115 try {
1116 // Calculate cutoff date
1117 auto now = std::chrono::system_clock::now();
1118 auto cutoff = now - config.default_retention;
1119
1120 // Convert to YYYYMMDD format for database query
1121 auto cutoff_time_t = std::chrono::system_clock::to_time_t(cutoff);
1122 std::tm tm = *std::localtime(&cutoff_time_t);
1123 std::ostringstream date_oss;
1124 date_oss << std::put_time(&tm, "%Y%m%d");
1125 std::string cutoff_date = date_oss.str();
1126
1127 // Query for studies older than cutoff
1128 storage::study_query query;
1129 query.study_date_to = cutoff_date;
1130 query.limit = config.max_deletions_per_cycle;
1131
1132 auto studies_result = database_.search_studies(query);
1133 if (!studies_result.is_ok()) {
1135 "Failed to query studies error={}",
1136 studies_result.error().message);
1137 return "Cleanup failed: " + studies_result.error().message;
1138 }
1139
1140 std::size_t deleted_count = 0;
1141 std::size_t skipped_count = 0;
1142
1143 for (const auto& study : studies_result.value()) {
1144 // Check modality-specific retention
1145 auto modality_retention = config.retention_for(study.modalities_in_study);
1146 auto modality_cutoff = now - modality_retention;
1147 auto modality_cutoff_t = std::chrono::system_clock::to_time_t(modality_cutoff);
1148 std::tm mod_tm = *std::localtime(&modality_cutoff_t);
1149 std::ostringstream mod_oss;
1150 mod_oss << std::put_time(&mod_tm, "%Y%m%d");
1151 std::string modality_cutoff_date = mod_oss.str();
1152
1153 // Skip if study is newer than modality-specific retention
1154 if (study.study_date > modality_cutoff_date) {
1155 ++skipped_count;
1156 continue;
1157 }
1158
1159 // Check exclusion patterns
1160 bool excluded = false;
1161 for (const auto& pattern : config.exclude_patterns) {
1162 if (study.study_description.find(pattern) != std::string::npos) {
1163 excluded = true;
1164 break;
1165 }
1166 }
1167 if (excluded) {
1168 ++skipped_count;
1169 continue;
1170 }
1171
1172 if (config.dry_run) {
1174 "Dry-run: would delete study study_uid={} study_date={} modality={}",
1175 study.study_uid, study.study_date, study.modalities_in_study);
1176 ++deleted_count;
1177 continue;
1178 }
1179
1180 // Delete files from storage if not database_only
1181 if (!config.database_only && file_storage_ != nullptr) {
1182 auto file_paths_result = database_.get_study_files(study.study_uid);
1183 if (file_paths_result.is_ok()) {
1184 for (const auto& file_path : file_paths_result.value()) {
1185 // Extract SOP UID from file path for removal
1186 std::filesystem::path p(file_path);
1187 std::string sop_uid = p.stem().string();
1188 auto remove_result = file_storage_->remove(sop_uid);
1189 if (remove_result.is_err()) {
1191 "Failed to remove file file_path={} error={}",
1192 file_path, remove_result.error().message);
1193 }
1194 }
1195 }
1196 }
1197
1198 // Delete from database
1199 auto delete_result = database_.delete_study(study.study_uid);
1200 if (delete_result.is_err()) {
1202 "Failed to delete study study_uid={} error={}",
1203 study.study_uid, delete_result.error().message);
1204 continue;
1205 }
1206
1207 ++deleted_count;
1209 "Deleted study study_uid={} study_date={}",
1210 study.study_uid, study.study_date);
1211 }
1212
1214 "Cleanup task completed deleted={} skipped={} dry_run={}",
1215 deleted_count, skipped_count, config.dry_run);
1216
1217 return std::nullopt; // Success
1218 } catch (const std::exception& e) {
1219 return std::string("Cleanup failed: ") + e.what();
1220 }
1221 };
1222}

References kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), kcenon::pacs::integration::logger_adapter::info(), and kcenon::pacs::integration::logger_adapter::warn().

Here is the call graph for this function:

◆ create_verification_callback()

auto kcenon::pacs::workflow::task_scheduler::create_verification_callback ( const verification_config & config) -> task_callback_with_result
nodiscardprivate

Create verification task callback.

Parameters
configVerification configuration
Returns
Task callback

Definition at line 1356 of file task_scheduler.cpp.

1357 {
1358 return [this, config]() -> std::optional<std::string> {
1360 "Running verification task check_checksums={} check_db={}",
1361 config.check_checksums,
1362 config.check_db_consistency);
1363
1364 try {
1365 std::size_t verified = 0;
1366 std::size_t errors = 0;
1367 std::size_t missing_files = 0;
1368
1369 // Database integrity check
1370 if (config.check_db_consistency) {
1371 auto db_result = database_.verify_integrity();
1372 if (db_result.is_err()) {
1374 "Database integrity check failed error={}",
1375 db_result.error().message);
1376 ++errors;
1377 } else {
1378 integration::logger_adapter::debug("Database integrity check passed");
1379 }
1380 }
1381
1382 // Get studies for verification
1383 storage::study_query query;
1384 query.limit = config.max_verifications_per_cycle;
1385 auto studies_result = database_.search_studies(query);
1386 if (!studies_result.is_ok()) {
1388 "Failed to query studies error={}",
1389 studies_result.error().message);
1390 return "Verification failed: " + studies_result.error().message;
1391 }
1392
1393 for (const auto& study : studies_result.value()) {
1394 // Get all files for this study
1395 auto file_paths_result = database_.get_study_files(study.study_uid);
1396 if (!file_paths_result.is_ok()) {
1398 "Failed to get study files study_uid={} error={}",
1399 study.study_uid, file_paths_result.error().message);
1400 ++errors;
1401 continue;
1402 }
1403
1404 for (const auto& file_path : file_paths_result.value()) {
1405 std::filesystem::path path(file_path);
1406
1407 // Check file existence
1408 if (!std::filesystem::exists(path)) {
1409 ++missing_files;
1411 "Missing file detected file_path={} study_uid={}",
1412 file_path, study.study_uid);
1413
1414 if (config.repair_on_failure) {
1415 // Extract SOP UID and remove orphaned database record
1416 std::string sop_uid = path.stem().string();
1417 (void)database_.delete_instance(sop_uid);
1419 "Removed orphaned database record sop_uid={}", sop_uid);
1420 }
1421 continue;
1422 }
1423
1424 // Checksum verification
1425 if (config.check_checksums) {
1426 // Get file size as basic verification
1427 // Full checksum verification would require storing checksums in DB
1428 try {
1429 auto file_size = std::filesystem::file_size(path);
1430 if (file_size == 0) {
1431 ++errors;
1433 "Empty file detected file_path={}", file_path);
1434 }
1435 } catch (const std::filesystem::filesystem_error& e) {
1436 ++errors;
1438 "Cannot read file file_path={} error={}",
1439 file_path, e.what());
1440 }
1441 }
1442
1443 ++verified;
1444 }
1445 }
1446
1447 // File storage integrity check
1448 if (file_storage_ != nullptr) {
1449 auto storage_result = file_storage_->verify_integrity();
1450 if (storage_result.is_err()) {
1452 "Storage integrity check reported issues error={}",
1453 storage_result.error().message);
1454 }
1455 }
1456
1458 "Verification task completed verified={} errors={} missing_files={}",
1459 verified, errors, missing_files);
1460
1461 if (errors > 0 || missing_files > 0) {
1462 std::ostringstream oss;
1463 oss << "Verification found " << errors << " errors and "
1464 << missing_files << " missing files";
1465 return oss.str();
1466 }
1467
1468 return std::nullopt; // Success
1469 } catch (const std::exception& e) {
1470 return std::string("Verification failed: ") + e.what();
1471 }
1472 };
1473}
auto verify_integrity() -> VoidResult override
Verify storage integrity.
auto verify_integrity() const -> VoidResult
Verify database integrity.
auto delete_instance(std::string_view sop_uid) -> VoidResult
Delete an instance by SOP Instance UID.
@ verified
VERIFIED - Verified by authorized person.

References kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), kcenon::pacs::integration::logger_adapter::info(), and kcenon::pacs::integration::logger_adapter::warn().

Here is the call graph for this function:

◆ deserialize_tasks()

auto kcenon::pacs::workflow::task_scheduler::deserialize_tasks ( const std::string & json) -> std::size_t
private

Deserialize tasks from JSON.

Parameters
jsonJSON string
Returns
Number of tasks loaded

Definition at line 1554 of file task_scheduler.cpp.

1554 {
1555 // Note: Full deserialization is not supported without a proper JSON library.
1556 //
1557 // Limitations:
1558 // 1. Callbacks cannot be serialized/deserialized (function pointers)
1559 // 2. Schedule details are not currently serialized
1560 // 3. Tags and other complex fields are not serialized
1561 //
1562 // For production use, consider:
1563 // 1. Using nlohmann/json or similar library for proper JSON parsing
1564 // 2. Storing task configurations separately and re-creating tasks on startup
1565 // 3. Using a database-backed persistence layer
1566 //
1567 // Current implementation returns 0 (no tasks loaded) to indicate
1568 // that manual task registration is required after restart.
1569
1570 if (json.empty()) {
1571 return 0;
1572 }
1573
1575 "deserialize_tasks called but full deserialization is not implemented. "
1576 "Tasks must be re-registered programmatically after scheduler restart.");
1577
1578 return 0;
1579}

References kcenon::pacs::integration::logger_adapter::warn().

Here is the call graph for this function:

◆ execute_cycle()

void kcenon::pacs::workflow::task_scheduler::execute_cycle ( )
private

Execute a single scheduler cycle.

Checks for due tasks and executes them.

Definition at line 755 of file task_scheduler.cpp.

755 {
756 auto now = std::chrono::system_clock::now();
757 std::vector<task_id> due_tasks;
758
759 // Find due tasks
760 {
761 std::lock_guard<std::mutex> lock(tasks_mutex_);
762
763 for (auto& [id, task] : tasks_) {
764 if (!task.enabled || task.state != task_state::pending) {
765 continue;
766 }
767
768 if (task.next_run_at && *task.next_run_at <= now) {
769 due_tasks.push_back(id);
770 }
771 }
772 }
773
774 if (due_tasks.empty()) {
775 return;
776 }
777
778 // Sort by priority (higher first)
779 {
780 std::lock_guard<std::mutex> lock(tasks_mutex_);
781 std::ranges::sort(due_tasks, [this](const auto& a, const auto& b) {
782 return tasks_[a].priority > tasks_[b].priority;
783 });
784 }
785
786 // Execute tasks (respecting max concurrent limit)
787 std::size_t executed = 0;
788 std::size_t succeeded = 0;
789 std::size_t failed = 0;
790
791 for (const auto& id : due_tasks) {
793 break; // Max concurrent reached
794 }
795
796 std::lock_guard<std::mutex> lock(tasks_mutex_);
797 auto it = tasks_.find(id);
798 if (it == tasks_.end()) {
799 continue;
800 }
801
802 auto& task = it->second;
803
804 // Mark as running
805 task.state = task_state::running;
807
808 // Execute task
809 auto record = execute_task(task);
810
811 // Update task state
812 task.state = (record.state == task_state::completed)
814 : record.state;
815 task.last_run_at = record.started_at;
816 task.last_execution = record;
817 ++task.execution_count;
818
819 if (record.state == task_state::completed) {
820 ++task.success_count;
821 ++succeeded;
822 } else {
823 ++task.failure_count;
824 ++failed;
825 }
826
827 // Calculate next run time
828 if (task.enabled && task.state == task_state::pending) {
829 task.next_run_at = calculate_next_run(
830 task.task_schedule,
831 std::chrono::system_clock::now());
832
833 // For one-time tasks, disable after execution
834 if (std::holds_alternative<one_time_schedule>(task.task_schedule)) {
835 task.enabled = false;
836 }
837 }
838
840 ++executed;
841
842 // Record execution history
843 record_execution(id, record);
844 update_stats(record);
845
846 // Invoke callbacks
848 config_.on_task_complete(id, record);
849 }
851 config_.on_task_error(id, record.error_message.value_or("Unknown error"));
852 }
853 }
854
855 // Log cycle summary
856 if (executed > 0) {
858 "Scheduler cycle completed executed={} succeeded={} failed={}",
859 executed, succeeded, failed);
860
861 // Record metrics
863 "scheduler_tasks_executed", static_cast<int64_t>(executed));
865 "scheduler_tasks_succeeded", static_cast<int64_t>(succeeded));
867 "scheduler_tasks_failed", static_cast<int64_t>(failed));
868
869 // Update stats
870 {
871 std::lock_guard<std::mutex> lock(stats_mutex_);
872 stats_.last_cycle_at = std::chrono::system_clock::now();
873 }
874
875 // Invoke cycle callback
877 config_.on_cycle_complete(executed, succeeded, failed);
878 }
879 }
880}
static void increment_counter(std::string_view name, std::int64_t value=1)
Increment a counter metric.
void update_stats(const task_execution_record &record)
Update statistics after execution.
auto execute_task(scheduled_task &task) -> task_execution_record
Execute a single task.
std::mutex stats_mutex_
Mutex for statistics.
auto calculate_next_run(const kcenon::pacs::workflow::schedule &sched, std::chrono::system_clock::time_point from=std::chrono::system_clock::now()) const -> std::optional< std::chrono::system_clock::time_point >
Calculate next run time for a schedule.
void record_execution(const task_id &task_id, const task_execution_record &record)
Record task execution.
std::atomic< std::size_t > running_count_
Running task count.
scheduler_stats stats_
Scheduler statistics.
@ running
Currently executing.
@ pending
Waiting for scheduled time.
@ completed
Completed successfully.
std::optional< std::chrono::system_clock::time_point > last_cycle_at
Last cycle time.
std::size_t max_concurrent_tasks
Maximum concurrent task executions.

References calculate_next_run(), kcenon::pacs::workflow::completed, config_, execute_task(), kcenon::pacs::workflow::failed, kcenon::pacs::integration::monitoring_adapter::increment_counter(), kcenon::pacs::integration::logger_adapter::info(), kcenon::pacs::workflow::scheduler_stats::last_cycle_at, kcenon::pacs::workflow::task_scheduler_config::max_concurrent_tasks, kcenon::pacs::workflow::task_scheduler_config::on_cycle_complete, kcenon::pacs::workflow::task_scheduler_config::on_task_complete, kcenon::pacs::workflow::task_scheduler_config::on_task_error, kcenon::pacs::workflow::pending, record_execution(), kcenon::pacs::workflow::running, running_count_, stats_, stats_mutex_, tasks_, tasks_mutex_, and update_stats().

Referenced by run_loop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ execute_task()

auto kcenon::pacs::workflow::task_scheduler::execute_task ( scheduled_task & task) -> task_execution_record
private

Execute a single task.

Parameters
taskTask to execute
Returns
Execution record

Definition at line 882 of file task_scheduler.cpp.

883 {
884 task_execution_record record;
885 record.execution_id = generate_execution_id();
886 record.task_id = task.id;
887 record.started_at = std::chrono::system_clock::now();
889
891 "Executing task task_id={} name={}",
892 task.id, task.name);
893
894 auto start_time = std::chrono::steady_clock::now();
895
896 // Retry logic: attempt up to (max_retries + 1) times
897 std::size_t attempt = 0;
898 const std::size_t max_attempts = task.max_retries + 1;
899
900 while (attempt < max_attempts) {
901 attempt++;
902
903 if (attempt > 1) {
905 "Retrying task task_id={} attempt={}/{}",
906 task.id, attempt, max_attempts);
907
908 // Wait before retry
909 std::this_thread::sleep_for(task.retry_delay);
910 }
911
912 try {
913 if (!task.callback) {
915 record.error_message = "No callback defined";
916 break; // No point retrying without callback
917 }
918
919 // Execute with timeout if configured
920 if (task.timeout.count() > 0) {
921 // Run callback in async task with timeout
922 auto future = std::async(std::launch::async, task.callback);
923
924 auto status = future.wait_for(task.timeout);
925 if (status == std::future_status::timeout) {
927 record.error_message = "Task execution timed out after " +
928 std::to_string(task.timeout.count()) + " seconds";
930 "Task timed out task_id={} timeout_seconds={}",
931 task.id, task.timeout.count());
932 // Continue to retry if attempts remain
933 continue;
934 }
935
936 // Get result (may throw)
937 auto result = future.get();
938 if (result.has_value()) {
940 record.error_message = *result;
941 } else {
943 break; // Success, no need to retry
944 }
945 } else {
946 // No timeout, execute directly
947 auto result = task.callback();
948
949 if (result.has_value()) {
951 record.error_message = *result;
952 } else {
954 break; // Success, no need to retry
955 }
956 }
957 } catch (const std::exception& e) {
959 record.error_message = e.what();
961 "Task execution failed task_id={} attempt={} error={}",
962 task.id, attempt, e.what());
963 }
964
965 // If we've exhausted all retries and still failed
966 if (attempt >= max_attempts && record.state == task_state::failed) {
968 "Task failed after {} attempts task_id={}",
969 max_attempts, task.id);
970 }
971 }
972
973 record.ended_at = std::chrono::system_clock::now();
974
975 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
976 std::chrono::steady_clock::now() - start_time);
977
979 "Task execution completed task_id={} name={} state={} duration_ms={} attempts={}",
980 task.id, task.name, to_string(record.state), duration.count(), attempt);
981
982 // Record metrics
984 "scheduler_task_duration_ms",
985 static_cast<double>(duration.count()));
986
987 if (attempt > 1) {
989 "scheduler_task_retries_total");
990 }
991
992 return record;
993}
static void record_histogram(std::string_view name, double value)
Record a histogram sample.
auto generate_execution_id() const -> std::string
Generate unique execution ID.
constexpr dicom_tag status
Status.

References kcenon::pacs::workflow::completed, kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), kcenon::pacs::workflow::failed, kcenon::pacs::integration::monitoring_adapter::increment_counter(), kcenon::pacs::integration::logger_adapter::info(), kcenon::pacs::integration::monitoring_adapter::record_histogram(), kcenon::pacs::workflow::running, and kcenon::pacs::integration::logger_adapter::warn().

Referenced by execute_cycle().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ generate_execution_id()

auto kcenon::pacs::workflow::task_scheduler::generate_execution_id ( ) const -> std::string
nodiscardprivate

Generate unique execution ID.

Returns
New unique execution ID

Definition at line 1097 of file task_scheduler.cpp.

1097 {
1098 auto id = next_execution_id_.fetch_add(1);
1099
1100 // Add timestamp for uniqueness
1101 auto now = std::chrono::system_clock::now();
1102 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
1103 now.time_since_epoch()).count();
1104
1105 return "exec_" + std::to_string(millis) + "_" + std::to_string(id);
1106}
std::atomic< uint64_t > next_execution_id_
Next execution ID counter.

References next_execution_id_.

◆ generate_task_id()

auto kcenon::pacs::workflow::task_scheduler::generate_task_id ( ) const -> task_id
nodiscardprivate

Generate unique task ID.

Returns
New unique task ID

Definition at line 1092 of file task_scheduler.cpp.

1092 {
1093 auto id = next_task_id_.fetch_add(1);
1094 return "task_" + std::to_string(id);
1095}
std::atomic< uint64_t > next_task_id_
Next task ID counter.

References next_task_id_.

◆ get_execution_history()

auto kcenon::pacs::workflow::task_scheduler::get_execution_history ( const task_id & id,
std::size_t limit = 100 ) const -> std::vector<task_execution_record>
nodiscard

Get execution history for a task.

Parameters
idTask ID
limitMaximum number of records to return
Returns
Vector of execution records (most recent first)

Definition at line 541 of file task_scheduler.cpp.

543 {
544 std::lock_guard<std::mutex> lock(history_mutex_);
545
546 auto it = execution_history_.find(id);
547 if (it == execution_history_.end()) {
548 return {};
549 }
550
551 const auto& records = it->second;
552 if (records.size() <= limit) {
553 return records;
554 }
555
556 // Return most recent records
557 return std::vector<task_execution_record>(
558 records.end() - static_cast<std::ptrdiff_t>(limit),
559 records.end());
560}

◆ get_recent_executions()

auto kcenon::pacs::workflow::task_scheduler::get_recent_executions ( std::size_t limit = 100) const -> std::vector<task_execution_record>
nodiscard

Get all recent executions.

Parameters
limitMaximum number of records to return
Returns
Vector of execution records (most recent first)

Definition at line 562 of file task_scheduler.cpp.

563 {
564 std::lock_guard<std::mutex> lock(history_mutex_);
565
566 std::vector<task_execution_record> all_records;
567
568 for (const auto& [id, records] : execution_history_) {
569 all_records.insert(all_records.end(), records.begin(), records.end());
570 }
571
572 // Sort by start time (most recent first)
573 std::ranges::sort(all_records, [](const auto& a, const auto& b) {
574 return a.started_at > b.started_at;
575 });
576
577 if (all_records.size() <= limit) {
578 return all_records;
579 }
580
581 all_records.resize(limit);
582 return all_records;
583}

◆ get_stats()

auto kcenon::pacs::workflow::task_scheduler::get_stats ( ) const -> scheduler_stats
nodiscard

Get scheduler statistics.

Returns
Current scheduler statistics

Definition at line 608 of file task_scheduler.cpp.

608 {
609 std::lock_guard<std::mutex> lock(stats_mutex_);
610
611 scheduler_stats result = stats_;
612
613 // Calculate uptime
614 if (running_.load()) {
615 auto now = std::chrono::steady_clock::now();
616 result.uptime = std::chrono::duration_cast<std::chrono::seconds>(
617 now - start_time_);
618 }
619
620 result.running_tasks = running_count_.load();
621
622 {
623 std::lock_guard<std::mutex> tasks_lock(tasks_mutex_);
624 result.scheduled_tasks = tasks_.size();
625 }
626
627 return result;
628}
std::atomic< bool > running_
Flag indicating scheduler is running.
std::chrono::steady_clock::time_point start_time_
Start time for uptime calculation.
std::size_t running_tasks
Number of tasks currently running.

References running_, running_count_, kcenon::pacs::workflow::scheduler_stats::running_tasks, kcenon::pacs::workflow::scheduler_stats::scheduled_tasks, start_time_, stats_, stats_mutex_, tasks_, tasks_mutex_, and kcenon::pacs::workflow::scheduler_stats::uptime.

◆ get_task()

auto kcenon::pacs::workflow::task_scheduler::get_task ( const task_id & id) const -> std::optional<scheduled_task>
nodiscard

Get a specific task by ID.

Parameters
idTask ID
Returns
Task if found, nullopt otherwise

Definition at line 416 of file task_scheduler.cpp.

417 {
418 std::lock_guard<std::mutex> lock(tasks_mutex_);
419
420 auto it = tasks_.find(id);
421 if (it == tasks_.end()) {
422 return std::nullopt;
423 }
424
425 return it->second;
426}

◆ is_running()

auto kcenon::pacs::workflow::task_scheduler::is_running ( ) const -> bool
nodiscardnoexcept

Check if the scheduler is running.

Returns
true if the scheduler is actively running

Definition at line 201 of file task_scheduler.cpp.

201 {
202 return running_.load();
203}

References running_.

◆ list_tasks() [1/3]

auto kcenon::pacs::workflow::task_scheduler::list_tasks ( ) const -> std::vector<scheduled_task>
nodiscard

List all scheduled tasks.

Returns
Vector of all scheduled tasks

Definition at line 373 of file task_scheduler.cpp.

373 {
374 std::lock_guard<std::mutex> lock(tasks_mutex_);
375
376 std::vector<scheduled_task> result;
377 result.reserve(tasks_.size());
378
379 for (const auto& [id, task] : tasks_) {
380 result.push_back(task);
381 }
382
383 return result;
384}

References tasks_, and tasks_mutex_.

◆ list_tasks() [2/3]

auto kcenon::pacs::workflow::task_scheduler::list_tasks ( task_state state) const -> std::vector<scheduled_task>
nodiscard

List tasks by state.

Parameters
stateTask state to filter by
Returns
Vector of matching tasks

Definition at line 401 of file task_scheduler.cpp.

402 {
403 std::lock_guard<std::mutex> lock(tasks_mutex_);
404
405 std::vector<scheduled_task> result;
406
407 for (const auto& [id, task] : tasks_) {
408 if (task.state == state) {
409 result.push_back(task);
410 }
411 }
412
413 return result;
414}

◆ list_tasks() [3/3]

auto kcenon::pacs::workflow::task_scheduler::list_tasks ( task_type type) const -> std::vector<scheduled_task>
nodiscard

List tasks by type.

Parameters
typeTask type to filter by
Returns
Vector of matching tasks

Definition at line 386 of file task_scheduler.cpp.

387 {
388 std::lock_guard<std::mutex> lock(tasks_mutex_);
389
390 std::vector<scheduled_task> result;
391
392 for (const auto& [id, task] : tasks_) {
393 if (task.type == type) {
394 result.push_back(task);
395 }
396 }
397
398 return result;
399}

◆ load_tasks()

auto kcenon::pacs::workflow::task_scheduler::load_tasks ( ) -> std::size_t

Load tasks from persistence storage.

Returns
Number of tasks loaded

Definition at line 681 of file task_scheduler.cpp.

681 {
682 if (config_.persistence_path.empty()) {
683 return 0;
684 }
685
686 try {
687 std::ifstream file(config_.persistence_path);
688 if (!file) {
690 "No persistence file found path={}",
692 return 0;
693 }
694
695 std::string json((std::istreambuf_iterator<char>(file)),
696 std::istreambuf_iterator<char>());
697 file.close();
698
699 std::size_t count = deserialize_tasks(json);
700
702 "Loaded tasks from persistence path={} count={}",
704
705 return count;
706 } catch (const std::exception& e) {
708 "Failed to load tasks error={}", e.what());
709 return 0;
710 }
711}
auto deserialize_tasks(const std::string &json) -> std::size_t
Deserialize tasks from JSON.

References kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), and kcenon::pacs::integration::logger_adapter::info().

Here is the call graph for this function:

◆ operator=() [1/2]

task_scheduler & kcenon::pacs::workflow::task_scheduler::operator= ( const task_scheduler & )
delete

◆ operator=() [2/2]

task_scheduler & kcenon::pacs::workflow::task_scheduler::operator= ( task_scheduler && )
delete

◆ pause_task()

auto kcenon::pacs::workflow::task_scheduler::pause_task ( const task_id & id) -> bool

Pause a scheduled task.

Temporarily suspends task execution. The task remains scheduled but will not execute until resumed.

Parameters
idTask ID to pause
Returns
true if task was found and paused

Definition at line 447 of file task_scheduler.cpp.

447 {
448 std::lock_guard<std::mutex> lock(tasks_mutex_);
449
450 auto it = tasks_.find(id);
451 if (it == tasks_.end()) {
452 return false;
453 }
454
455 if (it->second.state == task_state::running) {
456 return false; // Cannot pause a running task
457 }
458
459 it->second.state = task_state::paused;
460 it->second.updated_at = std::chrono::system_clock::now();
461
463 "Paused scheduled task task_id={} name={}",
464 id, it->second.name);
465
466 return true;
467}

References kcenon::pacs::integration::logger_adapter::info(), kcenon::pacs::workflow::paused, and kcenon::pacs::workflow::running.

Here is the call graph for this function:

◆ pending_count()

auto kcenon::pacs::workflow::task_scheduler::pending_count ( ) const -> std::size_t
nodiscardnoexcept

Get number of pending tasks.

Returns
Number of tasks waiting to execute

Definition at line 630 of file task_scheduler.cpp.

630 {
631 std::lock_guard<std::mutex> lock(tasks_mutex_);
632
633 std::size_t count = 0;
634 for (const auto& [id, task] : tasks_) {
635 if (task.enabled && task.state == task_state::pending) {
636 ++count;
637 }
638 }
639 return count;
640}

References kcenon::pacs::workflow::pending, tasks_, and tasks_mutex_.

◆ record_execution()

void kcenon::pacs::workflow::task_scheduler::record_execution ( const task_id & task_id,
const task_execution_record & record )
private

Record task execution.

Parameters
task_idTask ID
recordExecution record

Definition at line 1475 of file task_scheduler.cpp.

1476 {
1477 std::lock_guard<std::mutex> lock(history_mutex_);
1478
1479 auto& history = execution_history_[task_id];
1480 history.push_back(record);
1481
1482 // Limit history size (keep last 1000 records per task)
1483 constexpr std::size_t max_history = 1000;
1484 if (history.size() > max_history) {
1485 history.erase(
1486 history.begin(),
1487 history.begin() + static_cast<std::ptrdiff_t>(history.size() - max_history));
1488 }
1489}
std::string task_id
Unique task identifier.

References execution_history_, and history_mutex_.

Referenced by execute_cycle().

Here is the caller graph for this function:

◆ resume_task()

auto kcenon::pacs::workflow::task_scheduler::resume_task ( const task_id & id) -> bool

Resume a paused task.

Parameters
idTask ID to resume
Returns
true if task was found and resumed

Definition at line 469 of file task_scheduler.cpp.

469 {
470 std::lock_guard<std::mutex> lock(tasks_mutex_);
471
472 auto it = tasks_.find(id);
473 if (it == tasks_.end()) {
474 return false;
475 }
476
477 if (it->second.state != task_state::paused) {
478 return false; // Not paused
479 }
480
481 it->second.state = task_state::pending;
482 it->second.next_run_at = calculate_next_run(it->second.task_schedule);
483 it->second.updated_at = std::chrono::system_clock::now();
484
486 "Resumed scheduled task task_id={} name={}",
487 id, it->second.name);
488
489 return true;
490}

References kcenon::pacs::integration::logger_adapter::info(), kcenon::pacs::workflow::paused, and kcenon::pacs::workflow::pending.

Here is the call graph for this function:

◆ run_loop()

void kcenon::pacs::workflow::task_scheduler::run_loop ( )
private

Background thread main loop.

Definition at line 733 of file task_scheduler.cpp.

733 {
734 integration::logger_adapter::debug("Task scheduler thread started");
735
736 while (!stop_requested_.load()) {
737 std::unique_lock<std::mutex> lock(mutex_);
738
739 // Wait for check interval or until woken up
740 cv_.wait_for(lock, config_.check_interval, [this]() {
741 return stop_requested_.load();
742 });
743
744 if (stop_requested_.load()) {
745 break;
746 }
747
748 lock.unlock();
750 }
751
752 integration::logger_adapter::debug("Task scheduler thread stopped");
753}
void execute_cycle()
Execute a single scheduler cycle.
std::condition_variable cv_
Condition variable for sleep/wake.
std::mutex mutex_
Mutex for thread synchronization.
std::atomic< bool > stop_requested_
Flag to signal shutdown.
std::chrono::seconds check_interval
Scheduler check interval (how often to check for due tasks)

References kcenon::pacs::workflow::task_scheduler_config::check_interval, config_, cv_, kcenon::pacs::integration::logger_adapter::debug(), execute_cycle(), mutex_, and stop_requested_.

Referenced by start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ running_count()

auto kcenon::pacs::workflow::task_scheduler::running_count ( ) const -> std::size_t
nodiscardnoexcept

Get number of running tasks.

Returns
Number of tasks currently executing

Definition at line 642 of file task_scheduler.cpp.

642 {
643 return running_count_.load();
644}

References running_count_.

◆ save_tasks()

auto kcenon::pacs::workflow::task_scheduler::save_tasks ( ) const -> bool

Save all tasks to persistence storage.

Returns
true if saved successfully

Definition at line 650 of file task_scheduler.cpp.

650 {
651 if (config_.persistence_path.empty()) {
652 return false;
653 }
654
655 try {
656 std::string json = serialize_tasks();
657
658 std::ofstream file(config_.persistence_path);
659 if (!file) {
661 "Failed to open persistence file path={}",
663 return false;
664 }
665
666 file << json;
667 file.close();
668
670 "Saved tasks to persistence path={}",
672
673 return true;
674 } catch (const std::exception& e) {
676 "Failed to save tasks error={}", e.what());
677 return false;
678 }
679}
auto serialize_tasks() const -> std::string
Serialize tasks to JSON.

References config_, kcenon::pacs::integration::logger_adapter::debug(), kcenon::pacs::integration::logger_adapter::error(), kcenon::pacs::workflow::task_scheduler_config::persistence_path, and serialize_tasks().

Referenced by ~task_scheduler().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ schedule() [1/3]

auto kcenon::pacs::workflow::task_scheduler::schedule ( const std::string & name,
const std::string & description,
const cron_schedule & cron_expr,
task_callback_with_result callback ) -> task_id

Schedule a custom task with cron schedule.

Parameters
nameTask name
descriptionTask description
cron_exprCron schedule expression
callbackTask callback function
Returns
Task ID

Definition at line 293 of file task_scheduler.cpp.

297 {
298
299 scheduled_task task;
300 task.id = generate_task_id();
301 task.name = name;
302 task.description = description;
303 task.type = task_type::custom;
304 task.task_schedule = cron_expr;
305 task.enabled = true;
306 task.callback = std::move(callback);
307 task.created_at = std::chrono::system_clock::now();
308 task.updated_at = task.created_at;
309
310 return schedule(std::move(task));
311}
auto generate_task_id() const -> task_id
Generate unique task ID.
std::variant< interval_schedule, cron_schedule, one_time_schedule > schedule
Combined schedule type.
std::string_view name

References kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::custom, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, name, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, and kcenon::pacs::workflow::scheduled_task::updated_at.

◆ schedule() [2/3]

auto kcenon::pacs::workflow::task_scheduler::schedule ( const std::string & name,
const std::string & description,
std::chrono::seconds interval,
task_callback_with_result callback ) -> task_id

Schedule a custom task with interval.

Parameters
nameTask name
descriptionTask description
intervalInterval between executions
callbackTask callback function
Returns
Task ID

Definition at line 273 of file task_scheduler.cpp.

277 {
278
279 scheduled_task task;
280 task.id = generate_task_id();
281 task.name = name;
282 task.description = description;
283 task.type = task_type::custom;
284 task.task_schedule = interval_schedule{interval, std::nullopt};
285 task.enabled = true;
286 task.callback = std::move(callback);
287 task.created_at = std::chrono::system_clock::now();
288 task.updated_at = task.created_at;
289
290 return schedule(std::move(task));
291}

References kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::custom, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, name, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, and kcenon::pacs::workflow::scheduled_task::updated_at.

◆ schedule() [3/3]

auto kcenon::pacs::workflow::task_scheduler::schedule ( scheduled_task task) -> task_id

Schedule a task with full definition.

Parameters
taskComplete task definition
Returns
Task ID

Definition at line 334 of file task_scheduler.cpp.

334 {
335 std::lock_guard<std::mutex> lock(tasks_mutex_);
336
337 // Calculate next run time if not set
338 if (!task.next_run_at) {
339 task.next_run_at = calculate_next_run(task.task_schedule);
340 }
341
342 auto id = task.id;
343
344 // Check if task already exists
345 auto it = tasks_.find(id);
346 if (it != tasks_.end()) {
347 // Update existing task
348 it->second = std::move(task);
350 "Updated scheduled task task_id={} name={}",
351 id, it->second.name);
352 } else {
353 // Add new task
354 tasks_.emplace(id, std::move(task));
356 "Added scheduled task task_id={} name={}",
357 id, tasks_[id].name);
358 }
359
360 // Update stats
361 {
362 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
364 }
365
366 return id;
367}
@ id
Implant Displaced (alternate code)
std::size_t scheduled_tasks
Number of tasks currently scheduled.

References kcenon::pacs::integration::logger_adapter::info(), and name.

Here is the call graph for this function:

◆ schedule_archive()

auto kcenon::pacs::workflow::task_scheduler::schedule_archive ( const archive_config & config) -> task_id

Schedule archive task.

Creates an archive task with the specified configuration. If an archive task already exists, updates its configuration.

Parameters
configArchive configuration
Returns
Task ID for the archive task

Definition at line 230 of file task_scheduler.cpp.

230 {
231 scheduled_task task;
232 task.id = "archive_task";
233 task.name = "Study Archival";
234 task.description = "Archives studies to secondary storage";
235 task.type = task_type::archive;
236 task.task_schedule = config.archive_schedule;
237 task.enabled = true;
238 task.priority = 5;
239 task.tags = {"maintenance", "archive"};
240 task.callback = create_archive_callback(config);
241 task.created_at = std::chrono::system_clock::now();
242 task.updated_at = task.created_at;
243
244 return schedule(std::move(task));
245}
auto create_archive_callback(const archive_config &config) -> task_callback_with_result
Create archive task callback.
@ archive
Study archival task.

References kcenon::pacs::workflow::archive, kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, kcenon::pacs::workflow::scheduled_task::priority, kcenon::pacs::workflow::scheduled_task::tags, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, and kcenon::pacs::workflow::scheduled_task::updated_at.

◆ schedule_cleanup()

auto kcenon::pacs::workflow::task_scheduler::schedule_cleanup ( const cleanup_config & config) -> task_id

Schedule cleanup task.

Creates a cleanup task with the specified configuration. If a cleanup task already exists, updates its configuration.

Parameters
configCleanup configuration
Returns
Task ID for the cleanup task

Definition at line 209 of file task_scheduler.cpp.

209 {
210 scheduled_task task;
211 task.id = "cleanup_task";
212 task.name = "Storage Cleanup";
213 task.description = "Removes old studies based on retention policy";
214 task.type = task_type::cleanup;
215 task.task_schedule = config.cleanup_schedule;
216 task.enabled = true;
217 task.priority = 10;
218 task.tags = {"maintenance", "storage"};
219 task.callback = create_cleanup_callback(config);
220 task.created_at = std::chrono::system_clock::now();
221 task.updated_at = task.created_at;
222
223 return schedule(std::move(task));
224}
auto create_cleanup_callback(const cleanup_config &config) -> task_callback_with_result
Create cleanup task callback.
@ cleanup
Storage cleanup task.

References kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::cleanup, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, kcenon::pacs::workflow::scheduled_task::priority, kcenon::pacs::workflow::scheduled_task::tags, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, and kcenon::pacs::workflow::scheduled_task::updated_at.

◆ schedule_once()

auto kcenon::pacs::workflow::task_scheduler::schedule_once ( const std::string & name,
const std::string & description,
std::chrono::system_clock::time_point execute_at,
task_callback_with_result callback ) -> task_id

Schedule a one-time task.

Parameters
nameTask name
descriptionTask description
execute_atExecution time
callbackTask callback function
Returns
Task ID

Definition at line 313 of file task_scheduler.cpp.

317 {
318
319 scheduled_task task;
320 task.id = generate_task_id();
321 task.name = name;
322 task.description = description;
323 task.type = task_type::custom;
324 task.task_schedule = one_time_schedule{execute_at};
325 task.enabled = true;
326 task.callback = std::move(callback);
327 task.created_at = std::chrono::system_clock::now();
328 task.updated_at = task.created_at;
329 task.next_run_at = execute_at;
330
331 return schedule(std::move(task));
332}

References kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::custom, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, name, kcenon::pacs::workflow::scheduled_task::next_run_at, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, and kcenon::pacs::workflow::scheduled_task::updated_at.

◆ schedule_verification()

auto kcenon::pacs::workflow::task_scheduler::schedule_verification ( const verification_config & config) -> task_id

Schedule verification task.

Creates a verification task with the specified configuration. If a verification task already exists, updates its configuration.

Parameters
configVerification configuration
Returns
Task ID for the verification task

Definition at line 251 of file task_scheduler.cpp.

252 {
253 scheduled_task task;
254 task.id = "verification_task";
255 task.name = "Data Verification";
256 task.description = "Verifies data integrity and consistency";
257 task.type = task_type::verification;
258 task.task_schedule = config.verification_schedule;
259 task.enabled = true;
260 task.priority = 8;
261 task.tags = {"maintenance", "integrity"};
262 task.callback = create_verification_callback(config);
263 task.created_at = std::chrono::system_clock::now();
264 task.updated_at = task.created_at;
265
266 return schedule(std::move(task));
267}
auto create_verification_callback(const verification_config &config) -> task_callback_with_result
Create verification task callback.
@ verification
Data integrity verification.

References kcenon::pacs::workflow::scheduled_task::callback, kcenon::pacs::workflow::scheduled_task::created_at, kcenon::pacs::workflow::scheduled_task::description, kcenon::pacs::workflow::scheduled_task::enabled, kcenon::pacs::workflow::scheduled_task::id, kcenon::pacs::workflow::scheduled_task::name, kcenon::pacs::workflow::scheduled_task::priority, kcenon::pacs::workflow::scheduled_task::tags, kcenon::pacs::workflow::scheduled_task::task_schedule, kcenon::pacs::workflow::scheduled_task::type, kcenon::pacs::workflow::scheduled_task::updated_at, and kcenon::pacs::workflow::verification.

◆ serialize_tasks()

auto kcenon::pacs::workflow::task_scheduler::serialize_tasks ( ) const -> std::string
nodiscardprivate

Serialize tasks to JSON.

Returns
JSON string representation

Definition at line 1526 of file task_scheduler.cpp.

1526 {
1527 // Simple JSON serialization
1528 // In production, use a proper JSON library
1529
1530 std::lock_guard<std::mutex> lock(tasks_mutex_);
1531
1532 std::ostringstream oss;
1533 oss << "{\n \"tasks\": [\n";
1534
1535 bool first = true;
1536 for (const auto& [id, task] : tasks_) {
1537 if (!first) oss << ",\n";
1538 first = false;
1539
1540 oss << " {\n";
1541 oss << " \"id\": \"" << task.id << "\",\n";
1542 oss << " \"name\": \"" << task.name << "\",\n";
1543 oss << " \"type\": \"" << to_string(task.type) << "\",\n";
1544 oss << " \"enabled\": " << (task.enabled ? "true" : "false") << ",\n";
1545 oss << " \"priority\": " << task.priority << "\n";
1546 oss << " }";
1547 }
1548
1549 oss << "\n ]\n}";
1550
1551 return oss.str();
1552}
auto to_string(lock_type type) -> std::string
Convert lock_type to string.

References tasks_, tasks_mutex_, and kcenon::pacs::workflow::to_string().

Referenced by save_tasks().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ set_error_callback()

void kcenon::pacs::workflow::task_scheduler::set_error_callback ( task_scheduler_config::task_error_callback callback)

Set the error callback.

Parameters
callbackFunction called when a task fails

Definition at line 723 of file task_scheduler.cpp.

724 {
725 std::lock_guard<std::mutex> lock(mutex_);
726 config_.on_task_error = std::move(callback);
727}

References config_, mutex_, and kcenon::pacs::workflow::task_scheduler_config::on_task_error.

◆ set_task_complete_callback()

void kcenon::pacs::workflow::task_scheduler::set_task_complete_callback ( task_scheduler_config::task_complete_callback callback)

Set the task complete callback.

Parameters
callbackFunction called after each task execution

Definition at line 717 of file task_scheduler.cpp.

718 {
719 std::lock_guard<std::mutex> lock(mutex_);
720 config_.on_task_complete = std::move(callback);
721}

References config_, mutex_, and kcenon::pacs::workflow::task_scheduler_config::on_task_complete.

◆ start()

void kcenon::pacs::workflow::task_scheduler::start ( )

Start the scheduler service.

Starts the background scheduler thread and begins monitoring for tasks to execute.

Definition at line 164 of file task_scheduler.cpp.

164 {
165 if (running_.exchange(true)) {
166 return; // Already running
167 }
168
169 stop_requested_.store(false);
170 start_time_ = std::chrono::steady_clock::now();
171
172 scheduler_thread_ = std::thread([this]() {
173 run_loop();
174 });
175
177 "Task scheduler started check_interval_sec={} max_concurrent={}",
178 config_.check_interval.count(),
180}
std::thread scheduler_thread_
Background scheduler thread.
void run_loop()
Background thread main loop.

References kcenon::pacs::workflow::task_scheduler_config::check_interval, config_, kcenon::pacs::integration::logger_adapter::info(), kcenon::pacs::workflow::task_scheduler_config::max_concurrent_tasks, run_loop(), running_, scheduler_thread_, start_time_, and stop_requested_.

Here is the call graph for this function:

◆ stop()

void kcenon::pacs::workflow::task_scheduler::stop ( bool wait_for_completion = true)

Stop the scheduler service.

Gracefully stops the service, waiting for any in-progress tasks to complete.

Parameters
wait_for_completionIf true, waits for running tasks

Definition at line 182 of file task_scheduler.cpp.

182 {
183 if (!running_.exchange(false)) {
184 return; // Already stopped
185 }
186
187 stop_requested_.store(true);
188
189 // Wake up the scheduler thread
190 cv_.notify_all();
191
192 if (wait_for_completion && scheduler_thread_.joinable()) {
193 scheduler_thread_.join();
194 } else if (scheduler_thread_.joinable()) {
195 scheduler_thread_.detach();
196 }
197
198 integration::logger_adapter::info("Task scheduler stopped");
199}

References cv_, kcenon::pacs::integration::logger_adapter::info(), running_, scheduler_thread_, and stop_requested_.

Referenced by ~task_scheduler().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ trigger_task()

auto kcenon::pacs::workflow::task_scheduler::trigger_task ( const task_id & id) -> bool

Trigger immediate execution of a task.

Executes the task immediately, regardless of its schedule.

Parameters
idTask ID to execute
Returns
true if task was found and triggered

Definition at line 492 of file task_scheduler.cpp.

492 {
493 std::lock_guard<std::mutex> lock(tasks_mutex_);
494
495 auto it = tasks_.find(id);
496 if (it == tasks_.end()) {
497 return false;
498 }
499
500 if (!it->second.enabled || it->second.state == task_state::running) {
501 return false;
502 }
503
504 // Set next run to now to trigger immediate execution
505 it->second.next_run_at = std::chrono::system_clock::now();
506
507 // Wake up scheduler
508 cv_.notify_one();
509
511 "Triggered immediate execution task_id={} name={}",
512 id, it->second.name);
513
514 return true;
515}

References kcenon::pacs::integration::logger_adapter::info(), and kcenon::pacs::workflow::running.

Here is the call graph for this function:

◆ update_schedule()

auto kcenon::pacs::workflow::task_scheduler::update_schedule ( const task_id & id,
const kcenon::pacs::workflow::schedule & new_schedule ) -> bool

Update task schedule.

Parameters
idTask ID
new_scheduleNew schedule
Returns
true if task was found and updated

Definition at line 517 of file task_scheduler.cpp.

518 {
519 std::lock_guard<std::mutex> lock(tasks_mutex_);
520
521 auto it = tasks_.find(id);
522 if (it == tasks_.end()) {
523 return false;
524 }
525
526 it->second.task_schedule = new_schedule;
527 it->second.next_run_at = calculate_next_run(new_schedule);
528 it->second.updated_at = std::chrono::system_clock::now();
529
531 "Updated schedule for task task_id={} name={}",
532 id, it->second.name);
533
534 return true;
535}

References kcenon::pacs::integration::logger_adapter::info().

Here is the call graph for this function:

◆ update_stats()

void kcenon::pacs::workflow::task_scheduler::update_stats ( const task_execution_record & record)
private

Update statistics after execution.

Parameters
recordExecution record

Definition at line 1491 of file task_scheduler.cpp.

1491 {
1492 std::lock_guard<std::mutex> lock(stats_mutex_);
1493
1495
1496 if (record.state == task_state::completed) {
1498 } else if (record.state == task_state::failed) {
1500 } else if (record.state == task_state::cancelled) {
1502 }
1503
1504 // Update average and max execution time
1505 if (auto dur = record.duration()) {
1506 auto ms = dur->count();
1507
1508 // Update max
1509 if (ms > stats_.max_execution_time.count()) {
1510 stats_.max_execution_time = std::chrono::milliseconds(ms);
1511 }
1512
1513 // Update running average
1514 if (stats_.total_executions == 1) {
1515 stats_.avg_execution_time = std::chrono::milliseconds(ms);
1516 } else {
1517 auto current_avg = stats_.avg_execution_time.count();
1518 auto new_avg = current_avg +
1519 (ms - current_avg) /
1520 static_cast<int64_t>(stats_.total_executions);
1521 stats_.avg_execution_time = std::chrono::milliseconds(new_avg);
1522 }
1523 }
1524}
std::size_t cancelled_executions
Cancelled executions.
std::size_t successful_executions
Successful executions.
std::chrono::milliseconds max_execution_time
Maximum execution time observed.
std::size_t failed_executions
Failed executions.
std::size_t total_executions
Total tasks executed.
std::chrono::milliseconds avg_execution_time
Average execution time.

References kcenon::pacs::workflow::scheduler_stats::avg_execution_time, kcenon::pacs::workflow::cancelled, kcenon::pacs::workflow::scheduler_stats::cancelled_executions, kcenon::pacs::workflow::completed, kcenon::pacs::workflow::failed, kcenon::pacs::workflow::scheduler_stats::failed_executions, kcenon::pacs::workflow::scheduler_stats::max_execution_time, stats_, stats_mutex_, kcenon::pacs::workflow::scheduler_stats::successful_executions, and kcenon::pacs::workflow::scheduler_stats::total_executions.

Referenced by execute_cycle().

Here is the caller graph for this function:

Member Data Documentation

◆ config_

task_scheduler_config kcenon::pacs::workflow::task_scheduler::config_
private

◆ cv_

std::condition_variable kcenon::pacs::workflow::task_scheduler::cv_
private

Condition variable for sleep/wake.

Definition at line 686 of file task_scheduler.h.

Referenced by run_loop(), and stop().

◆ database_

storage::index_database& kcenon::pacs::workflow::task_scheduler::database_
private

Reference to PACS index database.

Definition at line 665 of file task_scheduler.h.

◆ execution_history_

std::map<task_id, std::vector<task_execution_record> > kcenon::pacs::workflow::task_scheduler::execution_history_
private

Execution history (task_id -> records)

Definition at line 701 of file task_scheduler.h.

Referenced by clear_history(), and record_execution().

◆ executor_

std::shared_ptr<kcenon::common::interfaces::IExecutor> kcenon::pacs::workflow::task_scheduler::executor_
private

IExecutor for task execution (recommended, Issue #487)

Definition at line 674 of file task_scheduler.h.

◆ file_storage_

storage::file_storage* kcenon::pacs::workflow::task_scheduler::file_storage_ {nullptr}
private

Optional reference to file storage.

Definition at line 668 of file task_scheduler.h.

668{nullptr};

◆ history_mutex_

std::mutex kcenon::pacs::workflow::task_scheduler::history_mutex_
mutableprivate

Mutex for execution history.

Definition at line 704 of file task_scheduler.h.

Referenced by clear_history(), and record_execution().

◆ mutex_

std::mutex kcenon::pacs::workflow::task_scheduler::mutex_
mutableprivate

Mutex for thread synchronization.

Definition at line 683 of file task_scheduler.h.

Referenced by run_loop(), set_error_callback(), and set_task_complete_callback().

◆ next_execution_id_

std::atomic<uint64_t> kcenon::pacs::workflow::task_scheduler::next_execution_id_ {1}
mutableprivate

Next execution ID counter.

Definition at line 722 of file task_scheduler.h.

722{1};

Referenced by generate_execution_id().

◆ next_task_id_

std::atomic<uint64_t> kcenon::pacs::workflow::task_scheduler::next_task_id_ {1}
mutableprivate

Next task ID counter.

Definition at line 719 of file task_scheduler.h.

719{1};

Referenced by generate_task_id().

◆ running_

std::atomic<bool> kcenon::pacs::workflow::task_scheduler::running_ {false}
private

Flag indicating scheduler is running.

Definition at line 692 of file task_scheduler.h.

692{false};

Referenced by get_stats(), is_running(), start(), and stop().

◆ running_count_

std::atomic<std::size_t> kcenon::pacs::workflow::task_scheduler::running_count_ {0}
private

Running task count.

Definition at line 707 of file task_scheduler.h.

707{0};

Referenced by execute_cycle(), get_stats(), and running_count().

◆ scheduler_thread_

std::thread kcenon::pacs::workflow::task_scheduler::scheduler_thread_
private

Background scheduler thread.

Definition at line 680 of file task_scheduler.h.

Referenced by start(), and stop().

◆ start_time_

std::chrono::steady_clock::time_point kcenon::pacs::workflow::task_scheduler::start_time_
private

Start time for uptime calculation.

Definition at line 716 of file task_scheduler.h.

Referenced by get_stats(), and start().

◆ stats_

scheduler_stats kcenon::pacs::workflow::task_scheduler::stats_
private

Scheduler statistics.

Definition at line 710 of file task_scheduler.h.

Referenced by execute_cycle(), get_stats(), and update_stats().

◆ stats_mutex_

std::mutex kcenon::pacs::workflow::task_scheduler::stats_mutex_
mutableprivate

Mutex for statistics.

Definition at line 713 of file task_scheduler.h.

Referenced by execute_cycle(), get_stats(), and update_stats().

◆ stop_requested_

std::atomic<bool> kcenon::pacs::workflow::task_scheduler::stop_requested_ {false}
private

Flag to signal shutdown.

Definition at line 689 of file task_scheduler.h.

689{false};

Referenced by run_loop(), start(), and stop().

◆ tasks_

std::map<task_id, scheduled_task> kcenon::pacs::workflow::task_scheduler::tasks_
private

All scheduled tasks (id -> task)

Definition at line 695 of file task_scheduler.h.

Referenced by execute_cycle(), get_stats(), list_tasks(), pending_count(), and serialize_tasks().

◆ tasks_mutex_

std::mutex kcenon::pacs::workflow::task_scheduler::tasks_mutex_
mutableprivate

Mutex for tasks map.

Definition at line 698 of file task_scheduler.h.

Referenced by execute_cycle(), get_stats(), list_tasks(), pending_count(), and serialize_tasks().

◆ thread_pool_

std::shared_ptr<kcenon::thread::thread_pool> kcenon::pacs::workflow::task_scheduler::thread_pool_
private

Thread pool for parallel task execution (legacy)

Definition at line 671 of file task_scheduler.h.


The documentation for this class was generated from the following files: