PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
kcenon::pacs::client::job_manager Class Reference

#include <job_manager.h>

Collaboration diagram for kcenon::pacs::client::job_manager:
Collaboration graph

Classes

struct  impl
 

Public Member Functions

 job_manager (std::shared_ptr< storage::job_repository > repo, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< di::ILogger > logger=nullptr)
 Construct a job manager with default configuration.
 
 job_manager (const job_manager_config &config, std::shared_ptr< storage::job_repository > repo, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< di::ILogger > logger=nullptr)
 Construct a job manager with custom configuration.
 
 ~job_manager ()
 Destructor - stops workers if running.
 
 job_manager (const job_manager &)=delete
 
auto operator= (const job_manager &) -> job_manager &=delete
 
 job_manager (job_manager &&)=delete
 
auto operator= (job_manager &&) -> job_manager &=delete
 
auto create_retrieve_job (std::string_view source_node_id, std::string_view study_uid, std::optional< std::string_view > series_uid=std::nullopt, job_priority priority=job_priority::normal) -> std::string
 Create a retrieve job (C-MOVE/C-GET)
 
auto create_store_job (std::string_view destination_node_id, const std::vector< std::string > &instance_uids, job_priority priority=job_priority::normal) -> std::string
 Create a store job (C-STORE)
 
auto create_query_job (std::string_view node_id, std::string_view query_level, const std::unordered_map< std::string, std::string > &query_keys, job_priority priority=job_priority::normal) -> std::string
 Create a query job (C-FIND)
 
auto create_sync_job (std::string_view source_node_id, std::optional< std::string_view > patient_id=std::nullopt, job_priority priority=job_priority::low) -> std::string
 Create a sync job.
 
auto create_prefetch_job (std::string_view source_node_id, std::string_view patient_id, job_priority priority=job_priority::low) -> std::string
 Create a prefetch job.
 
auto start_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Start a pending job.
 
auto pause_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Pause a running or queued job.
 
auto resume_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Resume a paused job.
 
auto cancel_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Cancel a job.
 
auto retry_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Retry a failed job.
 
auto delete_job (std::string_view job_id) -> kcenon::pacs::VoidResult
 Delete a job.
 
auto get_job (std::string_view job_id) const -> std::optional< job_record >
 Get a job by ID.
 
auto list_jobs (std::optional< job_status > status=std::nullopt, std::optional< job_type > type=std::nullopt, size_t limit=100, size_t offset=0) const -> std::vector< job_record >
 List jobs with optional filters.
 
auto list_jobs_by_node (std::string_view node_id) const -> std::vector< job_record >
 List jobs by node ID.
 
auto get_progress (std::string_view job_id) const -> job_progress
 Get current progress for a job.
 
void set_progress_callback (job_progress_callback callback)
 Set the progress callback.
 
void set_completion_callback (job_completion_callback callback)
 Set the completion callback.
 
auto wait_for_completion (std::string_view job_id) -> std::future< job_record >
 Wait for a job to complete.
 
void start_workers ()
 Start the worker threads.
 
void stop_workers ()
 Stop the worker threads.
 
auto is_running () const noexcept -> bool
 Check if workers are running.
 
auto active_jobs () const -> size_t
 Get number of active (running) jobs.
 
auto pending_jobs () const -> size_t
 Get number of pending jobs.
 
auto completed_jobs_today () const -> size_t
 Get number of jobs completed today.
 
auto failed_jobs_today () const -> size_t
 Get number of jobs failed today.
 
auto config () const noexcept -> const job_manager_config &
 Get current configuration.
 

Private Attributes

std::unique_ptr< implimpl_
 

Detailed Description

Definition at line 93 of file job_manager.h.

Constructor & Destructor Documentation

◆ job_manager() [1/4]

kcenon::pacs::client::job_manager::job_manager ( std::shared_ptr< storage::job_repository > repo,
std::shared_ptr< remote_node_manager > node_manager,
std::shared_ptr< di::ILogger > logger = nullptr )
explicit

Construct a job manager with default configuration.

Parameters
repoJob repository for persistence (required)
node_managerRemote node manager for DICOM operations (required)
loggerLogger instance (optional, defaults to NullLogger)
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1129 of file job_manager.cpp.

1133 : job_manager(job_manager_config{}, std::move(repo), std::move(node_manager), std::move(logger)) {
1134}
job_manager(std::shared_ptr< storage::job_repository > repo, std::shared_ptr< remote_node_manager > node_manager, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a job manager with default configuration.

◆ job_manager() [2/4]

kcenon::pacs::client::job_manager::job_manager ( const job_manager_config & config,
std::shared_ptr< storage::job_repository > repo,
std::shared_ptr< remote_node_manager > node_manager,
std::shared_ptr< di::ILogger > logger = nullptr )
explicit

Construct a job manager with custom configuration.

Parameters
configManager configuration
repoJob repository for persistence (required)
node_managerRemote node manager for DICOM operations (required)
loggerLogger instance (optional, defaults to NullLogger)

Definition at line 1136 of file job_manager.cpp.

1141 : impl_(std::make_unique<impl>()) {
1142
1143 impl_->config = config;
1144 impl_->repo = std::move(repo);
1145 impl_->node_manager = std::move(node_manager);
1146 impl_->logger = logger ? std::move(logger) : di::null_logger();
1147
1148 // Load any pending jobs from repository
1150}
auto config() const noexcept -> const job_manager_config &
Get current configuration.
std::unique_ptr< impl > impl_
std::shared_ptr< di::ILogger > logger
std::shared_ptr< storage::job_repository > repo
std::shared_ptr< remote_node_manager > node_manager

References config(), kcenon::pacs::client::job_manager::impl::config, impl_, kcenon::pacs::client::job_manager::impl::load_pending_jobs_from_repo(), kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::job_manager::impl::node_manager, kcenon::pacs::di::null_logger(), and kcenon::pacs::client::job_manager::impl::repo.

Here is the call graph for this function:

◆ ~job_manager()

kcenon::pacs::client::job_manager::~job_manager ( )

Destructor - stops workers if running.

Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1152 of file job_manager.cpp.

1152 {
1153 stop_workers();
1154}
void stop_workers()
Stop the worker threads.

References stop_workers().

Here is the call graph for this function:

◆ job_manager() [3/4]

kcenon::pacs::client::job_manager::job_manager ( const job_manager & )
delete

◆ job_manager() [4/4]

kcenon::pacs::client::job_manager::job_manager ( job_manager && )
delete

Member Function Documentation

◆ active_jobs()

size_t kcenon::pacs::client::job_manager::active_jobs ( ) const -> size_t
nodiscard

Get number of active (running) jobs.

Returns
Number of jobs currently executing
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1622 of file job_manager.cpp.

1622 {
1623 std::lock_guard lock(impl_->active_mutex);
1624 return impl_->active_job_ids.size();
1625}
std::unordered_set< std::string > active_job_ids

References kcenon::pacs::client::job_manager::impl::active_job_ids, kcenon::pacs::client::job_manager::impl::active_mutex, and impl_.

◆ cancel_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::cancel_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Cancel a job.

Cancels a running, queued, or pending job.

Parameters
job_idThe job ID to cancel
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1353 of file job_manager.cpp.

1353 {
1354 auto job_opt = impl_->get_job_from_cache(job_id);
1355 if (!job_opt) {
1357 kcenon::pacs::error_codes::not_found,
1358 "Job not found: " + std::string(job_id));
1359 }
1360
1361 if (!job_opt->can_cancel()) {
1363 kcenon::pacs::error_codes::invalid_argument,
1364 "Job cannot be cancelled in current state: " + std::string(to_string(job_opt->status)));
1365 }
1366
1367 {
1368 std::lock_guard lock(impl_->cancelled_mutex);
1369 impl_->cancelled_job_ids.insert(std::string(job_id));
1370 }
1371
1372 // If not actively running, update status immediately
1373 {
1374 std::lock_guard lock(impl_->active_mutex);
1375 if (impl_->active_job_ids.count(std::string(job_id)) == 0) {
1376 impl_->update_job_status(std::string(job_id), job_status::cancelled);
1377 auto final_job = impl_->get_job_from_cache(job_id);
1378 if (final_job) {
1379 impl_->notify_completion(std::string(job_id), *final_job);
1380 }
1381 }
1382 }
1383
1384 impl_->logger->info_fmt("Cancelled job {}", job_id);
1385 return kcenon::pacs::ok();
1386}
constexpr const char * to_string(job_type type) noexcept
Convert job_type to string representation.
Definition job_types.h:54
@ cancelled
Job was cancelled by user.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
void update_job_status(const std::string &job_id, job_status status, const std::string &error_msg="", const std::string &error_details="")
std::unordered_set< std::string > cancelled_job_ids
void notify_completion(const std::string &job_id, const job_record &record)
std::optional< job_record > get_job_from_cache(std::string_view job_id) const

References kcenon::pacs::client::job_manager::impl::active_job_ids, kcenon::pacs::client::job_manager::impl::active_mutex, kcenon::pacs::client::cancelled, kcenon::pacs::client::job_manager::impl::cancelled_job_ids, kcenon::pacs::client::job_manager::impl::cancelled_mutex, kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::job_manager::impl::notify_completion(), kcenon::pacs::pacs_void_error(), kcenon::pacs::client::to_string(), and kcenon::pacs::client::job_manager::impl::update_job_status().

Referenced by delete_job().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ completed_jobs_today()

size_t kcenon::pacs::client::job_manager::completed_jobs_today ( ) const -> size_t
nodiscard

Get number of jobs completed today.

Returns
Number of successful completions today
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1632 of file job_manager.cpp.

1632 {
1633 if (impl_->repo) {
1634#ifdef PACS_WITH_DATABASE_SYSTEM
1635 auto result = impl_->repo->count_completed_today();
1636 if (result.is_ok()) {
1637 return result.value();
1638 }
1639 return 0;
1640#else
1641 return impl_->repo->count_completed_today();
1642#endif
1643 }
1644 return 0;
1645}

References impl_, and kcenon::pacs::client::job_manager::impl::repo.

◆ config()

const job_manager_config & kcenon::pacs::client::job_manager::config ( ) const -> const job_manager_config&
nodiscardnoexcept

Get current configuration.

Returns
Current manager configuration
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1666 of file job_manager.cpp.

1666 {
1667 return impl_->config;
1668}

References kcenon::pacs::client::job_manager::impl::config, and impl_.

Referenced by job_manager().

Here is the caller graph for this function:

◆ create_prefetch_job()

std::string kcenon::pacs::client::job_manager::create_prefetch_job ( std::string_view source_node_id,
std::string_view patient_id,
job_priority priority = job_priority::low ) -> std::string
nodiscard

Create a prefetch job.

Creates a job to prefetch prior studies for a patient.

Parameters
source_node_idID of the source remote node
patient_idPatient ID to prefetch studies for
priorityJob priority (default: low for background prefetch)
Returns
Unique job ID
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1252 of file job_manager.cpp.

1255 {
1256
1257 job_record job;
1258 job.job_id = generate_uuid();
1259 job.type = job_type::prefetch;
1260 job.status = job_status::pending;
1261 job.priority = priority;
1262 job.source_node_id = std::string(source_node_id);
1263 job.patient_id = std::string(patient_id);
1264 job.created_at = std::chrono::system_clock::now();
1265
1266 impl_->save_job(job);
1267 impl_->logger->info_fmt("Created prefetch job {}: patient={}", job.job_id, patient_id);
1268
1269 return job.job_id;
1270}
@ prefetch
Prefetch prior studies.
@ pending
Job created but not yet queued.
constexpr dicom_tag priority
Priority.
void save_job(const job_record &job)

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::pending, kcenon::pacs::client::prefetch, and kcenon::pacs::client::job_manager::impl::save_job().

Here is the call graph for this function:

◆ create_query_job()

std::string kcenon::pacs::client::job_manager::create_query_job ( std::string_view node_id,
std::string_view query_level,
const std::unordered_map< std::string, std::string > & query_keys,
job_priority priority = job_priority::normal ) -> std::string
nodiscard

Create a query job (C-FIND)

Creates a job to query a remote node.

Parameters
node_idID of the remote node to query
query_levelQuery retrieve level (PATIENT, STUDY, SERIES, IMAGE)
query_keysQuery keys as key-value pairs
priorityJob priority (default: normal)
Returns
Unique job ID
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1206 of file job_manager.cpp.

1210 {
1211
1212 job_record job;
1213 job.job_id = generate_uuid();
1214 job.type = job_type::query;
1215 job.status = job_status::pending;
1216 job.priority = priority;
1217 job.source_node_id = std::string(node_id);
1218 job.metadata["query_level"] = std::string(query_level);
1219 for (const auto& [key, value] : query_keys) {
1220 job.metadata["query_" + key] = value;
1221 }
1222 job.created_at = std::chrono::system_clock::now();
1223
1224 impl_->save_job(job);
1225 impl_->logger->info_fmt("Created query job {}: level={}", job.job_id, query_level);
1226
1227 return job.job_id;
1228}

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::pending, kcenon::pacs::client::query, and kcenon::pacs::client::job_manager::impl::save_job().

Here is the call graph for this function:

◆ create_retrieve_job()

std::string kcenon::pacs::client::job_manager::create_retrieve_job ( std::string_view source_node_id,
std::string_view study_uid,
std::optional< std::string_view > series_uid = std::nullopt,
job_priority priority = job_priority::normal ) -> std::string
nodiscard

Create a retrieve job (C-MOVE/C-GET)

Creates a job to retrieve DICOM objects from a remote node.

Parameters
source_node_idID of the source remote node
study_uidStudy Instance UID to retrieve
series_uidOptional Series Instance UID (retrieve all if not specified)
priorityJob priority (default: normal)
Returns
Unique job ID
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1160 of file job_manager.cpp.

1164 {
1165
1166 job_record job;
1167 job.job_id = generate_uuid();
1168 job.type = job_type::retrieve;
1169 job.status = job_status::pending;
1170 job.priority = priority;
1171 job.source_node_id = std::string(source_node_id);
1172 job.study_uid = std::string(study_uid);
1173 if (series_uid) {
1174 job.series_uid = std::string(*series_uid);
1175 }
1176 job.created_at = std::chrono::system_clock::now();
1177
1178 impl_->save_job(job);
1179 impl_->logger->info_fmt("Created retrieve job {}: study={}", job.job_id, study_uid);
1180
1181 return job.job_id;
1182}
@ retrieve
C-MOVE/C-GET operation.

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::pending, kcenon::pacs::client::retrieve, and kcenon::pacs::client::job_manager::impl::save_job().

Here is the call graph for this function:

◆ create_store_job()

std::string kcenon::pacs::client::job_manager::create_store_job ( std::string_view destination_node_id,
const std::vector< std::string > & instance_uids,
job_priority priority = job_priority::normal ) -> std::string
nodiscard

Create a store job (C-STORE)

Creates a job to store DICOM objects to a remote node.

Parameters
destination_node_idID of the destination remote node
instance_uidsSOP Instance UIDs to store
priorityJob priority (default: normal)
Returns
Unique job ID
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1184 of file job_manager.cpp.

1187 {
1188
1189 job_record job;
1190 job.job_id = generate_uuid();
1191 job.type = job_type::store;
1192 job.status = job_status::pending;
1193 job.priority = priority;
1194 job.destination_node_id = std::string(destination_node_id);
1195 job.instance_uids = instance_uids;
1196 job.progress.total_items = instance_uids.size();
1197 job.created_at = std::chrono::system_clock::now();
1198
1199 impl_->save_job(job);
1200 impl_->logger->info_fmt("Created store job {}: {} instances to {}",
1201 job.job_id, instance_uids.size(), destination_node_id);
1202
1203 return job.job_id;
1204}
@ store
C-STORE operation.

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::pending, kcenon::pacs::client::job_manager::impl::save_job(), and kcenon::pacs::client::store.

Here is the call graph for this function:

◆ create_sync_job()

std::string kcenon::pacs::client::job_manager::create_sync_job ( std::string_view source_node_id,
std::optional< std::string_view > patient_id = std::nullopt,
job_priority priority = job_priority::low ) -> std::string
nodiscard

Create a sync job.

Creates a job to synchronize data with a remote node.

Parameters
source_node_idID of the source remote node
patient_idOptional patient ID filter
priorityJob priority (default: low for background sync)
Returns
Unique job ID
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1230 of file job_manager.cpp.

1233 {
1234
1235 job_record job;
1236 job.job_id = generate_uuid();
1237 job.type = job_type::sync;
1238 job.status = job_status::pending;
1239 job.priority = priority;
1240 job.source_node_id = std::string(source_node_id);
1241 if (patient_id) {
1242 job.patient_id = std::string(*patient_id);
1243 }
1244 job.created_at = std::chrono::system_clock::now();
1245
1246 impl_->save_job(job);
1247 impl_->logger->info_fmt("Created sync job {}: from {}", job.job_id, source_node_id);
1248
1249 return job.job_id;
1250}

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::pending, kcenon::pacs::client::job_manager::impl::save_job(), and kcenon::pacs::client::sync.

Here is the call graph for this function:

◆ delete_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::delete_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Delete a job.

Removes a job from the system. Running jobs are cancelled first.

Parameters
job_idThe job ID to delete
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1426 of file job_manager.cpp.

1426 {
1427 auto job_opt = impl_->get_job_from_cache(job_id);
1428 if (!job_opt) {
1430 kcenon::pacs::error_codes::not_found,
1431 "Job not found: " + std::string(job_id));
1432 }
1433
1434 // Cancel if not terminal
1435 if (!job_opt->is_finished()) {
1436 auto cancel_result = cancel_job(job_id);
1437 if (cancel_result.is_err()) {
1438 return cancel_result;
1439 }
1440 }
1441
1442 // Remove from cache
1443 {
1444 std::unique_lock lock(impl_->cache_mutex);
1445 impl_->job_cache.erase(std::string(job_id));
1446 }
1447
1448 // Remove from repository
1449 if (impl_->repo) {
1450 [[maybe_unused]] auto result = impl_->repo->remove(std::string(job_id));
1451 }
1452
1453 impl_->logger->info_fmt("Deleted job {}", job_id);
1454 return kcenon::pacs::ok();
1455}
auto cancel_job(std::string_view job_id) -> kcenon::pacs::VoidResult
Cancel a job.
std::unordered_map< std::string, job_record > job_cache

References kcenon::pacs::client::job_manager::impl::cache_mutex, cancel_job(), kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::job_cache, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::pacs_void_error(), and kcenon::pacs::client::job_manager::impl::repo.

Here is the call graph for this function:

◆ failed_jobs_today()

size_t kcenon::pacs::client::job_manager::failed_jobs_today ( ) const -> size_t
nodiscard

Get number of jobs failed today.

Returns
Number of failures today
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1647 of file job_manager.cpp.

1647 {
1648 if (impl_->repo) {
1649#ifdef PACS_WITH_DATABASE_SYSTEM
1650 auto result = impl_->repo->count_failed_today();
1651 if (result.is_ok()) {
1652 return result.value();
1653 }
1654 return 0;
1655#else
1656 return impl_->repo->count_failed_today();
1657#endif
1658 }
1659 return 0;
1660}

References impl_, and kcenon::pacs::client::job_manager::impl::repo.

◆ get_job()

std::optional< job_record > kcenon::pacs::client::job_manager::get_job ( std::string_view job_id) const -> std::optional<job_record>
nodiscard

Get a job by ID.

Parameters
job_idThe job ID to retrieve
Returns
Optional containing the job if found
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1461 of file job_manager.cpp.

1461 {
1462 return impl_->get_job_from_cache(job_id);
1463}

References kcenon::pacs::client::job_manager::impl::get_job_from_cache(), and impl_.

Here is the call graph for this function:

◆ get_progress()

job_progress kcenon::pacs::client::job_manager::get_progress ( std::string_view job_id) const -> job_progress
nodiscard

Get current progress for a job.

Parameters
job_idThe job ID
Returns
Current progress information

Definition at line 1533 of file job_manager.cpp.

1533 {
1534 auto job_opt = impl_->get_job_from_cache(job_id);
1535 if (job_opt) {
1536 return job_opt->progress;
1537 }
1538 return {};
1539}

References kcenon::pacs::client::job_manager::impl::get_job_from_cache(), and impl_.

Here is the call graph for this function:

◆ is_running()

bool kcenon::pacs::client::job_manager::is_running ( ) const -> bool
nodiscardnoexcept

Check if workers are running.

Returns
true if worker threads are active
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1614 of file job_manager.cpp.

1614 {
1615 return impl_->running.load();
1616}

References impl_, and kcenon::pacs::client::job_manager::impl::running.

◆ list_jobs()

std::vector< job_record > kcenon::pacs::client::job_manager::list_jobs ( std::optional< job_status > status = std::nullopt,
std::optional< job_type > type = std::nullopt,
size_t limit = 100,
size_t offset = 0 ) const -> std::vector<job_record>
nodiscard

List jobs with optional filters.

Parameters
statusOptional status filter
typeOptional type filter
limitMaximum results (default: 100)
offsetResult offset for pagination
Returns
Vector of matching jobs
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1465 of file job_manager.cpp.

1469 {
1470
1471 if (impl_->repo) {
1472 storage::job_query_options options;
1473 options.status = status;
1474 options.type = type;
1475 options.limit = limit;
1476 options.offset = offset;
1477#ifdef PACS_WITH_DATABASE_SYSTEM
1478 auto result = impl_->repo->find_jobs(options);
1479 if (result.is_ok()) {
1480 return result.value();
1481 }
1482 return {};
1483#else
1484 return impl_->repo->find_jobs(options);
1485#endif
1486 }
1487
1488 // Fallback to cache
1489 std::vector<job_record> result;
1490 std::shared_lock lock(impl_->cache_mutex);
1491 for (const auto& [_, job] : impl_->job_cache) {
1492 if (status && job.status != *status) continue;
1493 if (type && job.type != *type) continue;
1494 if (offset > 0) {
1495 --offset;
1496 continue;
1497 }
1498 result.push_back(job);
1499 if (result.size() >= limit) break;
1500 }
1501 return result;
1502}
constexpr dicom_tag status
Status.

References kcenon::pacs::client::job_manager::impl::cache_mutex, impl_, kcenon::pacs::client::job_manager::impl::job_cache, kcenon::pacs::client::job_manager::impl::repo, and kcenon::pacs::storage::job_query_options::status.

◆ list_jobs_by_node()

std::vector< job_record > kcenon::pacs::client::job_manager::list_jobs_by_node ( std::string_view node_id) const -> std::vector<job_record>
nodiscard

List jobs by node ID.

Returns jobs that involve the specified node as source or destination.

Parameters
node_idThe node ID to filter by
Returns
Vector of jobs involving the node
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1504 of file job_manager.cpp.

1504 {
1505 if (impl_->repo) {
1506#ifdef PACS_WITH_DATABASE_SYSTEM
1507 auto result = impl_->repo->find_by_node(node_id);
1508 if (result.is_ok()) {
1509 return result.value();
1510 }
1511 return {};
1512#else
1513 return impl_->repo->find_by_node(node_id);
1514#endif
1515 }
1516
1517 // Fallback to cache
1518 std::vector<job_record> result;
1519 std::shared_lock lock(impl_->cache_mutex);
1520 std::string node_str(node_id);
1521 for (const auto& [_, job] : impl_->job_cache) {
1522 if (job.source_node_id == node_str || job.destination_node_id == node_str) {
1523 result.push_back(job);
1524 }
1525 }
1526 return result;
1527}

References kcenon::pacs::client::job_manager::impl::cache_mutex, impl_, kcenon::pacs::client::job_manager::impl::job_cache, and kcenon::pacs::client::job_manager::impl::repo.

◆ operator=() [1/2]

auto kcenon::pacs::client::job_manager::operator= ( const job_manager & ) -> job_manager &=delete
delete

◆ operator=() [2/2]

auto kcenon::pacs::client::job_manager::operator= ( job_manager && ) -> job_manager &=delete
delete

◆ pause_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::pause_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Pause a running or queued job.

Parameters
job_idThe job ID to pause
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1305 of file job_manager.cpp.

1305 {
1306 auto job_opt = impl_->get_job_from_cache(job_id);
1307 if (!job_opt) {
1309 kcenon::pacs::error_codes::not_found,
1310 "Job not found: " + std::string(job_id));
1311 }
1312
1313 if (!job_opt->can_pause()) {
1315 kcenon::pacs::error_codes::invalid_argument,
1316 "Job cannot be paused in current state: " + std::string(to_string(job_opt->status)));
1317 }
1318
1319 {
1320 std::lock_guard lock(impl_->paused_mutex);
1321 impl_->paused_job_ids.insert(std::string(job_id));
1322 }
1323 impl_->update_job_status(std::string(job_id), job_status::paused);
1324
1325 impl_->logger->info_fmt("Paused job {}", job_id);
1326 return kcenon::pacs::ok();
1327}
std::unordered_set< std::string > paused_job_ids

References kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::pacs_void_error(), kcenon::pacs::client::paused, kcenon::pacs::client::job_manager::impl::paused_job_ids, kcenon::pacs::client::job_manager::impl::paused_mutex, kcenon::pacs::client::to_string(), and kcenon::pacs::client::job_manager::impl::update_job_status().

Here is the call graph for this function:

◆ pending_jobs()

size_t kcenon::pacs::client::job_manager::pending_jobs ( ) const -> size_t
nodiscard

Get number of pending jobs.

Returns
Number of jobs waiting in queue
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1627 of file job_manager.cpp.

1627 {
1628 std::lock_guard lock(impl_->queue_mutex);
1629 return impl_->job_queue.size();
1630}
std::priority_queue< queue_entry > job_queue

References impl_, kcenon::pacs::client::job_manager::impl::job_queue, and kcenon::pacs::client::job_manager::impl::queue_mutex.

◆ resume_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::resume_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Resume a paused job.

Parameters
job_idThe job ID to resume
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1329 of file job_manager.cpp.

1329 {
1330 auto job_opt = impl_->get_job_from_cache(job_id);
1331 if (!job_opt) {
1333 kcenon::pacs::error_codes::not_found,
1334 "Job not found: " + std::string(job_id));
1335 }
1336
1337 if (job_opt->status != job_status::paused) {
1339 kcenon::pacs::error_codes::invalid_argument,
1340 "Job is not paused: " + std::string(to_string(job_opt->status)));
1341 }
1342
1343 {
1344 std::lock_guard lock(impl_->paused_mutex);
1345 impl_->paused_job_ids.erase(std::string(job_id));
1346 }
1347 impl_->update_job_status(std::string(job_id), job_status::queued);
1348
1349 impl_->logger->info_fmt("Resumed job {}", job_id);
1350 return kcenon::pacs::ok();
1351}
@ queued
Job is in the execution queue.

References kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::pacs_void_error(), kcenon::pacs::client::paused, kcenon::pacs::client::job_manager::impl::paused_job_ids, kcenon::pacs::client::job_manager::impl::paused_mutex, kcenon::pacs::client::queued, kcenon::pacs::client::to_string(), and kcenon::pacs::client::job_manager::impl::update_job_status().

Here is the call graph for this function:

◆ retry_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::retry_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Retry a failed job.

Resets a failed job and queues it for re-execution.

Parameters
job_idThe job ID to retry
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1388 of file job_manager.cpp.

1388 {
1389 auto job_opt = impl_->get_job_from_cache(job_id);
1390 if (!job_opt) {
1392 kcenon::pacs::error_codes::not_found,
1393 "Job not found: " + std::string(job_id));
1394 }
1395
1396 if (!job_opt->can_retry()) {
1398 kcenon::pacs::error_codes::invalid_argument,
1399 "Job cannot be retried: " + std::string(to_string(job_opt->status)));
1400 }
1401
1402 // Reset job state
1403 {
1404 std::unique_lock lock(impl_->cache_mutex);
1405 auto it = impl_->job_cache.find(std::string(job_id));
1406 if (it != impl_->job_cache.end()) {
1407 it->second.status = job_status::pending;
1408 it->second.retry_count++;
1409 it->second.error_message.clear();
1410 it->second.error_details.clear();
1411 it->second.started_at = std::nullopt;
1412 it->second.completed_at = std::nullopt;
1413 it->second.progress = job_progress{};
1414 }
1415 }
1416
1417 if (impl_->repo) {
1418 [[maybe_unused]] auto result = impl_->repo->increment_retry(job_id);
1419 [[maybe_unused]] auto status_result = impl_->repo->update_status(job_id, job_status::pending);
1420 }
1421
1422 impl_->logger->info_fmt("Retrying job {} (attempt {})", job_id, job_opt->retry_count + 1);
1423 return kcenon::pacs::ok();
1424}

References kcenon::pacs::client::job_manager::impl::cache_mutex, kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::job_cache, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::pacs_void_error(), kcenon::pacs::client::pending, kcenon::pacs::client::job_manager::impl::repo, and kcenon::pacs::client::to_string().

Here is the call graph for this function:

◆ set_completion_callback()

void kcenon::pacs::client::job_manager::set_completion_callback ( job_completion_callback callback)

Set the completion callback.

The callback is invoked when a job completes (success, failure, or cancel). Only one callback can be set; setting a new one replaces the old.

Parameters
callbackFunction to call on job completion
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1546 of file job_manager.cpp.

1546 {
1547 std::unique_lock lock(impl_->callbacks_mutex);
1548 impl_->completion_callback = std::move(callback);
1549}
job_completion_callback completion_callback

References kcenon::pacs::client::job_manager::impl::callbacks_mutex, kcenon::pacs::client::job_manager::impl::completion_callback, and impl_.

◆ set_progress_callback()

void kcenon::pacs::client::job_manager::set_progress_callback ( job_progress_callback callback)

Set the progress callback.

The callback is invoked whenever job progress is updated. Only one callback can be set; setting a new one replaces the old.

Parameters
callbackFunction to call on progress updates
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1541 of file job_manager.cpp.

1541 {
1542 std::unique_lock lock(impl_->callbacks_mutex);
1543 impl_->progress_callback = std::move(callback);
1544}

References kcenon::pacs::client::job_manager::impl::callbacks_mutex, impl_, and kcenon::pacs::client::job_manager::impl::progress_callback.

◆ start_job()

kcenon::pacs::VoidResult kcenon::pacs::client::job_manager::start_job ( std::string_view job_id) -> kcenon::pacs::VoidResult
nodiscard

Start a pending job.

Moves a pending job to the execution queue.

Parameters
job_idThe job ID to start
Returns
VoidResult indicating success or error
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1276 of file job_manager.cpp.

1276 {
1277 auto job_opt = impl_->get_job_from_cache(job_id);
1278 if (!job_opt) {
1280 kcenon::pacs::error_codes::not_found,
1281 "Job not found: " + std::string(job_id));
1282 }
1283
1284 if (!job_opt->can_start()) {
1286 kcenon::pacs::error_codes::invalid_argument,
1287 "Job cannot be started in current state: " + std::string(to_string(job_opt->status)));
1288 }
1289
1290 // Update status and enqueue
1291 impl_->update_job_status(std::string(job_id), job_status::queued);
1292 {
1293 std::unique_lock lock(impl_->cache_mutex);
1294 auto it = impl_->job_cache.find(std::string(job_id));
1295 if (it != impl_->job_cache.end()) {
1296 it->second.queued_at = std::chrono::system_clock::now();
1297 }
1298 }
1299 impl_->enqueue_job(std::string(job_id), job_opt->priority);
1300
1301 impl_->logger->info_fmt("Started job {}", job_id);
1302 return kcenon::pacs::ok();
1303}
void enqueue_job(const std::string &job_id, job_priority priority)

References kcenon::pacs::client::job_manager::impl::cache_mutex, kcenon::pacs::client::job_manager::impl::enqueue_job(), kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, kcenon::pacs::client::job_manager::impl::job_cache, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::pacs_void_error(), kcenon::pacs::client::queued, kcenon::pacs::client::to_string(), and kcenon::pacs::client::job_manager::impl::update_job_status().

Here is the call graph for this function:

◆ start_workers()

void kcenon::pacs::client::job_manager::start_workers ( )

Start the worker threads.

Starts the configured number of worker threads. Does nothing if already running.

Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1579 of file job_manager.cpp.

1579 {
1580 if (impl_->running.load()) {
1581 return;
1582 }
1583
1584 impl_->running.store(true);
1586
1587 for (size_t i = 0; i < impl_->config.worker_count; ++i) {
1588 impl_->workers.emplace_back([this]() {
1589 impl_->worker_loop();
1590 });
1591 }
1592
1593 impl_->logger->info_fmt("Started {} worker threads", impl_->config.worker_count);
1594}
std::vector< std::thread > workers
size_t worker_count
Number of worker threads.
Definition job_types.h:407

References kcenon::pacs::client::job_manager::impl::config, impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::job_manager::impl::running, kcenon::pacs::client::job_manager_config::worker_count, kcenon::pacs::client::job_manager::impl::worker_loop(), and kcenon::pacs::client::job_manager::impl::workers.

Here is the call graph for this function:

◆ stop_workers()

void kcenon::pacs::client::job_manager::stop_workers ( )

Stop the worker threads.

Gracefully stops all worker threads. Running jobs are allowed to complete or timeout.

Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1596 of file job_manager.cpp.

1596 {
1597 if (!impl_->running.load()) {
1598 return;
1599 }
1600
1601 impl_->running.store(false);
1602 impl_->queue_cv.notify_all();
1603
1604 for (auto& worker : impl_->workers) {
1605 if (worker.joinable()) {
1606 worker.join();
1607 }
1608 }
1609 impl_->workers.clear();
1610
1611 impl_->logger->info("Stopped worker threads");
1612}

References impl_, kcenon::pacs::client::job_manager::impl::logger, kcenon::pacs::client::job_manager::impl::queue_cv, kcenon::pacs::client::job_manager::impl::running, and kcenon::pacs::client::job_manager::impl::workers.

Referenced by ~job_manager().

Here is the caller graph for this function:

◆ wait_for_completion()

std::future< job_record > kcenon::pacs::client::job_manager::wait_for_completion ( std::string_view job_id) -> std::future<job_record>
nodiscard

Wait for a job to complete.

Returns a future that completes when the job finishes.

Parameters
job_idThe job ID to wait for
Returns
Future containing the final job record
Examples
/home/runner/work/pacs_system/pacs_system/include/kcenon/pacs/client/job_manager.h.

Definition at line 1555 of file job_manager.cpp.

1555 {
1556 auto promise = std::make_shared<std::promise<job_record>>();
1557 auto future = promise->get_future();
1558
1559 // Check if already completed
1560 auto job_opt = impl_->get_job_from_cache(job_id);
1561 if (job_opt && job_opt->is_finished()) {
1562 promise->set_value(*job_opt);
1563 return future;
1564 }
1565
1566 // Store promise for later fulfillment
1567 {
1568 std::lock_guard lock(impl_->promises_mutex);
1569 impl_->completion_promises[std::string(job_id)] = promise;
1570 }
1571
1572 return future;
1573}
std::unordered_map< std::string, std::shared_ptr< std::promise< job_record > > > completion_promises

References kcenon::pacs::client::job_manager::impl::completion_promises, kcenon::pacs::client::job_manager::impl::get_job_from_cache(), impl_, and kcenon::pacs::client::job_manager::impl::promises_mutex.

Here is the call graph for this function:

Member Data Documentation

◆ impl_


The documentation for this class was generated from the following files: