25#include <condition_variable>
27#if KCENON_WITH_THREAD_SYSTEM
28#include <kcenon/thread/core/thread_pool.h>
29#include <kcenon/thread/core/thread_worker.h>
34#if KCENON_WITH_THREAD_SYSTEM
37class basic_thread_pool::impl {
40 if (num_threads == 0) {
41 num_threads = std::thread::hardware_concurrency();
42 if (num_threads == 0) num_threads = 2;
49 auto* pool =
new kcenon::thread::thread_pool(
"network_basic_pool");
50 pool_ = std::shared_ptr<kcenon::thread::thread_pool>(
52 [](kcenon::thread::thread_pool*) { }
56 for (
size_t i = 0; i < num_threads; ++i) {
57 pool_->enqueue(std::make_unique<kcenon::thread::thread_worker>());
67 std::future<void>
submit(std::function<
void()> task) {
68 auto promise = std::make_shared<std::promise<void>>();
69 auto future = promise->get_future();
71 if (!pool_ || !pool_->is_running()) {
72 promise->set_exception(
73 std::make_exception_ptr(
74 std::runtime_error(
"Thread pool is not running")
86 pool_->submit([task = std::move(task), promise, completed_ptr]()
mutable {
90 completed_ptr->fetch_add(1, std::memory_order_relaxed);
92 promise->set_exception(std::current_exception());
95 }
catch (
const std::exception& e) {
96 promise->set_exception(
97 std::make_exception_ptr(
99 std::string(
"Failed to submit task to thread pool: ") + e.what()
109 std::function<
void()> task,
110 std::chrono::milliseconds delay
113#if defined(THREAD_HAS_COMMON_EXECUTOR)
114 return pool_->submit_delayed(std::move(task), delay);
118 auto promise = std::make_shared<std::promise<void>>();
119 auto future = promise->get_future();
121 if (!pool_ || !pool_->is_running()) {
122 promise->set_exception(
123 std::make_exception_ptr(
124 std::runtime_error(
"Thread pool is not running")
136 pool_->submit([task = std::move(task), delay, promise, completed_ptr]()
mutable {
138 std::this_thread::sleep_for(delay);
140 promise->set_value();
141 completed_ptr->fetch_add(1, std::memory_order_relaxed);
143 promise->set_exception(std::current_exception());
146 }
catch (
const std::exception& e) {
147 promise->set_exception(
148 std::make_exception_ptr(
150 std::string(
"Failed to submit delayed task to thread pool: ") + e.what()
161 return pool_ ? pool_->get_active_worker_count() : 0;
165 return pool_ && pool_->is_running();
169 return pool_ ? pool_->get_pending_task_count() : 0;
172 void stop(
bool wait_for_tasks) {
174 pool_->stop(!wait_for_tasks);
183 std::shared_ptr<kcenon::thread::thread_pool> pool_;
205 if (num_threads == 0) {
206 num_threads = std::thread::hardware_concurrency();
207 if (num_threads == 0) num_threads = 2;
211 for (
size_t i = 0; i < num_threads; ++i) {
222 std::future<void>
submit(std::function<
void()> task) {
223 auto promise = std::make_shared<std::promise<void>>();
224 auto future = promise->get_future();
229 promise->set_exception(
230 std::make_exception_ptr(
231 std::runtime_error(
"Thread pool is not running")
237 tasks_.emplace([task, promise]() {
240 promise->set_value();
242 promise->set_exception(std::current_exception());
252 std::function<
void()> task,
253 std::chrono::milliseconds delay
255 auto promise = std::make_shared<std::promise<void>>();
256 auto future = promise->get_future();
261 promise->set_exception(
262 std::make_exception_ptr(
263 std::runtime_error(
"Thread pool is not running")
269 auto execute_time = std::chrono::steady_clock::now() + delay;
272 submit([task, promise]() {
275 promise->set_value();
277 promise->set_exception(std::current_exception());
300 void stop(
bool wait_for_tasks) {
303 if (!wait_for_tasks) {
319 if (worker.joinable()) {
337 std::function<void()> task;
350 task = std::move(
tasks_.front());
371 auto now = std::chrono::steady_clock::now();
374 if (now >= next_task.execute_at) {
375 auto task = next_task.task;
382 return !running_ || delayed_tasks_.empty() || delayed_tasks_.top().execute_at < next_task.execute_at;
392 std::queue<std::function<void()>>
tasks_;
396 std::priority_queue<DelayedTask, std::vector<DelayedTask>, std::greater<DelayedTask>>
delayed_tasks_;
411 : pimpl_(new
impl(num_threads), [](
impl*) { }) {
417 return pimpl_->submit(task);
421 std::function<
void()> task,
422 std::chrono::milliseconds delay
424 return pimpl_->submit_delayed(task, delay);
428 return pimpl_->worker_count();
432 return pimpl_->is_running();
436 return pimpl_->pending_tasks();
440 pimpl_->stop(wait_for_tasks);
444 return pimpl_->get_completed_tasks();
453 std::unique_lock<std::mutex> lock(
mutex_);
458 std::unique_lock<std::mutex> lock(
mutex_);
473 std::function<
void()> task,
474 std::chrono::milliseconds delay
480 std::unique_lock<std::mutex> lock(
mutex_);
511 : pimpl_(new
impl(), [](
impl*) { }) {
517 std::shared_ptr<thread_pool_interface> pool
519 pimpl_->set_thread_pool(pool);
523 return pimpl_->get_thread_pool();
527 std::function<
void()> task
529 return pimpl_->submit_task(task);
533 std::function<
void()> task,
534 std::chrono::milliseconds delay
536 return pimpl_->submit_delayed_task(task, delay);
540 return pimpl_->get_metrics();
size_t worker_count() const
void stop(bool wait_for_tasks)
std::atomic< size_t > completed_tasks_
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay)
std::vector< std::thread > workers_
std::condition_variable scheduler_condition_
std::thread scheduler_thread_
std::priority_queue< DelayedTask, std::vector< DelayedTask >, std::greater< DelayedTask > > delayed_tasks_
size_t pending_tasks() const
std::mutex scheduler_mutex_
std::condition_variable condition_
std::queue< std::function< void()> > tasks_
std::future< void > submit(std::function< void()> task)
std::atomic< bool > running_
size_t get_completed_tasks() const
Basic thread pool implementation for standalone use.
size_t worker_count() const override
Get the number of worker threads.
void stop(bool wait_for_tasks=true)
Stop the thread pool.
bool is_running() const override
Check if the thread pool is running.
size_t completed_tasks() const
Get completed tasks count.
basic_thread_pool(size_t num_threads=0)
Construct with specified number of threads.
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay) override
Submit a task with delay.
size_t pending_tasks() const override
Get pending task count.
std::future< void > submit(std::function< void()> task) override
Submit a task to the thread pool.
std::future< void > submit_task(std::function< void()> task)
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
std::future< void > submit_delayed_task(std::function< void()> task, std::chrono::milliseconds delay)
std::shared_ptr< thread_pool_interface > thread_pool_
std::shared_ptr< thread_pool_interface > get_thread_pool()
thread_integration_manager::metrics get_metrics() const
Manager for thread system integration.
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
Set the thread pool implementation.
std::future< void > submit_delayed_task(std::function< void()> task, std::chrono::milliseconds delay)
Submit a task with delay.
~thread_integration_manager()
static thread_integration_manager & instance()
Get the singleton instance.
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
metrics get_metrics() const
Get current metrics.
std::future< void > submit_task(std::function< void()> task)
Submit a task to the thread pool.
std::shared_ptr< thread_pool_interface > get_thread_pool()
Get the current thread pool.
thread_integration_manager()
Feature flags for network_system.
std::function< void()> task
bool operator>(const DelayedTask &other) const
std::chrono::steady_clock::time_point execute_at
Thread system integration interface for network_system.