PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
thread_pool_adapter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13
14#include <kcenon/thread/core/thread_pool.h>
15#include <kcenon/thread/core/thread_worker.h>
16#include <kcenon/thread/core/job_builder.h>
17#include <kcenon/thread/interfaces/thread_context.h>
18
19#include <stdexcept>
20#include <vector>
21
23
24// =============================================================================
25// Constructors & Destructor
26// =============================================================================
27
29 : config_(config) {
30 // Validate configuration
31 if (config_.min_threads == 0) {
32 config_.min_threads = 1;
33 }
35 config_.max_threads = config_.min_threads;
36 }
37
38 // Create the pool
39 kcenon::thread::thread_context context;
40 pool_ = std::make_shared<kcenon::thread::thread_pool>(config_.pool_name, context);
41}
42
44 std::shared_ptr<kcenon::thread::thread_pool> pool)
45 : pool_(std::move(pool)), initialized_(true) {
46 if (!pool_) {
47 throw std::invalid_argument("Thread pool cannot be null");
48 }
49}
50
54
55// =============================================================================
56// Lifecycle Management
57// =============================================================================
58
60 std::lock_guard<std::mutex> lock(mutex_);
61
62 if (initialized_ && pool_ && pool_->is_running()) {
63 return true; // Already running
64 }
65
66 // Create pool if not exists
67 if (!pool_) {
68 kcenon::thread::thread_context context;
69 pool_ = std::make_shared<kcenon::thread::thread_pool>(config_.pool_name, context);
70 }
71
72 // Create worker threads
73 std::vector<std::unique_ptr<kcenon::thread::thread_worker>> workers;
74 workers.reserve(config_.min_threads);
75
76 kcenon::thread::thread_context context;
77 for (std::size_t i = 0; i < config_.min_threads; ++i) {
78 workers.push_back(std::make_unique<kcenon::thread::thread_worker>(false, context));
79 }
80
81 // Enqueue workers
82 auto enqueue_result = pool_->enqueue_batch(std::move(workers));
83 if (enqueue_result.is_err()) {
84 return false;
85 }
86
87 // Start the pool
88 auto start_result = pool_->start();
89 if (start_result.is_err()) {
90 return false;
91 }
92
93 initialized_ = true;
94 return true;
95}
96
97auto thread_pool_adapter::is_running() const noexcept -> bool {
98 std::lock_guard<std::mutex> lock(mutex_);
99 return pool_ && pool_->is_running();
100}
101
102void thread_pool_adapter::shutdown(bool wait_for_completion) {
103 std::lock_guard<std::mutex> lock(mutex_);
104
105 if (pool_) {
106 pool_->stop(!wait_for_completion);
107 }
108
109 initialized_ = false;
110}
111
112// =============================================================================
113// Task Submission
114// =============================================================================
115
116auto thread_pool_adapter::submit(std::function<void()> task)
117 -> std::future<void> {
118 auto promise = std::make_shared<std::promise<void>>();
119 auto future = promise->get_future();
120
121 submit_internal(
122 [task = std::move(task), promise]() mutable {
123 try {
124 task();
125 promise->set_value();
126 } catch (...) {
127 promise->set_exception(std::current_exception());
128 }
129 },
131
132 return future;
133}
134
136 job_priority priority,
137 std::function<void()> task) -> std::future<void> {
138 auto promise = std::make_shared<std::promise<void>>();
139 auto future = promise->get_future();
140
141 submit_internal(
142 [task = std::move(task), promise]() mutable {
143 try {
144 task();
145 promise->set_value();
146 } catch (...) {
147 promise->set_exception(std::current_exception());
148 }
149 },
150 priority);
151
152 return future;
153}
154
155void thread_pool_adapter::submit_fire_and_forget(std::function<void()> task) {
156 submit_internal(std::move(task), job_priority::low);
157}
158
160 std::function<void()> task,
161 job_priority /*priority*/) {
162 // Ensure pool is started
163 if (!is_running()) {
164 if (!start()) {
165 throw std::runtime_error("Failed to start thread pool");
166 }
167 }
168
169 std::lock_guard<std::mutex> lock(mutex_);
170 if (pool_) {
171 // Use job_builder for modern API (replaces deprecated future_job)
172 // Note: Priority is not currently used as the base thread_pool
173 // doesn't support it. For future implementation, consider using
174 // typed_thread_pool or a priority queue wrapper.
175 try {
176 auto job = kcenon::thread::job_builder()
177 .name("pacs_pool_task")
178 .work([task = std::move(task)]() -> kcenon::common::VoidResult {
179 task();
180 return kcenon::common::ok();
181 })
182 .build();
183
184 auto result = pool_->enqueue(std::move(job));
185 if (result.is_err()) {
186 throw std::runtime_error("Failed to enqueue task to thread pool");
187 }
188 } catch (const std::exception&) {
189 throw std::runtime_error("Failed to enqueue task to thread pool");
190 }
191 }
192}
193
194// =============================================================================
195// Statistics
196// =============================================================================
197
198auto thread_pool_adapter::get_thread_count() const -> std::size_t {
199 std::lock_guard<std::mutex> lock(mutex_);
200 // Return configured thread count if pool is initialized
201 // This reflects the expected worker count rather than transient active state
202 return initialized_ ? config_.min_threads : 0;
203}
204
206 std::lock_guard<std::mutex> lock(mutex_);
207 return pool_ ? pool_->get_pending_task_count() : 0;
208}
209
211 std::lock_guard<std::mutex> lock(mutex_);
212 return pool_ ? pool_->get_idle_worker_count() : 0;
213}
214
215// =============================================================================
216// Adapter-specific Methods
217// =============================================================================
218
220 -> std::shared_ptr<kcenon::thread::thread_pool> {
221 std::lock_guard<std::mutex> lock(mutex_);
222 return pool_;
223}
224
226 return config_;
227}
228
229} // namespace kcenon::pacs::integration
if(!color.empty()) style.color
void submit_internal(std::function< void()> task, job_priority priority)
Internal task submission with priority.
thread_pool_adapter(const thread_pool_config &config)
Construct adapter with configuration.
auto submit(std::function< void()> task) -> std::future< void > override
Submit a task for execution.
auto get_idle_worker_count() const -> std::size_t override
Get the number of idle workers.
auto get_underlying_pool() const -> std::shared_ptr< kcenon::thread::thread_pool >
Get the underlying thread pool.
auto get_thread_count() const -> std::size_t override
Get the current number of worker threads.
auto get_config() const noexcept -> const thread_pool_config &
Get the current configuration.
auto get_pending_task_count() const -> std::size_t override
Get the number of pending tasks in the queue.
void submit_fire_and_forget(std::function< void()> task) override
Submit a task without waiting for completion.
auto submit_with_priority(job_priority priority, std::function< void()> task) -> std::future< void > override
Submit a task with a specific priority level.
void shutdown(bool wait_for_completion=true) override
Shutdown the thread pool.
auto start() -> bool override
Start the thread pool.
std::shared_ptr< kcenon::thread::thread_pool > pool_
auto is_running() const noexcept -> bool override
Check if the thread pool is running.
job_priority
Priority levels for job scheduling.
@ low
Background tasks (cleanup, maintenance)
Configuration options for the thread pool.
std::string pool_name
Thread pool name for logging.
std::size_t min_threads
Minimum number of worker threads.
std::size_t max_threads
Maximum number of worker threads.
Concrete implementation of thread_pool_interface using kcenon::thread.