13#if defined(_MSC_VER) && (defined(_M_IX86) || defined(_M_X64))
14 #include <emmintrin.h>
72 worker_id_(next_worker_id_.fetch_add(1)),
73 use_time_tag_(use_time_tag),
116 std::shared_ptr<kcenon::thread::job_queue> old_queue;
119 std::unique_lock<std::mutex> lock(queue_mutex_);
122 queue_being_replaced_ =
true;
126 queue_cv_.wait(lock, [
this] {
127 return current_job_.load(std::memory_order_acquire) ==
nullptr;
131 old_queue = job_queue_;
137 queue_being_replaced_ =
false;
140 queue_cv_.notify_all();
190 local_deque_ = std::make_unique<lockfree::work_stealing_deque<job*>>();
215 if (local_result.has_value())
217 return std::unique_ptr<job>(*local_result);
228 auto dequeue_result =
job_queue_->try_dequeue();
229 if (dequeue_result.is_ok())
231 return std::move(dequeue_result.value());
248 if (stolen !=
nullptr)
250 return std::unique_ptr<job>(stolen);
257 std::this_thread::sleep_for(backoff);
386 std::unique_lock<std::mutex> lock(queue_mutex_);
389 if (job_queue_ ==
nullptr)
398 std::shared_ptr<job_queue> local_queue = job_queue_;
403 std::unique_ptr<job> current_job;
406 if (policy_.enable_work_stealing)
411 auto local_result = local_deque_->pop();
412 if (local_result.has_value())
414 current_job = std::unique_ptr<job>(*local_result);
421 auto dequeue_result = local_queue->try_dequeue();
422 if (dequeue_result.is_ok())
424 current_job = std::move(dequeue_result.value());
431 current_job = try_steal_work();
437 is_idle_.store(
true, std::memory_order_relaxed);
438 std::this_thread::sleep_for(policy_.idle_sleep_duration);
453 constexpr int spin_count = 16;
454 for (
int i = 0; i < spin_count; ++i)
456 auto dequeue_result = local_queue->try_dequeue();
457 if (dequeue_result.is_ok())
460 current_job = std::move(dequeue_result.value());
466 #if defined(_MSC_VER)
468 #if defined(_M_IX86) || defined(_M_X64)
470 #elif defined(_M_ARM) || defined(_M_ARM64)
473 std::this_thread::yield();
475 #elif defined(__GNUC__) || defined(__clang__)
477 #if defined(__x86_64__) || defined(__i386__)
478 __builtin_ia32_pause();
479 #elif defined(__aarch64__) || defined(__arm__)
480 __asm__ __volatile__(
"yield");
482 std::this_thread::yield();
485 std::this_thread::yield();
495 is_idle_.store(
true, std::memory_order_relaxed);
499 auto dequeue_result = local_queue->dequeue();
500 if (dequeue_result.is_ok())
502 current_job = std::move(dequeue_result.value());
514 if (current_job ==
nullptr)
516 return common::error_info{
static_cast<int>(
error_code::job_invalid),
"error executing job: nullptr",
"thread_system"};
520 const auto job_id = current_job->get_job_id();
521 const auto job_name_for_event = current_job->get_name();
522 const auto enqueue_time = current_job->get_enqueue_time();
527 const bool should_trace = diagnostics_
528 && diagnostics_->is_tracing_enabled()
529 && (++diagnostics_counter_ % diagnostics_sample_rate_ == 0);
536 dequeued_event.
job_name = job_name_for_event;
538 dequeued_event.
timestamp = std::chrono::steady_clock::now();
540 dequeued_event.
thread_id = std::this_thread::get_id();
542 dequeued_event.
wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
543 dequeued_event.
timestamp - enqueue_time);
544 diagnostics_->record_event(dequeued_event);
548 auto now = std::chrono::steady_clock::now();
549 auto state_since = get_state_since();
550 if (is_idle_.load(std::memory_order_relaxed))
552 auto idle_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(now - state_since);
553 total_idle_time_ns_.fetch_add(
554 static_cast<std::uint64_t
>(idle_duration.count()),
555 std::memory_order_relaxed
560 is_idle_.store(
false, std::memory_order_relaxed);
561 state_since_rep_.store(now.time_since_epoch().count(), std::memory_order_release);
562 current_job_start_time_ = now;
565 std::optional<std::chrono::time_point<std::chrono::high_resolution_clock>>
566 started_time_point = std::nullopt;
569 started_time_point = std::chrono::high_resolution_clock::now();
574 current_job->set_job_queue(local_queue);
578 current_job->set_cancellation_token(worker_cancellation_token_);
582 current_job_.store(current_job.get(), std::memory_order_release);
589 started_event.
job_name = job_name_for_event;
591 started_event.
timestamp = std::chrono::steady_clock::now();
593 started_event.
thread_id = std::this_thread::get_id();
595 started_event.
wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
597 diagnostics_->record_event(started_event);
601 auto work_result = current_job->do_work();
602 std::uint64_t execution_duration_ns = 0;
603 if (started_time_point.has_value())
605 auto end_time = std::chrono::high_resolution_clock::now();
606 execution_duration_ns =
static_cast<std::uint64_t
>(
607 std::chrono::duration_cast<std::chrono::nanoseconds>(
608 end_time - started_time_point.value()).count());
612 std::string job_name = current_job->get_name();
618 std::lock_guard<std::mutex> notify_lock(queue_mutex_);
619 current_job_.store(
nullptr, std::memory_order_release);
630 queue_cv_.notify_all();
635 auto end_now = std::chrono::steady_clock::now();
636 auto busy_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
637 end_now - current_job_start_time_
639 total_busy_time_ns_.fetch_add(
640 static_cast<std::uint64_t
>(busy_duration.count()),
641 std::memory_order_relaxed
644 is_idle_.store(
true, std::memory_order_relaxed);
645 state_since_rep_.store(end_now.time_since_epoch().count(), std::memory_order_release);
648 if (work_result.is_err())
651 jobs_failed_.fetch_add(1, std::memory_order_relaxed);
658 failed_event.
job_name = job_name_for_event;
660 failed_event.
timestamp = std::chrono::steady_clock::now();
662 failed_event.
thread_id = std::this_thread::get_id();
664 failed_event.
wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
665 current_job_start_time_ - enqueue_time);
666 failed_event.
execution_time = std::chrono::nanoseconds(execution_duration_ns);
667 failed_event.
error_code = work_result.error().code;
669 diagnostics_->record_event(failed_event);
674 metrics_->record_execution(0,
false);
677 formatter::format(
"error executing job: {}", work_result.error().message),
"thread_system"};
681 jobs_completed_.fetch_add(1, std::memory_order_relaxed);
688 completed_event.
job_name = job_name_for_event;
690 completed_event.
timestamp = std::chrono::steady_clock::now();
692 completed_event.
thread_id = std::this_thread::get_id();
694 completed_event.
wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
695 current_job_start_time_ - enqueue_time);
696 completed_event.
execution_time = std::chrono::nanoseconds(execution_duration_ns);
697 diagnostics_->record_event(completed_event);
702 if (!started_time_point.has_value())
705 context_.log(common::interfaces::log_level::debug,
712 context_.log(common::interfaces::log_level::debug,
714 job_name, execution_duration_ns));
717 if (context_.monitoring())
719 common::interfaces::worker_metrics metrics(worker_id_);
720 metrics.jobs_processed.value = 1;
721 metrics.total_processing_time_ns.value =
static_cast<double>(execution_duration_ns);
723 context_.update_worker_metrics(worker_id_, metrics);
729 metrics_->record_execution(execution_duration_ns,
true);
752 return is_idle_.load(std::memory_order_relaxed);
789 worker_cancellation_token_.cancel();
793 std::lock_guard<std::mutex> lock(queue_mutex_);
796 auto* job_ptr = current_job_.load(std::memory_order_acquire);
799 if (job_ptr !=
nullptr)
804 auto job_token = job_ptr->get_cancellation_token();
808 context_.log(common::interfaces::log_level::debug,
810 job_ptr->get_name(), worker_id_));
837 return std::chrono::steady_clock::time_point{
838 std::chrono::steady_clock::duration{rep}
845 auto* job_ptr =
current_job_.load(std::memory_order_acquire);
846 if (job_ptr ==
nullptr)
853 info.job_name = job_ptr->get_name();
856 info.enqueue_time = job_ptr->get_enqueue_time();
857 info.executed_by = std::this_thread::get_id();
859 auto now = std::chrono::steady_clock::now();
860 info.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
864 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
Provides a mechanism for cooperative cancellation of operations.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Represents a unit of work (task) to be executed, typically by a job queue.
Lock-free work-stealing deque based on Chase-Lev algorithm.
A foundational class for implementing custom worker threads.
Context object that provides access to optional services.
diagnostics::thread_pool_diagnostics * diagnostics_
Pointer to the diagnostics instance for event tracing.
std::atomic< std::uint64_t > total_idle_time_ns_
Total time spent waiting for jobs (idle time) in nanoseconds.
std::function< job *(std::size_t)> steal_function_
Function to steal work from other workers.
std::size_t get_worker_id() const
Get the worker ID.
std::size_t worker_id_
Unique ID for this worker instance.
std::atomic< std::uint64_t > total_busy_time_ns_
Total time spent executing jobs (busy time) in nanoseconds.
std::unique_ptr< job > try_steal_work()
Try to steal work from other workers.
std::uint64_t get_jobs_failed() const noexcept
Gets the total number of jobs that failed during execution.
void set_diagnostics(diagnostics::thread_pool_diagnostics *diag)
Set the diagnostics instance for event tracing.
void set_steal_function(std::function< job *(std::size_t)> steal_fn)
Set the steal function for finding other workers' deques.
const worker_policy & get_policy() const
Get the current worker policy.
std::chrono::nanoseconds get_total_busy_time() const noexcept
Gets the total time spent executing jobs (busy time).
static std::atomic< std::size_t > next_worker_id_
Static counter for generating unique worker IDs.
auto get_context(void) const -> const thread_context &
Gets the thread context for this worker.
void set_metrics(std::shared_ptr< metrics::ThreadPoolMetrics > metrics)
Provide shared metrics storage for this worker.
std::uint32_t diagnostics_sample_rate_
Diagnostics sampling rate (record every Nth job).
bool is_idle() const noexcept
Checks if the worker is currently idle (not processing a job).
std::shared_ptr< metrics::ThreadPoolMetrics > metrics_
Shared metrics aggregator provided by the owning thread pool.
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
Local work-stealing deque for this worker.
void set_diagnostics_sample_rate(std::uint32_t rate)
Set the diagnostics sampling rate.
auto should_continue_work() const -> bool override
Determines if there are jobs available in the queue to continue working on.
std::uint64_t get_jobs_completed() const noexcept
Gets the total number of jobs successfully completed by this worker.
auto set_context(const thread_context &context) -> void
Sets the thread context for this worker.
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
Time point when the worker entered its current state.
std::chrono::steady_clock::time_point current_job_start_time_
Time point when the current job started executing.
void set_policy(const worker_policy &policy)
Set the worker policy for this worker.
std::chrono::nanoseconds get_total_idle_time() const noexcept
Gets the total time spent waiting for jobs (idle time).
auto on_stop_requested() -> void override
Called when the worker is requested to stop.
worker_policy policy_
Worker policy configuration.
std::unique_ptr< job > try_get_job()
Try to get a job from local deque first, then global queue.
lockfree::work_stealing_deque< job * > * get_local_deque() noexcept
Get the local work-stealing deque for this worker.
std::atomic< bool > is_idle_
Indicates whether the worker is currently idle (not processing a job).
std::atomic< std::uint64_t > jobs_completed_
Total number of jobs successfully completed by this worker.
virtual ~thread_worker(void)
Virtual destructor. Ensures the worker thread is stopped before destruction.
std::shared_ptr< job_queue > job_queue_
A shared pointer to the job queue from which this worker obtains jobs.
thread_worker(const bool &use_time_tag=true, const thread_context &context=thread_context())
Constructs a new thread_worker.
auto do_work() -> common::VoidResult override
Processes one or more jobs from the queue.
auto set_job_queue(std::shared_ptr< job_queue > job_queue) -> void
Sets the job_queue that this worker should process.
std::atomic< job * > current_job_
Pointer to the currently executing job.
std::mutex queue_mutex_
Mutex protecting job queue replacement.
std::atomic< std::uint64_t > jobs_failed_
Total number of jobs that failed during execution.
std::optional< diagnostics::job_info > get_current_job_info() const noexcept
Gets information about the currently executing job.
std::chrono::steady_clock::time_point get_state_since() const noexcept
Gets the time when the worker entered its current state.
@ running
Job is currently being executed.
@ dequeued
Job was taken from queue by a worker.
@ failed
Job failed with an error.
@ started
Job execution started.
@ completed
Job completed successfully.
Core threading foundation of the thread system library.
@ resource_allocation_failed
std::uint64_t job_id
Unique job identifier for DAG scheduler.
@ info
Informational messages highlighting progress.
Event data for job execution tracing.
std::string job_name
Human-readable name of the job.
std::chrono::nanoseconds wait_time
Time spent waiting in queue before dequeue.
std::optional< std::string > error_message
Error message if the job failed.
std::chrono::nanoseconds execution_time
Time spent executing the job.
std::uint64_t job_id
ID of the job this event relates to.
event_type type
Type of event that occurred.
std::optional< int > error_code
Error code if the job failed.
std::size_t worker_id
Worker ID that processed this job.
std::thread::id thread_id
ID of the thread that processed this event.
std::chrono::system_clock::time_point system_timestamp
System time when the event occurred.
std::chrono::steady_clock::time_point timestamp
Time when the event occurred.
Information about a job in the thread pool.
std::uint64_t job_id
Unique identifier for this job.
Worker behavior policy configuration.
std::chrono::microseconds steal_backoff
size_t max_steal_attempts
bool enable_work_stealing
Runtime diagnostics, health monitoring, and execution tracing for thread pools.
Specialized worker thread that processes jobs from a job_queue.