Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::thread_worker Class Reference

A specialized worker thread that processes jobs from a job_queue. More...

#include <thread_worker.h>

Inheritance diagram for kcenon::thread::thread_worker:
Inheritance graph
Collaboration diagram for kcenon::thread::thread_worker:
Collaboration graph

Public Member Functions

 thread_worker (const bool &use_time_tag=true, const thread_context &context=thread_context())
 Constructs a new thread_worker.
 
virtual ~thread_worker (void)
 Virtual destructor. Ensures the worker thread is stopped before destruction.
 
auto set_job_queue (std::shared_ptr< job_queue > job_queue) -> void
 Sets the job_queue that this worker should process.
 
auto set_context (const thread_context &context) -> void
 Sets the thread context for this worker.
 
void set_metrics (std::shared_ptr< metrics::ThreadPoolMetrics > metrics)
 Provide shared metrics storage for this worker.
 
void set_diagnostics (diagnostics::thread_pool_diagnostics *diag)
 Set the diagnostics instance for event tracing.
 
void set_diagnostics_sample_rate (std::uint32_t rate)
 Set the diagnostics sampling rate.
 
void set_policy (const worker_policy &policy)
 Set the worker policy for this worker.
 
const worker_policyget_policy () const
 Get the current worker policy.
 
lockfree::work_stealing_deque< job * > * get_local_deque () noexcept
 Get the local work-stealing deque for this worker.
 
void set_steal_function (std::function< job *(std::size_t)> steal_fn)
 Set the steal function for finding other workers' deques.
 
std::size_t get_worker_id () const
 Get the worker ID.
 
auto get_context (void) const -> const thread_context &
 Gets the thread context for this worker.
 
bool is_idle () const noexcept
 Checks if the worker is currently idle (not processing a job).
 
std::uint64_t get_jobs_completed () const noexcept
 Gets the total number of jobs successfully completed by this worker.
 
std::uint64_t get_jobs_failed () const noexcept
 Gets the total number of jobs that failed during execution.
 
std::chrono::nanoseconds get_total_busy_time () const noexcept
 Gets the total time spent executing jobs (busy time).
 
std::chrono::nanoseconds get_total_idle_time () const noexcept
 Gets the total time spent waiting for jobs (idle time).
 
std::chrono::steady_clock::time_point get_state_since () const noexcept
 Gets the time when the worker entered its current state.
 
std::optional< diagnostics::job_infoget_current_job_info () const noexcept
 Gets information about the currently executing job.
 
- Public Member Functions inherited from kcenon::thread::thread_base
 thread_base (const thread_base &)=delete
 
thread_baseoperator= (const thread_base &)=delete
 
 thread_base (thread_base &&)=delete
 
thread_baseoperator= (thread_base &&)=delete
 
 thread_base (const std::string &thread_title="thread_base")
 Constructs a new thread_base object.
 
virtual ~thread_base (void)
 Virtual destructor. Ensures proper cleanup of derived classes.
 
auto set_wake_interval (const std::optional< std::chrono::milliseconds > &wake_interval) -> void
 Sets the interval at which the worker thread should wake up (if any).
 
auto get_wake_interval () const -> std::optional< std::chrono::milliseconds >
 Gets the current wake interval setting.
 
auto start (void) -> common::VoidResult
 Starts the worker thread.
 
auto stop (void) -> common::VoidResult
 Requests the worker thread to stop and waits for it to finish.
 
auto get_thread_title () const -> std::string
 Returns the worker thread's title.
 
auto is_running () const -> bool
 Checks whether the worker thread is currently running.
 
auto get_thread_id () const -> std::thread::id
 Gets the native thread ID of the worker thread.
 
virtual auto to_string (void) const -> std::string
 Returns a string representation of this thread_base object.
 

Protected Member Functions

auto should_continue_work () const -> bool override
 Determines if there are jobs available in the queue to continue working on.
 
auto do_work () -> common::VoidResult override
 Processes one or more jobs from the queue.
 
auto on_stop_requested () -> void override
 Called when the worker is requested to stop.
 
- Protected Member Functions inherited from kcenon::thread::thread_base
virtual auto before_start (void) -> common::VoidResult
 Called just before the worker thread starts running.
 
virtual auto after_stop (void) -> common::VoidResult
 Called immediately after the worker thread has stopped.
 

Private Member Functions

std::unique_ptr< jobtry_get_job ()
 Try to get a job from local deque first, then global queue.
 
std::unique_ptr< jobtry_steal_work ()
 Try to steal work from other workers.
 

Private Attributes

std::size_t worker_id_ {0}
 Unique ID for this worker instance.
 
bool use_time_tag_
 Indicates whether to use time tags or timestamps for job processing.
 
std::shared_ptr< job_queuejob_queue_
 A shared pointer to the job queue from which this worker obtains jobs.
 
thread_context context_
 The thread context providing access to logging and monitoring services.
 
std::shared_ptr< metrics::ThreadPoolMetricsmetrics_
 Shared metrics aggregator provided by the owning thread pool.
 
diagnostics::thread_pool_diagnosticsdiagnostics_ {nullptr}
 Pointer to the diagnostics instance for event tracing.
 
cancellation_token worker_cancellation_token_
 Cancellation token for this worker.
 
std::uint32_t diagnostics_sample_rate_ {1}
 Diagnostics sampling rate (record every Nth job).
 
std::uint64_t diagnostics_counter_ {0}
 Counter for diagnostics sampling.
 
std::atomic< job * > current_job_ {nullptr}
 Pointer to the currently executing job.
 
std::atomic< bool > is_idle_ {true}
 Indicates whether the worker is currently idle (not processing a job).
 
std::atomic< std::uint64_t > jobs_completed_ {0}
 Total number of jobs successfully completed by this worker.
 
std::atomic< std::uint64_t > jobs_failed_ {0}
 Total number of jobs that failed during execution.
 
std::atomic< std::uint64_t > total_busy_time_ns_ {0}
 Total time spent executing jobs (busy time) in nanoseconds.
 
std::atomic< std::uint64_t > total_idle_time_ns_ {0}
 Total time spent waiting for jobs (idle time) in nanoseconds.
 
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
 Time point when the worker entered its current state.
 
std::chrono::steady_clock::time_point current_job_start_time_
 Time point when the current job started executing.
 
std::mutex queue_mutex_
 Mutex protecting job queue replacement.
 
std::condition_variable queue_cv_
 Condition variable for queue replacement synchronization.
 
bool queue_being_replaced_ {false}
 Indicates whether queue replacement is in progress.
 
worker_policy policy_
 Worker policy configuration.
 
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
 Local work-stealing deque for this worker.
 
std::function< job *(std::size_t)> steal_function_
 Function to steal work from other workers.
 
std::size_t steal_victim_index_ {0}
 Counter for round-robin steal victim selection.
 

Static Private Attributes

static std::atomic< std::size_t > next_worker_id_ {0}
 Static counter for generating unique worker IDs.
 

Additional Inherited Members

- Protected Attributes inherited from kcenon::thread::thread_base
std::optional< std::chrono::milliseconds > wake_interval_
 Interval at which the thread is optionally awakened.
 

Detailed Description

A specialized worker thread that processes jobs from a job_queue.

The thread_worker class inherits from thread_base, leveraging its life-cycle control methods (start, stop, etc.) and provides an implementation for job processing using a shared job_queue. By overriding should_continue_work() and do_work(), it polls the queue for available jobs and executes them.

Typical Usage

auto my_queue = std::make_shared<job_queue>();
auto worker = std::make_unique<thread_worker>();
worker->set_job_queue(my_queue);
worker->start(); // Worker thread begins processing jobs
// Enqueue jobs into my_queue...
// Eventually...
worker->stop(); // Waits for current job to finish, then stops
Examples
crash_protection/main.cpp.

Definition at line 67 of file thread_worker.h.

Constructor & Destructor Documentation

◆ thread_worker()

kcenon::thread::thread_worker::thread_worker ( const bool & use_time_tag = true,
const thread_context & context = thread_context() )

Constructs a new thread_worker.

Constructs a worker thread with optional timing capabilities.

Parameters
use_time_tagIf set to true (default), the worker may log or utilize timestamps/tags when processing jobs.
contextOptional thread context for logging and monitoring (defaults to empty context).

This flag can be used to measure job durations, implement logging with timestamps, or any other time-related features in your job processing. The context provides access to logging and monitoring services.

Implementation details:

  • Inherits from thread_base to get thread management functionality
  • Sets descriptive name "thread_worker" for debugging and logging
  • Initializes timing flag for optional performance measurement
  • Job queue is not set initially (must be set before starting work)
  • Stores thread context for logging and monitoring
  • Creates a cancellation token for job cancellation support

Performance Timing:

  • When enabled, measures execution time for each job
  • Uses high_resolution_clock for precise measurements
  • Minimal overhead when disabled (single boolean check)
Parameters
use_time_tagIf true, enables timing measurements for job execution
contextThread context providing logging and monitoring services

Definition at line 70 of file thread_worker.cpp.

71 : thread_base("thread_worker"),
72 worker_id_(next_worker_id_.fetch_add(1)),
73 use_time_tag_(use_time_tag),
74 job_queue_(nullptr),
75 context_(context),
77 current_job_(nullptr)
78 {
79 }
static cancellation_token create()
Creates a new cancellation token.
thread_base(const thread_base &)=delete
cancellation_token worker_cancellation_token_
Cancellation token for this worker.
std::size_t worker_id_
Unique ID for this worker instance.
static std::atomic< std::size_t > next_worker_id_
Static counter for generating unique worker IDs.
thread_context context_
The thread context providing access to logging and monitoring services.
bool use_time_tag_
Indicates whether to use time tags or timestamps for job processing.
std::shared_ptr< job_queue > job_queue_
A shared pointer to the job queue from which this worker obtains jobs.
std::atomic< job * > current_job_
Pointer to the currently executing job.

◆ ~thread_worker()

kcenon::thread::thread_worker::~thread_worker ( void )
virtual

Virtual destructor. Ensures the worker thread is stopped before destruction.

Destroys the worker thread.

Implementation details:

  • Base class destructor handles thread shutdown
  • No manual cleanup required due to RAII design
  • Shared pointer to job queue is automatically released

Definition at line 89 of file thread_worker.cpp.

89{}

Member Function Documentation

◆ do_work()

auto kcenon::thread::thread_worker::do_work ( void ) -> common::VoidResult
overrideprotectedvirtual

Processes one or more jobs from the queue.

Executes a single work cycle by processing one job from the queue.

Returns
common::VoidResult containing an error if the work fails, or success value otherwise.

This method fetches a job from the queue (if available), executes it, and may repeat depending on the implementation. If any job fails, an error is returned. Otherwise, return a success value.

Implementation details:

  • Uses non-blocking try_dequeue() to avoid condition variable deadlock
  • Polls the queue with minimal CPU overhead via short sleep intervals
  • Validates job pointer before execution
  • Optionally measures execution timing for performance analysis
  • Associates job with queue for potential re-submission
  • Logs execution results with appropriate detail level

Job Processing Workflow:

  1. Validate job queue availability
  2. Attempt non-blocking dequeue of next job
  3. If queue is empty: sleep briefly and return (will be called again)
  4. Validate dequeued job pointer
  5. Optionally record start time for measurement
  6. Associate job with queue for context
  7. Execute job's do_work() method
  8. Handle execution errors with detailed logging
  9. Log successful completion with timing info if enabled

Hybrid Wait Strategy:

  • Phase 1: Bounded spin (16 iterations) with CPU pause hints for fast pickup
  • Phase 2: Blocking dequeue() on job_queue's condition_variable
  • Wakes immediately on enqueue (notify_one) or stop (notify_all)
  • should_continue_work() determines when to exit (on queue stop)

Performance Characteristics:

  • Sub-microsecond pickup when jobs arrive during spin phase
  • Near-zero idle CPU usage (CV blocking, not polling)
  • Immediate wake-up on enqueue (<100μs vs previous 10ms floor)
  • No busy-waiting overhead when idle

Error Handling:

  • Missing job queue: Returns resource allocation error
  • Empty queue: Returns success after brief sleep (normal polling)
  • Null job pointer: Returns job invalid error
  • Job execution failure: Returns execution failed error with details

Performance Measurements:

  • High-resolution timing when use_time_tag_ is enabled
  • Nanosecond precision for accurate profiling
  • Minimal overhead when timing is disabled

Logging Behavior:

  • Standard success message when timing is disabled
  • Timestamped success message when timing is enabled
  • Error details are propagated up the call stack
Returns
result_void indicating success or detailed error information

Reimplemented from kcenon::thread::thread_base.

Definition at line 383 of file thread_worker.cpp.

384 {
385 // Acquire lock to safely get queue pointer
386 std::unique_lock<std::mutex> lock(queue_mutex_);
387
388 // Validate that job queue is available for processing
389 if (job_queue_ == nullptr)
390 {
391 lock.unlock();
392 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "there is no job_queue", "thread_system"};
393 }
394
395 // Make a local copy of the queue pointer while holding the lock
396 // The shared_ptr keeps the queue alive even if set_job_queue() replaces it
397 // No need to wait for !queue_being_replaced_ - the local copy is safe to use
398 std::shared_ptr<job_queue> local_queue = job_queue_;
399
400 // Release lock before dequeuing to allow other operations
401 lock.unlock();
402
403 std::unique_ptr<job> current_job;
404
405 // Work-stealing path: try local deque first, then global queue, then steal
407 {
408 // Step 1: Try local deque (LIFO for cache locality)
409 if (local_deque_)
410 {
411 auto local_result = local_deque_->pop();
412 if (local_result.has_value())
413 {
414 current_job = std::unique_ptr<job>(*local_result);
415 }
416 }
417
418 // Step 2: Try global queue
419 if (!current_job)
420 {
421 auto dequeue_result = local_queue->try_dequeue();
422 if (dequeue_result.is_ok())
423 {
424 current_job = std::move(dequeue_result.value());
425 }
426 }
427
428 // Step 3: Try to steal from other workers
429 if (!current_job)
430 {
431 current_job = try_steal_work();
432 }
433
434 // No work found - mark as idle and sleep
435 if (!current_job)
436 {
437 is_idle_.store(true, std::memory_order_relaxed);
438 std::this_thread::sleep_for(policy_.idle_sleep_duration);
439 return common::ok();
440 }
441 }
442 else
443 {
444 // Original non-work-stealing path
445 // Hybrid wait strategy: short spin followed by blocking dequeue
446 // This approach provides:
447 // - Sub-ms pickup latency (via spin loop)
448 // - Near-zero idle CPU usage (via blocking dequeue)
449 // - No busy-waiting overhead
450
451 // Phase 1: Short bounded spin (16 iterations)
452 // Optimized for scenarios where jobs arrive quickly
453 constexpr int spin_count = 16;
454 for (int i = 0; i < spin_count; ++i)
455 {
456 auto dequeue_result = local_queue->try_dequeue();
457 if (dequeue_result.is_ok())
458 {
459 // Job found during spin - fast path
460 current_job = std::move(dequeue_result.value());
461 break;
462 }
463
464 // CPU pause hint to reduce contention and power consumption
465 // Different intrinsics per compiler and architecture
466 #if defined(_MSC_VER)
467 // MSVC: Use _mm_pause() for x86/x64, YieldProcessor() for ARM
468 #if defined(_M_IX86) || defined(_M_X64)
469 _mm_pause();
470 #elif defined(_M_ARM) || defined(_M_ARM64)
471 __yield();
472 #else
473 std::this_thread::yield();
474 #endif
475 #elif defined(__GNUC__) || defined(__clang__)
476 // GCC/Clang: Use builtin functions
477 #if defined(__x86_64__) || defined(__i386__)
478 __builtin_ia32_pause();
479 #elif defined(__aarch64__) || defined(__arm__)
480 __asm__ __volatile__("yield");
481 #else
482 std::this_thread::yield();
483 #endif
484 #else
485 std::this_thread::yield();
486 #endif
487 }
488
489 // Phase 2: Block on job_queue's condition variable if spin didn't find a job
490 // Uses the queue's blocking dequeue() which waits on its internal CV.
491 // This wakes immediately when a new job is enqueued (via notify_one)
492 // or when the queue is stopped (via notify_all in job_queue::stop).
493 if (!current_job)
494 {
495 is_idle_.store(true, std::memory_order_relaxed);
496
497 // Blocking dequeue: waits on job_queue's condition_variable
498 // Wakes on: enqueue() notify_one, or stop() notify_all
499 auto dequeue_result = local_queue->dequeue();
500 if (dequeue_result.is_ok())
501 {
502 current_job = std::move(dequeue_result.value());
503 }
504 else
505 {
506 // Queue is stopped or empty after wake — return to let
507 // should_continue_work() decide whether to exit
508 return common::ok();
509 }
510 }
511 }
512
513 // Validate job pointer
514 if (current_job == nullptr)
515 {
516 return common::error_info{static_cast<int>(error_code::job_invalid), "error executing job: nullptr", "thread_system"};
517 }
518
519 // Capture job info for event tracing (before any state changes)
520 const auto job_id = current_job->get_job_id();
521 const auto job_name_for_event = current_job->get_name();
522 const auto enqueue_time = current_job->get_enqueue_time();
523
524 // Determine whether this job should record diagnostics events.
525 // The sampling counter reduces clock-read overhead by skipping
526 // events for most jobs when sample_rate > 1.
527 const bool should_trace = diagnostics_
530
531 // Record dequeued event if tracing is enabled and sampled
532 if (should_trace)
533 {
534 diagnostics::job_execution_event dequeued_event;
535 dequeued_event.job_id = job_id;
536 dequeued_event.job_name = job_name_for_event;
537 dequeued_event.type = diagnostics::event_type::dequeued;
538 dequeued_event.timestamp = std::chrono::steady_clock::now();
539 dequeued_event.system_timestamp = std::chrono::system_clock::now();
540 dequeued_event.thread_id = std::this_thread::get_id();
541 dequeued_event.worker_id = worker_id_;
542 dequeued_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
543 dequeued_event.timestamp - enqueue_time);
544 diagnostics_->record_event(dequeued_event);
545 }
546
547 // Update idle time statistics before transitioning to busy state
548 auto now = std::chrono::steady_clock::now();
549 auto state_since = get_state_since();
550 if (is_idle_.load(std::memory_order_relaxed))
551 {
552 auto idle_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(now - state_since);
553 total_idle_time_ns_.fetch_add(
554 static_cast<std::uint64_t>(idle_duration.count()),
555 std::memory_order_relaxed
556 );
557 }
558
559 // Mark worker as busy and update state timestamp
560 is_idle_.store(false, std::memory_order_relaxed);
561 state_since_rep_.store(now.time_since_epoch().count(), std::memory_order_release);
563
564 // Initialize timing measurement if performance monitoring is enabled
565 std::optional<std::chrono::time_point<std::chrono::high_resolution_clock>>
566 started_time_point = std::nullopt;
567 if (use_time_tag_)
568 {
569 started_time_point = std::chrono::high_resolution_clock::now();
570 }
571
572 // Associate the job with its source queue for potential re-submission
573 // Use local_queue to avoid race with set_job_queue()
574 current_job->set_job_queue(local_queue);
575
576 // Set cancellation token on the job for cooperative cancellation
577 // This allows the job to check if it should cancel during execution
578 current_job->set_cancellation_token(worker_cancellation_token_);
579
580 // Track currently executing job atomically for on_stop_requested()
581 // Use release ordering to ensure job state is visible to cancellation thread
582 current_job_.store(current_job.get(), std::memory_order_release);
583
584 // Record started event if tracing is enabled and sampled
585 if (should_trace)
586 {
587 diagnostics::job_execution_event started_event;
588 started_event.job_id = job_id;
589 started_event.job_name = job_name_for_event;
590 started_event.type = diagnostics::event_type::started;
591 started_event.timestamp = std::chrono::steady_clock::now();
592 started_event.system_timestamp = std::chrono::system_clock::now();
593 started_event.thread_id = std::this_thread::get_id();
594 started_event.worker_id = worker_id_;
595 started_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
596 started_event.timestamp - enqueue_time);
597 diagnostics_->record_event(started_event);
598 }
599
600 // Execute the job's work method and capture the result
601 auto work_result = current_job->do_work();
602 std::uint64_t execution_duration_ns = 0;
603 if (started_time_point.has_value())
604 {
605 auto end_time = std::chrono::high_resolution_clock::now();
606 execution_duration_ns = static_cast<std::uint64_t>(
607 std::chrono::duration_cast<std::chrono::nanoseconds>(
608 end_time - started_time_point.value()).count());
609 }
610
611 // Capture job name before clearing (for logging after job destruction)
612 std::string job_name = current_job->get_name();
613
614 // Clear current job tracking and destroy job under mutex protection
615 // This prevents race condition with on_stop_requested() (Issue #225)
616 // The mutex ensures job is not accessed while being destroyed
617 {
618 std::lock_guard<std::mutex> notify_lock(queue_mutex_);
619 current_job_.store(nullptr, std::memory_order_release);
620
621 // Explicitly destroy the job while holding the mutex
622 // This is critical for thread safety - on_stop_requested() acquires
623 // the same mutex before accessing current_job_, ensuring it cannot
624 // access a job that is being destroyed
625 current_job.reset();
626
627 // Notify any waiting set_job_queue() that job has completed
628 // This allows queue replacement to proceed safely
629 // Lock is held to prevent lost wakeup between predicate check and wait
630 queue_cv_.notify_all();
631 }
632
633 // Update busy time and state transition for diagnostics
634 {
635 auto end_now = std::chrono::steady_clock::now();
636 auto busy_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
638 );
639 total_busy_time_ns_.fetch_add(
640 static_cast<std::uint64_t>(busy_duration.count()),
641 std::memory_order_relaxed
642 );
643 // Transition back to idle state
644 is_idle_.store(true, std::memory_order_relaxed);
645 state_since_rep_.store(end_now.time_since_epoch().count(), std::memory_order_release);
646 }
647
648 if (work_result.is_err())
649 {
650 // Increment failed job counter
651 jobs_failed_.fetch_add(1, std::memory_order_relaxed);
652
653 // Record failed event if tracing is enabled and sampled
654 if (should_trace)
655 {
656 diagnostics::job_execution_event failed_event;
657 failed_event.job_id = job_id;
658 failed_event.job_name = job_name_for_event;
659 failed_event.type = diagnostics::event_type::failed;
660 failed_event.timestamp = std::chrono::steady_clock::now();
661 failed_event.system_timestamp = std::chrono::system_clock::now();
662 failed_event.thread_id = std::this_thread::get_id();
663 failed_event.worker_id = worker_id_;
664 failed_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
665 current_job_start_time_ - enqueue_time);
666 failed_event.execution_time = std::chrono::nanoseconds(execution_duration_ns);
667 failed_event.error_code = work_result.error().code;
668 failed_event.error_message = work_result.error().message;
669 diagnostics_->record_event(failed_event);
670 }
671
672 if (metrics_)
673 {
674 metrics_->record_execution(0, false);
675 }
676 return common::error_info{static_cast<int>(error_code::job_execution_failed),
677 formatter::format("error executing job: {}", work_result.error().message), "thread_system"};
678 }
679
680 // Increment completed job counter
681 jobs_completed_.fetch_add(1, std::memory_order_relaxed);
682
683 // Record completed event if tracing is enabled and sampled
684 if (should_trace)
685 {
686 diagnostics::job_execution_event completed_event;
687 completed_event.job_id = job_id;
688 completed_event.job_name = job_name_for_event;
689 completed_event.type = diagnostics::event_type::completed;
690 completed_event.timestamp = std::chrono::steady_clock::now();
691 completed_event.system_timestamp = std::chrono::system_clock::now();
692 completed_event.thread_id = std::this_thread::get_id();
693 completed_event.worker_id = worker_id_;
694 completed_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
695 current_job_start_time_ - enqueue_time);
696 completed_event.execution_time = std::chrono::nanoseconds(execution_duration_ns);
697 diagnostics_->record_event(completed_event);
698 }
699
700 // Log successful job completion based on timing configuration
701 // Note: Using captured job_name since job is already destroyed
702 if (!started_time_point.has_value())
703 {
704 // Standard logging without timing information
705 context_.log(common::interfaces::log_level::debug,
706 formatter::format("job executed successfully: {} on thread_worker",
707 job_name));
708 }
709 else
710 {
711 // Enhanced logging with execution timing information
712 context_.log(common::interfaces::log_level::debug,
713 formatter::format("job executed successfully: {} on thread_worker ({}ns)",
714 job_name, execution_duration_ns));
715
716 // Update worker metrics if monitoring is available
717 if (context_.monitoring())
718 {
719 common::interfaces::worker_metrics metrics(worker_id_);
720 metrics.jobs_processed.value = 1;
721 metrics.total_processing_time_ns.value = static_cast<double>(execution_duration_ns);
722 // Use proper worker ID instead of thread hash
724 }
725 }
726
727 if (metrics_)
728 {
729 metrics_->record_execution(execution_duration_ns, true);
730 }
731
732 return common::ok();
733 }
auto is_tracing_enabled() const -> bool
Checks if tracing is enabled.
void record_event(const job_execution_event &event)
Records a job execution event.
std::shared_ptr< IMonitor > monitoring() const
Get the monitoring service.
void log(common::interfaces::log_level level, const std::string &message) const
Log a message if logger is available.
void update_worker_metrics(std::size_t worker_id, const common::interfaces::worker_metrics &metrics) const
Update worker metrics if monitoring is available.
diagnostics::thread_pool_diagnostics * diagnostics_
Pointer to the diagnostics instance for event tracing.
std::atomic< std::uint64_t > total_idle_time_ns_
Total time spent waiting for jobs (idle time) in nanoseconds.
std::atomic< std::uint64_t > total_busy_time_ns_
Total time spent executing jobs (busy time) in nanoseconds.
std::unique_ptr< job > try_steal_work()
Try to steal work from other workers.
std::uint32_t diagnostics_sample_rate_
Diagnostics sampling rate (record every Nth job).
std::shared_ptr< metrics::ThreadPoolMetrics > metrics_
Shared metrics aggregator provided by the owning thread pool.
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
Local work-stealing deque for this worker.
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
Time point when the worker entered its current state.
std::chrono::steady_clock::time_point current_job_start_time_
Time point when the current job started executing.
std::condition_variable queue_cv_
Condition variable for queue replacement synchronization.
worker_policy policy_
Worker policy configuration.
std::atomic< bool > is_idle_
Indicates whether the worker is currently idle (not processing a job).
std::atomic< std::uint64_t > jobs_completed_
Total number of jobs successfully completed by this worker.
std::mutex queue_mutex_
Mutex protecting job queue replacement.
std::atomic< std::uint64_t > jobs_failed_
Total number of jobs that failed during execution.
std::uint64_t diagnostics_counter_
Counter for diagnostics sampling.
std::chrono::steady_clock::time_point get_state_since() const noexcept
Gets the time when the worker entered its current state.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
@ dequeued
Job was taken from queue by a worker.
@ failed
Job failed with an error.
@ completed
Job completed successfully.
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33
std::chrono::microseconds idle_sleep_duration

References kcenon::thread::diagnostics::completed, kcenon::thread::diagnostics::dequeued, kcenon::thread::diagnostics::job_execution_event::error_code, kcenon::thread::diagnostics::job_execution_event::error_message, kcenon::thread::diagnostics::job_execution_event::execution_time, kcenon::thread::diagnostics::failed, kcenon::thread::utils::formatter::format(), kcenon::thread::job_execution_failed, kcenon::thread::diagnostics::job_execution_event::job_id, kcenon::thread::job_invalid, kcenon::thread::diagnostics::job_execution_event::job_name, kcenon::thread::resource_allocation_failed, kcenon::thread::diagnostics::started, kcenon::thread::diagnostics::job_execution_event::system_timestamp, kcenon::thread::diagnostics::job_execution_event::thread_id, kcenon::thread::diagnostics::job_execution_event::timestamp, kcenon::thread::diagnostics::job_execution_event::type, kcenon::thread::diagnostics::job_execution_event::wait_time, and kcenon::thread::diagnostics::job_execution_event::worker_id.

Here is the call graph for this function:

◆ get_context()

auto kcenon::thread::thread_worker::get_context ( void ) const -> const thread_context&
nodiscard

Gets the thread context for this worker.

Returns
The thread context providing access to logging and monitoring services.
The thread context providing access to logging and monitoring services

Definition at line 269 of file thread_worker.cpp.

270 {
271 return context_;
272 }

◆ get_current_job_info()

std::optional< diagnostics::job_info > kcenon::thread::thread_worker::get_current_job_info ( ) const
nodiscardnoexcept

Gets information about the currently executing job.

Returns
Optional job_info if a job is currently executing, std::nullopt otherwise.

Thread Safety:

  • Safe to call from any thread
  • Provides snapshot of current state

Definition at line 842 of file thread_worker.cpp.

843 {
844 std::lock_guard<std::mutex> lock(queue_mutex_);
845 auto* job_ptr = current_job_.load(std::memory_order_acquire);
846 if (job_ptr == nullptr)
847 {
848 return std::nullopt;
849 }
850
851 diagnostics::job_info info;
852 info.job_id = job_ptr->get_job_id();
853 info.job_name = job_ptr->get_name();
855 info.start_time = current_job_start_time_;
856 info.enqueue_time = job_ptr->get_enqueue_time();
857 info.executed_by = std::this_thread::get_id();
858
859 auto now = std::chrono::steady_clock::now();
860 info.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
862 );
863 // Calculate wait time from enqueue to start
864 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
865 current_job_start_time_ - job_ptr->get_enqueue_time()
866 );
867
868 return info;
869 }
@ running
Job is currently being executed.
std::shared_ptr< job_interface > job_ptr
Shared pointer type for job objects.
Definition job_types.h:84
@ info
Informational messages highlighting progress.

References current_job_, current_job_start_time_, kcenon::thread::info, kcenon::thread::diagnostics::job_info::job_id, queue_mutex_, and kcenon::thread::diagnostics::running.

◆ get_jobs_completed()

std::uint64_t kcenon::thread::thread_worker::get_jobs_completed ( ) const
nodiscardnoexcept

Gets the total number of jobs successfully completed by this worker.

Returns
Count of successfully completed jobs.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic load with relaxed memory ordering

Definition at line 814 of file thread_worker.cpp.

815 {
816 return jobs_completed_.load(std::memory_order_relaxed);
817 }

References jobs_completed_.

◆ get_jobs_failed()

std::uint64_t kcenon::thread::thread_worker::get_jobs_failed ( ) const
nodiscardnoexcept

Gets the total number of jobs that failed during execution.

Returns
Count of failed jobs.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic load with relaxed memory ordering

Definition at line 819 of file thread_worker.cpp.

820 {
821 return jobs_failed_.load(std::memory_order_relaxed);
822 }

References jobs_failed_.

◆ get_local_deque()

lockfree::work_stealing_deque< job * > * kcenon::thread::thread_worker::get_local_deque ( )
nodiscardnoexcept

Get the local work-stealing deque for this worker.

Returns
Pointer to the local deque (nullptr if work-stealing disabled).

This deque is used for work-stealing: other workers can steal jobs from this worker's local deque when they are idle.

Definition at line 199 of file thread_worker.cpp.

200{
201 return local_deque_.get();
202}

References local_deque_.

◆ get_policy()

const worker_policy & kcenon::thread::thread_worker::get_policy ( ) const
nodiscard

Get the current worker policy.

Returns
The worker policy configuration.

Definition at line 194 of file thread_worker.cpp.

195{
196 return policy_;
197}

References policy_.

◆ get_state_since()

std::chrono::steady_clock::time_point kcenon::thread::thread_worker::get_state_since ( ) const
nodiscardnoexcept

Gets the time when the worker entered its current state.

Returns
Time point when current state was entered.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic load with acquire memory ordering

Definition at line 834 of file thread_worker.cpp.

835 {
836 auto rep = state_since_rep_.load(std::memory_order_acquire);
837 return std::chrono::steady_clock::time_point{
838 std::chrono::steady_clock::duration{rep}
839 };
840 }

References state_since_rep_.

◆ get_total_busy_time()

std::chrono::nanoseconds kcenon::thread::thread_worker::get_total_busy_time ( ) const
nodiscardnoexcept

Gets the total time spent executing jobs (busy time).

Returns
Duration of busy time in nanoseconds.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic load with relaxed memory ordering

Definition at line 824 of file thread_worker.cpp.

825 {
826 return std::chrono::nanoseconds{total_busy_time_ns_.load(std::memory_order_relaxed)};
827 }

References total_busy_time_ns_.

◆ get_total_idle_time()

std::chrono::nanoseconds kcenon::thread::thread_worker::get_total_idle_time ( ) const
nodiscardnoexcept

Gets the total time spent waiting for jobs (idle time).

Returns
Duration of idle time in nanoseconds.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic load with relaxed memory ordering

Definition at line 829 of file thread_worker.cpp.

830 {
831 return std::chrono::nanoseconds{total_idle_time_ns_.load(std::memory_order_relaxed)};
832 }

References total_idle_time_ns_.

◆ get_worker_id()

std::size_t kcenon::thread::thread_worker::get_worker_id ( ) const
nodiscard

Get the worker ID.

Returns
The unique ID for this worker instance.

Definition at line 735 of file thread_worker.cpp.

736 {
737 return worker_id_;
738 }

References worker_id_.

◆ is_idle()

bool kcenon::thread::thread_worker::is_idle ( ) const
nodiscardnoexcept

Checks if the worker is currently idle (not processing a job).

Returns
true if the worker is idle (waiting for jobs), false if actively processing.

Thread Safety:

  • Safe to call from any thread
  • Uses atomic operations for lock-free access
  • Provides snapshot of current state (may change immediately after return)

Use Cases:

  • Thread pool statistics and monitoring
  • Load balancing decisions
  • Performance analysis

Implementation details:

  • Returns current state of is_idle_ flag
  • Relaxed memory ordering sufficient (advisory-only value)
  • No synchronization needed (snapshot of current state)
Returns
true if worker is idle, false if actively processing a job

Definition at line 750 of file thread_worker.cpp.

751 {
752 return is_idle_.load(std::memory_order_relaxed);
753 }

References is_idle_.

◆ on_stop_requested()

auto kcenon::thread::thread_worker::on_stop_requested ( void ) -> void
overrideprotectedvirtual

Called when the worker is requested to stop.

Propagates cancellation signal to the currently executing job.

Overrides the base class hook to propagate cancellation to the currently executing job (if any). This allows jobs to cooperatively cancel when the worker thread is stopped.

Thread Safety:

  • Called from thread requesting stop (not worker thread)
  • Safe concurrent access with do_work() via atomic operations

Implementation details:

  • Called from thread_base::stop() when worker shutdown is requested
  • First cancels the worker's cancellation token (affects all future jobs)
  • Then directly cancels the current job's token if a job is running
  • Uses mutex synchronization to prevent use-after-free race condition

Cancellation Propagation:

  1. Cancel worker_cancellation_token_ (prevents new jobs from starting)
  2. Acquire queue_mutex_ to safely access current job
  3. If a job is running, get its cancellation token and cancel it
  4. Job will detect cancellation on its next is_cancelled() check

Thread Safety (Issue #225 fix):

  • Called from stop() thread (not worker thread)
  • Uses queue_mutex_ to synchronize with do_work() job destruction
  • Prevents data race between job destructor and virtual method call
  • This fixes EXC_BAD_ACCESS on ARM64 caused by use-after-free

Race Condition Fixed:

  • Before: on_stop_requested() could access job while do_work() was destroying it
  • After: queue_mutex_ ensures job is not destroyed while being accessed
  • do_work() now destroys job while holding the mutex
Note
This implements cooperative cancellation - the job must check its cancellation token periodically to actually stop execution.

Reimplemented from kcenon::thread::thread_base.

Definition at line 784 of file thread_worker.cpp.

785 {
786 // Cancel the worker's token first
787 // This ensures any future jobs will see cancellation immediately
788 // Jobs receive this token via set_cancellation_token() in do_work()
790
791 // Acquire mutex to safely access current job
792 // This prevents race condition with do_work() job destruction (Issue #225)
793 std::lock_guard<std::mutex> lock(queue_mutex_);
794
795 // Load the currently executing job pointer atomically
796 auto* job_ptr = current_job_.load(std::memory_order_acquire);
797
798 // If a job is currently executing, cancel it directly
799 if (job_ptr != nullptr)
800 {
801 // Get the job's cancellation token and cancel it
802 // This provides redundancy in case the job cached its token before
803 // we cancelled worker_cancellation_token_
804 auto job_token = job_ptr->get_cancellation_token();
805 job_token.cancel();
806
807 // Log cancellation attempt for debugging
808 context_.log(common::interfaces::log_level::debug,
809 formatter::format("Cancellation requested for job: {} on worker {}",
810 job_ptr->get_name(), worker_id_));
811 }
812 }
void cancel()
Cancels the operation.

References kcenon::thread::utils::formatter::format().

Here is the call graph for this function:

◆ set_context()

auto kcenon::thread::thread_worker::set_context ( const thread_context & context) -> void

Sets the thread context for this worker.

Parameters
contextThe thread context providing access to logging and monitoring services.

Implementation details:

  • Stores the context for use in logging and monitoring
  • Should be called before starting the worker thread
  • Context provides access to optional services
Parameters
contextThread context with logging and monitoring services

Definition at line 163 of file thread_worker.cpp.

164{
165 context_ = context;
166}

◆ set_diagnostics()

void kcenon::thread::thread_worker::set_diagnostics ( diagnostics::thread_pool_diagnostics * diag)

Set the diagnostics instance for event tracing.

Parameters
diagPointer to the diagnostics instance.

When set, the worker will record execution events to the diagnostics instance if tracing is enabled. If nullptr, no events are recorded.

Definition at line 173 of file thread_worker.cpp.

174{
175 diagnostics_ = diag;
176}

References diagnostics_.

◆ set_diagnostics_sample_rate()

void kcenon::thread::thread_worker::set_diagnostics_sample_rate ( std::uint32_t rate)

Set the diagnostics sampling rate.

Parameters
rateRecord diagnostics events every Nth job (1 = every job).

When rate > 1, only every Nth job records diagnostic events, reducing clock-read overhead while still providing representative data. The is_tracing_enabled() check remains the top-level gate.

Definition at line 178 of file thread_worker.cpp.

179{
180 diagnostics_sample_rate_ = (rate > 0) ? rate : 1;
181}

References diagnostics_sample_rate_.

◆ set_job_queue()

auto kcenon::thread::thread_worker::set_job_queue ( std::shared_ptr< job_queue > job_queue) -> void

Sets the job_queue that this worker should process.

Associates this worker with a job queue for processing.

Parameters
job_queueA shared pointer to the queue containing jobs.

Once the queue is set and start() is called, the worker will repeatedly poll the queue for new jobs and process them.

Implementation details:

  • Stores shared pointer to enable job dequeuing
  • Thread-safe queue replacement with proper synchronization
  • Waits for current job completion before replacing queue
  • Multiple workers can share the same job queue

Queue Replacement Synchronization:

  • Acquires mutex to prevent concurrent do_work() access
  • Sets replacement flag to prevent new job processing
  • Waits for current job to complete (current_job_ == nullptr)
  • Replaces queue pointer atomically
  • Notifies worker thread to resume

Thread Safety:

  • Safe to call from any thread
  • Coordinates with do_work() via mutex and condition variable
  • Prevents use-after-free during queue replacement
Parameters
job_queueShared pointer to the job queue for this worker

Definition at line 114 of file thread_worker.cpp.

115 {
116 std::shared_ptr<kcenon::thread::job_queue> old_queue;
117
118 {
119 std::unique_lock<std::mutex> lock(queue_mutex_);
120
121 // Signal that queue replacement is in progress
123
124 // Wait for current job to complete
125 // Predicate ensures we don't proceed while a job is executing
126 queue_cv_.wait(lock, [this] {
127 return current_job_.load(std::memory_order_acquire) == nullptr;
128 });
129
130 // Save old queue so we can wake any blocked dequeue() calls
131 old_queue = job_queue_;
132
133 // Replace the queue pointer
134 job_queue_ = std::move(job_queue);
135
136 // Clear replacement flag
137 queue_being_replaced_ = false;
138
139 // Notify worker thread that replacement is complete
140 queue_cv_.notify_all();
141 }
142
143 // Stop the old queue outside the lock to wake any worker thread
144 // blocked on old_queue->dequeue(). Without this, the worker's
145 // do_work() would block indefinitely on the old queue's CV after
146 // the queue pointer has been replaced.
147 if (old_queue)
148 {
149 old_queue->stop();
150 }
151 }
bool queue_being_replaced_
Indicates whether queue replacement is in progress.

◆ set_metrics()

void kcenon::thread::thread_worker::set_metrics ( std::shared_ptr< metrics::ThreadPoolMetrics > metrics)

Provide shared metrics storage for this worker.

Definition at line 168 of file thread_worker.cpp.

169{
170 metrics_ = std::move(metrics);
171}

References metrics_.

◆ set_policy()

void kcenon::thread::thread_worker::set_policy ( const worker_policy & policy)

Set the worker policy for this worker.

Parameters
policyThe worker policy configuration.

Definition at line 183 of file thread_worker.cpp.

184{
185 policy_ = policy;
186
187 // Initialize local deque if work-stealing is enabled
189 {
190 local_deque_ = std::make_unique<lockfree::work_stealing_deque<job*>>();
191 }
192}

References kcenon::thread::worker_policy::enable_work_stealing, local_deque_, and policy_.

◆ set_steal_function()

void kcenon::thread::thread_worker::set_steal_function ( std::function< job *(std::size_t)> steal_fn)

Set the steal function for finding other workers' deques.

Parameters
steal_fnFunction that returns a job to steal, or nullptr.

The steal function is called when this worker's local deque and the global queue are both empty. It should try to steal work from other workers.

Definition at line 204 of file thread_worker.cpp.

205{
206 steal_function_ = std::move(steal_fn);
207}
std::function< job *(std::size_t)> steal_function_
Function to steal work from other workers.

References steal_function_.

◆ should_continue_work()

auto kcenon::thread::thread_worker::should_continue_work ( void ) const -> bool
nodiscardoverrideprotectedvirtual

Determines if there are jobs available in the queue to continue working on.

Determines if the worker should continue processing jobs.

Returns
true if there is work in the queue, false otherwise.

Called in the thread's main loop (defined by thread_base) to decide if do_work() should be invoked. Returns true if the job queue is not empty; otherwise, false.

Implementation details:

  • Used by thread_base to control the work loop
  • Returns false if no job queue is set (prevents infinite loop)
  • Returns true while queue is not stopped (even if empty)
  • Actual job waiting is handled by non-blocking polling in do_work()
  • Thread-safe operation (job_queue methods are thread-safe)

Work Loop Control:

  • Worker continues until queue is explicitly stopped
  • Empty queue does NOT cause worker exit - do_work() will poll for jobs
  • This prevents premature worker termination before jobs arrive
  • Worker exits gracefully only when queue is stopped

Design Rationale - Solving the Two-Level Condition Variable Problem:

  • Thread_base waits on its own condition variable (worker_condition_)
  • Job_queue notifies its own condition variable (different object!)
  • If should_continue_work() returns false on empty queue:
    1. thread_base waits on worker_condition_
    2. job enqueue notifies job_queue's condition variable
    3. Worker never wakes up - deadlock situation
  • By returning true until stopped, worker enters do_work() immediately
  • do_work() uses non-blocking try_dequeue() with polling
  • This completely avoids the two-level CV problem
  • CPU overhead is minimal due to sleep between polling attempts

Shutdown Safety:

  • thread_pool::stop() can call operations in any order without race conditions
  • Queue stop sets is_stopped() = true (atomic operation)
  • Worker sees stopped flag and exits cleanly
  • No dependency on operation ordering

Thread Safety:

  • Synchronizes access to job_queue_ with queue_mutex_
  • Prevents race conditions with set_job_queue() and do_work()
  • Uses lock_guard for RAII-based exception safety
  • Mutex marked mutable to allow locking in const method
Returns
true if worker should continue processing, false to exit

Reimplemented from kcenon::thread::thread_base.

Definition at line 317 of file thread_worker.cpp.

318 {
319 // Synchronize access to job_queue_ with set_job_queue() and do_work()
320 std::lock_guard<std::mutex> lock(queue_mutex_);
321
322 if (job_queue_ == nullptr)
323 {
324 return false;
325 }
326
327 // Continue while queue is not stopped - do_work() handles polling for jobs
328 return !job_queue_->is_stopped();
329 }

References job_queue_, and queue_mutex_.

◆ try_get_job()

std::unique_ptr< job > kcenon::thread::thread_worker::try_get_job ( )
nodiscardprivate

Try to get a job from local deque first, then global queue.

Returns
A unique_ptr to the job, or nullptr if no work available.

Definition at line 209 of file thread_worker.cpp.

210{
211 // If work-stealing is enabled, try local deque first (LIFO for cache locality)
213 {
214 auto local_result = local_deque_->pop();
215 if (local_result.has_value())
216 {
217 return std::unique_ptr<job>(*local_result);
218 }
219 }
220
221 // Fallback to global queue
222 std::lock_guard<std::mutex> lock(queue_mutex_);
223 if (job_queue_ == nullptr)
224 {
225 return nullptr;
226 }
227
228 auto dequeue_result = job_queue_->try_dequeue();
229 if (dequeue_result.is_ok())
230 {
231 return std::move(dequeue_result.value());
232 }
233
234 return nullptr;
235}

References kcenon::thread::worker_policy::enable_work_stealing, job_queue_, local_deque_, policy_, and queue_mutex_.

◆ try_steal_work()

std::unique_ptr< job > kcenon::thread::thread_worker::try_steal_work ( )
nodiscardprivate

Try to steal work from other workers.

Returns
A unique_ptr to the stolen job, or nullptr if stealing failed.

Definition at line 237 of file thread_worker.cpp.

238{
240 {
241 return nullptr;
242 }
243
244 // Apply exponential backoff between steal attempts
245 for (std::size_t attempt = 0; attempt < policy_.max_steal_attempts; ++attempt)
246 {
247 job* stolen = steal_function_(worker_id_);
248 if (stolen != nullptr)
249 {
250 return std::unique_ptr<job>(stolen);
251 }
252
253 // Exponential backoff
254 if (attempt > 0)
255 {
256 auto backoff = policy_.steal_backoff * (1 << (attempt - 1));
257 std::this_thread::sleep_for(backoff);
258 }
259 }
260
261 return nullptr;
262}
std::chrono::microseconds steal_backoff

References kcenon::thread::worker_policy::enable_work_stealing, kcenon::thread::worker_policy::max_steal_attempts, policy_, kcenon::thread::worker_policy::steal_backoff, steal_function_, and worker_id_.

Member Data Documentation

◆ context_

thread_context kcenon::thread::thread_worker::context_
private

The thread context providing access to logging and monitoring services.

This context enables the worker to log messages and report metrics through the configured services.

Definition at line 314 of file thread_worker.h.

◆ current_job_

std::atomic<job*> kcenon::thread::thread_worker::current_job_ {nullptr}
private

Pointer to the currently executing job.

This is set atomically at the start of job execution and cleared when the job completes. Used by on_stop_requested() to cancel the running job.

Memory Ordering:

  • Release when storing (ensures job state visible to cancellation thread)
  • Acquire when loading (ensures we see the correct job state)
Note
Raw pointer used because we don't own the job (it's owned by unique_ptr in do_work()). Safe because job lifetime is guaranteed during execution.

Definition at line 366 of file thread_worker.h.

366{nullptr};

Referenced by get_current_job_info().

◆ current_job_start_time_

std::chrono::steady_clock::time_point kcenon::thread::thread_worker::current_job_start_time_
private

Time point when the current job started executing.

Used to track job execution time for diagnostics. Only valid when a job is currently executing.

Definition at line 426 of file thread_worker.h.

Referenced by get_current_job_info().

◆ diagnostics_

diagnostics::thread_pool_diagnostics* kcenon::thread::thread_worker::diagnostics_ {nullptr}
private

Pointer to the diagnostics instance for event tracing.

When set, the worker records execution events if tracing is enabled. Raw pointer because the diagnostics outlives the worker.

Definition at line 327 of file thread_worker.h.

327{nullptr};

Referenced by set_diagnostics().

◆ diagnostics_counter_

std::uint64_t kcenon::thread::thread_worker::diagnostics_counter_ {0}
private

Counter for diagnostics sampling.

Incremented on each job execution. Diagnostics events are recorded when (diagnostics_counter_ % diagnostics_sample_rate_ == 0).

Definition at line 351 of file thread_worker.h.

351{0};

◆ diagnostics_sample_rate_

std::uint32_t kcenon::thread::thread_worker::diagnostics_sample_rate_ {1}
private

Diagnostics sampling rate (record every Nth job).

Default is 1 (every job) for backward compatibility.

Definition at line 343 of file thread_worker.h.

343{1};

Referenced by set_diagnostics_sample_rate().

◆ is_idle_

std::atomic<bool> kcenon::thread::thread_worker::is_idle_ {true}
private

Indicates whether the worker is currently idle (not processing a job).

This flag is set to true when the worker is waiting for jobs and false when actively processing a job. Updated atomically for thread-safe access.

Memory Ordering:

  • Relaxed ordering sufficient (no synchronization dependencies)
  • Value is advisory only (race conditions between check and state change are acceptable)
Note
Used by thread pool for statistics and monitoring purposes.

Definition at line 380 of file thread_worker.h.

380{true};

Referenced by is_idle().

◆ job_queue_

std::shared_ptr<job_queue> kcenon::thread::thread_worker::job_queue_
private

A shared pointer to the job queue from which this worker obtains jobs.

Multiple workers can share the same queue, enabling concurrent processing of queued jobs.

Definition at line 306 of file thread_worker.h.

Referenced by should_continue_work(), and try_get_job().

◆ jobs_completed_

std::atomic<std::uint64_t> kcenon::thread::thread_worker::jobs_completed_ {0}
private

Total number of jobs successfully completed by this worker.

Incremented atomically after each successful job execution.

Definition at line 387 of file thread_worker.h.

387{0};

Referenced by get_jobs_completed().

◆ jobs_failed_

std::atomic<std::uint64_t> kcenon::thread::thread_worker::jobs_failed_ {0}
private

Total number of jobs that failed during execution.

Incremented atomically when a job's do_work() returns an error.

Definition at line 394 of file thread_worker.h.

394{0};

Referenced by get_jobs_failed().

◆ local_deque_

std::unique_ptr<lockfree::work_stealing_deque<job*> > kcenon::thread::thread_worker::local_deque_
private

Local work-stealing deque for this worker.

When work-stealing is enabled, jobs submitted to this worker are stored in this deque. The owner (this worker) can push/pop from the bottom (LIFO), while other workers can steal from the top (FIFO).

Definition at line 469 of file thread_worker.h.

Referenced by get_local_deque(), set_policy(), and try_get_job().

◆ metrics_

std::shared_ptr<metrics::ThreadPoolMetrics> kcenon::thread::thread_worker::metrics_
private

Shared metrics aggregator provided by the owning thread pool.

Definition at line 319 of file thread_worker.h.

Referenced by set_metrics().

◆ next_worker_id_

std::atomic< std::size_t > kcenon::thread::thread_worker::next_worker_id_ {0}
staticprivate

Static counter for generating unique worker IDs.

Definition at line 284 of file thread_worker.h.

◆ policy_

worker_policy kcenon::thread::thread_worker::policy_
private

Worker policy configuration.

Controls worker behavior including work-stealing settings.

Definition at line 459 of file thread_worker.h.

Referenced by get_policy(), set_policy(), try_get_job(), and try_steal_work().

◆ queue_being_replaced_

bool kcenon::thread::thread_worker::queue_being_replaced_ {false}
private

Indicates whether queue replacement is in progress.

When true, the worker thread should wait before accessing the queue. Protected by queue_mutex_.

Definition at line 452 of file thread_worker.h.

452{false};

◆ queue_cv_

std::condition_variable kcenon::thread::thread_worker::queue_cv_
private

Condition variable for queue replacement synchronization.

Used to wait for current job completion before replacing the queue.

Definition at line 444 of file thread_worker.h.

◆ queue_mutex_

std::mutex kcenon::thread::thread_worker::queue_mutex_
mutableprivate

Mutex protecting job queue replacement.

This mutex synchronizes access to job_queue_ during replacement operations to prevent race conditions between do_work(), set_job_queue(), and should_continue_work().

Note
Marked mutable to allow locking in const methods like should_continue_work(). The const qualifier applies to the logical state, not the mutex itself.

Definition at line 437 of file thread_worker.h.

Referenced by get_current_job_info(), should_continue_work(), and try_get_job().

◆ state_since_rep_

std::atomic<std::chrono::steady_clock::time_point::rep> kcenon::thread::thread_worker::state_since_rep_
private
Initial value:
{
std::chrono::steady_clock::now().time_since_epoch().count()
}

Time point when the worker entered its current state.

Updated when transitioning between idle and busy states. Used to calculate current state duration.

Definition at line 416 of file thread_worker.h.

416 {
417 std::chrono::steady_clock::now().time_since_epoch().count()
418 };

Referenced by get_state_since().

◆ steal_function_

std::function<job*(std::size_t)> kcenon::thread::thread_worker::steal_function_
private

Function to steal work from other workers.

This function is provided by the thread pool and returns a stolen job from another worker's deque, or nullptr if no work available.

Definition at line 477 of file thread_worker.h.

Referenced by set_steal_function(), and try_steal_work().

◆ steal_victim_index_

std::size_t kcenon::thread::thread_worker::steal_victim_index_ {0}
private

Counter for round-robin steal victim selection.

Definition at line 482 of file thread_worker.h.

482{0};

◆ total_busy_time_ns_

std::atomic<std::uint64_t> kcenon::thread::thread_worker::total_busy_time_ns_ {0}
private

Total time spent executing jobs (busy time) in nanoseconds.

Accumulated after each job execution with the job's execution duration.

Definition at line 401 of file thread_worker.h.

401{0};

Referenced by get_total_busy_time().

◆ total_idle_time_ns_

std::atomic<std::uint64_t> kcenon::thread::thread_worker::total_idle_time_ns_ {0}
private

Total time spent waiting for jobs (idle time) in nanoseconds.

Accumulated when transitioning from idle to busy state.

Definition at line 408 of file thread_worker.h.

408{0};

Referenced by get_total_idle_time().

◆ use_time_tag_

bool kcenon::thread::thread_worker::use_time_tag_
private

Indicates whether to use time tags or timestamps for job processing.

When true, the worker might record timestamps (e.g., job start/end times) or log them for debugging/monitoring. The exact usage depends on the job and override details in derived classes.

Definition at line 298 of file thread_worker.h.

◆ worker_cancellation_token_

cancellation_token kcenon::thread::thread_worker::worker_cancellation_token_
private

Cancellation token for this worker.

This token is propagated to jobs during execution, allowing them to cooperatively cancel when the worker is stopped. The token is cancelled in on_stop_requested().

Definition at line 336 of file thread_worker.h.

◆ worker_id_

std::size_t kcenon::thread::thread_worker::worker_id_ {0}
private

Unique ID for this worker instance.

Definition at line 289 of file thread_worker.h.

289{0};

Referenced by get_worker_id(), and try_steal_work().


The documentation for this class was generated from the following files: