26#if KCENON_HAS_COMMON_EXECUTOR
27namespace common_ns = common;
52 : thread_title_(thread_title)
53 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
58 , metrics_service_(
std::make_shared<metrics::metrics_service>()) {
61 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
62 initial_metrics.worker_threads.value = 0;
63 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
83 std::shared_ptr<job_queue> custom_queue,
85 : thread_title_(thread_title)
86 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
88 , job_queue_(
std::move(custom_queue))
91 , metrics_service_(
std::make_shared<metrics::metrics_service>()) {
94 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
95 initial_metrics.worker_threads.value = 0;
96 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
117 std::unique_ptr<pool_queue_adapter_interface> queue_adapter,
119 : thread_title_(thread_title)
120 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
122 , job_queue_(queue_adapter ? queue_adapter->get_job_queue() : nullptr)
123 , queue_adapter_(
std::move(queue_adapter))
126 , metrics_service_(
std::make_shared<metrics::metrics_service>()) {
129 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
130 initial_metrics.worker_threads.value = 0;
131 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
170 return this->shared_from_this();
198 std::scoped_lock<std::mutex> lock(workers_mutex_);
202 if (start_pool_.load(std::memory_order_acquire)) {
207 if (workers_.empty()) {
213 if (queue_adapter_) {
215 if (queue_adapter_->is_stopped()) {
218 return common::error_info{
220 "queue is stopped; create a new thread_pool instance for restart",
224 job_queue_ = queue_adapter_->get_job_queue();
225 }
else if (job_queue_ ==
nullptr || job_queue_->is_stopped()) {
228 job_queue_ = std::make_shared<kcenon::thread::job_queue>();
231 for (
auto& worker : workers_) {
232 worker->set_job_queue(job_queue_);
239 metrics_service_->reset();
242 for (
auto& worker : workers_) {
243 auto start_result = worker->start();
244 if (start_result.is_err()) {
247 return start_result.error();
254 start_pool_.store(
true, std::memory_order_release);
282 std::size_t worker_count = 0;
325 if (
job ==
nullptr) {
331 if (queue_adapter_) {
332 if (queue_adapter_->is_stopped()) {
335 }
else if (job_queue_ ==
nullptr) {
337 }
else if (job_queue_->is_stopped()) {
342 metrics_service_->record_submission();
344 auto start_time = std::chrono::steady_clock::now();
346 if (queue_adapter_) {
347 auto enqueue_result = queue_adapter_->enqueue(std::move(
job));
348 if (enqueue_result.is_err()) {
349 return enqueue_result.error();
352 auto enqueue_result = job_queue_->enqueue(std::move(
job));
353 if (enqueue_result.is_err()) {
354 return enqueue_result.error();
358 auto end_time = std::chrono::steady_clock::now();
359 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
360 metrics_service_->record_enqueue_with_latency(
latency);
362 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
363 metrics_service_->record_queue_depth(queue_size);
375 if (queue_adapter_) {
376 if (queue_adapter_->is_stopped()) {
379 }
else if (job_queue_ ==
nullptr) {
381 }
else if (job_queue_->is_stopped()) {
385 const auto batch_size = jobs.size();
386 metrics_service_->record_submission(batch_size);
388 auto start_time = std::chrono::steady_clock::now();
390 if (queue_adapter_) {
391 auto enqueue_result = queue_adapter_->enqueue_batch(std::move(jobs));
392 if (enqueue_result.is_err()) {
393 return enqueue_result.error();
396 auto enqueue_result = job_queue_->enqueue_batch(std::move(jobs));
397 if (enqueue_result.is_err()) {
398 return enqueue_result.error();
402 auto end_time = std::chrono::steady_clock::now();
403 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
404 metrics_service_->record_enqueue_with_latency(
latency, batch_size);
406 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
407 metrics_service_->record_queue_depth(queue_size);
413 if (worker ==
nullptr) {
418 std::shared_ptr<job_queue> worker_queue;
419 if (queue_adapter_) {
420 worker_queue = queue_adapter_->get_job_queue();
424 return common::error_info{
426 "policy_queue adapter without job_queue backend not yet supported for workers",
429 }
else if (job_queue_ ==
nullptr) {
432 worker_queue = job_queue_;
435 worker->set_job_queue(worker_queue);
436 worker->set_context(context_);
437 worker->set_metrics(metrics_service_->get_basic_metrics());
438 worker->set_diagnostics(&diagnostics());
439 worker->set_diagnostics_sample_rate(diagnostics().get_config().event_sample_rate);
445 std::scoped_lock<std::mutex> lock(workers_mutex_);
449 bool is_running = start_pool_.load(std::memory_order_acquire);
453 workers_.emplace_back(std::move(worker));
458 auto start_result = workers_.back()->start();
459 if (start_result.is_err()) {
462 return start_result.error();
470 -> common::VoidResult {
471 if (workers.empty()) {
475 if (job_queue_ ==
nullptr) {
481 std::scoped_lock<std::mutex> lock(workers_mutex_);
484 bool is_running = start_pool_.load(std::memory_order_acquire);
487 std::size_t start_index = workers_.size();
490 auto* diag = &diagnostics();
491 const auto sample_rate = diag->get_config().event_sample_rate;
493 for (
auto& worker : workers) {
494 worker->set_job_queue(job_queue_);
495 worker->set_context(context_);
496 worker->set_metrics(metrics_service_->get_basic_metrics());
497 worker->set_diagnostics(diag);
498 worker->set_diagnostics_sample_rate(sample_rate);
501 workers_.emplace_back(std::move(worker));
505 auto start_result = workers_.back()->start();
506 if (start_result.is_err()) {
508 workers_.erase(workers_.begin() +
static_cast<std::ptrdiff_t
>(start_index),
510 return start_result.error();
522 bool expected =
true;
523 if (!start_pool_.compare_exchange_strong(expected,
false, std::memory_order_acq_rel,
524 std::memory_order_acquire)) {
536 pool_cancellation_token_.cancel();
539 if (queue_adapter_) {
540 queue_adapter_->stop();
541 if (immediately_stop) {
542 queue_adapter_->clear();
544 }
else if (job_queue_ !=
nullptr) {
546 if (immediately_stop) {
554 std::scoped_lock<std::mutex> lock(workers_mutex_);
555 for (
auto& worker : workers_) {
556 auto stop_result = worker->stop();
557 if (stop_result.is_err()) {
558 context_.log(common::interfaces::log_level::error,
formatter::format(
"error stopping worker: {}",
559 stop_result.error().message));
587 bool expected =
true;
588 if (!
start_pool_.compare_exchange_strong(expected,
false, std::memory_order_acq_rel,
589 std::memory_order_acquire)) {
617 std::string format_string;
622 start_pool_.load(std::memory_order_relaxed) ?
"running" :
"stopped");
625 std::string queue_str;
626 if (queue_adapter_) {
627 queue_str = queue_adapter_->to_string();
628 }
else if (job_queue_ !=
nullptr) {
629 queue_str = job_queue_->to_string();
631 queue_str =
"nullptr";
636 std::scoped_lock<std::mutex> lock(workers_mutex_);
638 for (
const auto& worker : workers_) {
642 return format_string;
684 return static_cast<std::size_t
>(std::count_if(
686 [](
const std::unique_ptr<thread_worker>& worker) { return worker && worker->is_idle(); }));
692 return start_pool_.load(std::memory_order_acquire);
707 std::scoped_lock<std::mutex> lock(workers_mutex_);
709 std::size_t failed_count = 0;
713 std::remove_if(workers_.begin(), workers_.end(),
714 [&failed_count](
const std::unique_ptr<thread_worker>& worker) {
715 if (!worker || !worker->is_running()) {
722 workers_.erase(remove_iter, workers_.end());
725 if (restart_failed && failed_count > 0 && is_running()) {
727 auto* diag = &diagnostics();
728 const auto sample_rate = diag->get_config().event_sample_rate;
731 for (std::size_t i = 0; i < failed_count; ++i) {
733 auto worker = std::make_unique<thread_worker>(
true, context_);
736 worker->set_job_queue(job_queue_);
737 worker->set_metrics(metrics_service_->get_basic_metrics());
738 worker->set_diagnostics(diag);
739 worker->set_diagnostics_sample_rate(sample_rate);
742 auto start_result = worker->start();
743 if (start_result.is_err()) {
748 workers_.push_back(std::move(worker));
755auto thread_pool::get_active_worker_count() const ->
std::
size_t {
756 std::scoped_lock<std::mutex> lock(workers_mutex_);
758 return static_cast<std::size_t
>(std::count_if(workers_.begin(), workers_.end(),
759 [](
const std::unique_ptr<thread_worker>& worker) {
760 return worker && worker->is_running();
769 std::call_once(diagnostics_init_flag_, [
this]() {
770 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(*
this);
772 return *diagnostics_;
775auto thread_pool::diagnostics() const -> const diagnostics::thread_pool_diagnostics& {
776 std::call_once(diagnostics_init_flag_, [
this]() {
777 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(
780 return *diagnostics_;
783auto thread_pool::collect_worker_diagnostics() const
784 ->
std::vector<diagnostics::thread_info> {
785 std::scoped_lock<std::mutex> lock(workers_mutex_);
787 std::vector<diagnostics::thread_info>
result;
788 result.reserve(workers_.size());
790 for (std::size_t i = 0; i < workers_.size(); ++i) {
791 const auto& worker = workers_[i];
797 info.thread_id = worker->get_thread_id();
798 info.thread_name =
"Worker-" + std::to_string(i);
799 info.worker_id = worker->get_worker_id();
802 if (!worker->is_running()) {
803 info.state = diagnostics::worker_state::stopped;
804 }
else if (worker->is_idle()) {
805 info.state = diagnostics::worker_state::idle;
807 info.state = diagnostics::worker_state::active;
810 info.state_since = worker->get_state_since();
813 info.current_job = worker->get_current_job_info();
816 info.jobs_completed = worker->get_jobs_completed();
817 info.jobs_failed = worker->get_jobs_failed();
818 info.total_busy_time = worker->get_total_busy_time();
819 info.total_idle_time = worker->get_total_idle_time();
822 info.update_utilization();
834void thread_pool::add_policy(std::unique_ptr<pool_policy> policy) {
839 std::scoped_lock<std::mutex> lock(policies_mutex_);
840 policies_.push_back(std::move(policy));
847auto thread_pool::remove_policy(
const std::string& name) ->
bool {
848 std::scoped_lock<std::mutex> lock(policies_mutex_);
850 auto it = std::remove_if(policies_.begin(), policies_.end(),
851 [&name](
const std::unique_ptr<pool_policy>& policy) {
852 return policy && policy->get_name() == name;
855 if (it != policies_.end()) {
856 policies_.erase(it, policies_.end());
867auto thread_pool::remove_workers_internal(std::size_t count, std::size_t min_workers)
868 -> common::VoidResult
874 std::scoped_lock<std::mutex> lock(workers_mutex_);
876 if (workers_.size() <= min_workers) {
877 return common::error_info{
878 static_cast<int>(error_code::invalid_argument),
879 "Cannot remove workers: already at minimum",
885 std::size_t max_removable = workers_.size() - min_workers;
886 count = std::min(count, max_removable);
888 std::size_t removed = 0;
891 auto it = workers_.begin();
892 while (it != workers_.end() && removed < count) {
893 if (*it && (*it)->is_idle()) {
896 it = workers_.erase(it);
904 if (removed < count) {
907 context_.log(common::interfaces::log_level::info,
Provides a mechanism for cooperative cancellation of operations.
void cancel()
Cancels the operation.
static cancellation_token create()
Creates a new cancellation token.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Represents a unit of work (task) to be executed, typically by a job queue.
Enhanced thread pool metrics with histograms and percentiles.
Lightweight metrics container shared between thread_pool and workers.
void reset() override
Reset all metrics to their initial state.
Base interface for thread pool policies.
A template class representing either a value or an error.
Context object that provides access to optional services.
std::shared_ptr< IMonitor > monitoring() const
Get the monitoring service.
void update_thread_pool_metrics(const common::interfaces::thread_pool_metrics &metrics) const
Update thread pool metrics if monitoring is available.
static bool is_shutting_down()
Check if shutdown is in progress.
A thread pool for concurrent execution of jobs using multiple worker threads.
static std::atomic< std::uint32_t > next_pool_instance_id_
Static counter for generating unique pool instance IDs.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
virtual ~thread_pool(void)
Virtual destructor. Cleans up resources used by the thread pool.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
void report_metrics()
Collect and report current thread pool metrics.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
thread_pool(const std::string &thread_title="thread_pool", const thread_context &context=thread_context())
Constructs a new thread_pool instance.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
std::unique_ptr< pool_queue_adapter_interface > queue_adapter_
Queue adapter for unified access to different queue types.
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
std::shared_ptr< metrics::metrics_service > metrics_service_
Centralized metrics service for all pool and worker metrics.
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
void reset_metrics()
Reset accumulated metrics.
const metrics::EnhancedThreadPoolMetrics & enhanced_metrics() const
Access enhanced metrics (read-only reference).
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.
auto is_running() const -> bool
Check if the thread pool is currently running.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
std::string thread_title_
A title or name for this thread pool, useful for identification and logging.
std::uint32_t get_pool_instance_id() const
Get the pool instance id.
auto stop_unsafe() noexcept -> common::VoidResult
Stops the thread pool without logging (for use during static destruction).
void set_enhanced_metrics_enabled(bool enabled)
Enable or disable enhanced metrics collection.
std::atomic< bool > start_pool_
Indicates whether the pool is currently running.
cancellation_token pool_cancellation_token_
Pool-level cancellation token.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
std::shared_ptr< job_queue > job_queue_
The shared job queue where jobs (job objects) are enqueued.
auto check_worker_health(bool restart_failed=true) -> std::size_t
Check health of all worker threads and restart failed workers.
std::uint32_t pool_instance_id_
Unique instance ID for this pool (for multi-pool scenarios).
thread_context context_
The thread context providing access to logging and monitoring services.
auto get_job_queue(void) -> std::shared_ptr< job_queue >
Returns the shared job_queue used by this thread pool.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto get_ptr(void) -> std::shared_ptr< thread_pool >
Retrieves a std::shared_ptr to this thread_pool instance.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.
Core thread pool implementation with work stealing and auto-scaling.
Thread-safe FIFO job queue with optional bounded size.
Adapter bridging job_queue to pool_queue_adapter_interface.
Core threading foundation of the thread system library.
@ resource_allocation_failed
@ info
Informational messages highlighting progress.
@ latency
Latency threshold exceeded.
Information about a worker thread in the pool.
Enhanced snapshot with latency percentiles and throughput.
Internal logging interface for the thread system.
Runtime diagnostics, health monitoring, and execution tracing for thread pools.