Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_worker.h
Go to the documentation of this file.
1
9#pragma once
10
11// BSD 3-Clause License
12// Copyright (c) 2024, 🍀☀🌕🌥 🌊
13// See the LICENSE file in the project root for full license information.
14
22#include "worker_policy.h"
27
28#include <memory>
29#include <optional>
30#include <vector>
31#include <atomic>
32#include <mutex>
33#include <condition_variable>
34#include <functional>
35
36// Forward declarations
38{
39 class thread_pool_diagnostics;
40}
41
42namespace kcenon::thread
43{
68 {
69 public:
80 thread_worker(const bool& use_time_tag = true,
81 const thread_context& context = thread_context());
82
86 virtual ~thread_worker(void);
87
95 auto set_job_queue(std::shared_ptr<job_queue> job_queue) -> void;
96
101 auto set_context(const thread_context& context) -> void;
102
106 void set_metrics(std::shared_ptr<metrics::ThreadPoolMetrics> metrics);
107
116
125 void set_diagnostics_sample_rate(std::uint32_t rate);
126
131 void set_policy(const worker_policy& policy);
132
137 [[nodiscard]] const worker_policy& get_policy() const;
138
147
156 void set_steal_function(std::function<job*(std::size_t)> steal_fn);
157
162 [[nodiscard]] std::size_t get_worker_id() const;
163
168 [[nodiscard]] auto get_context(void) const -> const thread_context&;
169
184 [[nodiscard]] bool is_idle() const noexcept;
185
194 [[nodiscard]] std::uint64_t get_jobs_completed() const noexcept;
195
204 [[nodiscard]] std::uint64_t get_jobs_failed() const noexcept;
205
214 [[nodiscard]] std::chrono::nanoseconds get_total_busy_time() const noexcept;
215
224 [[nodiscard]] std::chrono::nanoseconds get_total_idle_time() const noexcept;
225
234 [[nodiscard]] std::chrono::steady_clock::time_point get_state_since() const noexcept;
235
244 [[nodiscard]] std::optional<diagnostics::job_info> get_current_job_info() const noexcept;
245
246 protected:
255 [[nodiscard]] auto should_continue_work() const -> bool override;
256
265 auto do_work() -> common::VoidResult override;
266
278 auto on_stop_requested() -> void override;
279
280 private:
284 static std::atomic<std::size_t> next_worker_id_;
285
289 std::size_t worker_id_{0};
290
299
306 std::shared_ptr<job_queue> job_queue_;
307
315
319 std::shared_ptr<metrics::ThreadPoolMetrics> metrics_;
320
328
337
343 std::uint32_t diagnostics_sample_rate_{1};
344
351 std::uint64_t diagnostics_counter_{0};
352
366 std::atomic<job*> current_job_{nullptr};
367
380 std::atomic<bool> is_idle_{true};
381
387 std::atomic<std::uint64_t> jobs_completed_{0};
388
394 std::atomic<std::uint64_t> jobs_failed_{0};
395
401 std::atomic<std::uint64_t> total_busy_time_ns_{0};
402
408 std::atomic<std::uint64_t> total_idle_time_ns_{0};
409
416 std::atomic<std::chrono::steady_clock::time_point::rep> state_since_rep_{
417 std::chrono::steady_clock::now().time_since_epoch().count()
418 };
419
426 std::chrono::steady_clock::time_point current_job_start_time_;
427
437 mutable std::mutex queue_mutex_;
438
444 std::condition_variable queue_cv_;
445
453
460
469 std::unique_ptr<lockfree::work_stealing_deque<job*>> local_deque_;
470
477 std::function<job*(std::size_t)> steal_function_;
478
482 std::size_t steal_victim_index_{0};
483
488 [[nodiscard]] std::unique_ptr<job> try_get_job();
489
494 [[nodiscard]] std::unique_ptr<job> try_steal_work();
495 };
496} // namespace kcenon::thread
497
498// ----------------------------------------------------------------------------
499// Formatter specializations for thread_worker
500// ----------------------------------------------------------------------------
501
513template <>
514struct std::formatter<kcenon::thread::thread_worker> : std::formatter<std::string_view>
515{
523 template <typename FormatContext>
524 auto format(const kcenon::thread::thread_worker& item, FormatContext& ctx) const
525 {
526 return std::formatter<std::string_view>::format(item.to_string(), ctx);
527 }
528};
529
535template <>
536struct std::formatter<kcenon::thread::thread_worker, wchar_t>
537 : std::formatter<std::wstring_view, wchar_t>
538{
546 template <typename FormatContext>
547 auto format(const kcenon::thread::thread_worker& item, FormatContext& ctx) const
548 {
549 auto str = item.to_string();
551 return std::formatter<std::wstring_view, wchar_t>::format(wstr, ctx);
552 }
553};
Provides a mechanism for cooperative cancellation of operations.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Lock-free work-stealing deque based on Chase-Lev algorithm.
A foundational class for implementing custom worker threads.
virtual auto to_string(void) const -> std::string
Returns a string representation of this thread_base object.
Context object that provides access to optional services.
A specialized worker thread that processes jobs from a job_queue.
diagnostics::thread_pool_diagnostics * diagnostics_
Pointer to the diagnostics instance for event tracing.
std::atomic< std::uint64_t > total_idle_time_ns_
Total time spent waiting for jobs (idle time) in nanoseconds.
std::function< job *(std::size_t)> steal_function_
Function to steal work from other workers.
cancellation_token worker_cancellation_token_
Cancellation token for this worker.
std::size_t get_worker_id() const
Get the worker ID.
std::size_t worker_id_
Unique ID for this worker instance.
std::atomic< std::uint64_t > total_busy_time_ns_
Total time spent executing jobs (busy time) in nanoseconds.
std::unique_ptr< job > try_steal_work()
Try to steal work from other workers.
std::uint64_t get_jobs_failed() const noexcept
Gets the total number of jobs that failed during execution.
void set_diagnostics(diagnostics::thread_pool_diagnostics *diag)
Set the diagnostics instance for event tracing.
void set_steal_function(std::function< job *(std::size_t)> steal_fn)
Set the steal function for finding other workers' deques.
const worker_policy & get_policy() const
Get the current worker policy.
std::chrono::nanoseconds get_total_busy_time() const noexcept
Gets the total time spent executing jobs (busy time).
static std::atomic< std::size_t > next_worker_id_
Static counter for generating unique worker IDs.
auto get_context(void) const -> const thread_context &
Gets the thread context for this worker.
void set_metrics(std::shared_ptr< metrics::ThreadPoolMetrics > metrics)
Provide shared metrics storage for this worker.
std::uint32_t diagnostics_sample_rate_
Diagnostics sampling rate (record every Nth job).
bool is_idle() const noexcept
Checks if the worker is currently idle (not processing a job).
std::shared_ptr< metrics::ThreadPoolMetrics > metrics_
Shared metrics aggregator provided by the owning thread pool.
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
Local work-stealing deque for this worker.
void set_diagnostics_sample_rate(std::uint32_t rate)
Set the diagnostics sampling rate.
thread_context context_
The thread context providing access to logging and monitoring services.
auto should_continue_work() const -> bool override
Determines if there are jobs available in the queue to continue working on.
std::uint64_t get_jobs_completed() const noexcept
Gets the total number of jobs successfully completed by this worker.
auto set_context(const thread_context &context) -> void
Sets the thread context for this worker.
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
Time point when the worker entered its current state.
bool queue_being_replaced_
Indicates whether queue replacement is in progress.
std::chrono::steady_clock::time_point current_job_start_time_
Time point when the current job started executing.
void set_policy(const worker_policy &policy)
Set the worker policy for this worker.
std::condition_variable queue_cv_
Condition variable for queue replacement synchronization.
std::size_t steal_victim_index_
Counter for round-robin steal victim selection.
std::chrono::nanoseconds get_total_idle_time() const noexcept
Gets the total time spent waiting for jobs (idle time).
auto on_stop_requested() -> void override
Called when the worker is requested to stop.
worker_policy policy_
Worker policy configuration.
std::unique_ptr< job > try_get_job()
Try to get a job from local deque first, then global queue.
lockfree::work_stealing_deque< job * > * get_local_deque() noexcept
Get the local work-stealing deque for this worker.
bool use_time_tag_
Indicates whether to use time tags or timestamps for job processing.
std::atomic< bool > is_idle_
Indicates whether the worker is currently idle (not processing a job).
std::atomic< std::uint64_t > jobs_completed_
Total number of jobs successfully completed by this worker.
virtual ~thread_worker(void)
Virtual destructor. Ensures the worker thread is stopped before destruction.
std::shared_ptr< job_queue > job_queue_
A shared pointer to the job queue from which this worker obtains jobs.
thread_worker(const bool &use_time_tag=true, const thread_context &context=thread_context())
Constructs a new thread_worker.
auto do_work() -> common::VoidResult override
Processes one or more jobs from the queue.
auto set_job_queue(std::shared_ptr< job_queue > job_queue) -> void
Sets the job_queue that this worker should process.
std::atomic< job * > current_job_
Pointer to the currently executing job.
std::mutex queue_mutex_
Mutex protecting job queue replacement.
std::atomic< std::uint64_t > jobs_failed_
Total number of jobs that failed during execution.
std::optional< diagnostics::job_info > get_current_job_info() const noexcept
Gets information about the currently executing job.
std::uint64_t diagnostics_counter_
Counter for diagnostics sampling.
std::chrono::steady_clock::time_point get_state_since() const noexcept
Gets the time when the worker entered its current state.
static auto to_wstring(const std::string &value) -> std::tuple< std::optional< std::wstring >, std::optional< std::string > >
Converts a std::string (system-encoded) to a std::wstring.
Job execution event types and listener interface for tracing.
Forward declarations for thread system types.
Implementation of a cancellation token for cooperative cancellation.
String encoding conversion, Base64 encoding/decoding utilities.
Generic formatter for enum types using user-provided converter functors.
Job information snapshot for diagnostics and monitoring.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Worker behavior policy configuration.
auto format(const kcenon::thread::thread_worker &item, FormatContext &ctx) const
Formats a thread_worker object as a wide string.
auto format(const kcenon::thread::thread_worker &item, FormatContext &ctx) const
Formats a thread_worker object as a string.
Foundational worker thread class with lifecycle management.
Context object providing access to optional thread system services.
Lightweight metrics container shared between thread_pool and workers.
Dynamic circular array work-stealing deque for lock-free task distribution.
Worker behavior policies and configuration.