|
Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
|
A specialized worker thread that processes jobs from a job_queue.
More...
#include <thread_worker.h>


Public Member Functions | |
| thread_worker (const bool &use_time_tag=true, const thread_context &context=thread_context()) | |
Constructs a new thread_worker. | |
| virtual | ~thread_worker (void) |
| Virtual destructor. Ensures the worker thread is stopped before destruction. | |
| auto | set_job_queue (std::shared_ptr< job_queue > job_queue) -> void |
Sets the job_queue that this worker should process. | |
| auto | set_context (const thread_context &context) -> void |
| Sets the thread context for this worker. | |
| void | set_metrics (std::shared_ptr< metrics::ThreadPoolMetrics > metrics) |
| Provide shared metrics storage for this worker. | |
| void | set_diagnostics (diagnostics::thread_pool_diagnostics *diag) |
| Set the diagnostics instance for event tracing. | |
| void | set_diagnostics_sample_rate (std::uint32_t rate) |
| Set the diagnostics sampling rate. | |
| void | set_policy (const worker_policy &policy) |
| Set the worker policy for this worker. | |
| const worker_policy & | get_policy () const |
| Get the current worker policy. | |
| lockfree::work_stealing_deque< job * > * | get_local_deque () noexcept |
| Get the local work-stealing deque for this worker. | |
| void | set_steal_function (std::function< job *(std::size_t)> steal_fn) |
| Set the steal function for finding other workers' deques. | |
| std::size_t | get_worker_id () const |
| Get the worker ID. | |
| auto | get_context (void) const -> const thread_context & |
| Gets the thread context for this worker. | |
| bool | is_idle () const noexcept |
| Checks if the worker is currently idle (not processing a job). | |
| std::uint64_t | get_jobs_completed () const noexcept |
| Gets the total number of jobs successfully completed by this worker. | |
| std::uint64_t | get_jobs_failed () const noexcept |
| Gets the total number of jobs that failed during execution. | |
| std::chrono::nanoseconds | get_total_busy_time () const noexcept |
| Gets the total time spent executing jobs (busy time). | |
| std::chrono::nanoseconds | get_total_idle_time () const noexcept |
| Gets the total time spent waiting for jobs (idle time). | |
| std::chrono::steady_clock::time_point | get_state_since () const noexcept |
| Gets the time when the worker entered its current state. | |
| std::optional< diagnostics::job_info > | get_current_job_info () const noexcept |
| Gets information about the currently executing job. | |
Public Member Functions inherited from kcenon::thread::thread_base | |
| thread_base (const thread_base &)=delete | |
| thread_base & | operator= (const thread_base &)=delete |
| thread_base (thread_base &&)=delete | |
| thread_base & | operator= (thread_base &&)=delete |
| thread_base (const std::string &thread_title="thread_base") | |
Constructs a new thread_base object. | |
| virtual | ~thread_base (void) |
| Virtual destructor. Ensures proper cleanup of derived classes. | |
| auto | set_wake_interval (const std::optional< std::chrono::milliseconds > &wake_interval) -> void |
| Sets the interval at which the worker thread should wake up (if any). | |
| auto | get_wake_interval () const -> std::optional< std::chrono::milliseconds > |
| Gets the current wake interval setting. | |
| auto | start (void) -> common::VoidResult |
| Starts the worker thread. | |
| auto | stop (void) -> common::VoidResult |
| Requests the worker thread to stop and waits for it to finish. | |
| auto | get_thread_title () const -> std::string |
| Returns the worker thread's title. | |
| auto | is_running () const -> bool |
| Checks whether the worker thread is currently running. | |
| auto | get_thread_id () const -> std::thread::id |
| Gets the native thread ID of the worker thread. | |
| virtual auto | to_string (void) const -> std::string |
Returns a string representation of this thread_base object. | |
Protected Member Functions | |
| auto | should_continue_work () const -> bool override |
| Determines if there are jobs available in the queue to continue working on. | |
| auto | do_work () -> common::VoidResult override |
| Processes one or more jobs from the queue. | |
| auto | on_stop_requested () -> void override |
| Called when the worker is requested to stop. | |
Protected Member Functions inherited from kcenon::thread::thread_base | |
| virtual auto | before_start (void) -> common::VoidResult |
| Called just before the worker thread starts running. | |
| virtual auto | after_stop (void) -> common::VoidResult |
| Called immediately after the worker thread has stopped. | |
Private Member Functions | |
| std::unique_ptr< job > | try_get_job () |
| Try to get a job from local deque first, then global queue. | |
| std::unique_ptr< job > | try_steal_work () |
| Try to steal work from other workers. | |
Private Attributes | |
| std::size_t | worker_id_ {0} |
| Unique ID for this worker instance. | |
| bool | use_time_tag_ |
| Indicates whether to use time tags or timestamps for job processing. | |
| std::shared_ptr< job_queue > | job_queue_ |
| A shared pointer to the job queue from which this worker obtains jobs. | |
| thread_context | context_ |
| The thread context providing access to logging and monitoring services. | |
| std::shared_ptr< metrics::ThreadPoolMetrics > | metrics_ |
| Shared metrics aggregator provided by the owning thread pool. | |
| diagnostics::thread_pool_diagnostics * | diagnostics_ {nullptr} |
| Pointer to the diagnostics instance for event tracing. | |
| cancellation_token | worker_cancellation_token_ |
| Cancellation token for this worker. | |
| std::uint32_t | diagnostics_sample_rate_ {1} |
| Diagnostics sampling rate (record every Nth job). | |
| std::uint64_t | diagnostics_counter_ {0} |
| Counter for diagnostics sampling. | |
| std::atomic< job * > | current_job_ {nullptr} |
| Pointer to the currently executing job. | |
| std::atomic< bool > | is_idle_ {true} |
| Indicates whether the worker is currently idle (not processing a job). | |
| std::atomic< std::uint64_t > | jobs_completed_ {0} |
| Total number of jobs successfully completed by this worker. | |
| std::atomic< std::uint64_t > | jobs_failed_ {0} |
| Total number of jobs that failed during execution. | |
| std::atomic< std::uint64_t > | total_busy_time_ns_ {0} |
| Total time spent executing jobs (busy time) in nanoseconds. | |
| std::atomic< std::uint64_t > | total_idle_time_ns_ {0} |
| Total time spent waiting for jobs (idle time) in nanoseconds. | |
| std::atomic< std::chrono::steady_clock::time_point::rep > | state_since_rep_ |
| Time point when the worker entered its current state. | |
| std::chrono::steady_clock::time_point | current_job_start_time_ |
| Time point when the current job started executing. | |
| std::mutex | queue_mutex_ |
| Mutex protecting job queue replacement. | |
| std::condition_variable | queue_cv_ |
| Condition variable for queue replacement synchronization. | |
| bool | queue_being_replaced_ {false} |
| Indicates whether queue replacement is in progress. | |
| worker_policy | policy_ |
| Worker policy configuration. | |
| std::unique_ptr< lockfree::work_stealing_deque< job * > > | local_deque_ |
| Local work-stealing deque for this worker. | |
| std::function< job *(std::size_t)> | steal_function_ |
| Function to steal work from other workers. | |
| std::size_t | steal_victim_index_ {0} |
| Counter for round-robin steal victim selection. | |
Static Private Attributes | |
| static std::atomic< std::size_t > | next_worker_id_ {0} |
| Static counter for generating unique worker IDs. | |
Additional Inherited Members | |
Protected Attributes inherited from kcenon::thread::thread_base | |
| std::optional< std::chrono::milliseconds > | wake_interval_ |
| Interval at which the thread is optionally awakened. | |
A specialized worker thread that processes jobs from a job_queue.
The thread_worker class inherits from thread_base, leveraging its life-cycle control methods (start, stop, etc.) and provides an implementation for job processing using a shared job_queue. By overriding should_continue_work() and do_work(), it polls the queue for available jobs and executes them.
Definition at line 67 of file thread_worker.h.
| kcenon::thread::thread_worker::thread_worker | ( | const bool & | use_time_tag = true, |
| const thread_context & | context = thread_context() ) |
Constructs a new thread_worker.
Constructs a worker thread with optional timing capabilities.
| use_time_tag | If set to true (default), the worker may log or utilize timestamps/tags when processing jobs. |
| context | Optional thread context for logging and monitoring (defaults to empty context). |
This flag can be used to measure job durations, implement logging with timestamps, or any other time-related features in your job processing. The context provides access to logging and monitoring services.
Implementation details:
Performance Timing:
| use_time_tag | If true, enables timing measurements for job execution |
| context | Thread context providing logging and monitoring services |
Definition at line 70 of file thread_worker.cpp.
|
virtual |
Virtual destructor. Ensures the worker thread is stopped before destruction.
Destroys the worker thread.
Implementation details:
Definition at line 89 of file thread_worker.cpp.
|
overrideprotectedvirtual |
Processes one or more jobs from the queue.
Executes a single work cycle by processing one job from the queue.
common::VoidResult containing an error if the work fails, or success value otherwise.This method fetches a job from the queue (if available), executes it, and may repeat depending on the implementation. If any job fails, an error is returned. Otherwise, return a success value.
Implementation details:
Job Processing Workflow:
Hybrid Wait Strategy:
Performance Characteristics:
Error Handling:
Performance Measurements:
Logging Behavior:
Reimplemented from kcenon::thread::thread_base.
Definition at line 383 of file thread_worker.cpp.
References kcenon::thread::diagnostics::completed, kcenon::thread::diagnostics::dequeued, kcenon::thread::diagnostics::job_execution_event::error_code, kcenon::thread::diagnostics::job_execution_event::error_message, kcenon::thread::diagnostics::job_execution_event::execution_time, kcenon::thread::diagnostics::failed, kcenon::thread::utils::formatter::format(), kcenon::thread::job_execution_failed, kcenon::thread::diagnostics::job_execution_event::job_id, kcenon::thread::job_invalid, kcenon::thread::diagnostics::job_execution_event::job_name, kcenon::thread::resource_allocation_failed, kcenon::thread::diagnostics::started, kcenon::thread::diagnostics::job_execution_event::system_timestamp, kcenon::thread::diagnostics::job_execution_event::thread_id, kcenon::thread::diagnostics::job_execution_event::timestamp, kcenon::thread::diagnostics::job_execution_event::type, kcenon::thread::diagnostics::job_execution_event::wait_time, and kcenon::thread::diagnostics::job_execution_event::worker_id.

|
nodiscard |
Gets the thread context for this worker.
Definition at line 269 of file thread_worker.cpp.
|
nodiscardnoexcept |
Gets information about the currently executing job.
Thread Safety:
Definition at line 842 of file thread_worker.cpp.
References current_job_, current_job_start_time_, kcenon::thread::info, kcenon::thread::diagnostics::job_info::job_id, queue_mutex_, and kcenon::thread::diagnostics::running.
|
nodiscardnoexcept |
Gets the total number of jobs successfully completed by this worker.
Thread Safety:
Definition at line 814 of file thread_worker.cpp.
References jobs_completed_.
|
nodiscardnoexcept |
Gets the total number of jobs that failed during execution.
Thread Safety:
Definition at line 819 of file thread_worker.cpp.
References jobs_failed_.
|
nodiscardnoexcept |
Get the local work-stealing deque for this worker.
This deque is used for work-stealing: other workers can steal jobs from this worker's local deque when they are idle.
Definition at line 199 of file thread_worker.cpp.
References local_deque_.
|
nodiscard |
Get the current worker policy.
Definition at line 194 of file thread_worker.cpp.
References policy_.
|
nodiscardnoexcept |
Gets the time when the worker entered its current state.
Thread Safety:
Definition at line 834 of file thread_worker.cpp.
References state_since_rep_.
|
nodiscardnoexcept |
Gets the total time spent executing jobs (busy time).
Thread Safety:
Definition at line 824 of file thread_worker.cpp.
References total_busy_time_ns_.
|
nodiscardnoexcept |
Gets the total time spent waiting for jobs (idle time).
Thread Safety:
Definition at line 829 of file thread_worker.cpp.
References total_idle_time_ns_.
|
nodiscard |
Get the worker ID.
Definition at line 735 of file thread_worker.cpp.
References worker_id_.
|
nodiscardnoexcept |
Checks if the worker is currently idle (not processing a job).
true if the worker is idle (waiting for jobs), false if actively processing.Thread Safety:
Use Cases:
Implementation details:
Definition at line 750 of file thread_worker.cpp.
References is_idle_.
|
overrideprotectedvirtual |
Called when the worker is requested to stop.
Propagates cancellation signal to the currently executing job.
Overrides the base class hook to propagate cancellation to the currently executing job (if any). This allows jobs to cooperatively cancel when the worker thread is stopped.
Thread Safety:
Implementation details:
Cancellation Propagation:
Thread Safety (Issue #225 fix):
Race Condition Fixed:
Reimplemented from kcenon::thread::thread_base.
Definition at line 784 of file thread_worker.cpp.
References kcenon::thread::utils::formatter::format().

| auto kcenon::thread::thread_worker::set_context | ( | const thread_context & | context | ) | -> void |
Sets the thread context for this worker.
| context | The thread context providing access to logging and monitoring services. |
Implementation details:
| context | Thread context with logging and monitoring services |
Definition at line 163 of file thread_worker.cpp.
| void kcenon::thread::thread_worker::set_diagnostics | ( | diagnostics::thread_pool_diagnostics * | diag | ) |
Set the diagnostics instance for event tracing.
| diag | Pointer to the diagnostics instance. |
When set, the worker will record execution events to the diagnostics instance if tracing is enabled. If nullptr, no events are recorded.
Definition at line 173 of file thread_worker.cpp.
References diagnostics_.
| void kcenon::thread::thread_worker::set_diagnostics_sample_rate | ( | std::uint32_t | rate | ) |
Set the diagnostics sampling rate.
| rate | Record diagnostics events every Nth job (1 = every job). |
When rate > 1, only every Nth job records diagnostic events, reducing clock-read overhead while still providing representative data. The is_tracing_enabled() check remains the top-level gate.
Definition at line 178 of file thread_worker.cpp.
References diagnostics_sample_rate_.
| auto kcenon::thread::thread_worker::set_job_queue | ( | std::shared_ptr< job_queue > | job_queue | ) | -> void |
Sets the job_queue that this worker should process.
Associates this worker with a job queue for processing.
| job_queue | A shared pointer to the queue containing jobs. |
Once the queue is set and start() is called, the worker will repeatedly poll the queue for new jobs and process them.
Implementation details:
Queue Replacement Synchronization:
Thread Safety:
| job_queue | Shared pointer to the job queue for this worker |
Definition at line 114 of file thread_worker.cpp.
| void kcenon::thread::thread_worker::set_metrics | ( | std::shared_ptr< metrics::ThreadPoolMetrics > | metrics | ) |
Provide shared metrics storage for this worker.
Definition at line 168 of file thread_worker.cpp.
References metrics_.
| void kcenon::thread::thread_worker::set_policy | ( | const worker_policy & | policy | ) |
Set the worker policy for this worker.
| policy | The worker policy configuration. |
Definition at line 183 of file thread_worker.cpp.
References kcenon::thread::worker_policy::enable_work_stealing, local_deque_, and policy_.
| void kcenon::thread::thread_worker::set_steal_function | ( | std::function< job *(std::size_t)> | steal_fn | ) |
Set the steal function for finding other workers' deques.
| steal_fn | Function that returns a job to steal, or nullptr. |
The steal function is called when this worker's local deque and the global queue are both empty. It should try to steal work from other workers.
Definition at line 204 of file thread_worker.cpp.
References steal_function_.
|
nodiscardoverrideprotectedvirtual |
Determines if there are jobs available in the queue to continue working on.
Determines if the worker should continue processing jobs.
true if there is work in the queue, false otherwise.Called in the thread's main loop (defined by thread_base) to decide if do_work() should be invoked. Returns true if the job queue is not empty; otherwise, false.
Implementation details:
Work Loop Control:
Design Rationale - Solving the Two-Level Condition Variable Problem:
Shutdown Safety:
Thread Safety:
Reimplemented from kcenon::thread::thread_base.
Definition at line 317 of file thread_worker.cpp.
References job_queue_, and queue_mutex_.
|
nodiscardprivate |
Try to get a job from local deque first, then global queue.
Definition at line 209 of file thread_worker.cpp.
References kcenon::thread::worker_policy::enable_work_stealing, job_queue_, local_deque_, policy_, and queue_mutex_.
|
nodiscardprivate |
Try to steal work from other workers.
Definition at line 237 of file thread_worker.cpp.
References kcenon::thread::worker_policy::enable_work_stealing, kcenon::thread::worker_policy::max_steal_attempts, policy_, kcenon::thread::worker_policy::steal_backoff, steal_function_, and worker_id_.
|
private |
The thread context providing access to logging and monitoring services.
This context enables the worker to log messages and report metrics through the configured services.
Definition at line 314 of file thread_worker.h.
|
private |
Pointer to the currently executing job.
This is set atomically at the start of job execution and cleared when the job completes. Used by on_stop_requested() to cancel the running job.
Memory Ordering:
Definition at line 366 of file thread_worker.h.
Referenced by get_current_job_info().
|
private |
Time point when the current job started executing.
Used to track job execution time for diagnostics. Only valid when a job is currently executing.
Definition at line 426 of file thread_worker.h.
Referenced by get_current_job_info().
|
private |
Pointer to the diagnostics instance for event tracing.
When set, the worker records execution events if tracing is enabled. Raw pointer because the diagnostics outlives the worker.
Definition at line 327 of file thread_worker.h.
Referenced by set_diagnostics().
|
private |
Counter for diagnostics sampling.
Incremented on each job execution. Diagnostics events are recorded when (diagnostics_counter_ % diagnostics_sample_rate_ == 0).
Definition at line 351 of file thread_worker.h.
|
private |
Diagnostics sampling rate (record every Nth job).
Default is 1 (every job) for backward compatibility.
Definition at line 343 of file thread_worker.h.
Referenced by set_diagnostics_sample_rate().
|
private |
Indicates whether the worker is currently idle (not processing a job).
This flag is set to true when the worker is waiting for jobs and false when actively processing a job. Updated atomically for thread-safe access.
Memory Ordering:
Definition at line 380 of file thread_worker.h.
Referenced by is_idle().
|
private |
A shared pointer to the job queue from which this worker obtains jobs.
Multiple workers can share the same queue, enabling concurrent processing of queued jobs.
Definition at line 306 of file thread_worker.h.
Referenced by should_continue_work(), and try_get_job().
|
private |
Total number of jobs successfully completed by this worker.
Incremented atomically after each successful job execution.
Definition at line 387 of file thread_worker.h.
Referenced by get_jobs_completed().
|
private |
Total number of jobs that failed during execution.
Incremented atomically when a job's do_work() returns an error.
Definition at line 394 of file thread_worker.h.
Referenced by get_jobs_failed().
|
private |
Local work-stealing deque for this worker.
When work-stealing is enabled, jobs submitted to this worker are stored in this deque. The owner (this worker) can push/pop from the bottom (LIFO), while other workers can steal from the top (FIFO).
Definition at line 469 of file thread_worker.h.
Referenced by get_local_deque(), set_policy(), and try_get_job().
|
private |
Shared metrics aggregator provided by the owning thread pool.
Definition at line 319 of file thread_worker.h.
Referenced by set_metrics().
|
staticprivate |
Static counter for generating unique worker IDs.
Definition at line 284 of file thread_worker.h.
|
private |
Worker policy configuration.
Controls worker behavior including work-stealing settings.
Definition at line 459 of file thread_worker.h.
Referenced by get_policy(), set_policy(), try_get_job(), and try_steal_work().
|
private |
Indicates whether queue replacement is in progress.
When true, the worker thread should wait before accessing the queue. Protected by queue_mutex_.
Definition at line 452 of file thread_worker.h.
|
private |
Condition variable for queue replacement synchronization.
Used to wait for current job completion before replacing the queue.
Definition at line 444 of file thread_worker.h.
|
mutableprivate |
Mutex protecting job queue replacement.
This mutex synchronizes access to job_queue_ during replacement operations to prevent race conditions between do_work(), set_job_queue(), and should_continue_work().
Definition at line 437 of file thread_worker.h.
Referenced by get_current_job_info(), should_continue_work(), and try_get_job().
|
private |
Time point when the worker entered its current state.
Updated when transitioning between idle and busy states. Used to calculate current state duration.
Definition at line 416 of file thread_worker.h.
Referenced by get_state_since().
|
private |
Function to steal work from other workers.
This function is provided by the thread pool and returns a stolen job from another worker's deque, or nullptr if no work available.
Definition at line 477 of file thread_worker.h.
Referenced by set_steal_function(), and try_steal_work().
|
private |
Counter for round-robin steal victim selection.
Definition at line 482 of file thread_worker.h.
|
private |
Total time spent executing jobs (busy time) in nanoseconds.
Accumulated after each job execution with the job's execution duration.
Definition at line 401 of file thread_worker.h.
Referenced by get_total_busy_time().
|
private |
Total time spent waiting for jobs (idle time) in nanoseconds.
Accumulated when transitioning from idle to busy state.
Definition at line 408 of file thread_worker.h.
Referenced by get_total_idle_time().
|
private |
Indicates whether to use time tags or timestamps for job processing.
When true, the worker might record timestamps (e.g., job start/end times) or log them for debugging/monitoring. The exact usage depends on the job and override details in derived classes.
Definition at line 298 of file thread_worker.h.
|
private |
Cancellation token for this worker.
This token is propagated to jobs during execution, allowing them to cooperatively cancel when the worker is stopped. The token is cancelled in on_stop_requested().
Definition at line 336 of file thread_worker.h.
|
private |
Unique ID for this worker instance.
Definition at line 289 of file thread_worker.h.
Referenced by get_worker_id(), and try_steal_work().