Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_pool.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11
12using namespace utility_module;
13
23namespace kcenon::thread {
24// Support both old (namespace common) and new (namespace kcenon::common) versions
25// When inside namespace kcenon::thread, 'common' resolves to kcenon::common
26#if KCENON_HAS_COMMON_EXECUTOR
27namespace common_ns = common;
28#endif
29
30// Initialize static member
31std::atomic<std::uint32_t> thread_pool::next_pool_instance_id_{0};
51thread_pool::thread_pool(const std::string& thread_title, const thread_context& context)
52 : thread_title_(thread_title)
53 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
54 , start_pool_(false)
55 , job_queue_(std::make_shared<kcenon::thread::job_queue>())
56 , context_(context)
57 , pool_cancellation_token_(cancellation_token::create())
58 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
59 // Report initial pool registration if monitoring is available
60 if (context_.monitoring()) {
61 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
62 initial_metrics.worker_threads.value = 0;
63 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
64 }
65}
66
82thread_pool::thread_pool(const std::string& thread_title,
83 std::shared_ptr<job_queue> custom_queue,
84 const thread_context& context)
85 : thread_title_(thread_title)
86 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
87 , start_pool_(false)
88 , job_queue_(std::move(custom_queue))
89 , context_(context)
90 , pool_cancellation_token_(cancellation_token::create())
91 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
92 // Report initial pool registration if monitoring is available
93 if (context_.monitoring()) {
94 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
95 initial_metrics.worker_threads.value = 0;
96 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
97 }
98}
99
116thread_pool::thread_pool(const std::string& thread_title,
117 std::unique_ptr<pool_queue_adapter_interface> queue_adapter,
118 const thread_context& context)
119 : thread_title_(thread_title)
120 , pool_instance_id_(next_pool_instance_id_.fetch_add(1))
121 , start_pool_(false)
122 , job_queue_(queue_adapter ? queue_adapter->get_job_queue() : nullptr)
123 , queue_adapter_(std::move(queue_adapter))
124 , context_(context)
125 , pool_cancellation_token_(cancellation_token::create())
126 , metrics_service_(std::make_shared<metrics::metrics_service>()) {
127 // Report initial pool registration if monitoring is available
128 if (context_.monitoring()) {
129 common::interfaces::thread_pool_metrics initial_metrics(thread_title_, pool_instance_id_);
130 initial_metrics.worker_threads.value = 0;
131 context_.update_thread_pool_metrics(thread_title_, pool_instance_id_, initial_metrics);
132 }
133}
134
149 // Check if we're in static destruction phase
150 // During static destruction, logger/monitoring singletons may already be destroyed
152 // Minimal cleanup without logging to avoid SDOF
153 stop_unsafe();
154 } else {
155 stop();
156 }
157}
158
169auto thread_pool::get_ptr(void) -> std::shared_ptr<thread_pool> {
170 return this->shared_from_this();
171}
172
196auto thread_pool::start(void) -> common::VoidResult {
197 // Acquire lock to check workers_ safely
198 std::scoped_lock<std::mutex> lock(workers_mutex_);
199
200 // Check if pool is already running
201 // Use acquire to ensure we see all previous modifications to pool state
202 if (start_pool_.load(std::memory_order_acquire)) {
203 return common::error_info{static_cast<int>(error_code::thread_already_running), "thread pool is already running", "thread_system"};
204 }
205
206 // Validate that workers have been added
207 if (workers_.empty()) {
208 return common::error_info{static_cast<int>(error_code::invalid_argument), "no workers to start", "thread_system"};
209 }
210
211 // Handle queue initialization for restart scenarios
212 // The approach differs based on whether we're using job_queue or queue_adapter
213 if (queue_adapter_) {
214 // Using queue_adapter (policy_queue or wrapped job_queue)
215 if (queue_adapter_->is_stopped()) {
216 // For policy_queue adapters, we can't easily recreate,
217 // so we return an error suggesting pool recreation
218 return common::error_info{
219 static_cast<int>(error_code::queue_stopped),
220 "queue is stopped; create a new thread_pool instance for restart",
221 "thread_system"};
222 }
223 // Update job_queue_ reference if adapter wraps a job_queue
224 job_queue_ = queue_adapter_->get_job_queue();
225 } else if (job_queue_ == nullptr || job_queue_->is_stopped()) {
226 // Create fresh job queue for restart scenarios
227 // Stopped queues cannot accept new jobs, so we must create a new instance
228 job_queue_ = std::make_shared<kcenon::thread::job_queue>();
229
230 // Update all workers with the new queue reference
231 for (auto& worker : workers_) {
232 worker->set_job_queue(job_queue_);
233 }
234 }
235
236 // Create fresh pool cancellation token for restart scenarios
237 // This ensures workers start with a non-cancelled token
238 pool_cancellation_token_ = cancellation_token::create();
239 metrics_service_->reset();
240
241 // Attempt to start each worker
242 for (auto& worker : workers_) {
243 auto start_result = worker->start();
244 if (start_result.is_err()) {
245 // If any worker fails, stop all and return error
246 stop();
247 return start_result.error();
248 }
249 }
250
251 // Mark pool as successfully started
252 // Use release to ensure all previous modifications (worker starts, queue setup)
253 // are visible to other threads before they see start_pool_ == true
254 start_pool_.store(true, std::memory_order_release);
255
256 return common::ok();
257}
258
269auto thread_pool::get_job_queue(void) -> std::shared_ptr<job_queue> {
270 return job_queue_;
271}
272
274 return metrics_service_->basic_metrics();
275}
276
280
282 std::size_t worker_count = 0;
283 {
284 std::scoped_lock<std::mutex> lock(workers_mutex_);
285 worker_count = workers_.size();
286 }
287 metrics_service_->set_enhanced_metrics_enabled(enabled, worker_count);
288}
289
291 return metrics_service_->is_enhanced_metrics_enabled();
292}
293
295 return metrics_service_->enhanced_metrics();
296}
297
301
323auto thread_pool::enqueue(std::unique_ptr<job>&& job) -> common::VoidResult {
324 // Validate inputs
325 if (job == nullptr) {
326 return common::error_info{static_cast<int>(error_code::invalid_argument), "job is null", "thread_system"};
327 }
328
329 // Check queue availability and stopped state
330 // Supports both queue_adapter_ (policy_queue) and job_queue_ (legacy)
331 if (queue_adapter_) {
332 if (queue_adapter_->is_stopped()) {
333 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
334 }
335 } else if (job_queue_ == nullptr) {
336 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
337 } else if (job_queue_->is_stopped()) {
338 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
339 }
340
341 // Record metrics and enqueue job
342 metrics_service_->record_submission();
343
344 auto start_time = std::chrono::steady_clock::now();
345
346 if (queue_adapter_) {
347 auto enqueue_result = queue_adapter_->enqueue(std::move(job));
348 if (enqueue_result.is_err()) {
349 return enqueue_result.error();
350 }
351 } else {
352 auto enqueue_result = job_queue_->enqueue(std::move(job));
353 if (enqueue_result.is_err()) {
354 return enqueue_result.error();
355 }
356 }
357
358 auto end_time = std::chrono::steady_clock::now();
359 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
360 metrics_service_->record_enqueue_with_latency(latency);
361
362 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
363 metrics_service_->record_queue_depth(queue_size);
364
365 return common::ok();
366}
367
368auto thread_pool::enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> common::VoidResult {
369 if (jobs.empty()) {
370 return common::error_info{static_cast<int>(error_code::invalid_argument), "jobs are empty", "thread_system"};
371 }
372
373 // Check queue availability and stopped state
374 // Supports both queue_adapter_ (policy_queue) and job_queue_ (legacy)
375 if (queue_adapter_) {
376 if (queue_adapter_->is_stopped()) {
377 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
378 }
379 } else if (job_queue_ == nullptr) {
380 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
381 } else if (job_queue_->is_stopped()) {
382 return common::error_info{static_cast<int>(error_code::queue_stopped), "thread pool is stopped", "thread_system"};
383 }
384
385 const auto batch_size = jobs.size();
386 metrics_service_->record_submission(batch_size);
387
388 auto start_time = std::chrono::steady_clock::now();
389
390 if (queue_adapter_) {
391 auto enqueue_result = queue_adapter_->enqueue_batch(std::move(jobs));
392 if (enqueue_result.is_err()) {
393 return enqueue_result.error();
394 }
395 } else {
396 auto enqueue_result = job_queue_->enqueue_batch(std::move(jobs));
397 if (enqueue_result.is_err()) {
398 return enqueue_result.error();
399 }
400 }
401
402 auto end_time = std::chrono::steady_clock::now();
403 auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time);
404 metrics_service_->record_enqueue_with_latency(latency, batch_size);
405
406 auto queue_size = queue_adapter_ ? queue_adapter_->size() : job_queue_->size();
407 metrics_service_->record_queue_depth(queue_size);
408
409 return common::ok();
410}
411
412auto thread_pool::enqueue(std::unique_ptr<thread_worker>&& worker) -> common::VoidResult {
413 if (worker == nullptr) {
414 return common::error_info{static_cast<int>(error_code::invalid_argument), "worker is null", "thread_system"};
415 }
416
417 // Get job_queue from adapter if available, otherwise use direct job_queue_
418 std::shared_ptr<job_queue> worker_queue;
419 if (queue_adapter_) {
420 worker_queue = queue_adapter_->get_job_queue();
421 if (!worker_queue) {
422 // policy_queue adapter without job_queue backend
423 // Workers currently require job_queue; this limitation may be lifted in future versions
424 return common::error_info{
426 "policy_queue adapter without job_queue backend not yet supported for workers",
427 "thread_system"};
428 }
429 } else if (job_queue_ == nullptr) {
430 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
431 } else {
432 worker_queue = job_queue_;
433 }
434
435 worker->set_job_queue(worker_queue);
436 worker->set_context(context_);
437 worker->set_metrics(metrics_service_->get_basic_metrics());
438 worker->set_diagnostics(&diagnostics());
439 worker->set_diagnostics_sample_rate(diagnostics().get_config().event_sample_rate);
440
441 // Acquire lock before checking start_pool_ and adding worker
442 // This prevents race condition with stop():
443 // - stop() acquires workers_mutex_ after atomically setting start_pool_ to false
444 // - If we check start_pool_ while holding the lock, we ensure consistent state
445 std::scoped_lock<std::mutex> lock(workers_mutex_);
446
447 // Use memory_order_acquire to ensure we see all previous modifications
448 // made by the thread that set start_pool_ to true (in start())
449 bool is_running = start_pool_.load(std::memory_order_acquire);
450
451 // Add worker to vector first, before starting
452 // This ensures stop() will see and stop this worker if called concurrently
453 workers_.emplace_back(std::move(worker));
454
455 // Only start the worker if pool is running
456 // Since we hold workers_mutex_, stop() cannot proceed until we release it
457 if (is_running) {
458 auto start_result = workers_.back()->start();
459 if (start_result.is_err()) {
460 // Remove the worker we just added since it failed to start
461 workers_.pop_back();
462 return start_result.error();
463 }
464 }
465
466 return common::ok();
467}
468
469auto thread_pool::enqueue_batch(std::vector<std::unique_ptr<thread_worker>>&& workers)
470 -> common::VoidResult {
471 if (workers.empty()) {
472 return common::error_info{static_cast<int>(error_code::invalid_argument), "workers are empty", "thread_system"};
473 }
474
475 if (job_queue_ == nullptr) {
476 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), "job queue is null", "thread_system"};
477 }
478
479 // Acquire lock before processing workers
480 // This ensures atomic check-and-add operation with respect to stop()
481 std::scoped_lock<std::mutex> lock(workers_mutex_);
482
483 // Check pool running state once with acquire semantics
484 bool is_running = start_pool_.load(std::memory_order_acquire);
485
486 // Track the starting index for rollback in case of error
487 std::size_t start_index = workers_.size();
488
489 // Get diagnostics pointer and config once outside the loop for efficiency
490 auto* diag = &diagnostics();
491 const auto sample_rate = diag->get_config().event_sample_rate;
492
493 for (auto& worker : workers) {
494 worker->set_job_queue(job_queue_);
495 worker->set_context(context_);
496 worker->set_metrics(metrics_service_->get_basic_metrics());
497 worker->set_diagnostics(diag);
498 worker->set_diagnostics_sample_rate(sample_rate);
499
500 // Add worker to vector first
501 workers_.emplace_back(std::move(worker));
502
503 // Only start if pool is running
504 if (is_running) {
505 auto start_result = workers_.back()->start();
506 if (start_result.is_err()) {
507 // Rollback: remove all workers added in this batch
508 workers_.erase(workers_.begin() + static_cast<std::ptrdiff_t>(start_index),
509 workers_.end());
510 return start_result.error();
511 }
512 }
513 }
514
515 return common::ok();
516}
517
518auto thread_pool::stop(const bool& immediately_stop) -> common::VoidResult {
519 // Use compare_exchange_strong to atomically check and set state
520 // This prevents TOCTOU (Time-Of-Check-Time-Of-Use) race conditions
521 // where multiple threads might call stop() simultaneously
522 bool expected = true;
523 if (!start_pool_.compare_exchange_strong(expected, false, std::memory_order_acq_rel,
524 std::memory_order_acquire)) {
525 // Pool is already stopped or being stopped by another thread
526 return common::ok();
527 }
528
529 // At this point, we've atomically transitioned from running to stopped
530 // and only this thread will execute the shutdown sequence
531
532 // Cancel pool-level token to propagate cancellation to all workers and jobs
533 // This triggers hierarchical cancellation:
534 // 1. Pool token cancelled → linked worker tokens cancelled
535 // 2. Worker tokens cancelled → running jobs receive cancellation signal
536 pool_cancellation_token_.cancel();
537
538 // Stop the queue (supports both queue_adapter_ and job_queue_)
539 if (queue_adapter_) {
540 queue_adapter_->stop();
541 if (immediately_stop) {
542 queue_adapter_->clear();
543 }
544 } else if (job_queue_ != nullptr) {
545 job_queue_->stop();
546 if (immediately_stop) {
547 job_queue_->clear();
548 }
549 }
550
551 // Stop workers while holding lock to ensure consistent iteration
552 // This is safe because worker->stop() only signals and joins threads,
553 // it does not call back into thread_pool methods
554 std::scoped_lock<std::mutex> lock(workers_mutex_);
555 for (auto& worker : workers_) {
556 auto stop_result = worker->stop();
557 if (stop_result.is_err()) {
558 context_.log(common::interfaces::log_level::error, formatter::format("error stopping worker: {}",
559 stop_result.error().message));
560 }
561 }
562
563 return common::ok();
564}
565
584auto thread_pool::stop_unsafe() noexcept -> common::VoidResult {
585 // Use compare_exchange_strong to atomically check and set state
586 // Same atomic transition as stop() to prevent race conditions
587 bool expected = true;
588 if (!start_pool_.compare_exchange_strong(expected, false, std::memory_order_acq_rel,
589 std::memory_order_acquire)) {
590 // Pool is already stopped or being stopped by another thread
591 return common::ok();
592 }
593
594 // Cancel pool-level token to propagate cancellation to all workers and jobs
596
597 // Stop the queue (supports both queue_adapter_ and job_queue_)
598 if (queue_adapter_) {
599 queue_adapter_->stop();
600 } else if (job_queue_ != nullptr) {
601 job_queue_->stop();
602 }
603
604 // Stop workers while holding lock to ensure consistent iteration
605 // No logging to avoid accessing potentially destroyed singletons
606 std::scoped_lock<std::mutex> lock(workers_mutex_);
607 for (auto& worker : workers_) {
608 // Stop worker without checking result to avoid any potential exceptions
609 // during static destruction
610 worker->stop();
611 }
612
613 return common::ok();
614}
615
616auto thread_pool::to_string(void) const -> std::string {
617 std::string format_string;
618
619 // Use relaxed memory order for diagnostic/logging purposes
620 // Exact state ordering is not critical for debug output
621 formatter::format_to(std::back_inserter(format_string), "{} is {},\n", thread_title_,
622 start_pool_.load(std::memory_order_relaxed) ? "running" : "stopped");
623
624 // Get queue string representation (supports both queue_adapter_ and job_queue_)
625 std::string queue_str;
626 if (queue_adapter_) {
627 queue_str = queue_adapter_->to_string();
628 } else if (job_queue_ != nullptr) {
629 queue_str = job_queue_->to_string();
630 } else {
631 queue_str = "nullptr";
632 }
633 formatter::format_to(std::back_inserter(format_string), "\tjob_queue: {}\n\n", queue_str);
634
635 // Protect workers_ access with lock
636 std::scoped_lock<std::mutex> lock(workers_mutex_);
637 formatter::format_to(std::back_inserter(format_string), "\tworkers: {}\n", workers_.size());
638 for (const auto& worker : workers_) {
639 formatter::format_to(std::back_inserter(format_string), "\t{}\n", worker->to_string());
640 }
641
642 return format_string;
643}
644
645auto thread_pool::get_context(void) const -> const thread_context& {
646 return context_;
647}
648
650 return pool_instance_id_;
651}
652
654 if (!context_.monitoring()) {
655 return;
656 }
657
658 common::interfaces::thread_pool_metrics metrics(thread_title_, pool_instance_id_);
659
660 // Protect workers_ access with lock
661 {
662 std::scoped_lock<std::mutex> lock(workers_mutex_);
663 metrics.worker_threads.value = static_cast<double>(workers_.size());
664 }
665
666 metrics.idle_threads.value = static_cast<double>(get_idle_worker_count());
667
668 // Get pending jobs count (supports both queue_adapter_ and job_queue_)
669 if (queue_adapter_) {
670 metrics.jobs_pending.value = static_cast<double>(queue_adapter_->size());
671 } else if (job_queue_) {
672 metrics.jobs_pending.value = static_cast<double>(job_queue_->size());
673 }
674
675 // Report metrics with pool identification
677}
678
680 // Count idle workers by checking each worker's idle state
681 // Thread safety: workers_mutex_ protects access to workers_ vector
682 std::scoped_lock<std::mutex> lock(workers_mutex_);
683
684 return static_cast<std::size_t>(std::count_if(
685 workers_.begin(), workers_.end(),
686 [](const std::unique_ptr<thread_worker>& worker) { return worker && worker->is_idle(); }));
687}
688
689auto thread_pool::is_running() const -> bool {
690 // Use acquire to ensure we see the latest pool state
691 // This is important for callers making decisions based on running state
692 return start_pool_.load(std::memory_order_acquire);
693}
694
696 // Supports both queue_adapter_ and job_queue_
697 if (queue_adapter_) {
698 return queue_adapter_->size();
699 }
700 if (job_queue_) {
701 return job_queue_->size();
702 }
703 return 0;
704}
705
706auto thread_pool::check_worker_health(bool restart_failed) -> std::size_t {
707 std::scoped_lock<std::mutex> lock(workers_mutex_);
708
709 std::size_t failed_count = 0;
710
711 // Remove dead workers using erase-remove idiom
712 auto remove_iter =
713 std::remove_if(workers_.begin(), workers_.end(),
714 [&failed_count](const std::unique_ptr<thread_worker>& worker) {
715 if (!worker || !worker->is_running()) {
716 ++failed_count;
717 return true; // Remove this worker
718 }
719 return false; // Keep this worker
720 });
721
722 workers_.erase(remove_iter, workers_.end());
723
724 // Restart workers if requested and pool is running
725 if (restart_failed && failed_count > 0 && is_running()) {
726 // Get diagnostics pointer and config for new workers
727 auto* diag = &diagnostics();
728 const auto sample_rate = diag->get_config().event_sample_rate;
729
730 // Create new workers to replace failed ones
731 for (std::size_t i = 0; i < failed_count; ++i) {
732 // Create worker with default settings and context
733 auto worker = std::make_unique<thread_worker>(true, context_);
734
735 // Set job queue and diagnostics
736 worker->set_job_queue(job_queue_);
737 worker->set_metrics(metrics_service_->get_basic_metrics());
738 worker->set_diagnostics(diag);
739 worker->set_diagnostics_sample_rate(sample_rate);
740
741 // Start the new worker
742 auto start_result = worker->start();
743 if (start_result.is_err()) {
744 // Failed to start, skip this worker
745 continue;
746 }
747
748 workers_.push_back(std::move(worker));
749 }
750 }
751
752 return failed_count;
753}
754
755auto thread_pool::get_active_worker_count() const -> std::size_t {
756 std::scoped_lock<std::mutex> lock(workers_mutex_);
757
758 return static_cast<std::size_t>(std::count_if(workers_.begin(), workers_.end(),
759 [](const std::unique_ptr<thread_worker>& worker) {
760 return worker && worker->is_running();
761 }));
762}
763
764// ============================================================================
765// Diagnostics
766// ============================================================================
767
768auto thread_pool::diagnostics() -> diagnostics::thread_pool_diagnostics& {
769 std::call_once(diagnostics_init_flag_, [this]() {
770 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(*this);
771 });
772 return *diagnostics_;
773}
774
775auto thread_pool::diagnostics() const -> const diagnostics::thread_pool_diagnostics& {
776 std::call_once(diagnostics_init_flag_, [this]() {
777 diagnostics_ = std::make_unique<diagnostics::thread_pool_diagnostics>(
778 const_cast<thread_pool&>(*this));
779 });
780 return *diagnostics_;
781}
782
783auto thread_pool::collect_worker_diagnostics() const
784 -> std::vector<diagnostics::thread_info> {
785 std::scoped_lock<std::mutex> lock(workers_mutex_);
786
787 std::vector<diagnostics::thread_info> result;
788 result.reserve(workers_.size());
789
790 for (std::size_t i = 0; i < workers_.size(); ++i) {
791 const auto& worker = workers_[i];
792 if (!worker) {
793 continue;
794 }
795
797 info.thread_id = worker->get_thread_id();
798 info.thread_name = "Worker-" + std::to_string(i);
799 info.worker_id = worker->get_worker_id();
800
801 // Determine worker state
802 if (!worker->is_running()) {
803 info.state = diagnostics::worker_state::stopped;
804 } else if (worker->is_idle()) {
805 info.state = diagnostics::worker_state::idle;
806 } else {
807 info.state = diagnostics::worker_state::active;
808 }
809
810 info.state_since = worker->get_state_since();
811
812 // Get current job info if active
813 info.current_job = worker->get_current_job_info();
814
815 // Get statistics
816 info.jobs_completed = worker->get_jobs_completed();
817 info.jobs_failed = worker->get_jobs_failed();
818 info.total_busy_time = worker->get_total_busy_time();
819 info.total_idle_time = worker->get_total_idle_time();
820
821 // Calculate utilization
822 info.update_utilization();
823
824 result.push_back(std::move(info));
825 }
826
827 return result;
828}
829
830// ============================================================================
831// Pool Policies
832// ============================================================================
833
834void thread_pool::add_policy(std::unique_ptr<pool_policy> policy) {
835 if (!policy) {
836 return;
837 }
838
839 std::scoped_lock<std::mutex> lock(policies_mutex_);
840 policies_.push_back(std::move(policy));
841}
842
843auto thread_pool::get_policies() const -> const std::vector<std::unique_ptr<pool_policy>>& {
844 return policies_;
845}
846
847auto thread_pool::remove_policy(const std::string& name) -> bool {
848 std::scoped_lock<std::mutex> lock(policies_mutex_);
849
850 auto it = std::remove_if(policies_.begin(), policies_.end(),
851 [&name](const std::unique_ptr<pool_policy>& policy) {
852 return policy && policy->get_name() == name;
853 });
854
855 if (it != policies_.end()) {
856 policies_.erase(it, policies_.end());
857 return true;
858 }
859
860 return false;
861}
862
863// ============================================================================
864// Internal Methods (for friend classes)
865// ============================================================================
866
867auto thread_pool::remove_workers_internal(std::size_t count, std::size_t min_workers)
868 -> common::VoidResult
869{
870 if (count == 0) {
871 return common::ok();
872 }
873
874 std::scoped_lock<std::mutex> lock(workers_mutex_);
875
876 if (workers_.size() <= min_workers) {
877 return common::error_info{
878 static_cast<int>(error_code::invalid_argument),
879 "Cannot remove workers: already at minimum",
880 "thread_system"
881 };
882 }
883
884 // Calculate how many we can actually remove
885 std::size_t max_removable = workers_.size() - min_workers;
886 count = std::min(count, max_removable);
887
888 std::size_t removed = 0;
889
890 // First pass: remove idle workers
891 auto it = workers_.begin();
892 while (it != workers_.end() && removed < count) {
893 if (*it && (*it)->is_idle()) {
894 // Stop the worker
895 (*it)->stop();
896 it = workers_.erase(it);
897 ++removed;
898 } else {
899 ++it;
900 }
901 }
902
903 // If we still need to remove more, wait briefly for workers to become idle
904 if (removed < count) {
905 // Just return success with what we removed
906 // Remaining workers will be removed on subsequent calls
907 context_.log(common::interfaces::log_level::info,
908 formatter::format("Removed {} of {} requested workers (remaining are busy)",
909 removed, count));
910 }
911
912 return common::ok();
913}
914
915} // namespace kcenon::thread
Provides a mechanism for cooperative cancellation of operations.
void cancel()
Cancels the operation.
static cancellation_token create()
Creates a new cancellation token.
Comprehensive diagnostics API for thread pool monitoring.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Enhanced thread pool metrics with histograms and percentiles.
Lightweight metrics container shared between thread_pool and workers.
void reset() override
Reset all metrics to their initial state.
Base interface for thread pool policies.
Definition pool_policy.h:81
A template class representing either a value or an error.
Context object that provides access to optional services.
std::shared_ptr< IMonitor > monitoring() const
Get the monitoring service.
void update_thread_pool_metrics(const common::interfaces::thread_pool_metrics &metrics) const
Update thread pool metrics if monitoring is available.
static bool is_shutting_down()
Check if shutdown is in progress.
A thread pool for concurrent execution of jobs using multiple worker threads.
static std::atomic< std::uint32_t > next_pool_instance_id_
Static counter for generating unique pool instance IDs.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
virtual ~thread_pool(void)
Virtual destructor. Cleans up resources used by the thread pool.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
void report_metrics()
Collect and report current thread pool metrics.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
thread_pool(const std::string &thread_title="thread_pool", const thread_context &context=thread_context())
Constructs a new thread_pool instance.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
std::unique_ptr< pool_queue_adapter_interface > queue_adapter_
Queue adapter for unified access to different queue types.
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
std::shared_ptr< metrics::metrics_service > metrics_service_
Centralized metrics service for all pool and worker metrics.
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
void reset_metrics()
Reset accumulated metrics.
const metrics::EnhancedThreadPoolMetrics & enhanced_metrics() const
Access enhanced metrics (read-only reference).
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.
auto is_running() const -> bool
Check if the thread pool is currently running.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
std::string thread_title_
A title or name for this thread pool, useful for identification and logging.
std::uint32_t get_pool_instance_id() const
Get the pool instance id.
auto stop_unsafe() noexcept -> common::VoidResult
Stops the thread pool without logging (for use during static destruction).
void set_enhanced_metrics_enabled(bool enabled)
Enable or disable enhanced metrics collection.
std::atomic< bool > start_pool_
Indicates whether the pool is currently running.
cancellation_token pool_cancellation_token_
Pool-level cancellation token.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
std::shared_ptr< job_queue > job_queue_
The shared job queue where jobs (job objects) are enqueued.
auto check_worker_health(bool restart_failed=true) -> std::size_t
Check health of all worker threads and restart failed workers.
std::uint32_t pool_instance_id_
Unique instance ID for this pool (for multi-pool scenarios).
thread_context context_
The thread context providing access to logging and monitoring services.
auto get_job_queue(void) -> std::shared_ptr< job_queue >
Returns the shared job_queue used by this thread pool.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto get_ptr(void) -> std::shared_ptr< thread_pool >
Retrieves a std::shared_ptr to this thread_pool instance.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.
static auto format_to(OutputIt out, const char *formats, const FormatArgs &... args) -> OutputIt
Formats a narrow-character string directly to an output iterator.
Definition formatter.h:162
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Thread-safe FIFO job queue with optional bounded size.
Adapter bridging job_queue to pool_queue_adapter_interface.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ info
Informational messages highlighting progress.
@ latency
Latency threshold exceeded.
STL namespace.
Information about a worker thread in the pool.
Definition thread_info.h:88
Enhanced snapshot with latency percentiles and throughput.
Internal logging interface for the thread system.
Runtime diagnostics, health monitoring, and execution tracing for thread pools.