PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
kcenon::pacs::integration::thread_pool_executor_adapter Class Reference

IExecutor implementation using kcenon::thread::thread_pool. More...

#include <executor_adapter.h>

Inheritance diagram for kcenon::pacs::integration::thread_pool_executor_adapter:
Inheritance graph
Collaboration diagram for kcenon::pacs::integration::thread_pool_executor_adapter:
Collaboration graph

Classes

struct  delayed_task
 

Public Member Functions

 thread_pool_executor_adapter (std::shared_ptr< kcenon::thread::thread_pool > pool)
 Construct adapter with thread pool.
 
 thread_pool_executor_adapter (std::size_t worker_count)
 Construct adapter with worker count.
 
 ~thread_pool_executor_adapter () override
 Destructor - ensures graceful shutdown.
 
 thread_pool_executor_adapter (const thread_pool_executor_adapter &)=delete
 
thread_pool_executor_adapteroperator= (const thread_pool_executor_adapter &)=delete
 
 thread_pool_executor_adapter (thread_pool_executor_adapter &&)=delete
 
thread_pool_executor_adapteroperator= (thread_pool_executor_adapter &&)=delete
 
kcenon::common::Result< std::future< void > > execute (std::unique_ptr< kcenon::common::interfaces::IJob > &&job) override
 Execute a job.
 
kcenon::common::Result< std::future< void > > execute_delayed (std::unique_ptr< kcenon::common::interfaces::IJob > &&job, std::chrono::milliseconds delay) override
 Execute a job with delay.
 
std::size_t worker_count () const override
 Get the number of worker threads.
 
bool is_running () const override
 Check if the executor is running.
 
std::size_t pending_tasks () const override
 Get the number of pending tasks.
 
void shutdown (bool wait_for_completion=true) override
 Shutdown the executor.
 
template<typename F >
requires std::invocable<F>
auto submit (F &&func, std::string name="submitted_job") -> kcenon::common::Result< std::future< void > >
 Submit a void-returning callable directly.
 
auto get_underlying_pool () const -> std::shared_ptr< kcenon::thread::thread_pool >
 Get the underlying thread pool.
 

Private Attributes

std::shared_ptr< kcenon::thread::thread_pool > pool_
 
std::atomic< bool > running_ {true}
 
std::atomic< std::size_t > pending_count_ {0}
 
std::thread delay_thread_
 
std::mutex delay_mutex_
 
std::condition_variable delay_cv_
 
std::atomic< bool > shutdown_requested_ {false}
 
std::priority_queue< delayed_task, std::vector< delayed_task >, std::greater< delayed_task > > delayed_tasks_
 

Detailed Description

IExecutor implementation using kcenon::thread::thread_pool.

This class adapts kcenon::thread::thread_pool to the IExecutor interface, enabling standardized task execution across the pacs_system workflow modules.

Thread Safety: All public methods are thread-safe.

Definition at line 144 of file executor_adapter.h.

Constructor & Destructor Documentation

◆ thread_pool_executor_adapter() [1/4]

kcenon::pacs::integration::thread_pool_executor_adapter::thread_pool_executor_adapter ( std::shared_ptr< kcenon::thread::thread_pool > pool)
explicit

Construct adapter with thread pool.

Parameters
poolExisting thread pool to use

Definition at line 30 of file executor_adapter.cpp.

32 : pool_(std::move(pool)) {
33 if (!pool_) {
34 throw std::invalid_argument("Thread pool cannot be null");
35 }
36}
std::shared_ptr< kcenon::thread::thread_pool > pool_

References pool_.

◆ thread_pool_executor_adapter() [2/4]

kcenon::pacs::integration::thread_pool_executor_adapter::thread_pool_executor_adapter ( std::size_t worker_count)
explicit

Construct adapter with worker count.

Creates a new thread pool with the specified number of workers.

Parameters
worker_countNumber of worker threads

Definition at line 38 of file executor_adapter.cpp.

38 {
39 kcenon::thread::thread_context context;
40 pool_ = std::make_shared<kcenon::thread::thread_pool>("executor_pool", context);
41
42 // Create and enqueue workers
43 std::vector<std::unique_ptr<kcenon::thread::thread_worker>> workers;
44 workers.reserve(worker_count);
45
46 for (std::size_t i = 0; i < worker_count; ++i) {
47 workers.push_back(std::make_unique<kcenon::thread::thread_worker>(false, context));
48 }
49
50 auto enqueue_result = pool_->enqueue_batch(std::move(workers));
51 if (enqueue_result.is_err()) {
52 throw std::runtime_error("Failed to enqueue workers to thread pool");
53 }
54
55 auto start_result = pool_->start();
56 if (start_result.is_err()) {
57 throw std::runtime_error("Failed to start thread pool");
58 }
59}
std::size_t worker_count() const override
Get the number of worker threads.

References pool_, and worker_count().

Here is the call graph for this function:

◆ ~thread_pool_executor_adapter()

kcenon::pacs::integration::thread_pool_executor_adapter::~thread_pool_executor_adapter ( )
override

Destructor - ensures graceful shutdown.

Definition at line 61 of file executor_adapter.cpp.

61 {
62 shutdown(true);
63}
void shutdown(bool wait_for_completion=true) override
Shutdown the executor.

References shutdown().

Here is the call graph for this function:

◆ thread_pool_executor_adapter() [3/4]

kcenon::pacs::integration::thread_pool_executor_adapter::thread_pool_executor_adapter ( const thread_pool_executor_adapter & )
delete

◆ thread_pool_executor_adapter() [4/4]

kcenon::pacs::integration::thread_pool_executor_adapter::thread_pool_executor_adapter ( thread_pool_executor_adapter && )
delete

Member Function Documentation

◆ execute()

kcenon::common::Result< std::future< void > > kcenon::pacs::integration::thread_pool_executor_adapter::execute ( std::unique_ptr< kcenon::common::interfaces::IJob > && job)
nodiscardoverride

Execute a job.

Submits the job to the thread pool for asynchronous execution.

Parameters
jobThe job to execute
Returns
Result containing future or error

Definition at line 65 of file executor_adapter.cpp.

66 {
67
68 if (!running_.load()) {
70 kcenon::common::error_info{-1, "Executor is not running", "executor"});
71 }
72
73 if (!job) {
75 kcenon::common::error_info{-2, "Job cannot be null", "executor"});
76 }
77
78 auto promise = std::make_shared<std::promise<void>>();
79 auto future = promise->get_future();
80
81 // Capture job by shared_ptr for thread safety
82 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
83
85
86 try {
87 auto job_obj = kcenon::thread::job_builder()
88 .name("executor_task")
89 .work([this, shared_job, promise]() -> kcenon::common::VoidResult {
90 try {
91 auto result = shared_job->execute();
92 if (result.is_ok()) {
93 promise->set_value();
94 } else {
95 promise->set_exception(std::make_exception_ptr(
96 std::runtime_error(result.error().message)));
97 }
98 } catch (...) {
99 promise->set_exception(std::current_exception());
100 }
102 return kcenon::common::ok();
103 })
104 .build();
105
106 auto enqueue_result = pool_->enqueue(std::move(job_obj));
107 if (enqueue_result.is_err()) {
110 kcenon::common::error_info{-3, "Failed to enqueue task to thread pool", "executor"});
111 }
112 } catch (const std::exception&) {
115 kcenon::common::error_info{-3, "Failed to enqueue task to thread pool", "executor"});
116 }
117
118 return kcenon::common::Result<std::future<void>>::ok(std::move(future));
119}

References pending_count_, pool_, and running_.

◆ execute_delayed()

kcenon::common::Result< std::future< void > > kcenon::pacs::integration::thread_pool_executor_adapter::execute_delayed ( std::unique_ptr< kcenon::common::interfaces::IJob > && job,
std::chrono::milliseconds delay )
nodiscardoverride

Execute a job with delay.

Parameters
jobThe job to execute
delayThe delay before execution
Returns
Result containing future or error

Definition at line 121 of file executor_adapter.cpp.

123 {
124
125 if (!running_.load()) {
127 kcenon::common::error_info{-1, "Executor is not running", "executor"});
128 }
129
130 if (!job) {
132 kcenon::common::error_info{-2, "Job cannot be null", "executor"});
133 }
134
135 auto promise = std::make_shared<std::promise<void>>();
136 auto future = promise->get_future();
137
138 // Capture job and schedule for later execution
139 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
140 auto execute_at = std::chrono::steady_clock::now() + delay;
141
143
144 {
145 std::lock_guard<std::mutex> lock(delay_mutex_);
146 delayed_tasks_.push({execute_at,
147 [this, shared_job, promise]() mutable {
148 try {
149 auto result = shared_job->execute();
150 if (result.is_ok()) {
151 promise->set_value();
152 } else {
153 promise->set_exception(std::make_exception_ptr(
154 std::runtime_error(result.error().message)));
155 }
156 } catch (...) {
157 promise->set_exception(std::current_exception());
158 }
160 }});
161 }
162
163 // Start delay thread if not already running
164 if (!delay_thread_.joinable()) {
165 delay_thread_ = std::thread([this]() {
166 while (!shutdown_requested_.load()) {
167 std::function<void()> task;
168
169 {
170 std::unique_lock<std::mutex> lock(delay_mutex_);
171
172 if (delayed_tasks_.empty()) {
173 delay_cv_.wait_for(lock, std::chrono::milliseconds(100));
174 continue;
175 }
176
177 auto now = std::chrono::steady_clock::now();
178 const auto& top_task = delayed_tasks_.top();
179
180 if (top_task.execute_at > now) {
181 delay_cv_.wait_until(lock, top_task.execute_at);
182 continue;
183 }
184
185 task = top_task.task;
186 delayed_tasks_.pop();
187 }
188
189 if (task && pool_ && pool_->is_running()) {
190 try {
191 auto job_obj = kcenon::thread::job_builder()
192 .name("delayed_task")
193 .work([task = std::move(task)]() -> kcenon::common::VoidResult {
194 task();
195 return kcenon::common::ok();
196 })
197 .build();
198
199 (void)pool_->enqueue(std::move(job_obj));
200 } catch (...) {
201 // Ignore enqueue failures in delayed task scheduler
202 }
203 }
204 }
205 });
206 }
207
208 delay_cv_.notify_one();
209
210 return kcenon::common::Result<std::future<void>>::ok(std::move(future));
211}
std::priority_queue< delayed_task, std::vector< delayed_task >, std::greater< delayed_task > > delayed_tasks_

References delay_cv_, delay_mutex_, delay_thread_, delayed_tasks_, pending_count_, pool_, running_, and shutdown_requested_.

◆ get_underlying_pool()

std::shared_ptr< kcenon::thread::thread_pool > kcenon::pacs::integration::thread_pool_executor_adapter::get_underlying_pool ( ) const -> std::shared_ptr<kcenon::thread::thread_pool>
nodiscard

Get the underlying thread pool.

Definition at line 242 of file executor_adapter.cpp.

242 {
243 return pool_;
244}

References pool_.

◆ is_running()

bool kcenon::pacs::integration::thread_pool_executor_adapter::is_running ( ) const
nodiscardoverride

Check if the executor is running.

Definition at line 217 of file executor_adapter.cpp.

217 {
218 return running_.load() && pool_ && pool_->is_running();
219}

References pool_, and running_.

◆ operator=() [1/2]

thread_pool_executor_adapter & kcenon::pacs::integration::thread_pool_executor_adapter::operator= ( const thread_pool_executor_adapter & )
delete

◆ operator=() [2/2]

thread_pool_executor_adapter & kcenon::pacs::integration::thread_pool_executor_adapter::operator= ( thread_pool_executor_adapter && )
delete

◆ pending_tasks()

std::size_t kcenon::pacs::integration::thread_pool_executor_adapter::pending_tasks ( ) const
nodiscardoverride

Get the number of pending tasks.

Definition at line 221 of file executor_adapter.cpp.

221 {
222 std::size_t pool_pending = pool_ ? pool_->get_pending_task_count() : 0;
223 return pool_pending + pending_count_.load();
224}

References pending_count_, and pool_.

◆ shutdown()

void kcenon::pacs::integration::thread_pool_executor_adapter::shutdown ( bool wait_for_completion = true)
override

Shutdown the executor.

Parameters
wait_for_completionWait for all pending tasks to complete

Definition at line 226 of file executor_adapter.cpp.

226 {
227 running_.store(false);
228 shutdown_requested_.store(true);
229
230 delay_cv_.notify_all();
231
232 if (delay_thread_.joinable()) {
233 delay_thread_.join();
234 }
235
236 if (pool_) {
237 pool_->stop(!wait_for_completion);
238 }
239}

References delay_cv_, delay_thread_, pool_, running_, and shutdown_requested_.

Referenced by ~thread_pool_executor_adapter().

Here is the caller graph for this function:

◆ submit()

template<typename F >
requires std::invocable<F>
auto kcenon::pacs::integration::thread_pool_executor_adapter::submit ( F && func,
std::string name = "submitted_job" ) -> kcenon::common::Result<std::future<void>>
inlinenodiscard

Submit a void-returning callable directly.

This is a convenience method that wraps the callable in a lambda_job.

Template Parameters
FCallable type
Parameters
funcThe function to execute
nameJob name for logging
Returns
Result containing future or error

Definition at line 238 of file executor_adapter.h.

239 {
240 auto job = std::make_unique<lambda_job>(std::forward<F>(func), std::move(name));
241 return execute(std::move(job));
242 }
kcenon::common::Result< std::future< void > > execute(std::unique_ptr< kcenon::common::interfaces::IJob > &&job) override
Execute a job.
std::string_view name

References name.

◆ worker_count()

std::size_t kcenon::pacs::integration::thread_pool_executor_adapter::worker_count ( ) const
nodiscardoverride

Get the number of worker threads.

Definition at line 213 of file executor_adapter.cpp.

213 {
214 return pool_ ? pool_->get_active_worker_count() : 0;
215}

References pool_.

Referenced by thread_pool_executor_adapter().

Here is the caller graph for this function:

Member Data Documentation

◆ delay_cv_

std::condition_variable kcenon::pacs::integration::thread_pool_executor_adapter::delay_cv_
private

Definition at line 258 of file executor_adapter.h.

Referenced by execute_delayed(), and shutdown().

◆ delay_mutex_

std::mutex kcenon::pacs::integration::thread_pool_executor_adapter::delay_mutex_
private

Definition at line 257 of file executor_adapter.h.

Referenced by execute_delayed().

◆ delay_thread_

std::thread kcenon::pacs::integration::thread_pool_executor_adapter::delay_thread_
private

Definition at line 256 of file executor_adapter.h.

Referenced by execute_delayed(), and shutdown().

◆ delayed_tasks_

std::priority_queue<delayed_task, std::vector<delayed_task>, std::greater<delayed_task> > kcenon::pacs::integration::thread_pool_executor_adapter::delayed_tasks_
private

Definition at line 270 of file executor_adapter.h.

Referenced by execute_delayed().

◆ pending_count_

std::atomic<std::size_t> kcenon::pacs::integration::thread_pool_executor_adapter::pending_count_ {0}
private

Definition at line 253 of file executor_adapter.h.

253{0};

Referenced by execute(), execute_delayed(), and pending_tasks().

◆ pool_

std::shared_ptr<kcenon::thread::thread_pool> kcenon::pacs::integration::thread_pool_executor_adapter::pool_
private

◆ running_

std::atomic<bool> kcenon::pacs::integration::thread_pool_executor_adapter::running_ {true}
private

Definition at line 252 of file executor_adapter.h.

252{true};

Referenced by execute(), execute_delayed(), is_running(), and shutdown().

◆ shutdown_requested_

std::atomic<bool> kcenon::pacs::integration::thread_pool_executor_adapter::shutdown_requested_ {false}
private

Definition at line 259 of file executor_adapter.h.

259{false};

Referenced by execute_delayed(), and shutdown().


The documentation for this class was generated from the following files: