PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
executor_adapter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
15
16#include <kcenon/thread/core/thread_pool.h>
17#include <kcenon/thread/core/thread_worker.h>
18#include <kcenon/thread/core/job_builder.h>
19#include <kcenon/thread/interfaces/thread_context.h>
20
21#include <algorithm>
22#include <stdexcept>
23
25
26// =============================================================================
27// thread_pool_executor_adapter Implementation
28// =============================================================================
29
31 std::shared_ptr<kcenon::thread::thread_pool> pool)
32 : pool_(std::move(pool)) {
33 if (!pool_) {
34 throw std::invalid_argument("Thread pool cannot be null");
35 }
36}
37
39 kcenon::thread::thread_context context;
40 pool_ = std::make_shared<kcenon::thread::thread_pool>("executor_pool", context);
41
42 // Create and enqueue workers
43 std::vector<std::unique_ptr<kcenon::thread::thread_worker>> workers;
44 workers.reserve(worker_count);
45
46 for (std::size_t i = 0; i < worker_count; ++i) {
47 workers.push_back(std::make_unique<kcenon::thread::thread_worker>(false, context));
48 }
49
50 auto enqueue_result = pool_->enqueue_batch(std::move(workers));
51 if (enqueue_result.is_err()) {
52 throw std::runtime_error("Failed to enqueue workers to thread pool");
53 }
54
55 auto start_result = pool_->start();
56 if (start_result.is_err()) {
57 throw std::runtime_error("Failed to start thread pool");
58 }
59}
60
64
66 std::unique_ptr<kcenon::common::interfaces::IJob>&& job) {
67
68 if (!running_.load()) {
70 kcenon::common::error_info{-1, "Executor is not running", "executor"});
71 }
72
73 if (!job) {
75 kcenon::common::error_info{-2, "Job cannot be null", "executor"});
76 }
77
78 auto promise = std::make_shared<std::promise<void>>();
79 auto future = promise->get_future();
80
81 // Capture job by shared_ptr for thread safety
82 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
83
85
86 try {
87 auto job_obj = kcenon::thread::job_builder()
88 .name("executor_task")
89 .work([this, shared_job, promise]() -> kcenon::common::VoidResult {
90 try {
91 auto result = shared_job->execute();
92 if (result.is_ok()) {
93 promise->set_value();
94 } else {
95 promise->set_exception(std::make_exception_ptr(
96 std::runtime_error(result.error().message)));
97 }
98 } catch (...) {
99 promise->set_exception(std::current_exception());
100 }
102 return kcenon::common::ok();
103 })
104 .build();
105
106 auto enqueue_result = pool_->enqueue(std::move(job_obj));
107 if (enqueue_result.is_err()) {
110 kcenon::common::error_info{-3, "Failed to enqueue task to thread pool", "executor"});
111 }
112 } catch (const std::exception&) {
115 kcenon::common::error_info{-3, "Failed to enqueue task to thread pool", "executor"});
116 }
117
118 return kcenon::common::Result<std::future<void>>::ok(std::move(future));
119}
120
122 std::unique_ptr<kcenon::common::interfaces::IJob>&& job,
123 std::chrono::milliseconds delay) {
124
125 if (!running_.load()) {
127 kcenon::common::error_info{-1, "Executor is not running", "executor"});
128 }
129
130 if (!job) {
132 kcenon::common::error_info{-2, "Job cannot be null", "executor"});
133 }
134
135 auto promise = std::make_shared<std::promise<void>>();
136 auto future = promise->get_future();
137
138 // Capture job and schedule for later execution
139 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
140 auto execute_at = std::chrono::steady_clock::now() + delay;
141
143
144 {
145 std::lock_guard<std::mutex> lock(delay_mutex_);
146 delayed_tasks_.push({execute_at,
147 [this, shared_job, promise]() mutable {
148 try {
149 auto result = shared_job->execute();
150 if (result.is_ok()) {
151 promise->set_value();
152 } else {
153 promise->set_exception(std::make_exception_ptr(
154 std::runtime_error(result.error().message)));
155 }
156 } catch (...) {
157 promise->set_exception(std::current_exception());
158 }
160 }});
161 }
162
163 // Start delay thread if not already running
164 if (!delay_thread_.joinable()) {
165 delay_thread_ = std::thread([this]() {
166 while (!shutdown_requested_.load()) {
167 std::function<void()> task;
168
169 {
170 std::unique_lock<std::mutex> lock(delay_mutex_);
171
172 if (delayed_tasks_.empty()) {
173 delay_cv_.wait_for(lock, std::chrono::milliseconds(100));
174 continue;
175 }
176
177 auto now = std::chrono::steady_clock::now();
178 const auto& top_task = delayed_tasks_.top();
179
180 if (top_task.execute_at > now) {
181 delay_cv_.wait_until(lock, top_task.execute_at);
182 continue;
183 }
184
185 task = top_task.task;
186 delayed_tasks_.pop();
187 }
188
189 if (task && pool_ && pool_->is_running()) {
190 try {
191 auto job_obj = kcenon::thread::job_builder()
192 .name("delayed_task")
193 .work([task = std::move(task)]() -> kcenon::common::VoidResult {
194 task();
195 return kcenon::common::ok();
196 })
197 .build();
198
199 (void)pool_->enqueue(std::move(job_obj));
200 } catch (...) {
201 // Ignore enqueue failures in delayed task scheduler
202 }
203 }
204 }
205 });
206 }
207
208 delay_cv_.notify_one();
209
210 return kcenon::common::Result<std::future<void>>::ok(std::move(future));
211}
212
214 return pool_ ? pool_->get_active_worker_count() : 0;
215}
216
218 return running_.load() && pool_ && pool_->is_running();
219}
220
222 std::size_t pool_pending = pool_ ? pool_->get_pending_task_count() : 0;
223 return pool_pending + pending_count_.load();
224}
225
226void thread_pool_executor_adapter::shutdown(bool wait_for_completion) {
227 running_.store(false);
228 shutdown_requested_.store(true);
229
230 delay_cv_.notify_all();
231
232 if (delay_thread_.joinable()) {
233 delay_thread_.join();
234 }
235
236 if (pool_) {
237 pool_->stop(!wait_for_completion);
238 }
239}
240
241std::shared_ptr<kcenon::thread::thread_pool>
245
246// =============================================================================
247// Factory Function
248// =============================================================================
249
250std::shared_ptr<kcenon::common::interfaces::IExecutor>
251make_executor(std::shared_ptr<thread_pool_interface> pool_interface) {
252 if (!pool_interface) {
253 throw std::invalid_argument("Pool interface cannot be null");
254 }
255
256 // Create an executor that uses the pool interface
257 // This requires creating a custom adapter that works with the interface
258
259 class interface_executor_adapter : public kcenon::common::interfaces::IExecutor {
260 public:
261 explicit interface_executor_adapter(std::shared_ptr<thread_pool_interface> pool)
262 : pool_(std::move(pool)) {}
263
265 std::unique_ptr<kcenon::common::interfaces::IJob>&& job) override {
266
267 if (!pool_ || !pool_->is_running()) {
269 kcenon::common::error_info{-1, "Pool is not running", "executor"});
270 }
271
272 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
273
274 auto future = pool_->submit([shared_job]() {
275 auto result = shared_job->execute();
276 if (result.is_err()) {
277 throw std::runtime_error(result.error().message);
278 }
279 });
280
281 return kcenon::common::Result<std::future<void>>::ok(std::move(future));
282 }
283
285 std::unique_ptr<kcenon::common::interfaces::IJob>&& /*job*/,
286 std::chrono::milliseconds /*delay*/) override {
287 // Not supported through interface
289 kcenon::common::error_info{-4, "Delayed execution not supported through pool interface", "executor"});
290 }
291
292 std::size_t worker_count() const override {
293 return pool_ ? pool_->get_thread_count() : 0;
294 }
295
296 bool is_running() const override {
297 return pool_ && pool_->is_running();
298 }
299
300 std::size_t pending_tasks() const override {
301 return pool_ ? pool_->get_pending_task_count() : 0;
302 }
303
304 void shutdown(bool wait_for_completion) override {
305 if (pool_) {
306 pool_->shutdown(wait_for_completion);
307 }
308 }
309
310 private:
311 std::shared_ptr<thread_pool_interface> pool_;
312 };
313
314 return std::make_shared<interface_executor_adapter>(std::move(pool_interface));
315}
316
317} // namespace kcenon::pacs::integration
std::shared_ptr< kcenon::thread::thread_pool > pool_
std::size_t pending_tasks() const override
Get the number of pending tasks.
std::size_t worker_count() const override
Get the number of worker threads.
kcenon::common::Result< std::future< void > > execute(std::unique_ptr< kcenon::common::interfaces::IJob > &&job) override
Execute a job.
auto get_underlying_pool() const -> std::shared_ptr< kcenon::thread::thread_pool >
Get the underlying thread pool.
~thread_pool_executor_adapter() override
Destructor - ensures graceful shutdown.
bool is_running() const override
Check if the executor is running.
thread_pool_executor_adapter(std::shared_ptr< kcenon::thread::thread_pool > pool)
Construct adapter with thread pool.
std::priority_queue< delayed_task, std::vector< delayed_task >, std::greater< delayed_task > > delayed_tasks_
void shutdown(bool wait_for_completion=true) override
Shutdown the executor.
kcenon::common::Result< std::future< void > > execute_delayed(std::unique_ptr< kcenon::common::interfaces::IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
Adapter for integrating common_system's IExecutor interface.
Adapter for DICOM audit logging using logger_system.
std::shared_ptr< kcenon::common::interfaces::IExecutor > make_executor(std::shared_ptr< thread_pool_interface > pool_interface)
Create an IExecutor from a thread_pool_interface.
Abstract interface for thread pool operations.