16#include <kcenon/thread/core/thread_pool.h>
17#include <kcenon/thread/core/thread_worker.h>
18#include <kcenon/thread/core/job_builder.h>
19#include <kcenon/thread/interfaces/thread_context.h>
31 std::shared_ptr<kcenon::thread::thread_pool> pool)
32 : pool_(std::move(pool)) {
34 throw std::invalid_argument(
"Thread pool cannot be null");
39 kcenon::thread::thread_context context;
40 pool_ = std::make_shared<kcenon::thread::thread_pool>(
"executor_pool", context);
43 std::vector<std::unique_ptr<kcenon::thread::thread_worker>> workers;
47 workers.push_back(std::make_unique<kcenon::thread::thread_worker>(
false, context));
50 auto enqueue_result =
pool_->enqueue_batch(std::move(workers));
51 if (enqueue_result.is_err()) {
52 throw std::runtime_error(
"Failed to enqueue workers to thread pool");
55 auto start_result =
pool_->start();
56 if (start_result.is_err()) {
57 throw std::runtime_error(
"Failed to start thread pool");
66 std::unique_ptr<kcenon::common::interfaces::IJob>&& job) {
70 kcenon::common::error_info{-1,
"Executor is not running",
"executor"});
75 kcenon::common::error_info{-2,
"Job cannot be null",
"executor"});
78 auto promise = std::make_shared<std::promise<void>>();
79 auto future = promise->get_future();
82 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
87 auto job_obj = kcenon::thread::job_builder()
88 .name(
"executor_task")
89 .work([
this, shared_job, promise]() -> kcenon::common::VoidResult {
91 auto result = shared_job->execute();
95 promise->set_exception(std::make_exception_ptr(
96 std::runtime_error(result.error().message)));
99 promise->set_exception(std::current_exception());
102 return kcenon::common::ok();
106 auto enqueue_result =
pool_->enqueue(std::move(job_obj));
107 if (enqueue_result.is_err()) {
110 kcenon::common::error_info{-3,
"Failed to enqueue task to thread pool",
"executor"});
112 }
catch (
const std::exception&) {
115 kcenon::common::error_info{-3,
"Failed to enqueue task to thread pool",
"executor"});
122 std::unique_ptr<kcenon::common::interfaces::IJob>&& job,
123 std::chrono::milliseconds delay) {
127 kcenon::common::error_info{-1,
"Executor is not running",
"executor"});
132 kcenon::common::error_info{-2,
"Job cannot be null",
"executor"});
135 auto promise = std::make_shared<std::promise<void>>();
136 auto future = promise->get_future();
139 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
140 auto execute_at = std::chrono::steady_clock::now() + delay;
147 [
this, shared_job, promise]()
mutable {
149 auto result = shared_job->execute();
150 if (result.is_ok()) {
151 promise->set_value();
153 promise->set_exception(std::make_exception_ptr(
154 std::runtime_error(result.error().message)));
157 promise->set_exception(std::current_exception());
167 std::function<void()> task;
173 delay_cv_.wait_for(lock, std::chrono::milliseconds(100));
177 auto now = std::chrono::steady_clock::now();
180 if (top_task.execute_at > now) {
181 delay_cv_.wait_until(lock, top_task.execute_at);
185 task = top_task.task;
191 auto job_obj = kcenon::thread::job_builder()
192 .name(
"delayed_task")
193 .work([task = std::move(task)]() -> kcenon::common::VoidResult {
195 return kcenon::common::ok();
199 (void)
pool_->enqueue(std::move(job_obj));
214 return pool_ ?
pool_->get_active_worker_count() : 0;
222 std::size_t pool_pending =
pool_ ?
pool_->get_pending_task_count() : 0;
237 pool_->stop(!wait_for_completion);
241std::shared_ptr<kcenon::thread::thread_pool>
250std::shared_ptr<kcenon::common::interfaces::IExecutor>
252 if (!pool_interface) {
253 throw std::invalid_argument(
"Pool interface cannot be null");
259 class interface_executor_adapter :
public kcenon::common::interfaces::IExecutor {
261 explicit interface_executor_adapter(std::shared_ptr<thread_pool_interface> pool)
262 : pool_(std::move(pool)) {}
265 std::unique_ptr<kcenon::common::interfaces::IJob>&& job)
override {
267 if (!pool_ || !pool_->is_running()) {
269 kcenon::common::error_info{-1,
"Pool is not running",
"executor"});
272 auto shared_job = std::shared_ptr<kcenon::common::interfaces::IJob>(std::move(job));
274 auto future = pool_->submit([shared_job]() {
275 auto result = shared_job->execute();
276 if (result.is_err()) {
277 throw std::runtime_error(result.error().message);
285 std::unique_ptr<kcenon::common::interfaces::IJob>&& ,
286 std::chrono::milliseconds )
override {
289 kcenon::common::error_info{-4,
"Delayed execution not supported through pool interface",
"executor"});
292 std::size_t worker_count()
const override {
293 return pool_ ? pool_->get_thread_count() : 0;
296 bool is_running()
const override {
297 return pool_ && pool_->is_running();
300 std::size_t pending_tasks()
const override {
301 return pool_ ? pool_->get_pending_task_count() : 0;
304 void shutdown(
bool wait_for_completion)
override {
306 pool_->shutdown(wait_for_completion);
311 std::shared_ptr<thread_pool_interface> pool_;
314 return std::make_shared<interface_executor_adapter>(std::move(pool_interface));
std::shared_ptr< kcenon::thread::thread_pool > pool_
std::thread delay_thread_
std::size_t pending_tasks() const override
Get the number of pending tasks.
std::size_t worker_count() const override
Get the number of worker threads.
kcenon::common::Result< std::future< void > > execute(std::unique_ptr< kcenon::common::interfaces::IJob > &&job) override
Execute a job.
auto get_underlying_pool() const -> std::shared_ptr< kcenon::thread::thread_pool >
Get the underlying thread pool.
~thread_pool_executor_adapter() override
Destructor - ensures graceful shutdown.
std::atomic< std::size_t > pending_count_
std::condition_variable delay_cv_
bool is_running() const override
Check if the executor is running.
std::atomic< bool > shutdown_requested_
thread_pool_executor_adapter(std::shared_ptr< kcenon::thread::thread_pool > pool)
Construct adapter with thread pool.
std::atomic< bool > running_
std::priority_queue< delayed_task, std::vector< delayed_task >, std::greater< delayed_task > > delayed_tasks_
void shutdown(bool wait_for_completion=true) override
Shutdown the executor.
kcenon::common::Result< std::future< void > > execute_delayed(std::unique_ptr< kcenon::common::interfaces::IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
Adapter for integrating common_system's IExecutor interface.
Adapter for DICOM audit logging using logger_system.
std::shared_ptr< kcenon::common::interfaces::IExecutor > make_executor(std::shared_ptr< thread_pool_interface > pool_interface)
Create an IExecutor from a thread_pool_interface.
Abstract interface for thread pool operations.