Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_worker.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
9
10#include <thread>
11
12// Platform-specific CPU pause intrinsics
13#if defined(_MSC_VER) && (defined(_M_IX86) || defined(_M_X64))
14 #include <emmintrin.h> // For _mm_pause()
15#endif
16
17using namespace utility_module;
18
47namespace kcenon::thread
48{
49 // Initialize static member
50 std::atomic<std::size_t> thread_worker::next_worker_id_{0};
70 thread_worker::thread_worker(const bool& use_time_tag, const thread_context& context)
71 : thread_base("thread_worker"),
72 worker_id_(next_worker_id_.fetch_add(1)),
73 use_time_tag_(use_time_tag),
74 job_queue_(nullptr),
75 context_(context),
76 worker_cancellation_token_(cancellation_token::create()),
77 current_job_(nullptr)
78 {
79 }
80
90
114 auto thread_worker::set_job_queue(std::shared_ptr<job_queue> job_queue) -> void
115 {
116 std::shared_ptr<kcenon::thread::job_queue> old_queue;
117
118 {
119 std::unique_lock<std::mutex> lock(queue_mutex_);
120
121 // Signal that queue replacement is in progress
122 queue_being_replaced_ = true;
123
124 // Wait for current job to complete
125 // Predicate ensures we don't proceed while a job is executing
126 queue_cv_.wait(lock, [this] {
127 return current_job_.load(std::memory_order_acquire) == nullptr;
128 });
129
130 // Save old queue so we can wake any blocked dequeue() calls
131 old_queue = job_queue_;
132
133 // Replace the queue pointer
134 job_queue_ = std::move(job_queue);
135
136 // Clear replacement flag
137 queue_being_replaced_ = false;
138
139 // Notify worker thread that replacement is complete
140 queue_cv_.notify_all();
141 }
142
143 // Stop the old queue outside the lock to wake any worker thread
144 // blocked on old_queue->dequeue(). Without this, the worker's
145 // do_work() would block indefinitely on the old queue's CV after
146 // the queue pointer has been replaced.
147 if (old_queue)
148 {
149 old_queue->stop();
150 }
151 }
152
163auto thread_worker::set_context(const thread_context& context) -> void
164{
165 context_ = context;
166}
167
168void thread_worker::set_metrics(std::shared_ptr<metrics::ThreadPoolMetrics> metrics)
169{
170 metrics_ = std::move(metrics);
171}
172
177
179{
180 diagnostics_sample_rate_ = (rate > 0) ? rate : 1;
181}
182
184{
185 policy_ = policy;
186
187 // Initialize local deque if work-stealing is enabled
189 {
190 local_deque_ = std::make_unique<lockfree::work_stealing_deque<job*>>();
191 }
192}
193
195{
196 return policy_;
197}
198
203
204void thread_worker::set_steal_function(std::function<job*(std::size_t)> steal_fn)
205{
206 steal_function_ = std::move(steal_fn);
207}
208
209std::unique_ptr<job> thread_worker::try_get_job()
210{
211 // If work-stealing is enabled, try local deque first (LIFO for cache locality)
213 {
214 auto local_result = local_deque_->pop();
215 if (local_result.has_value())
216 {
217 return std::unique_ptr<job>(*local_result);
218 }
219 }
220
221 // Fallback to global queue
222 std::lock_guard<std::mutex> lock(queue_mutex_);
223 if (job_queue_ == nullptr)
224 {
225 return nullptr;
226 }
227
228 auto dequeue_result = job_queue_->try_dequeue();
229 if (dequeue_result.is_ok())
230 {
231 return std::move(dequeue_result.value());
232 }
233
234 return nullptr;
235}
236
237std::unique_ptr<job> thread_worker::try_steal_work()
238{
240 {
241 return nullptr;
242 }
243
244 // Apply exponential backoff between steal attempts
245 for (std::size_t attempt = 0; attempt < policy_.max_steal_attempts; ++attempt)
246 {
247 job* stolen = steal_function_(worker_id_);
248 if (stolen != nullptr)
249 {
250 return std::unique_ptr<job>(stolen);
251 }
252
253 // Exponential backoff
254 if (attempt > 0)
255 {
256 auto backoff = policy_.steal_backoff * (1 << (attempt - 1));
257 std::this_thread::sleep_for(backoff);
258 }
259 }
260
261 return nullptr;
262}
263
270 {
271 return context_;
272 }
273
318 {
319 // Synchronize access to job_queue_ with set_job_queue() and do_work()
320 std::lock_guard<std::mutex> lock(queue_mutex_);
321
322 if (job_queue_ == nullptr)
323 {
324 return false;
325 }
326
327 // Continue while queue is not stopped - do_work() handles polling for jobs
328 return !job_queue_->is_stopped();
329 }
330
383 auto thread_worker::do_work() -> common::VoidResult
384 {
385 // Acquire lock to safely get queue pointer
386 std::unique_lock<std::mutex> lock(queue_mutex_);
387
388 // Validate that job queue is available for processing
389 if (job_queue_ == nullptr)
390 {
391 lock.unlock();
392 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "there is no job_queue", "thread_system"};
393 }
394
395 // Make a local copy of the queue pointer while holding the lock
396 // The shared_ptr keeps the queue alive even if set_job_queue() replaces it
397 // No need to wait for !queue_being_replaced_ - the local copy is safe to use
398 std::shared_ptr<job_queue> local_queue = job_queue_;
399
400 // Release lock before dequeuing to allow other operations
401 lock.unlock();
402
403 std::unique_ptr<job> current_job;
404
405 // Work-stealing path: try local deque first, then global queue, then steal
406 if (policy_.enable_work_stealing)
407 {
408 // Step 1: Try local deque (LIFO for cache locality)
409 if (local_deque_)
410 {
411 auto local_result = local_deque_->pop();
412 if (local_result.has_value())
413 {
414 current_job = std::unique_ptr<job>(*local_result);
415 }
416 }
417
418 // Step 2: Try global queue
419 if (!current_job)
420 {
421 auto dequeue_result = local_queue->try_dequeue();
422 if (dequeue_result.is_ok())
423 {
424 current_job = std::move(dequeue_result.value());
425 }
426 }
427
428 // Step 3: Try to steal from other workers
429 if (!current_job)
430 {
431 current_job = try_steal_work();
432 }
433
434 // No work found - mark as idle and sleep
435 if (!current_job)
436 {
437 is_idle_.store(true, std::memory_order_relaxed);
438 std::this_thread::sleep_for(policy_.idle_sleep_duration);
439 return common::ok();
440 }
441 }
442 else
443 {
444 // Original non-work-stealing path
445 // Hybrid wait strategy: short spin followed by blocking dequeue
446 // This approach provides:
447 // - Sub-ms pickup latency (via spin loop)
448 // - Near-zero idle CPU usage (via blocking dequeue)
449 // - No busy-waiting overhead
450
451 // Phase 1: Short bounded spin (16 iterations)
452 // Optimized for scenarios where jobs arrive quickly
453 constexpr int spin_count = 16;
454 for (int i = 0; i < spin_count; ++i)
455 {
456 auto dequeue_result = local_queue->try_dequeue();
457 if (dequeue_result.is_ok())
458 {
459 // Job found during spin - fast path
460 current_job = std::move(dequeue_result.value());
461 break;
462 }
463
464 // CPU pause hint to reduce contention and power consumption
465 // Different intrinsics per compiler and architecture
466 #if defined(_MSC_VER)
467 // MSVC: Use _mm_pause() for x86/x64, YieldProcessor() for ARM
468 #if defined(_M_IX86) || defined(_M_X64)
469 _mm_pause();
470 #elif defined(_M_ARM) || defined(_M_ARM64)
471 __yield();
472 #else
473 std::this_thread::yield();
474 #endif
475 #elif defined(__GNUC__) || defined(__clang__)
476 // GCC/Clang: Use builtin functions
477 #if defined(__x86_64__) || defined(__i386__)
478 __builtin_ia32_pause();
479 #elif defined(__aarch64__) || defined(__arm__)
480 __asm__ __volatile__("yield");
481 #else
482 std::this_thread::yield();
483 #endif
484 #else
485 std::this_thread::yield();
486 #endif
487 }
488
489 // Phase 2: Block on job_queue's condition variable if spin didn't find a job
490 // Uses the queue's blocking dequeue() which waits on its internal CV.
491 // This wakes immediately when a new job is enqueued (via notify_one)
492 // or when the queue is stopped (via notify_all in job_queue::stop).
493 if (!current_job)
494 {
495 is_idle_.store(true, std::memory_order_relaxed);
496
497 // Blocking dequeue: waits on job_queue's condition_variable
498 // Wakes on: enqueue() notify_one, or stop() notify_all
499 auto dequeue_result = local_queue->dequeue();
500 if (dequeue_result.is_ok())
501 {
502 current_job = std::move(dequeue_result.value());
503 }
504 else
505 {
506 // Queue is stopped or empty after wake — return to let
507 // should_continue_work() decide whether to exit
508 return common::ok();
509 }
510 }
511 }
512
513 // Validate job pointer
514 if (current_job == nullptr)
515 {
516 return common::error_info{static_cast<int>(error_code::job_invalid), "error executing job: nullptr", "thread_system"};
517 }
518
519 // Capture job info for event tracing (before any state changes)
520 const auto job_id = current_job->get_job_id();
521 const auto job_name_for_event = current_job->get_name();
522 const auto enqueue_time = current_job->get_enqueue_time();
523
524 // Determine whether this job should record diagnostics events.
525 // The sampling counter reduces clock-read overhead by skipping
526 // events for most jobs when sample_rate > 1.
527 const bool should_trace = diagnostics_
528 && diagnostics_->is_tracing_enabled()
529 && (++diagnostics_counter_ % diagnostics_sample_rate_ == 0);
530
531 // Record dequeued event if tracing is enabled and sampled
532 if (should_trace)
533 {
535 dequeued_event.job_id = job_id;
536 dequeued_event.job_name = job_name_for_event;
538 dequeued_event.timestamp = std::chrono::steady_clock::now();
539 dequeued_event.system_timestamp = std::chrono::system_clock::now();
540 dequeued_event.thread_id = std::this_thread::get_id();
541 dequeued_event.worker_id = worker_id_;
542 dequeued_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
543 dequeued_event.timestamp - enqueue_time);
544 diagnostics_->record_event(dequeued_event);
545 }
546
547 // Update idle time statistics before transitioning to busy state
548 auto now = std::chrono::steady_clock::now();
549 auto state_since = get_state_since();
550 if (is_idle_.load(std::memory_order_relaxed))
551 {
552 auto idle_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(now - state_since);
553 total_idle_time_ns_.fetch_add(
554 static_cast<std::uint64_t>(idle_duration.count()),
555 std::memory_order_relaxed
556 );
557 }
558
559 // Mark worker as busy and update state timestamp
560 is_idle_.store(false, std::memory_order_relaxed);
561 state_since_rep_.store(now.time_since_epoch().count(), std::memory_order_release);
562 current_job_start_time_ = now;
563
564 // Initialize timing measurement if performance monitoring is enabled
565 std::optional<std::chrono::time_point<std::chrono::high_resolution_clock>>
566 started_time_point = std::nullopt;
567 if (use_time_tag_)
568 {
569 started_time_point = std::chrono::high_resolution_clock::now();
570 }
571
572 // Associate the job with its source queue for potential re-submission
573 // Use local_queue to avoid race with set_job_queue()
574 current_job->set_job_queue(local_queue);
575
576 // Set cancellation token on the job for cooperative cancellation
577 // This allows the job to check if it should cancel during execution
578 current_job->set_cancellation_token(worker_cancellation_token_);
579
580 // Track currently executing job atomically for on_stop_requested()
581 // Use release ordering to ensure job state is visible to cancellation thread
582 current_job_.store(current_job.get(), std::memory_order_release);
583
584 // Record started event if tracing is enabled and sampled
585 if (should_trace)
586 {
588 started_event.job_id = job_id;
589 started_event.job_name = job_name_for_event;
591 started_event.timestamp = std::chrono::steady_clock::now();
592 started_event.system_timestamp = std::chrono::system_clock::now();
593 started_event.thread_id = std::this_thread::get_id();
594 started_event.worker_id = worker_id_;
595 started_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
596 started_event.timestamp - enqueue_time);
597 diagnostics_->record_event(started_event);
598 }
599
600 // Execute the job's work method and capture the result
601 auto work_result = current_job->do_work();
602 std::uint64_t execution_duration_ns = 0;
603 if (started_time_point.has_value())
604 {
605 auto end_time = std::chrono::high_resolution_clock::now();
606 execution_duration_ns = static_cast<std::uint64_t>(
607 std::chrono::duration_cast<std::chrono::nanoseconds>(
608 end_time - started_time_point.value()).count());
609 }
610
611 // Capture job name before clearing (for logging after job destruction)
612 std::string job_name = current_job->get_name();
613
614 // Clear current job tracking and destroy job under mutex protection
615 // This prevents race condition with on_stop_requested() (Issue #225)
616 // The mutex ensures job is not accessed while being destroyed
617 {
618 std::lock_guard<std::mutex> notify_lock(queue_mutex_);
619 current_job_.store(nullptr, std::memory_order_release);
620
621 // Explicitly destroy the job while holding the mutex
622 // This is critical for thread safety - on_stop_requested() acquires
623 // the same mutex before accessing current_job_, ensuring it cannot
624 // access a job that is being destroyed
625 current_job.reset();
626
627 // Notify any waiting set_job_queue() that job has completed
628 // This allows queue replacement to proceed safely
629 // Lock is held to prevent lost wakeup between predicate check and wait
630 queue_cv_.notify_all();
631 }
632
633 // Update busy time and state transition for diagnostics
634 {
635 auto end_now = std::chrono::steady_clock::now();
636 auto busy_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
637 end_now - current_job_start_time_
638 );
639 total_busy_time_ns_.fetch_add(
640 static_cast<std::uint64_t>(busy_duration.count()),
641 std::memory_order_relaxed
642 );
643 // Transition back to idle state
644 is_idle_.store(true, std::memory_order_relaxed);
645 state_since_rep_.store(end_now.time_since_epoch().count(), std::memory_order_release);
646 }
647
648 if (work_result.is_err())
649 {
650 // Increment failed job counter
651 jobs_failed_.fetch_add(1, std::memory_order_relaxed);
652
653 // Record failed event if tracing is enabled and sampled
654 if (should_trace)
655 {
657 failed_event.job_id = job_id;
658 failed_event.job_name = job_name_for_event;
660 failed_event.timestamp = std::chrono::steady_clock::now();
661 failed_event.system_timestamp = std::chrono::system_clock::now();
662 failed_event.thread_id = std::this_thread::get_id();
663 failed_event.worker_id = worker_id_;
664 failed_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
665 current_job_start_time_ - enqueue_time);
666 failed_event.execution_time = std::chrono::nanoseconds(execution_duration_ns);
667 failed_event.error_code = work_result.error().code;
668 failed_event.error_message = work_result.error().message;
669 diagnostics_->record_event(failed_event);
670 }
671
672 if (metrics_)
673 {
674 metrics_->record_execution(0, false);
675 }
676 return common::error_info{static_cast<int>(error_code::job_execution_failed),
677 formatter::format("error executing job: {}", work_result.error().message), "thread_system"};
678 }
679
680 // Increment completed job counter
681 jobs_completed_.fetch_add(1, std::memory_order_relaxed);
682
683 // Record completed event if tracing is enabled and sampled
684 if (should_trace)
685 {
686 diagnostics::job_execution_event completed_event;
687 completed_event.job_id = job_id;
688 completed_event.job_name = job_name_for_event;
690 completed_event.timestamp = std::chrono::steady_clock::now();
691 completed_event.system_timestamp = std::chrono::system_clock::now();
692 completed_event.thread_id = std::this_thread::get_id();
693 completed_event.worker_id = worker_id_;
694 completed_event.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
695 current_job_start_time_ - enqueue_time);
696 completed_event.execution_time = std::chrono::nanoseconds(execution_duration_ns);
697 diagnostics_->record_event(completed_event);
698 }
699
700 // Log successful job completion based on timing configuration
701 // Note: Using captured job_name since job is already destroyed
702 if (!started_time_point.has_value())
703 {
704 // Standard logging without timing information
705 context_.log(common::interfaces::log_level::debug,
706 formatter::format("job executed successfully: {} on thread_worker",
707 job_name));
708 }
709 else
710 {
711 // Enhanced logging with execution timing information
712 context_.log(common::interfaces::log_level::debug,
713 formatter::format("job executed successfully: {} on thread_worker ({}ns)",
714 job_name, execution_duration_ns));
715
716 // Update worker metrics if monitoring is available
717 if (context_.monitoring())
718 {
719 common::interfaces::worker_metrics metrics(worker_id_);
720 metrics.jobs_processed.value = 1;
721 metrics.total_processing_time_ns.value = static_cast<double>(execution_duration_ns);
722 // Use proper worker ID instead of thread hash
723 context_.update_worker_metrics(worker_id_, metrics);
724 }
725 }
726
727 if (metrics_)
728 {
729 metrics_->record_execution(execution_duration_ns, true);
730 }
731
732 return common::ok();
733 }
734
736 {
737 return worker_id_;
738 }
739
750 bool thread_worker::is_idle() const noexcept
751 {
752 return is_idle_.load(std::memory_order_relaxed);
753 }
754
785 {
786 // Cancel the worker's token first
787 // This ensures any future jobs will see cancellation immediately
788 // Jobs receive this token via set_cancellation_token() in do_work()
789 worker_cancellation_token_.cancel();
790
791 // Acquire mutex to safely access current job
792 // This prevents race condition with do_work() job destruction (Issue #225)
793 std::lock_guard<std::mutex> lock(queue_mutex_);
794
795 // Load the currently executing job pointer atomically
796 auto* job_ptr = current_job_.load(std::memory_order_acquire);
797
798 // If a job is currently executing, cancel it directly
799 if (job_ptr != nullptr)
800 {
801 // Get the job's cancellation token and cancel it
802 // This provides redundancy in case the job cached its token before
803 // we cancelled worker_cancellation_token_
804 auto job_token = job_ptr->get_cancellation_token();
805 job_token.cancel();
806
807 // Log cancellation attempt for debugging
808 context_.log(common::interfaces::log_level::debug,
809 formatter::format("Cancellation requested for job: {} on worker {}",
810 job_ptr->get_name(), worker_id_));
811 }
812 }
813
814 std::uint64_t thread_worker::get_jobs_completed() const noexcept
815 {
816 return jobs_completed_.load(std::memory_order_relaxed);
817 }
818
819 std::uint64_t thread_worker::get_jobs_failed() const noexcept
820 {
821 return jobs_failed_.load(std::memory_order_relaxed);
822 }
823
824 std::chrono::nanoseconds thread_worker::get_total_busy_time() const noexcept
825 {
826 return std::chrono::nanoseconds{total_busy_time_ns_.load(std::memory_order_relaxed)};
827 }
828
829 std::chrono::nanoseconds thread_worker::get_total_idle_time() const noexcept
830 {
831 return std::chrono::nanoseconds{total_idle_time_ns_.load(std::memory_order_relaxed)};
832 }
833
834 std::chrono::steady_clock::time_point thread_worker::get_state_since() const noexcept
835 {
836 auto rep = state_since_rep_.load(std::memory_order_acquire);
837 return std::chrono::steady_clock::time_point{
838 std::chrono::steady_clock::duration{rep}
839 };
840 }
841
842 std::optional<diagnostics::job_info> thread_worker::get_current_job_info() const noexcept
843 {
844 std::lock_guard<std::mutex> lock(queue_mutex_);
845 auto* job_ptr = current_job_.load(std::memory_order_acquire);
846 if (job_ptr == nullptr)
847 {
848 return std::nullopt;
849 }
850
852 info.job_id = job_ptr->get_job_id();
853 info.job_name = job_ptr->get_name();
855 info.start_time = current_job_start_time_;
856 info.enqueue_time = job_ptr->get_enqueue_time();
857 info.executed_by = std::this_thread::get_id();
858
859 auto now = std::chrono::steady_clock::now();
860 info.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
862 );
863 // Calculate wait time from enqueue to start
864 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
865 current_job_start_time_ - job_ptr->get_enqueue_time()
866 );
867
868 return info;
869 }
870} // namespace kcenon::thread
Provides a mechanism for cooperative cancellation of operations.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Lock-free work-stealing deque based on Chase-Lev algorithm.
A foundational class for implementing custom worker threads.
Context object that provides access to optional services.
diagnostics::thread_pool_diagnostics * diagnostics_
Pointer to the diagnostics instance for event tracing.
std::atomic< std::uint64_t > total_idle_time_ns_
Total time spent waiting for jobs (idle time) in nanoseconds.
std::function< job *(std::size_t)> steal_function_
Function to steal work from other workers.
std::size_t get_worker_id() const
Get the worker ID.
std::size_t worker_id_
Unique ID for this worker instance.
std::atomic< std::uint64_t > total_busy_time_ns_
Total time spent executing jobs (busy time) in nanoseconds.
std::unique_ptr< job > try_steal_work()
Try to steal work from other workers.
std::uint64_t get_jobs_failed() const noexcept
Gets the total number of jobs that failed during execution.
void set_diagnostics(diagnostics::thread_pool_diagnostics *diag)
Set the diagnostics instance for event tracing.
void set_steal_function(std::function< job *(std::size_t)> steal_fn)
Set the steal function for finding other workers' deques.
const worker_policy & get_policy() const
Get the current worker policy.
std::chrono::nanoseconds get_total_busy_time() const noexcept
Gets the total time spent executing jobs (busy time).
static std::atomic< std::size_t > next_worker_id_
Static counter for generating unique worker IDs.
auto get_context(void) const -> const thread_context &
Gets the thread context for this worker.
void set_metrics(std::shared_ptr< metrics::ThreadPoolMetrics > metrics)
Provide shared metrics storage for this worker.
std::uint32_t diagnostics_sample_rate_
Diagnostics sampling rate (record every Nth job).
bool is_idle() const noexcept
Checks if the worker is currently idle (not processing a job).
std::shared_ptr< metrics::ThreadPoolMetrics > metrics_
Shared metrics aggregator provided by the owning thread pool.
std::unique_ptr< lockfree::work_stealing_deque< job * > > local_deque_
Local work-stealing deque for this worker.
void set_diagnostics_sample_rate(std::uint32_t rate)
Set the diagnostics sampling rate.
auto should_continue_work() const -> bool override
Determines if there are jobs available in the queue to continue working on.
std::uint64_t get_jobs_completed() const noexcept
Gets the total number of jobs successfully completed by this worker.
auto set_context(const thread_context &context) -> void
Sets the thread context for this worker.
std::atomic< std::chrono::steady_clock::time_point::rep > state_since_rep_
Time point when the worker entered its current state.
std::chrono::steady_clock::time_point current_job_start_time_
Time point when the current job started executing.
void set_policy(const worker_policy &policy)
Set the worker policy for this worker.
std::chrono::nanoseconds get_total_idle_time() const noexcept
Gets the total time spent waiting for jobs (idle time).
auto on_stop_requested() -> void override
Called when the worker is requested to stop.
worker_policy policy_
Worker policy configuration.
std::unique_ptr< job > try_get_job()
Try to get a job from local deque first, then global queue.
lockfree::work_stealing_deque< job * > * get_local_deque() noexcept
Get the local work-stealing deque for this worker.
std::atomic< bool > is_idle_
Indicates whether the worker is currently idle (not processing a job).
std::atomic< std::uint64_t > jobs_completed_
Total number of jobs successfully completed by this worker.
virtual ~thread_worker(void)
Virtual destructor. Ensures the worker thread is stopped before destruction.
std::shared_ptr< job_queue > job_queue_
A shared pointer to the job queue from which this worker obtains jobs.
thread_worker(const bool &use_time_tag=true, const thread_context &context=thread_context())
Constructs a new thread_worker.
auto do_work() -> common::VoidResult override
Processes one or more jobs from the queue.
auto set_job_queue(std::shared_ptr< job_queue > job_queue) -> void
Sets the job_queue that this worker should process.
std::atomic< job * > current_job_
Pointer to the currently executing job.
std::mutex queue_mutex_
Mutex protecting job queue replacement.
std::atomic< std::uint64_t > jobs_failed_
Total number of jobs that failed during execution.
std::optional< diagnostics::job_info > get_current_job_info() const noexcept
Gets information about the currently executing job.
std::chrono::steady_clock::time_point get_state_since() const noexcept
Gets the time when the worker entered its current state.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
@ running
Job is currently being executed.
@ dequeued
Job was taken from queue by a worker.
@ failed
Job failed with an error.
@ completed
Job completed successfully.
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
std::uint64_t job_id
Unique job identifier for DAG scheduler.
Definition dag_job.h:33
@ info
Informational messages highlighting progress.
Event data for job execution tracing.
std::string job_name
Human-readable name of the job.
std::chrono::nanoseconds wait_time
Time spent waiting in queue before dequeue.
std::optional< std::string > error_message
Error message if the job failed.
std::chrono::nanoseconds execution_time
Time spent executing the job.
std::uint64_t job_id
ID of the job this event relates to.
event_type type
Type of event that occurred.
std::optional< int > error_code
Error code if the job failed.
std::size_t worker_id
Worker ID that processed this job.
std::thread::id thread_id
ID of the thread that processed this event.
std::chrono::system_clock::time_point system_timestamp
System time when the event occurred.
std::chrono::steady_clock::time_point timestamp
Time when the event occurred.
Information about a job in the thread pool.
Definition job_info.h:90
std::uint64_t job_id
Unique identifier for this job.
Definition job_info.h:97
Worker behavior policy configuration.
std::chrono::microseconds steal_backoff
Runtime diagnostics, health monitoring, and execution tracing for thread pools.
Specialized worker thread that processes jobs from a job_queue.