Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_pool.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
27#pragma once
28
34 // end of thread_pools
35
50
51// Include unified feature flags from common_system if available
52#if __has_include(<kcenon/common/config/feature_flags.h>)
53#include <kcenon/common/config/feature_flags.h>
54#endif
55
56// Common system unified interfaces
57// KCENON_HAS_COMMON_EXECUTOR is defined by CMake when common_system is available
58// Fallback to 0 if not defined (standalone build without common_system)
59#ifndef KCENON_HAS_COMMON_EXECUTOR
60#define KCENON_HAS_COMMON_EXECUTOR 0
61#endif
62
63#if KCENON_HAS_COMMON_EXECUTOR
64#include <kcenon/common/interfaces/executor_interface.h>
65#endif
66
67#include "config.h"
68
69#include <tuple>
70#include <string>
71#include <memory>
72#include <mutex>
73#include <vector>
74#include <chrono>
75#include <optional>
76#include <future>
77#include <type_traits>
78
79// Forward declarations for thread_pool-specific types
80// Core forward declarations are in <kcenon/thread/forward.h>
82 class thread_pool_diagnostics;
83 struct thread_info;
84}
85
86namespace kcenon::thread {
87 // Type aliases for common policy_queue configurations
88 // Uses forward declarations from forward.h
93 >;
94
99 >;
100}
101
122namespace kcenon::thread
123{
124 // Support both old (namespace common) and new (namespace kcenon::common) versions
125 // When inside namespace kcenon::thread, 'common' resolves to kcenon::common
126#if KCENON_HAS_COMMON_EXECUTOR
127 namespace common_ns = common;
128#endif
129
181 class thread_pool : public std::enable_shared_from_this<thread_pool>
182 {
183 // Allow numa_thread_pool to access protected/private members for NUMA-specific functionality
184 friend class numa_thread_pool;
185 // Allow autoscaler to access remove_workers_internal for scaling operations
186 friend class autoscaler;
187
188 public:
198 thread_pool(const std::string& thread_title = "thread_pool",
199 const thread_context& context = thread_context());
200
210 thread_pool(const std::string& thread_title,
211 std::shared_ptr<job_queue> custom_queue,
212 const thread_context& context = thread_context());
213
238 thread_pool(const std::string& thread_title,
239 std::unique_ptr<pool_queue_adapter_interface> queue_adapter,
240 const thread_context& context = thread_context());
241
248 virtual ~thread_pool(void);
249
257 [[nodiscard]] auto get_ptr(void) -> std::shared_ptr<thread_pool>;
258
267 auto start(void) -> common::VoidResult;
268
276 [[nodiscard]] auto get_job_queue(void) -> std::shared_ptr<job_queue>;
277
281 [[nodiscard]] const metrics::ThreadPoolMetrics& metrics() const noexcept;
282
286 void reset_metrics();
287
296 void set_enhanced_metrics_enabled(bool enabled);
297
302 [[nodiscard]] bool is_enhanced_metrics_enabled() const;
303
309 [[nodiscard]] const metrics::EnhancedThreadPoolMetrics& enhanced_metrics() const;
310
317 [[nodiscard]] metrics::EnhancedSnapshot enhanced_metrics_snapshot() const;
318
324 auto enqueue(std::unique_ptr<job>&& job) -> common::VoidResult;
325
331 auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> common::VoidResult;
332
338 auto enqueue(std::unique_ptr<thread_worker>&& worker) -> common::VoidResult;
339
345 auto enqueue_batch(std::vector<std::unique_ptr<thread_worker>>&& workers)
346 -> common::VoidResult;
347
354 auto stop(const bool& immediately_stop = false) -> common::VoidResult;
355
362 [[nodiscard]] auto to_string(void) const -> std::string;
363
368 [[nodiscard]] std::uint32_t get_pool_instance_id() const;
369
376 void report_metrics();
377
382 [[nodiscard]] std::size_t get_idle_worker_count() const;
383
388 [[nodiscard]] auto get_context(void) const -> const thread_context&;
389
390 // ============================================================================
391 // Unified Submit API
392 // ============================================================================
393 // These methods provide a unified interface for job submission with
394 // configurable behavior through submit_options.
395
423 template<typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
424 [[nodiscard]] auto submit(F&& callable, const submit_options& opts = {})
425 -> std::future<R>;
426
460 template<typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
461 [[nodiscard]] auto submit(std::vector<F>&& callables, const submit_options& opts = {})
462 -> std::vector<std::future<R>>;
463
476 template<typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
477 [[nodiscard]] auto submit_wait_all(std::vector<F>&& callables,
478 const submit_options& opts = {})
479 -> std::vector<R>;
480
494 template<typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
495 [[nodiscard]] auto submit_wait_any(std::vector<F>&& callables,
496 const submit_options& opts = {})
497 -> common::Result<R>;
498
503 auto is_running() const -> bool;
504
509 auto get_pending_task_count() const -> std::size_t;
510
540 auto check_worker_health(bool restart_failed = true) -> std::size_t;
541
549 auto get_active_worker_count() const -> std::size_t;
550
551 // =========================================================================
552 // Pool Policies
553 // =========================================================================
554
576 void add_policy(std::unique_ptr<pool_policy> policy);
577
582 [[nodiscard]] auto get_policies() const -> const std::vector<std::unique_ptr<pool_policy>>&;
583
597 template<typename T = pool_policy>
598 [[nodiscard]] auto find_policy(const std::string& name) -> T*;
599
605 auto remove_policy(const std::string& name) -> bool;
606
607 // =========================================================================
608 // Diagnostics
609 // =========================================================================
610
622 [[nodiscard]] auto diagnostics() -> diagnostics::thread_pool_diagnostics&;
623
628 [[nodiscard]] auto diagnostics() const -> const diagnostics::thread_pool_diagnostics&;
629
638 [[nodiscard]] auto collect_worker_diagnostics() const
639 -> std::vector<diagnostics::thread_info>;
640
641 private:
642 // Allow diagnostics to access internal state
647 static std::atomic<std::uint32_t> next_pool_instance_id_;
648
653
657 std::uint32_t pool_instance_id_{0};
658
665 std::atomic<bool> start_pool_;
666
676 std::shared_ptr<job_queue> job_queue_;
677
689 std::unique_ptr<pool_queue_adapter_interface> queue_adapter_;
690
702 std::vector<std::unique_ptr<thread_worker>> workers_;
703
713 mutable std::mutex workers_mutex_;
714
722
740
754 auto stop_unsafe() noexcept -> common::VoidResult;
755
768 std::shared_ptr<metrics::metrics_service> metrics_service_;
769
770
777
781 mutable std::once_flag diagnostics_init_flag_;
782
789 std::vector<std::unique_ptr<pool_policy>> policies_;
790
794 mutable std::mutex policies_mutex_;
795
805 [[nodiscard]] auto remove_workers_internal(std::size_t count, std::size_t min_workers = 1)
806 -> common::VoidResult;
807 };
808} // namespace kcenon::thread
809
810// ----------------------------------------------------------------------------
811// Template method implementations for thread_pool
812// ----------------------------------------------------------------------------
813// Separated into thread_pool_impl.h for improved compilation times.
814// See thread_pool_impl.h for submit_async, submit_batch_async, submit_all, submit_any.
815
816#include <kcenon/thread/core/thread_pool_impl.h>
817
818// ----------------------------------------------------------------------------
819// Formatter specializations for thread_pool
820// ----------------------------------------------------------------------------
821// Separated into thread_pool_fmt.h for improved compilation times.
822// See thread_pool_fmt.h for std::formatter specializations.
823// For unified formatter access, include <kcenon/thread/formatters.h> instead.
824
825#define KCENON_THREAD_INTERNAL_INCLUDE
827#undef KCENON_THREAD_INTERNAL_INCLUDE
Manages automatic scaling of thread pool workers based on load metrics.
Definition autoscaler.h:95
Provides a mechanism for cooperative cancellation of operations.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Lightweight metrics container shared between thread_pool and workers.
A NUMA-aware thread pool optimized for Non-Uniform Memory Access architectures.
Lock-free synchronization policy using Michael-Scott algorithm.
Synchronization policy using mutex and condition variable.
Policy that rejects new items when queue is full.
Policy that allows unlimited queue size.
Policy-based queue template.
Base interface for thread pool policies.
Definition pool_policy.h:81
Context object that provides access to optional services.
A thread pool for concurrent execution of jobs using multiple worker threads.
static std::atomic< std::uint32_t > next_pool_instance_id_
Static counter for generating unique pool instance IDs.
auto submit_wait_any(std::vector< F > &&callables, const submit_options &opts={}) -> common::Result< R >
Submit a batch and return first completed result.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
virtual ~thread_pool(void)
Virtual destructor. Cleans up resources used by the thread pool.
auto diagnostics() -> diagnostics::thread_pool_diagnostics &
Get the diagnostics interface for this pool.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
void report_metrics()
Collect and report current thread pool metrics.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
thread_pool(const std::string &thread_title="thread_pool", const thread_context &context=thread_context())
Constructs a new thread_pool instance.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
std::unique_ptr< pool_queue_adapter_interface > queue_adapter_
Queue adapter for unified access to different queue types.
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
std::shared_ptr< metrics::metrics_service > metrics_service_
Centralized metrics service for all pool and worker metrics.
friend class diagnostics::thread_pool_diagnostics
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
void reset_metrics()
Reset accumulated metrics.
std::mutex policies_mutex_
Mutex protecting concurrent access to the policies_ vector.
const metrics::EnhancedThreadPoolMetrics & enhanced_metrics() const
Access enhanced metrics (read-only reference).
auto submit_wait_all(std::vector< F > &&callables, const submit_options &opts={}) -> std::vector< R >
Submit a batch and wait for all results.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.
std::once_flag diagnostics_init_flag_
Once flag for thread-safe lazy initialization of diagnostics_.
auto remove_policy(const std::string &name) -> bool
Remove a policy by name.
auto collect_worker_diagnostics() const -> std::vector< diagnostics::thread_info >
Collects diagnostics information from all workers.
void add_policy(std::unique_ptr< pool_policy > policy)
Add a policy to the pool.
auto is_running() const -> bool
Check if the thread pool is currently running.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
std::string thread_title_
A title or name for this thread pool, useful for identification and logging.
std::uint32_t get_pool_instance_id() const
Get the pool instance id.
auto find_policy(const std::string &name) -> T *
Find a policy by name.
std::unique_ptr< diagnostics::thread_pool_diagnostics > diagnostics_
Diagnostics interface for this pool.
auto stop_unsafe() noexcept -> common::VoidResult
Stops the thread pool without logging (for use during static destruction).
void set_enhanced_metrics_enabled(bool enabled)
Enable or disable enhanced metrics collection.
std::atomic< bool > start_pool_
Indicates whether the pool is currently running.
auto submit(F &&callable, const submit_options &opts={}) -> std::future< R >
cancellation_token pool_cancellation_token_
Pool-level cancellation token.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
auto get_policies() const -> const std::vector< std::unique_ptr< pool_policy > > &
Get all registered policies.
std::vector< std::unique_ptr< pool_policy > > policies_
Registered pool policies for extending thread pool behavior.
std::shared_ptr< job_queue > job_queue_
The shared job queue where jobs (job objects) are enqueued.
auto check_worker_health(bool restart_failed=true) -> std::size_t
Check health of all worker threads and restart failed workers.
auto remove_workers_internal(std::size_t count, std::size_t min_workers=1) -> common::VoidResult
Internal method to remove workers from the pool.
std::uint32_t pool_instance_id_
Unique instance ID for this pool (for multi-pool scenarios).
thread_context context_
The thread context providing access to logging and monitoring services.
auto get_job_queue(void) -> std::shared_ptr< job_queue >
Returns the shared job_queue used by this thread pool.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto get_ptr(void) -> std::shared_ptr< thread_pool >
Retrieves a std::shared_ptr to this thread_pool instance.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.
A specialized worker thread that processes jobs from a job_queue.
Central configuration for thread_pool module.
Enhanced metrics snapshot with latency percentiles and throughput.
Forward declarations for thread system types.
Implementation of a cancellation token for cooperative cancellation.
String encoding conversion, Base64 encoding/decoding utilities.
Generic formatter for enum types using user-provided converter functors.
Provides macros for generating std::formatter specializations.
Thread-safe FIFO job queue with optional bounded size.
Centralized metrics service for thread pool metrics management.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Base interface for extensible thread pool behavior policies.
Abstract interface for queue adapters used by thread_pool.
Options for submitting jobs to the thread pool.
Options struct for unified submit() API.
Context object providing access to optional thread system services.
std::formatter specializations for thread_pool
Lightweight metrics container shared between thread_pool and workers.
Specialized worker thread that processes jobs from a job_queue.