33#include <condition_variable>
39 class thread_pool_diagnostics;
106 void set_metrics(std::shared_ptr<metrics::ThreadPoolMetrics> metrics);
184 [[nodiscard]]
bool is_idle()
const noexcept;
234 [[nodiscard]] std::chrono::steady_clock::time_point
get_state_since()
const noexcept;
265 auto do_work() -> common::VoidResult
override;
319 std::shared_ptr<metrics::ThreadPoolMetrics>
metrics_;
417 std::chrono::steady_clock::now().time_since_epoch().count()
514struct std::formatter<
kcenon::thread::thread_worker> : std::formatter<std::string_view>
523 template <
typename FormatContext>
526 return std::formatter<std::string_view>::format(item.
to_string(), ctx);
536struct std::formatter<
kcenon::thread::thread_worker, wchar_t>
537 : std::formatter<std::wstring_view, wchar_t>
546 template <
typename FormatContext>
551 return std::formatter<std::wstring_view, wchar_t>::format(wstr, ctx);
Provides a mechanism for cooperative cancellation of operations.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Represents a unit of work (task) to be executed, typically by a job queue.
Lock-free work-stealing deque based on Chase-Lev algorithm.
A foundational class for implementing custom worker threads.
virtual auto to_string(void) const -> std::string
Returns a string representation of this thread_base object.
Context object that provides access to optional services.
A specialized worker thread that processes jobs from a job_queue.
diagnostics::thread_pool_diagnostics * diagnostics_
Pointer to the diagnostics instance for event tracing.
std::atomic< std::uint64_t > total_idle_time_ns_
Total time spent waiting for jobs (idle time) in nanoseconds.
std::function< job *(std::size_t)> steal_function_
Function to steal work from other workers.
cancellation_token worker_cancellation_token_
Cancellation token for this worker.
std::size_t get_worker_id() const
Get the worker ID.
std::size_t worker_id_
Unique ID for this worker instance.
std::atomic< std::uint64_t > total_busy_time_ns_
Total time spent executing jobs (busy time) in nanoseconds.
std::unique_ptr< job > try_steal_work()
Try to steal work from other workers.
std::uint64_t get_jobs_failed() const noexcept
Gets the total number of jobs that failed during execution.
void set_diagnostics(diagnostics::thread_pool_diagnostics *diag)
Set the diagnostics instance for event tracing.
void set_steal_function(std::function< job *(std::size_t)> steal_fn)
Set the steal function for finding other workers' deques.
const worker_policy & get_policy() const
Get the current worker policy.
std::chrono::nanoseconds get_total_busy_time() const noexcept
Gets the total time spent executing jobs (busy time).
static std::atomic< std::size_t > next_worker_id_
Static counter for generating unique worker IDs.
auto get_context(void) const -> const thread_context &
Gets the thread context for this worker.
void set_metrics(std::shared_ptr< metrics::ThreadPoolMetrics > metrics)
Provide shared metrics storage for this worker.
std::uint32_t diagnostics_sample_rate_
Diagnostics sampling rate (record every Nth job).
bool is_idle() const noexcept
Checks if the worker is currently idle (not processing a job).
std::shared_ptr< metrics::ThreadPoolMetrics > metrics_
Shared metrics aggregator provided by the owning thread pool.
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
Local work-stealing deque for this worker.
void set_diagnostics_sample_rate(std::uint32_t rate)
Set the diagnostics sampling rate.
thread_context context_
The thread context providing access to logging and monitoring services.
auto should_continue_work() const -> bool override
Determines if there are jobs available in the queue to continue working on.
std::uint64_t get_jobs_completed() const noexcept
Gets the total number of jobs successfully completed by this worker.
auto set_context(const thread_context &context) -> void
Sets the thread context for this worker.
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
Time point when the worker entered its current state.
bool queue_being_replaced_
Indicates whether queue replacement is in progress.
std::chrono::steady_clock::time_point current_job_start_time_
Time point when the current job started executing.
void set_policy(const worker_policy &policy)
Set the worker policy for this worker.
std::condition_variable queue_cv_
Condition variable for queue replacement synchronization.
std::size_t steal_victim_index_
Counter for round-robin steal victim selection.
std::chrono::nanoseconds get_total_idle_time() const noexcept
Gets the total time spent waiting for jobs (idle time).
auto on_stop_requested() -> void override
Called when the worker is requested to stop.
worker_policy policy_
Worker policy configuration.
std::unique_ptr< job > try_get_job()
Try to get a job from local deque first, then global queue.
lockfree::work_stealing_deque< job * > * get_local_deque() noexcept
Get the local work-stealing deque for this worker.
bool use_time_tag_
Indicates whether to use time tags or timestamps for job processing.
std::atomic< bool > is_idle_
Indicates whether the worker is currently idle (not processing a job).
std::atomic< std::uint64_t > jobs_completed_
Total number of jobs successfully completed by this worker.
virtual ~thread_worker(void)
Virtual destructor. Ensures the worker thread is stopped before destruction.
std::shared_ptr< job_queue > job_queue_
A shared pointer to the job queue from which this worker obtains jobs.
thread_worker(const bool &use_time_tag=true, const thread_context &context=thread_context())
Constructs a new thread_worker.
auto do_work() -> common::VoidResult override
Processes one or more jobs from the queue.
auto set_job_queue(std::shared_ptr< job_queue > job_queue) -> void
Sets the job_queue that this worker should process.
std::atomic< job * > current_job_
Pointer to the currently executing job.
std::mutex queue_mutex_
Mutex protecting job queue replacement.
std::atomic< std::uint64_t > jobs_failed_
Total number of jobs that failed during execution.
std::optional< diagnostics::job_info > get_current_job_info() const noexcept
Gets information about the currently executing job.
std::uint64_t diagnostics_counter_
Counter for diagnostics sampling.
std::chrono::steady_clock::time_point get_state_since() const noexcept
Gets the time when the worker entered its current state.
static auto to_wstring(const std::string &value) -> std::tuple< std::optional< std::wstring >, std::optional< std::string > >
Converts a std::string (system-encoded) to a std::wstring.
Job execution event types and listener interface for tracing.
Forward declarations for thread system types.
Implementation of a cancellation token for cooperative cancellation.
String encoding conversion, Base64 encoding/decoding utilities.
Job information snapshot for diagnostics and monitoring.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
Worker behavior policy configuration.
Foundational worker thread class with lifecycle management.
Context object providing access to optional thread system services.
Lightweight metrics container shared between thread_pool and workers.
Dynamic circular array work-stealing deque for lock-free task distribution.
Worker behavior policies and configuration.