Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::thread_pool Class Reference

A thread pool for concurrent execution of jobs using multiple worker threads. More...

#include <thread_pool.h>

Inheritance diagram for kcenon::thread::thread_pool:
Inheritance graph
Collaboration diagram for kcenon::thread::thread_pool:
Collaboration graph

Public Member Functions

 thread_pool (const std::string &thread_title="thread_pool", const thread_context &context=thread_context())
 Constructs a new thread_pool instance.
 
 thread_pool (const std::string &thread_title, std::shared_ptr< job_queue > custom_queue, const thread_context &context=thread_context())
 Constructs a new thread_pool instance with a custom job queue.
 
 thread_pool (const std::string &thread_title, std::unique_ptr< pool_queue_adapter_interface > queue_adapter, const thread_context &context=thread_context())
 Constructs a new thread_pool instance with a policy_queue adapter.
 
virtual ~thread_pool (void)
 Virtual destructor. Cleans up resources used by the thread pool.
 
auto get_ptr (void) -> std::shared_ptr< thread_pool >
 Retrieves a std::shared_ptr to this thread_pool instance.
 
auto start (void) -> common::VoidResult
 Starts the thread pool and all associated workers.
 
auto get_job_queue (void) -> std::shared_ptr< job_queue >
 Returns the shared job_queue used by this thread pool.
 
const metrics::ThreadPoolMetricsmetrics () const noexcept
 Access aggregated runtime metrics (read-only reference).
 
void reset_metrics ()
 Reset accumulated metrics.
 
void set_enhanced_metrics_enabled (bool enabled)
 Enable or disable enhanced metrics collection.
 
bool is_enhanced_metrics_enabled () const
 Check if enhanced metrics is enabled.
 
const metrics::EnhancedThreadPoolMetricsenhanced_metrics () const
 Access enhanced metrics (read-only reference).
 
metrics::EnhancedSnapshot enhanced_metrics_snapshot () const
 Get enhanced metrics snapshot.
 
auto enqueue (std::unique_ptr< job > &&job) -> common::VoidResult
 Enqueues a new job into the shared job_queue.
 
auto enqueue_batch (std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
 Enqueues a batch of jobs into the shared job_queue.
 
auto enqueue (std::unique_ptr< thread_worker > &&worker) -> common::VoidResult
 Adds a thread_worker to the thread pool for specialized or additional processing.
 
auto enqueue_batch (std::vector< std::unique_ptr< thread_worker > > &&workers) -> common::VoidResult
 Adds a batch of thread_worker objects to the thread pool.
 
auto stop (const bool &immediately_stop=false) -> common::VoidResult
 Stops the thread pool and all worker threads.
 
auto to_string (void) const -> std::string
 Provides a string representation of this thread_pool.
 
std::uint32_t get_pool_instance_id () const
 Get the pool instance id.
 
void report_metrics ()
 Collect and report current thread pool metrics.
 
std::size_t get_idle_worker_count () const
 Get the number of idle workers.
 
auto get_context (void) const -> const thread_context &
 Gets the thread context for this pool.
 
template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
auto submit (F &&callable, const submit_options &opts={}) -> std::future< R >
 
template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
auto submit (std::vector< F > &&callables, const submit_options &opts={}) -> std::vector< std::future< R > >
 
template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
auto submit_wait_all (std::vector< F > &&callables, const submit_options &opts={}) -> std::vector< R >
 Submit a batch and wait for all results.
 
template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
auto submit_wait_any (std::vector< F > &&callables, const submit_options &opts={}) -> common::Result< R >
 Submit a batch and return first completed result.
 
auto is_running () const -> bool
 Check if the thread pool is currently running.
 
auto get_pending_task_count () const -> std::size_t
 Get the number of pending tasks in the queue.
 
auto check_worker_health (bool restart_failed=true) -> std::size_t
 Check health of all worker threads and restart failed workers.
 
auto get_active_worker_count () const -> std::size_t
 Get the current number of active (running) workers.
 
void add_policy (std::unique_ptr< pool_policy > policy)
 Add a policy to the pool.
 
auto get_policies () const -> const std::vector< std::unique_ptr< pool_policy > > &
 Get all registered policies.
 
template<typename T = pool_policy>
auto find_policy (const std::string &name) -> T *
 Find a policy by name.
 
auto remove_policy (const std::string &name) -> bool
 Remove a policy by name.
 
auto diagnostics () -> diagnostics::thread_pool_diagnostics &
 Get the diagnostics interface for this pool.
 
auto diagnostics () const -> const diagnostics::thread_pool_diagnostics &
 Get the diagnostics interface for this pool (const version).
 
auto collect_worker_diagnostics () const -> std::vector< diagnostics::thread_info >
 Collects diagnostics information from all workers.
 

Private Member Functions

auto stop_unsafe () noexcept -> common::VoidResult
 Stops the thread pool without logging (for use during static destruction).
 
auto remove_workers_internal (std::size_t count, std::size_t min_workers=1) -> common::VoidResult
 Internal method to remove workers from the pool.
 

Private Attributes

std::string thread_title_
 A title or name for this thread pool, useful for identification and logging.
 
std::uint32_t pool_instance_id_ {0}
 Unique instance ID for this pool (for multi-pool scenarios).
 
std::atomic< bool > start_pool_
 Indicates whether the pool is currently running.
 
std::shared_ptr< job_queuejob_queue_
 The shared job queue where jobs (job objects) are enqueued.
 
std::unique_ptr< pool_queue_adapter_interfacequeue_adapter_
 Queue adapter for unified access to different queue types.
 
std::vector< std::unique_ptr< thread_worker > > workers_
 A collection of worker threads associated with this pool.
 
std::mutex workers_mutex_
 Mutex protecting concurrent access to the workers_ vector.
 
thread_context context_
 The thread context providing access to logging and monitoring services.
 
cancellation_token pool_cancellation_token_
 Pool-level cancellation token.
 
std::shared_ptr< metrics::metrics_servicemetrics_service_
 Centralized metrics service for all pool and worker metrics.
 
std::unique_ptr< diagnostics::thread_pool_diagnosticsdiagnostics_
 Diagnostics interface for this pool.
 
std::once_flag diagnostics_init_flag_
 Once flag for thread-safe lazy initialization of diagnostics_.
 
std::vector< std::unique_ptr< pool_policy > > policies_
 Registered pool policies for extending thread pool behavior.
 
std::mutex policies_mutex_
 Mutex protecting concurrent access to the policies_ vector.
 

Static Private Attributes

static std::atomic< std::uint32_t > next_pool_instance_id_ {0}
 Static counter for generating unique pool instance IDs.
 

Friends

class numa_thread_pool
 
class autoscaler
 
class diagnostics::thread_pool_diagnostics
 

Detailed Description

A thread pool for concurrent execution of jobs using multiple worker threads.

The thread_pool class manages a group of worker threads that process jobs from a shared job_queue. This implementation provides:

  • Efficient reuse of threads to reduce thread creation/destruction overhead
  • Controlled concurrency through a fixed or dynamic thread count
  • A simple interface for submitting jobs of various types
  • Graceful handling of thread startup, execution, and shutdown

The thread pool is designed for scenarios where many short-lived tasks need to be executed asynchronously without creating a new thread for each task.

Design Principles

  • Worker Thread Model: Each worker runs in its own thread, processing jobs from the shared queue.
  • Shared Job Queue: A single, thread-safe queue holds all pending jobs.
  • Job-Based Work Units: Jobs encapsulate work to be executed.
  • Non-Blocking Submission: Adding jobs to the pool never blocks the caller thread.
  • Cooperative Shutdown: Workers can complete current jobs before stopping.

Thread Safety

All public methods of this class are thread-safe and can be called from any thread. The underlying job_queue is also thread-safe, allowing multiple workers to dequeue jobs concurrently.

Performance Considerations

  • The number of worker threads should typically be close to the number of available CPU cores for CPU-bound tasks.
  • For I/O-bound tasks, more threads may be beneficial to maximize throughput while some threads are blocked on I/O.
  • Very large thread pools (significantly more threads than cores) may degrade performance due to context switching overhead.

IExecutor Interface

For IExecutor compatibility, use thread_pool_executor_adapter:

auto pool = std::make_shared<thread_pool>("my_pool");
auto executor = std::make_shared<adapters::thread_pool_executor_adapter>(pool);
Adapter to bridge thread_system pools with common IExecutor interface.
See also
thread_worker The worker thread class used by the pool
job_queue The shared queue for storing pending jobs
adapters::thread_pool_executor_adapter For IExecutor compatibility
typed_kcenon::thread::typed_thread_pool For a priority-based version
Examples
crash_protection/main.cpp, thread_pool_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 181 of file thread_pool.h.

Constructor & Destructor Documentation

◆ thread_pool() [1/3]

kcenon::thread::thread_pool::thread_pool ( const std::string & thread_title = "thread_pool",
const thread_context & context = thread_context() )

Constructs a new thread_pool instance.

Constructs a thread pool with adaptive job queue.

Parameters
thread_titleAn optional title or identifier for the thread pool (defaults to "thread_pool").
contextOptional thread context for logging and monitoring (defaults to empty context).

This title can be used for logging or debugging purposes. The context provides access to logging and monitoring services.

Implementation details:

  • Initializes with provided thread title for identification
  • Creates adaptive job queue that automatically optimizes based on contention
  • Pool starts in stopped state (start_pool_ = false)
  • No workers are initially assigned (workers_ is empty)
  • Stores thread context for logging and monitoring
  • Creates pool-level cancellation token for hierarchical cancellation

Adaptive Queue Strategy:

  • ADAPTIVE mode automatically switches between mutex and lock-free implementations
  • Provides optimal performance across different contention levels
  • Eliminates need for manual queue strategy selection
Parameters
thread_titleDescriptive name for this thread pool instance
contextThread context providing logging and monitoring services
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 51 of file thread_pool.cpp.

52 : thread_title_(thread_title)
54 , start_pool_(false)
55 , job_queue_(std::make_shared<kcenon::thread::job_queue>())
56 , context_(context)
58 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
59 // Report initial pool registration if monitoring is available
60 if (context_.monitoring()) {
61 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
62 initial_metrics.worker_threads.value = 0;
64 }
65}
static cancellation_token create()
Creates a new cancellation token.
std::shared_ptr< IMonitor > monitoring() const
Get the monitoring service.
void update_thread_pool_metrics(const common::interfaces::thread_pool_metrics &metrics) const
Update thread pool metrics if monitoring is available.
static std::atomic< std::uint32_t > next_pool_instance_id_
Static counter for generating unique pool instance IDs.
std::shared_ptr< metrics::metrics_service > metrics_service_
Centralized metrics service for all pool and worker metrics.
std::string thread_title_
A title or name for this thread pool, useful for identification and logging.
std::atomic< bool > start_pool_
Indicates whether the pool is currently running.
cancellation_token pool_cancellation_token_
Pool-level cancellation token.
std::shared_ptr< job_queue > job_queue_
The shared job queue where jobs (job objects) are enqueued.
std::uint32_t pool_instance_id_
Unique instance ID for this pool (for multi-pool scenarios).
thread_context context_
The thread context providing access to logging and monitoring services.

References context_, and kcenon::thread::thread_context::monitoring().

Here is the call graph for this function:

◆ thread_pool() [2/3]

kcenon::thread::thread_pool::thread_pool ( const std::string & thread_title,
std::shared_ptr< job_queue > custom_queue,
const thread_context & context = thread_context() )

Constructs a new thread_pool instance with a custom job queue.

Constructs a thread pool with a custom job queue.

Parameters
thread_titleA title or identifier for the thread pool.
custom_queueA custom job queue implementation (e.g., backpressure_job_queue).
contextOptional thread context for logging and monitoring (defaults to empty context).

This constructor allows injecting a specialized job queue such as backpressure_job_queue for rate limiting and flow control.

Implementation details:

  • Initializes with provided thread title for identification
  • Uses the provided custom job queue (e.g., backpressure_job_queue)
  • Pool starts in stopped state (start_pool_ = false)
  • No workers are initially assigned (workers_ is empty)
  • Stores thread context for logging and monitoring
  • Creates pool-level cancellation token for hierarchical cancellation
Parameters
thread_titleDescriptive name for this thread pool instance
custom_queueCustom job queue implementation
contextThread context providing logging and monitoring services

Definition at line 82 of file thread_pool.cpp.

85 : thread_title_(thread_title)
87 , start_pool_(false)
88 , job_queue_(std::move(custom_queue))
89 , context_(context)
91 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
92 // Report initial pool registration if monitoring is available
93 if (context_.monitoring()) {
94 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
95 initial_metrics.worker_threads.value = 0;
97 }
98}

References context_, and kcenon::thread::thread_context::monitoring().

Here is the call graph for this function:

◆ thread_pool() [3/3]

kcenon::thread::thread_pool::thread_pool ( const std::string & thread_title,
std::unique_ptr< pool_queue_adapter_interface > queue_adapter,
const thread_context & context = thread_context() )

Constructs a new thread_pool instance with a policy_queue adapter.

Constructs a thread pool with a policy_queue adapter.

Parameters
thread_titleA title or identifier for the thread pool.
queue_adapterA queue adapter wrapping a policy_queue.
contextOptional thread context for logging and monitoring (defaults to empty context).

This constructor allows using the new policy-based queue system with thread_pool. Use adapters from <kcenon/thread/adapters/policy_queue_adapter.h>.

Example

// Create thread_pool with standard_queue (mutex-based, unbounded)
auto pool = std::make_shared<thread_pool>(
"my_pool",
// Or with lock-free queue
auto fast_pool = std::make_shared<thread_pool>(
"fast_pool",
auto make_lockfree_queue_adapter() -> std::unique_ptr< pool_queue_adapter_interface >
Create a lock-free queue adapter.
auto make_standard_queue_adapter() -> std::unique_ptr< pool_queue_adapter_interface >
Create a standard_queue adapter.
Adapter bridging policy_queue to pool_queue_adapter_interface.

Implementation details:

  • Initializes with provided thread title for identification
  • Uses the provided queue adapter for queue operations
  • If adapter wraps a job_queue, extracts it for backward compatibility
  • Pool starts in stopped state (start_pool_ = false)
  • No workers are initially assigned (workers_ is empty)
  • Stores thread context for logging and monitoring
  • Creates pool-level cancellation token for hierarchical cancellation
Parameters
thread_titleDescriptive name for this thread pool instance
queue_adapterQueue adapter wrapping job_queue or policy_queue
contextThread context providing logging and monitoring services

Definition at line 116 of file thread_pool.cpp.

119 : thread_title_(thread_title)
121 , start_pool_(false)
122 , job_queue_(queue_adapter ? queue_adapter->get_job_queue() : nullptr)
123 , queue_adapter_(std::move(queue_adapter))
124 , context_(context)
126 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
127 // Report initial pool registration if monitoring is available
128 if (context_.monitoring()) {
129 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
130 initial_metrics.worker_threads.value = 0;
132 }
133}
std::unique_ptr< pool_queue_adapter_interface > queue_adapter_
Queue adapter for unified access to different queue types.

References context_, and kcenon::thread::thread_context::monitoring().

Here is the call graph for this function:

◆ ~thread_pool()

kcenon::thread::thread_pool::~thread_pool ( void )
virtual

Virtual destructor. Cleans up resources used by the thread pool.

Destroys the thread pool, ensuring all workers are stopped.

If the pool is still running, this typically calls stop() internally to ensure all worker threads are properly shut down.

Implementation details:

  • Checks if static destruction is in progress to avoid SDOF
  • Uses stop_unsafe() during static destruction (no logging)
  • Uses regular stop() during normal destruction
  • Workers will complete current jobs before terminating
  • Prevents resource leaks from running threads
See also
stop_unsafe() for the logging-free shutdown variant
thread_logger::is_shutting_down() for shutdown detection
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 148 of file thread_pool.cpp.

148 {
149 // Check if we're in static destruction phase
150 // During static destruction, logger/monitoring singletons may already be destroyed
152 // Minimal cleanup without logging to avoid SDOF
153 stop_unsafe();
154 } else {
155 stop();
156 }
157}
static bool is_shutting_down()
Check if shutdown is in progress.
auto stop_unsafe() noexcept -> common::VoidResult
Stops the thread pool without logging (for use during static destruction).
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.

References kcenon::thread::thread_logger::is_shutting_down(), stop(), and stop_unsafe().

Here is the call graph for this function:

Member Function Documentation

◆ add_policy()

void kcenon::thread::thread_pool::add_policy ( std::unique_ptr< pool_policy > policy)

Add a policy to the pool.

Parameters
policyUnique pointer to the policy to add.

Policies provide a way to extend thread pool behavior without modifying the thread_pool class. Multiple policies can be added and they will be called in order of addition.

See also
pool_policy
circuit_breaker_policy

Example

auto pool = std::make_shared<thread_pool>("my_pool");
// Add circuit breaker protection
pool->add_policy(std::make_unique<circuit_breaker_policy>(config));
pool->start();
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 834 of file thread_pool.cpp.

834 {
835 if (!policy) {
836 return;
837 }
838
839 std::scoped_lock<std::mutex> lock(policies_mutex_);
840 policies_.push_back(std::move(policy));
841}
std::mutex policies_mutex_
Mutex protecting concurrent access to the policies_ vector.
std::vector< std::unique_ptr< pool_policy > > policies_
Registered pool policies for extending thread pool behavior.

◆ check_worker_health()

auto kcenon::thread::thread_pool::check_worker_health ( bool restart_failed = true) -> std::size_t

Check health of all worker threads and restart failed workers.

This method performs health monitoring on all worker threads:

  • Detects workers that have stopped unexpectedly (consecutive failures)
  • Removes dead workers from the pool
  • Optionally restarts failed workers to maintain pool capacity

Use Cases:

  • Periodic health checks (e.g., from a watchdog thread)
  • Recovery from worker failures in long-running processes
  • Maintaining consistent thread pool capacity

Thread Safety:

  • Thread-safe: can be called from any thread
  • Acquires workers_mutex_ for safe access to workers vector
Parameters
restart_failedIf true, creates new workers to replace failed ones
Returns
Number of workers that were detected as failed/unhealthy

Example:

// Periodic health check with auto-restart
auto failed_count = pool.check_worker_health(true);
if (failed_count > 0) {
LOG_WARNING("Restarted {} failed workers", failed_count);
}
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 706 of file thread_pool.cpp.

706 {
707 std::scoped_lock<std::mutex> lock(workers_mutex_);
708
709 std::size_t failed_count = 0;
710
711 // Remove dead workers using erase-remove idiom
712 auto remove_iter =
713 std::remove_if(workers_.begin(), workers_.end(),
714 [&failed_count](const std::unique_ptr<thread_worker>& worker) {
715 if (!worker || !worker->is_running()) {
716 ++failed_count;
717 return true; // Remove this worker
718 }
719 return false; // Keep this worker
720 });
721
722 workers_.erase(remove_iter, workers_.end());
723
724 // Restart workers if requested and pool is running
725 if (restart_failed && failed_count > 0 && is_running()) {
726 // Get diagnostics pointer and config for new workers
727 auto* diag = &diagnostics();
728 const auto sample_rate = diag->get_config().event_sample_rate;
729
730 // Create new workers to replace failed ones
731 for (std::size_t i = 0; i < failed_count; ++i) {
732 // Create worker with default settings and context
733 auto worker = std::make_unique<thread_worker>(true, context_);
734
735 // Set job queue and diagnostics
736 worker->set_job_queue(job_queue_);
737 worker->set_metrics(metrics_service_->get_basic_metrics());
738 worker->set_diagnostics(diag);
739 worker->set_diagnostics_sample_rate(sample_rate);
740
741 // Start the new worker
742 auto start_result = worker->start();
743 if (start_result.is_err()) {
744 // Failed to start, skip this worker
745 continue;
746 }
747
748 workers_.push_back(std::move(worker));
749 }
750 }
751
752 return failed_count;
753}
auto diagnostics() -> diagnostics::thread_pool_diagnostics &
Get the diagnostics interface for this pool.
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
auto is_running() const -> bool
Check if the thread pool is currently running.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.

◆ collect_worker_diagnostics()

auto kcenon::thread::thread_pool::collect_worker_diagnostics ( ) const -> std::vector<diagnostics::thread_info>
nodiscard

Collects diagnostics information from all workers.

Returns
Vector of thread_info for each worker.

Thread Safety:

  • Acquires workers_mutex_ for safe access
  • Returns snapshot of current worker states
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 783 of file thread_pool.cpp.

784 {
785 std::scoped_lock<std::mutex> lock(workers_mutex_);
786
787 std::vector<diagnostics::thread_info> result;
788 result.reserve(workers_.size());
789
790 for (std::size_t i = 0; i < workers_.size(); ++i) {
791 const auto& worker = workers_[i];
792 if (!worker) {
793 continue;
794 }
795
796 diagnostics::thread_info info;
797 info.thread_id = worker->get_thread_id();
798 info.thread_name = "Worker-" + std::to_string(i);
799 info.worker_id = worker->get_worker_id();
800
801 // Determine worker state
802 if (!worker->is_running()) {
804 } else if (worker->is_idle()) {
806 } else {
808 }
809
810 info.state_since = worker->get_state_since();
811
812 // Get current job info if active
813 info.current_job = worker->get_current_job_info();
814
815 // Get statistics
816 info.jobs_completed = worker->get_jobs_completed();
817 info.jobs_failed = worker->get_jobs_failed();
818 info.total_busy_time = worker->get_total_busy_time();
819 info.total_idle_time = worker->get_total_idle_time();
820
821 // Calculate utilization
822 info.update_utilization();
823
824 result.push_back(std::move(info));
825 }
826
827 return result;
828}
@ active
Worker is executing a job.
@ idle
Worker is waiting for jobs.
@ info
Informational messages highlighting progress.

References kcenon::thread::info.

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), and kcenon::thread::diagnostics::thread_pool_diagnostics::dump_thread_states().

Here is the caller graph for this function:

◆ diagnostics() [1/2]

auto kcenon::thread::thread_pool::diagnostics ( ) -> diagnostics::thread_pool_diagnostics&
nodiscard

Get the diagnostics interface for this pool.

Returns
Reference to the diagnostics object.

The diagnostics interface provides:

  • Thread dump capabilities
  • Job inspection
  • Bottleneck detection
  • Health check integration
  • Event tracing
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 768 of file thread_pool.cpp.

768 {
769 std::call_once(diagnostics_init_flag_, [this]() {
770 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(*this);
771 });
772 return *diagnostics_;
773}
std::once_flag diagnostics_init_flag_
Once flag for thread-safe lazy initialization of diagnostics_.
std::unique_ptr< diagnostics::thread_pool_diagnostics > diagnostics_
Diagnostics interface for this pool.

◆ diagnostics() [2/2]

auto kcenon::thread::thread_pool::diagnostics ( ) const -> const diagnostics::thread_pool_diagnostics&
nodiscard

Get the diagnostics interface for this pool (const version).

Returns
Const reference to the diagnostics object.

Definition at line 775 of file thread_pool.cpp.

775 {
776 std::call_once(diagnostics_init_flag_, [this]() {
777 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(
778 const_cast<thread_pool&>(*this));
779 });
780 return *diagnostics_;
781}
thread_pool(const std::string &thread_title="thread_pool", const thread_context &context=thread_context())
Constructs a new thread_pool instance.

◆ enhanced_metrics()

const metrics::EnhancedThreadPoolMetrics & kcenon::thread::thread_pool::enhanced_metrics ( ) const
nodiscard

Access enhanced metrics (read-only reference).

Returns
Reference to enhanced metrics with histograms and percentiles.
Exceptions
std::runtime_errorif enhanced metrics is not enabled.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 294 of file thread_pool.cpp.

294 {
295 return metrics_service_->enhanced_metrics();
296}

References metrics_service_.

◆ enhanced_metrics_snapshot()

metrics::EnhancedSnapshot kcenon::thread::thread_pool::enhanced_metrics_snapshot ( ) const
nodiscard

Get enhanced metrics snapshot.

Returns
EnhancedSnapshot with all current metric values.

Returns empty snapshot if enhanced metrics is not enabled.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 298 of file thread_pool.cpp.

298 {
299 return metrics_service_->enhanced_snapshot();
300}

References metrics_service_.

Referenced by kcenon::thread::autoscaler::collect_metrics().

Here is the caller graph for this function:

◆ enqueue() [1/2]

auto kcenon::thread::thread_pool::enqueue ( std::unique_ptr< job > && job) -> common::VoidResult

Enqueues a new job into the shared job_queue.

Adds a single job to the thread pool for processing.

Parameters
jobA std::unique_ptr<job> representing the work to be done.
Returns
common::VoidResult containing an error on failure, or success value on success.

Implementation details:

  • Validates job pointer before submission
  • Validates queue availability
  • Delegates to adaptive job queue for optimal scheduling
  • Job will be processed by next available worker thread

Queue Behavior:

  • Adaptive queue automatically selects best strategy (mutex/lock-free)
  • Jobs are processed in FIFO order within the selected strategy
  • Workers are notified when jobs become available

Thread Safety:

  • Safe to call from multiple threads simultaneously
  • Adaptive queue handles contention efficiently
Parameters
jobUnique pointer to job (ownership transferred)
Returns
std::nullopt on success, error message on failure
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h, and crash_protection/main.cpp.

Definition at line 323 of file thread_pool.cpp.

323 {
324 // Validate inputs
325 if (job == nullptr) {
326 return common::error_info{static_cast<int>(error_code::invalid_argument), "job is null", "thread_system"};
327 }
328
329 // Check queue availability and stopped state
330 // Supports both queue_adapter_ (policy_queue) and job_queue_ (legacy)
331 if (queue_adapter_) {
332 if (queue_adapter_->is_stopped()) {
333 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
334 }
335 } else if (job_queue_ == nullptr) {
336 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
337 } else if (job_queue_->is_stopped()) {
338 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
339 }
340
341 // Record metrics and enqueue job
342 metrics_service_->record_submission();
343
344 auto start_time = std::chrono::steady_clock::now();
345
346 if (queue_adapter_) {
347 auto enqueue_result = queue_adapter_->enqueue(std::move(job));
348 if (enqueue_result.is_err()) {
349 return enqueue_result.error();
350 }
351 } else {
352 auto enqueue_result = job_queue_->enqueue(std::move(job));
353 if (enqueue_result.is_err()) {
354 return enqueue_result.error();
355 }
356 }
357
358 auto end_time = std::chrono::steady_clock::now();
359 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
360 metrics_service_->record_enqueue_with_latency(latency);
361
362 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
363 metrics_service_->record_queue_depth(queue_size);
364
365 return common::ok();
366}
@ latency
Latency threshold exceeded.

References kcenon::thread::invalid_argument, kcenon::thread::latency, kcenon::thread::queue_stopped, and kcenon::thread::resource_allocation_failed.

Referenced by main().

Here is the caller graph for this function:

◆ enqueue() [2/2]

auto kcenon::thread::thread_pool::enqueue ( std::unique_ptr< thread_worker > && worker) -> common::VoidResult

Adds a thread_worker to the thread pool for specialized or additional processing.

Parameters
workerA std::unique_ptr<thread_worker> object.
Returns
common::VoidResult containing an error on failure, or success value on success.

Definition at line 412 of file thread_pool.cpp.

412 {
413 if (worker == nullptr) {
414 return common::error_info{static_cast<int>(error_code::invalid_argument), "worker is null", "thread_system"};
415 }
416
417 // Get job_queue from adapter if available, otherwise use direct job_queue_
418 std::shared_ptr<job_queue> worker_queue;
419 if (queue_adapter_) {
420 worker_queue = queue_adapter_->get_job_queue();
421 if (!worker_queue) {
422 // policy_queue adapter without job_queue backend
423 // Workers currently require job_queue; this limitation may be lifted in future versions
424 return common::error_info{
426 "policy_queue adapter without job_queue backend not yet supported for workers",
427 "thread_system"};
428 }
429 } else if (job_queue_ == nullptr) {
430 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
431 } else {
432 worker_queue = job_queue_;
433 }
434
435 worker->set_job_queue(worker_queue);
436 worker->set_context(context_);
437 worker->set_metrics(metrics_service_->get_basic_metrics());
438 worker->set_diagnostics(&diagnostics());
439 worker->set_diagnostics_sample_rate(diagnostics().get_config().event_sample_rate);
440
441 // Acquire lock before checking start_pool_ and adding worker
442 // This prevents race condition with stop():
443 // - stop() acquires workers_mutex_ after atomically setting start_pool_ to false
444 // - If we check start_pool_ while holding the lock, we ensure consistent state
445 std::scoped_lock<std::mutex> lock(workers_mutex_);
446
447 // Use memory_order_acquire to ensure we see all previous modifications
448 // made by the thread that set start_pool_ to true (in start())
449 bool is_running = start_pool_.load(std::memory_order_acquire);
450
451 // Add worker to vector first, before starting
452 // This ensures stop() will see and stop this worker if called concurrently
453 workers_.emplace_back(std::move(worker));
454
455 // Only start the worker if pool is running
456 // Since we hold workers_mutex_, stop() cannot proceed until we release it
457 if (is_running) {
458 auto start_result = workers_.back()->start();
459 if (start_result.is_err()) {
460 // Remove the worker we just added since it failed to start
461 workers_.pop_back();
462 return start_result.error();
463 }
464 }
465
466 return common::ok();
467}

References kcenon::thread::invalid_argument, and kcenon::thread::resource_allocation_failed.

◆ enqueue_batch() [1/2]

auto kcenon::thread::thread_pool::enqueue_batch ( std::vector< std::unique_ptr< job > > && jobs) -> common::VoidResult

Enqueues a batch of jobs into the shared job_queue.

Parameters
jobsA vector of std::unique_ptr<job> objects to be added.
Returns
common::VoidResult containing an error on failure, or success value on success.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h, crash_protection/main.cpp, thread_pool_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 368 of file thread_pool.cpp.

368 {
369 if (jobs.empty()) {
370 return common::error_info{static_cast<int>(error_code::invalid_argument), "jobs are empty", "thread_system"};
371 }
372
373 // Check queue availability and stopped state
374 // Supports both queue_adapter_ (policy_queue) and job_queue_ (legacy)
375 if (queue_adapter_) {
376 if (queue_adapter_->is_stopped()) {
377 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
378 }
379 } else if (job_queue_ == nullptr) {
380 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
381 } else if (job_queue_->is_stopped()) {
382 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
383 }
384
385 const auto batch_size = jobs.size();
386 metrics_service_->record_submission(batch_size);
387
388 auto start_time = std::chrono::steady_clock::now();
389
390 if (queue_adapter_) {
391 auto enqueue_result = queue_adapter_->enqueue_batch(std::move(jobs));
392 if (enqueue_result.is_err()) {
393 return enqueue_result.error();
394 }
395 } else {
396 auto enqueue_result = job_queue_->enqueue_batch(std::move(jobs));
397 if (enqueue_result.is_err()) {
398 return enqueue_result.error();
399 }
400 }
401
402 auto end_time = std::chrono::steady_clock::now();
403 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
404 metrics_service_->record_enqueue_with_latency(latency, batch_size);
405
406 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
407 metrics_service_->record_queue_depth(queue_size);
408
409 return common::ok();
410}

References kcenon::thread::invalid_argument, kcenon::thread::latency, kcenon::thread::queue_stopped, and kcenon::thread::resource_allocation_failed.

Referenced by main(), store_job(), store_job(), and store_job().

Here is the caller graph for this function:

◆ enqueue_batch() [2/2]

auto kcenon::thread::thread_pool::enqueue_batch ( std::vector< std::unique_ptr< thread_worker > > && workers) -> common::VoidResult

Adds a batch of thread_worker objects to the thread pool.

Parameters
workersA vector of std::unique_ptr<thread_worker> objects.
Returns
common::VoidResult containing an error on failure, or success value on success.

Definition at line 469 of file thread_pool.cpp.

470 {
471 if (workers.empty()) {
472 return common::error_info{static_cast<int>(error_code::invalid_argument), "workers are empty", "thread_system"};
473 }
474
475 if (job_queue_ == nullptr) {
476 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
477 }
478
479 // Acquire lock before processing workers
480 // This ensures atomic check-and-add operation with respect to stop()
481 std::scoped_lock<std::mutex> lock(workers_mutex_);
482
483 // Check pool running state once with acquire semantics
484 bool is_running = start_pool_.load(std::memory_order_acquire);
485
486 // Track the starting index for rollback in case of error
487 std::size_t start_index = workers_.size();
488
489 // Get diagnostics pointer and config once outside the loop for efficiency
490 auto* diag = &diagnostics();
491 const auto sample_rate = diag->get_config().event_sample_rate;
492
493 for (auto& worker : workers) {
494 worker->set_job_queue(job_queue_);
495 worker->set_context(context_);
496 worker->set_metrics(metrics_service_->get_basic_metrics());
497 worker->set_diagnostics(diag);
498 worker->set_diagnostics_sample_rate(sample_rate);
499
500 // Add worker to vector first
501 workers_.emplace_back(std::move(worker));
502
503 // Only start if pool is running
504 if (is_running) {
505 auto start_result = workers_.back()->start();
506 if (start_result.is_err()) {
507 // Rollback: remove all workers added in this batch
508 workers_.erase(workers_.begin() + static_cast<std::ptrdiff_t>(start_index),
509 workers_.end());
510 return start_result.error();
511 }
512 }
513 }
514
515 return common::ok();
516}

References kcenon::thread::invalid_argument, and kcenon::thread::resource_allocation_failed.

◆ find_policy()

template<typename T >
auto kcenon::thread::thread_pool::find_policy ( const std::string & name) -> T*
nodiscard

Find a policy by name.

Parameters
nameThe policy name to search for.
Returns
Pointer to the policy, or nullptr if not found.

Example

auto* cb = pool->find_policy<circuit_breaker_policy>("circuit_breaker_policy");
if (cb) {
std::cout << "Circuit state: " << to_string(cb->get_state()) << std::endl;
}
Pool policy that implements circuit breaker pattern for failure protection.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 118 of file thread_pool_impl.h.

119{
120 std::scoped_lock<std::mutex> lock(policies_mutex_);
121
122 for (auto& policy : policies_) {
123 if (policy && policy->get_name() == name) {
124 return dynamic_cast<T*>(policy.get());
125 }
126 }
127
128 return nullptr;
129}

◆ get_active_worker_count()

auto kcenon::thread::thread_pool::get_active_worker_count ( ) const -> std::size_t

Get the current number of active (running) workers.

Returns
Number of workers currently in running state

This differs from the total worker count as it only counts workers that are actively running, not stopped or stopping.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 755 of file thread_pool.cpp.

755 {
756 std::scoped_lock<std::mutex> lock(workers_mutex_);
757
758 return static_cast<std::size_t>(std::count_if(workers_.begin(), workers_.end(),
759 [](const std::unique_ptr<thread_worker>& worker) {
760 return worker && worker->is_running();
761 }));
762}

Referenced by kcenon::thread::autoscaler::autoscaler(), kcenon::thread::autoscaler::can_scale_down(), kcenon::thread::autoscaler::can_scale_up(), kcenon::thread::diagnostics::thread_pool_diagnostics::check_worker_health(), kcenon::thread::autoscaler::collect_metrics(), kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), kcenon::thread::diagnostics::thread_pool_diagnostics::format_thread_dump(), kcenon::thread::diagnostics::thread_pool_diagnostics::health_check(), and kcenon::thread::adapters::thread_pool_executor_adapter::worker_count().

Here is the caller graph for this function:

◆ get_context()

auto kcenon::thread::thread_pool::get_context ( void ) const -> const thread_context&
nodiscard

Gets the thread context for this pool.

Returns
The thread context providing access to logging and monitoring services.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 645 of file thread_pool.cpp.

645 {
646 return context_;
647}

◆ get_idle_worker_count()

std::size_t kcenon::thread::thread_pool::get_idle_worker_count ( ) const
nodiscard

Get the number of idle workers.

Returns
Number of workers currently not processing jobs.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 679 of file thread_pool.cpp.

679 {
680 // Count idle workers by checking each worker's idle state
681 // Thread safety: workers_mutex_ protects access to workers_ vector
682 std::scoped_lock<std::mutex> lock(workers_mutex_);
683
684 return static_cast<std::size_t>(std::count_if(
685 workers_.begin(), workers_.end(),
686 [](const std::unique_ptr<thread_worker>& worker) { return worker && worker->is_idle(); }));
687}

References workers_, and workers_mutex_.

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::check_worker_health(), kcenon::thread::autoscaler::collect_metrics(), kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), kcenon::thread::diagnostics::thread_pool_diagnostics::format_thread_dump(), and report_metrics().

Here is the caller graph for this function:

◆ get_job_queue()

auto kcenon::thread::thread_pool::get_job_queue ( void ) -> std::shared_ptr<job_queue>
nodiscard

Returns the shared job_queue used by this thread pool.

Returns the shared job queue used by all workers.

Returns
A std::shared_ptr<job_queue> that stores the queued jobs.

The returned queue is shared among all worker threads in the pool, which can concurrently dequeue and process jobs.

Implementation details:

  • Provides access to the adaptive job queue for external job submission
  • Queue is shared among all workers for load balancing
  • Adaptive queue automatically optimizes based on contention patterns
Returns
Shared pointer to the job queue
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 269 of file thread_pool.cpp.

269 {
270 return job_queue_;
271}

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::check_queue_health(), kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), and kcenon::thread::diagnostics::thread_pool_diagnostics::health_check().

Here is the caller graph for this function:

◆ get_pending_task_count()

auto kcenon::thread::thread_pool::get_pending_task_count ( ) const -> std::size_t

Get the number of pending tasks in the queue.

Returns
Number of tasks waiting to be processed
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 695 of file thread_pool.cpp.

695 {
696 // Supports both queue_adapter_ and job_queue_
697 if (queue_adapter_) {
698 return queue_adapter_->size();
699 }
700 if (job_queue_) {
701 return job_queue_->size();
702 }
703 return 0;
704}

References job_queue_, and queue_adapter_.

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::check_queue_health(), kcenon::thread::autoscaler::collect_metrics(), kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), kcenon::thread::diagnostics::thread_pool_diagnostics::health_check(), and kcenon::thread::adapters::thread_pool_executor_adapter::pending_tasks().

Here is the caller graph for this function:

◆ get_policies()

auto kcenon::thread::thread_pool::get_policies ( ) const -> const std::vector<std::unique_ptr<pool_policy>>&
nodiscard

Get all registered policies.

Returns
Const reference to the vector of policies.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 843 of file thread_pool.cpp.

843 {
844 return policies_;
845}

◆ get_pool_instance_id()

std::uint32_t kcenon::thread::thread_pool::get_pool_instance_id ( ) const
nodiscard

Get the pool instance id.

Returns
Returns the unique instance id for this pool.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 649 of file thread_pool.cpp.

649 {
650 return pool_instance_id_;
651}

References pool_instance_id_.

◆ get_ptr()

auto kcenon::thread::thread_pool::get_ptr ( void ) -> std::shared_ptr<thread_pool>
nodiscard

Retrieves a std::shared_ptr to this thread_pool instance.

Returns a shared pointer to this thread pool instance.

Returns
A shared pointer to the current thread_pool object.

By inheriting from std::enable_shared_from_this, you can call get_ptr() within member functions to avoid storing a separate shared pointer.

Implementation details:

  • Uses std::enable_shared_from_this for safe shared_ptr creation
  • Required for passing pool reference to workers or other components
  • Ensures proper lifetime management in multi-threaded environment
Returns
Shared pointer to this thread_pool
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 169 of file thread_pool.cpp.

169 {
170 return this->shared_from_this();
171}

◆ is_enhanced_metrics_enabled()

bool kcenon::thread::thread_pool::is_enhanced_metrics_enabled ( ) const
nodiscard

Check if enhanced metrics is enabled.

Returns
True if enhanced metrics collection is enabled.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 290 of file thread_pool.cpp.

290 {
291 return metrics_service_->is_enhanced_metrics_enabled();
292}

References metrics_service_.

Referenced by kcenon::thread::autoscaler::collect_metrics().

Here is the caller graph for this function:

◆ is_running()

auto kcenon::thread::thread_pool::is_running ( ) const -> bool

Check if the thread pool is currently running.

Returns
true if the pool is active, false otherwise
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 689 of file thread_pool.cpp.

689 {
690 // Use acquire to ensure we see the latest pool state
691 // This is important for callers making decisions based on running state
692 return start_pool_.load(std::memory_order_acquire);
693}

References start_pool_.

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::check_worker_health(), kcenon::thread::diagnostics::thread_pool_diagnostics::is_healthy(), and kcenon::thread::adapters::thread_pool_executor_adapter::is_running().

Here is the caller graph for this function:

◆ metrics()

const metrics::ThreadPoolMetrics & kcenon::thread::thread_pool::metrics ( ) const
nodiscardnoexcept

◆ remove_policy()

auto kcenon::thread::thread_pool::remove_policy ( const std::string & name) -> bool

Remove a policy by name.

Parameters
nameThe policy name to remove.
Returns
True if policy was found and removed.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 847 of file thread_pool.cpp.

847 {
848 std::scoped_lock<std::mutex> lock(policies_mutex_);
849
850 auto it = std::remove_if(policies_.begin(), policies_.end(),
851 [&name](const std::unique_ptr<pool_policy>& policy) {
852 return policy && policy->get_name() == name;
853 });
854
855 if (it != policies_.end()) {
856 policies_.erase(it, policies_.end());
857 return true;
858 }
859
860 return false;
861}

◆ remove_workers_internal()

auto kcenon::thread::thread_pool::remove_workers_internal ( std::size_t count,
std::size_t min_workers = 1 ) -> common::VoidResult
nodiscardprivate

Internal method to remove workers from the pool.

Parameters
countNumber of workers to remove.
min_workersMinimum workers to keep.
Returns
Error if operation fails.

Used internally by autoscaler for scale-down operations. Gracefully stops and removes idle workers.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 867 of file thread_pool.cpp.

869{
870 if (count == 0) {
871 return common::ok();
872 }
873
874 std::scoped_lock<std::mutex> lock(workers_mutex_);
875
876 if (workers_.size() <= min_workers) {
877 return common::error_info{
878 static_cast<int>(error_code::invalid_argument),
879 "Cannot remove workers: already at minimum",
880 "thread_system"
881 };
882 }
883
884 // Calculate how many we can actually remove
885 std::size_t max_removable = workers_.size() - min_workers;
886 count = std::min(count, max_removable);
887
888 std::size_t removed = 0;
889
890 // First pass: remove idle workers
891 auto it = workers_.begin();
892 while (it != workers_.end() && removed < count) {
893 if (*it && (*it)->is_idle()) {
894 // Stop the worker
895 (*it)->stop();
896 it = workers_.erase(it);
897 ++removed;
898 } else {
899 ++it;
900 }
901 }
902
903 // If we still need to remove more, wait briefly for workers to become idle
904 if (removed < count) {
905 // Just return success with what we removed
906 // Remaining workers will be removed on subsequent calls
907 context_.log(common::interfaces::log_level::info,
908 formatter::format("Removed {} of {} requested workers (remaining are busy)",
909 removed, count));
910 }
911
912 return common::ok();
913}
void log(common::interfaces::log_level level, const std::string &message) const
Log a message if logger is available.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
constexpr size_t min_workers
Definition config.h:27

References kcenon::thread::utils::formatter::format().

Here is the call graph for this function:

◆ report_metrics()

void kcenon::thread::thread_pool::report_metrics ( )

Collect and report current thread pool metrics.

This method gathers current metrics from the pool and reports them through the monitoring interface if available.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 653 of file thread_pool.cpp.

653 {
654 if (!context_.monitoring()) {
655 return;
656 }
657
658 common::interfaces::thread_pool_metrics metrics(thread_title_, pool_instance_id_);
659
660 // Protect workers_ access with lock
661 {
662 std::scoped_lock<std::mutex> lock(workers_mutex_);
663 metrics.worker_threads.value = static_cast<double>(workers_.size());
664 }
665
666 metrics.idle_threads.value = static_cast<double>(get_idle_worker_count());
667
668 // Get pending jobs count (supports both queue_adapter_ and job_queue_)
669 if (queue_adapter_) {
670 metrics.jobs_pending.value = static_cast<double>(queue_adapter_->size());
671 } else if (job_queue_) {
672 metrics.jobs_pending.value = static_cast<double>(job_queue_->size());
673 }
674
675 // Report metrics with pool identification
677}
std::size_t get_idle_worker_count() const
Get the number of idle workers.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).

References context_, get_idle_worker_count(), job_queue_, metrics(), kcenon::thread::thread_context::monitoring(), pool_instance_id_, queue_adapter_, thread_title_, kcenon::thread::thread_context::update_thread_pool_metrics(), workers_, and workers_mutex_.

Here is the call graph for this function:

◆ reset_metrics()

void kcenon::thread::thread_pool::reset_metrics ( )

Reset accumulated metrics.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 277 of file thread_pool.cpp.

277 {
278 metrics_service_->reset();
279}

References metrics_service_, and kcenon::thread::metrics::ThreadPoolMetrics::reset().

Here is the call graph for this function:

◆ set_enhanced_metrics_enabled()

void kcenon::thread::thread_pool::set_enhanced_metrics_enabled ( bool enabled)

Enable or disable enhanced metrics collection.

Parameters
enabledTrue to enable enhanced metrics (histograms, percentiles).

When enabled, additional metrics like latency histograms and throughput counters are collected. This has minimal overhead (< 100ns per operation) but can be disabled for maximum performance.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 281 of file thread_pool.cpp.

281 {
282 std::size_t worker_count = 0;
283 {
284 std::scoped_lock<std::mutex> lock(workers_mutex_);
285 worker_count = workers_.size();
286 }
287 metrics_service_->set_enhanced_metrics_enabled(enabled, worker_count);
288}

References metrics_service_, workers_, and workers_mutex_.

◆ start()

auto kcenon::thread::thread_pool::start ( void ) -> common::VoidResult

Starts the thread pool and all associated workers.

Starts all worker threads in the pool.

Returns
common::VoidResult containing an error on failure, or success value on success.

If the pool is already running, a subsequent call to start() may return an error. On success, each thread_worker in workers_ is started, enabling them to process jobs from the job_queue_.

Implementation details:

  • Validates that workers have been added to the pool
  • Starts each worker thread individually
  • If any worker fails to start, stops all workers and returns error
  • Sets start_pool_ flag to true on successful startup
  • Workers begin processing jobs from the shared queue immediately

Startup Sequence:

  1. Check that workers exist
  2. Start each worker sequentially
  3. On failure: stop all workers and return error message
  4. On success: mark pool as started

Error Handling:

  • All-or-nothing startup (if one fails, all stop)
  • Provides detailed error message from failed worker
  • Ensures consistent pool state (either all running or all stopped)
Returns
std::nullopt on success, error message on failure
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h, crash_protection/main.cpp, thread_pool_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 196 of file thread_pool.cpp.

196 {
197 // Acquire lock to check workers_ safely
198 std::scoped_lock<std::mutex> lock(workers_mutex_);
199
200 // Check if pool is already running
201 // Use acquire to ensure we see all previous modifications to pool state
202 if (start_pool_.load(std::memory_order_acquire)) {
203 return common::error_info{static_cast<int>(error_code::thread_already_running), "thread pool is already running", "thread_system"};
204 }
205
206 // Validate that workers have been added
207 if (workers_.empty()) {
208 return common::error_info{static_cast<int>(error_code::invalid_argument), "no workers to start", "thread_system"};
209 }
210
211 // Handle queue initialization for restart scenarios
212 // The approach differs based on whether we're using job_queue or queue_adapter
213 if (queue_adapter_) {
214 // Using queue_adapter (policy_queue or wrapped job_queue)
215 if (queue_adapter_->is_stopped()) {
216 // For policy_queue adapters, we can't easily recreate,
217 // so we return an error suggesting pool recreation
218 return common::error_info{
219 static_cast<int>(error_code::queue_stopped),
220 "queue is stopped; create a new thread_pool instance for restart",
221 "thread_system"};
222 }
223 // Update job_queue_ reference if adapter wraps a job_queue
224 job_queue_ = queue_adapter_->get_job_queue();
225 } else if (job_queue_ == nullptr || job_queue_->is_stopped()) {
226 // Create fresh job queue for restart scenarios
227 // Stopped queues cannot accept new jobs, so we must create a new instance
228 job_queue_ = std::make_shared<kcenon::thread::job_queue>();
229
230 // Update all workers with the new queue reference
231 for (auto& worker : workers_) {
232 worker->set_job_queue(job_queue_);
233 }
234 }
235
236 // Create fresh pool cancellation token for restart scenarios
237 // This ensures workers start with a non-cancelled token
239 metrics_service_->reset();
240
241 // Attempt to start each worker
242 for (auto& worker : workers_) {
243 auto start_result = worker->start();
244 if (start_result.is_err()) {
245 // If any worker fails, stop all and return error
246 stop();
247 return start_result.error();
248 }
249 }
250
251 // Mark pool as successfully started
252 // Use release to ensure all previous modifications (worker starts, queue setup)
253 // are visible to other threads before they see start_pool_ == true
254 start_pool_.store(true, std::memory_order_release);
255
256 return common::ok();
257}

References kcenon::thread::cancellation_token::create(), kcenon::thread::invalid_argument, kcenon::thread::queue_stopped, and kcenon::thread::thread_already_running.

Referenced by main(), and main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ stop()

auto kcenon::thread::thread_pool::stop ( const bool & immediately_stop = false) -> common::VoidResult

Stops the thread pool and all worker threads.

Parameters
immediately_stopIf true, any ongoing jobs may be interrupted; if false (default), each worker attempts to finish its current job before stopping.
Returns
common::VoidResult containing an error on failure, or success value on success.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h, crash_protection/main.cpp, thread_pool_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 518 of file thread_pool.cpp.

518 {
519 // Use compare_exchange_strong to atomically check and set state
520 // This prevents TOCTOU (Time-Of-Check-Time-Of-Use) race conditions
521 // where multiple threads might call stop() simultaneously
522 bool expected = true;
523 if (!start_pool_.compare_exchange_strong(expected, false, std::memory_order_acq_rel,
524 std::memory_order_acquire)) {
525 // Pool is already stopped or being stopped by another thread
526 return common::ok();
527 }
528
529 // At this point, we've atomically transitioned from running to stopped
530 // and only this thread will execute the shutdown sequence
531
532 // Cancel pool-level token to propagate cancellation to all workers and jobs
533 // This triggers hierarchical cancellation:
534 // 1. Pool token cancelled → linked worker tokens cancelled
535 // 2. Worker tokens cancelled → running jobs receive cancellation signal
537
538 // Stop the queue (supports both queue_adapter_ and job_queue_)
539 if (queue_adapter_) {
540 queue_adapter_->stop();
541 if (immediately_stop) {
542 queue_adapter_->clear();
543 }
544 } else if (job_queue_ != nullptr) {
545 job_queue_->stop();
546 if (immediately_stop) {
547 job_queue_->clear();
548 }
549 }
550
551 // Stop workers while holding lock to ensure consistent iteration
552 // This is safe because worker->stop() only signals and joins threads,
553 // it does not call back into thread_pool methods
554 std::scoped_lock<std::mutex> lock(workers_mutex_);
555 for (auto& worker : workers_) {
556 auto stop_result = worker->stop();
557 if (stop_result.is_err()) {
558 context_.log(common::interfaces::log_level::error, formatter::format("error stopping worker: {}",
559 stop_result.error().message));
560 }
561 }
562
563 return common::ok();
564}
void cancel()
Cancels the operation.

References kcenon::thread::utils::formatter::format().

Referenced by main(), main(), kcenon::thread::adapters::thread_pool_executor_adapter::shutdown(), and ~thread_pool().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ stop_unsafe()

auto kcenon::thread::thread_pool::stop_unsafe ( ) -> common::VoidResult
privatenoexcept

Stops the thread pool without logging (for use during static destruction).

This method performs the same shutdown sequence as stop() but without any logging operations. It is specifically designed to be called from the destructor when static destruction is in progress, preventing Static Destruction Order Fiasco (SDOF) by avoiding access to potentially destroyed logger/monitoring singletons.

Returns
common::VoidResult containing an error on failure, or success value on success.
Note
This method should only be called from the destructor during static destruction.

This method performs the same shutdown sequence as stop() but without any logging operations. It is specifically designed to be called from the destructor when static destruction is in progress, preventing Static Destruction Order Fiasco (SDOF) by avoiding access to potentially destroyed logger/monitoring singletons.

Implementation details:

  • Same atomic state transition as stop()
  • Cancels pool-level token for hierarchical cancellation
  • Stops job queue without clearing (graceful shutdown)
  • Stops all workers without logging errors
  • noexcept guarantee for safe destructor use
Returns
result_void containing an error on failure, or success value on success.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 584 of file thread_pool.cpp.

584 {
585 // Use compare_exchange_strong to atomically check and set state
586 // Same atomic transition as stop() to prevent race conditions
587 bool expected = true;
588 if (!start_pool_.compare_exchange_strong(expected, false, std::memory_order_acq_rel,
589 std::memory_order_acquire)) {
590 // Pool is already stopped or being stopped by another thread
591 return common::ok();
592 }
593
594 // Cancel pool-level token to propagate cancellation to all workers and jobs
596
597 // Stop the queue (supports both queue_adapter_ and job_queue_)
598 if (queue_adapter_) {
599 queue_adapter_->stop();
600 } else if (job_queue_ != nullptr) {
601 job_queue_->stop();
602 }
603
604 // Stop workers while holding lock to ensure consistent iteration
605 // No logging to avoid accessing potentially destroyed singletons
606 std::scoped_lock<std::mutex> lock(workers_mutex_);
607 for (auto& worker : workers_) {
608 // Stop worker without checking result to avoid any potential exceptions
609 // during static destruction
610 worker->stop();
611 }
612
613 return common::ok();
614}

References kcenon::thread::cancellation_token::cancel(), job_queue_, pool_cancellation_token_, queue_adapter_, start_pool_, workers_, and workers_mutex_.

Referenced by ~thread_pool().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ submit() [1/2]

template<typename F , typename R >
auto kcenon::thread::thread_pool::submit ( F && callable,
const submit_options & opts = {} ) -> std::future<R>
nodiscard
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 35 of file thread_pool_impl.h.

37{
38 auto job_ptr = std::make_unique<future_job<R>>(
39 std::forward<F>(callable),
40 opts.name.empty() ? "async_job" : opts.name
41 );
42
43 auto future = job_ptr->get_future();
44
45 auto result = enqueue(std::move(job_ptr));
46 if (result.is_err()) {
47 std::promise<R> error_promise;
48 error_promise.set_exception(
49 std::make_exception_ptr(
50 std::runtime_error(result.error().message)
51 )
52 );
53 return error_promise.get_future();
54 }
55
56 return future;
57}
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.
std::shared_ptr< job_interface > job_ptr
Shared pointer type for job objects.
Definition job_types.h:84

◆ submit() [2/2]

template<typename F , typename R >
auto kcenon::thread::thread_pool::submit ( std::vector< F > && callables,
const submit_options & opts = {} ) -> std::vector<std::future<R>>
nodiscard

Definition at line 60 of file thread_pool_impl.h.

62{
63 return detail::batch_apply(std::move(callables), [this, &opts](auto&& callable) {
64 submit_options single_opts;
65 single_opts.name = opts.name;
66 return submit<F, R>(std::move(callable), single_opts);
67 });
68}
auto submit(F &&callable, const submit_options &opts={}) -> std::future< R >
auto batch_apply(Container &&items, Operation &&op)
Apply an operation to each item in a collection, returning results.

References kcenon::thread::detail::batch_apply(), and kcenon::thread::submit_options::name.

Here is the call graph for this function:

◆ submit_wait_all()

template<typename F , typename R >
auto kcenon::thread::thread_pool::submit_wait_all ( std::vector< F > && callables,
const submit_options & opts = {} ) -> std::vector<R>
nodiscard

Submit a batch and wait for all results.

Convenience overload that blocks until all tasks complete. Equivalent to submit(callables, submit_options::all()).

Template Parameters
FCallable type
RReturn type (automatically deduced)
Parameters
callablesVector of functions to execute
optsSubmit options (wait_all is implicitly true)
Returns
Vector of results
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 71 of file thread_pool_impl.h.

73{
74 auto futures = submit<F, R>(std::move(callables), opts);
75 return detail::collect_all(futures);
76}
auto collect_all(std::vector< std::future< T > > &futures) -> std::vector< T >
Collect all results from a vector of futures.

References kcenon::thread::detail::collect_all().

Here is the call graph for this function:

◆ submit_wait_any()

template<typename F , typename R >
auto kcenon::thread::thread_pool::submit_wait_any ( std::vector< F > && callables,
const submit_options & opts = {} ) -> common::Result<R>
nodiscard

Submit a batch and return first completed result.

Convenience overload that returns when any task completes. Equivalent to submit(callables, submit_options::any()).

Template Parameters
FCallable type
RReturn type (automatically deduced)
Parameters
callablesVector of functions to execute
optsSubmit options (wait_any is implicitly true)
Returns
common::Result<R> containing the first completed result, or an error if callables is empty or execution failed
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 79 of file thread_pool_impl.h.

81{
82 if (callables.empty()) {
83 return make_error_result<R>(error_code::invalid_argument, "Empty callables vector");
84 }
85
86 auto futures = submit<F, R>(std::move(callables), opts);
87 auto completed = std::make_shared<std::atomic<bool>>(false);
88 auto result_promise = std::make_shared<std::promise<R>>();
89 auto result_future = result_promise->get_future();
90
91 for (std::size_t i = 0; i < futures.size(); ++i) {
92 std::thread([completed, result_promise, fut = std::move(futures[i])]() mutable {
93 try {
94 R result = fut.get();
95 bool expected = false;
96 if (completed->compare_exchange_strong(expected, true)) {
97 result_promise->set_value(std::move(result));
98 }
99 } catch (...) {
100 bool expected = false;
101 if (completed->compare_exchange_strong(expected, true)) {
102 result_promise->set_exception(std::current_exception());
103 }
104 }
105 }).detach();
106 }
107
108 try {
109 return common::Result<R>::ok(result_future.get());
110 } catch (const std::exception& e) {
112 } catch (...) {
113 return make_error_result<R>(error_code::unknown_error, "Unknown exception in submit_wait_any");
114 }
115}
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
@ completed
Successfully completed.

References kcenon::thread::completed, kcenon::thread::invalid_argument, kcenon::thread::job_execution_failed, kcenon::thread::make_error_result(), and kcenon::thread::unknown_error.

Here is the call graph for this function:

◆ to_string()

auto kcenon::thread::thread_pool::to_string ( void ) const -> std::string
nodiscard

Provides a string representation of this thread_pool.

Returns
A string describing the pool, including its title and other optional details.

Derived classes may override this to include more diagnostic or state-related info.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h, thread_pool_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 616 of file thread_pool.cpp.

616 {
617 std::string format_string;
618
619 // Use relaxed memory order for diagnostic/logging purposes
620 // Exact state ordering is not critical for debug output
621 formatter::format_to(std::back_inserter(format_string), "{} is {},\n", thread_title_,
622 start_pool_.load(std::memory_order_relaxed) ? "running" : "stopped");
623
624 // Get queue string representation (supports both queue_adapter_ and job_queue_)
625 std::string queue_str;
626 if (queue_adapter_) {
627 queue_str = queue_adapter_->to_string();
628 } else if (job_queue_ != nullptr) {
629 queue_str = job_queue_->to_string();
630 } else {
631 queue_str = "nullptr";
632 }
633 formatter::format_to(std::back_inserter(format_string), "\tjob_queue: {}\n\n", queue_str);
634
635 // Protect workers_ access with lock
636 std::scoped_lock<std::mutex> lock(workers_mutex_);
637 formatter::format_to(std::back_inserter(format_string), "\tworkers: {}\n", workers_.size());
638 for (const auto& worker : workers_) {
639 formatter::format_to(std::back_inserter(format_string), "\t{}\n", worker->to_string());
640 }
641
642 return format_string;
643}
static auto format_to(OutputIt out, const char *formats, const FormatArgs &... args) -> OutputIt
Formats a narrow-character string directly to an output iterator.
Definition formatter.h:162

References kcenon::thread::utils::formatter::format_to().

Referenced by std::formatter< kcenon::thread::thread_pool >::format(), std::formatter< kcenon::thread::thread_pool, wchar_t >::format(), kcenon::thread::diagnostics::thread_pool_diagnostics::format_thread_dump(), main(), and kcenon::thread::diagnostics::thread_pool_diagnostics::to_prometheus().

Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ autoscaler

◆ diagnostics::thread_pool_diagnostics

◆ numa_thread_pool

Member Data Documentation

◆ context_

thread_context kcenon::thread::thread_pool::context_
private

The thread context providing access to logging and monitoring services.

This context is shared with all worker threads created by this pool, enabling consistent logging and monitoring throughout the pool.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 721 of file thread_pool.h.

Referenced by report_metrics(), thread_pool(), thread_pool(), and thread_pool().

◆ diagnostics_

std::unique_ptr<diagnostics::thread_pool_diagnostics> kcenon::thread::thread_pool::diagnostics_
mutableprivate

Diagnostics interface for this pool.

Lazily initialized on first access to diagnostics().

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 776 of file thread_pool.h.

◆ diagnostics_init_flag_

std::once_flag kcenon::thread::thread_pool::diagnostics_init_flag_
mutableprivate

Once flag for thread-safe lazy initialization of diagnostics_.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 781 of file thread_pool.h.

◆ job_queue_

std::shared_ptr<job_queue> kcenon::thread::thread_pool::job_queue_
private

The shared job queue where jobs (job objects) are enqueued.

Worker threads dequeue jobs from this queue to perform tasks. The queue persists for the lifetime of the pool or until no more references exist.

Note
When queue_adapter_ is set, this may be nullptr for policy_queue adapters. Use get_queue_adapter() for unified access.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 676 of file thread_pool.h.

Referenced by get_pending_task_count(), report_metrics(), and stop_unsafe().

◆ metrics_service_

std::shared_ptr<metrics::metrics_service> kcenon::thread::thread_pool::metrics_service_
private

Centralized metrics service for all pool and worker metrics.

Consolidates metrics management that was previously spread across thread_pool and thread_worker. Provides:

  • Basic metrics (ThreadPoolMetrics)
  • Enhanced metrics with histograms and percentiles (lazily initialized)
  • Unified recording interface
  • Clear ownership semantics (pool owns, workers borrow)
See also
metrics::metrics_service
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 768 of file thread_pool.h.

Referenced by enhanced_metrics(), enhanced_metrics_snapshot(), is_enhanced_metrics_enabled(), metrics(), reset_metrics(), and set_enhanced_metrics_enabled().

◆ next_pool_instance_id_

std::atomic< std::uint32_t > kcenon::thread::thread_pool::next_pool_instance_id_ {0}
staticprivate

Static counter for generating unique pool instance IDs.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 647 of file thread_pool.h.

◆ policies_

std::vector<std::unique_ptr<pool_policy> > kcenon::thread::thread_pool::policies_
private

Registered pool policies for extending thread pool behavior.

Policies are called in order of addition for each lifecycle event. Protected by policies_mutex_ for thread-safe access.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 789 of file thread_pool.h.

◆ policies_mutex_

std::mutex kcenon::thread::thread_pool::policies_mutex_
mutableprivate

Mutex protecting concurrent access to the policies_ vector.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 794 of file thread_pool.h.

◆ pool_cancellation_token_

cancellation_token kcenon::thread::thread_pool::pool_cancellation_token_
private

Pool-level cancellation token.

This token is used to propagate cancellation to all workers and jobs when the pool is stopped. Each worker receives a linked token that combines this pool token with its own worker token, creating a hierarchical cancellation structure.

Cancellation Hierarchy:

  • Pool stop() → cancels pool_cancellation_token_
  • Pool token cancellation → propagates to all linked worker tokens
  • Worker token cancellation → propagates to running jobs
Note
This token is reset when the pool is restarted to allow fresh job execution without stale cancellation state.
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 739 of file thread_pool.h.

Referenced by stop_unsafe().

◆ pool_instance_id_

std::uint32_t kcenon::thread::thread_pool::pool_instance_id_ {0}
private

Unique instance ID for this pool (for multi-pool scenarios).

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 657 of file thread_pool.h.

657{0};

Referenced by get_pool_instance_id(), and report_metrics().

◆ queue_adapter_

std::unique_ptr<pool_queue_adapter_interface> kcenon::thread::thread_pool::queue_adapter_
private

Queue adapter for unified access to different queue types.

When set, provides a unified interface for both job_queue and policy_queue. This enables thread_pool to work with the new policy-based queue system while maintaining backward compatibility.

The adapter wraps either:

  • job_queue: Legacy queue with full feature support
  • policy_queue: New policy-based queue with compile-time configuration
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 689 of file thread_pool.h.

Referenced by get_pending_task_count(), report_metrics(), and stop_unsafe().

◆ start_pool_

std::atomic<bool> kcenon::thread::thread_pool::start_pool_
private

Indicates whether the pool is currently running.

Set to true after a successful call to start(), and reset to false after stop(). Used internally to prevent multiple active starts or erroneous state transitions.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 665 of file thread_pool.h.

Referenced by is_running(), and stop_unsafe().

◆ thread_title_

std::string kcenon::thread::thread_pool::thread_title_
private

A title or name for this thread pool, useful for identification and logging.

Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 652 of file thread_pool.h.

Referenced by report_metrics().

◆ workers_

std::vector<std::unique_ptr<thread_worker> > kcenon::thread::thread_pool::workers_
private

A collection of worker threads associated with this pool.

Each thread_worker typically runs in its own thread context, processing jobs from job_queue_ or performing specialized logic. They are started together when thread_pool::start() is called.

Thread Safety:

  • Protected by workers_mutex_ to prevent concurrent modification
  • Accessed during enqueue_worker() and stop() operations
Examples
/home/runner/work/thread_system/thread_system/include/kcenon/thread/core/thread_pool.h.

Definition at line 702 of file thread_pool.h.

Referenced by kcenon::thread::diagnostics::thread_pool_diagnostics::check_worker_health(), kcenon::thread::diagnostics::thread_pool_diagnostics::detect_bottlenecks(), kcenon::thread::diagnostics::thread_pool_diagnostics::format_thread_dump(), get_idle_worker_count(), kcenon::thread::diagnostics::thread_pool_diagnostics::health_check(), kcenon::thread::diagnostics::thread_pool_diagnostics::is_healthy(), report_metrics(), set_enhanced_metrics_enabled(), and stop_unsafe().

◆ workers_mutex_

std::mutex kcenon::thread::thread_pool::workers_mutex_
mutableprivate

The documentation for this class was generated from the following files: