Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_pool_diagnostics.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
8
9#include <algorithm>
10#include <cmath>
11#include <format>
12#include <iomanip>
13#include <sstream>
14
16{
17 // =========================================================================
18 // Constructor / Destructor
19 // =========================================================================
20
22 const diagnostics_config& config)
23 : pool_(pool)
24 , config_(config)
25 , tracing_enabled_(config.enable_tracing)
26 , start_time_(std::chrono::steady_clock::now())
27 {
28 }
29
31
32 // =========================================================================
33 // Thread Dump
34 // =========================================================================
35
37 {
38 // Delegate to thread_pool's collect_worker_diagnostics for actual worker info
40 }
41
43 {
44 std::ostringstream oss;
45
46 auto threads = dump_thread_states();
47 auto now = std::chrono::system_clock::now();
48 auto time_t = std::chrono::system_clock::to_time_t(now);
49
50 std::size_t worker_count;
51 {
52 std::scoped_lock<std::mutex> lock(pool_.workers_mutex_);
53 worker_count = pool_.workers_.size();
54 }
55 auto active_count = pool_.get_active_worker_count();
56 auto idle_count = pool_.get_idle_worker_count();
57
58 // Header
59 oss << "=== Thread Pool Dump: " << pool_.to_string() << " ===\n";
60 oss << "Time: " << std::put_time(std::gmtime(&time_t), "%Y-%m-%dT%H:%M:%SZ") << "\n";
61 oss << "Workers: " << worker_count << ", Active: " << active_count
62 << ", Idle: " << idle_count << "\n\n";
63
64 // Worker details
65 for (const auto& t : threads)
66 {
67 auto state_duration = t.state_duration();
68 auto duration_sec = std::chrono::duration<double>(state_duration).count();
69
70 oss << t.thread_name << " [tid:" << t.thread_id << "] "
71 << worker_state_to_string(t.state)
72 << " (" << std::fixed << std::setprecision(1) << duration_sec << "s)\n";
73
74 if (t.current_job.has_value())
75 {
76 const auto& job = t.current_job.value();
77 auto exec_time_ms = std::chrono::duration<double, std::milli>(
78 job.execution_time).count();
79 oss << " Current Job: " << job.job_name << "#" << job.job_id
80 << " (running " << std::fixed << std::setprecision(0)
81 << exec_time_ms << "ms)\n";
82 }
83
84 oss << " Jobs: " << t.jobs_completed << " completed, "
85 << t.jobs_failed << " failed\n";
86 oss << " Utilization: " << std::fixed << std::setprecision(1)
87 << (t.utilization * 100.0) << "%\n\n";
88 }
89
90 return oss.str();
91 }
92
93 // =========================================================================
94 // Job Inspection
95 // =========================================================================
96
98 {
99 std::vector<job_info> result;
100
101 // Get thread states which include current job info
102 auto threads = dump_thread_states();
103
104 for (const auto& thread : threads)
105 {
106 if (thread.current_job.has_value())
107 {
108 result.push_back(thread.current_job.value());
109 }
110 }
111
112 return result;
113 }
114
115 auto thread_pool_diagnostics::get_pending_jobs(std::size_t limit) const
116 -> std::vector<job_info>
117 {
118 // Delegate to job_queue's inspect_pending_jobs
119 auto queue = pool_.get_job_queue();
120 if (!queue)
121 {
122 return {};
123 }
124
125 return queue->inspect_pending_jobs(limit);
126 }
127
128 auto thread_pool_diagnostics::get_recent_jobs(std::size_t limit) const
129 -> std::vector<job_info>
130 {
131 std::lock_guard<std::mutex> lock(jobs_mutex_);
132
133 std::vector<job_info> result;
134 auto count = std::min(limit, recent_jobs_.size());
135 result.reserve(count);
136
137 auto it = recent_jobs_.rbegin();
138 for (std::size_t i = 0; i < count && it != recent_jobs_.rend(); ++i, ++it)
139 {
140 result.push_back(*it);
141 }
142
143 return result;
144 }
145
147 {
148 std::lock_guard<std::mutex> lock(jobs_mutex_);
149
150 recent_jobs_.push_back(info);
152 {
153 recent_jobs_.pop_front();
154 }
155 }
156
157 // =========================================================================
158 // Bottleneck Detection
159 // =========================================================================
160
162 {
163 bottleneck_report report;
164
165 // Gather metrics
166 auto metrics_snap = pool_.metrics().snapshot();
167 std::size_t worker_count;
168 {
169 std::scoped_lock<std::mutex> lock(pool_.workers_mutex_);
170 worker_count = pool_.workers_.size();
171 }
172 auto active_count = pool_.get_active_worker_count();
173 auto idle_count = pool_.get_idle_worker_count();
175
176 report.queue_depth = queue_depth;
177 report.idle_workers = idle_count;
178 report.total_workers = worker_count;
179
180 // Calculate queue saturation
181 auto queue = pool_.get_job_queue();
182 if (queue)
183 {
184 auto max_size = queue->get_max_size();
185 if (max_size.has_value() && max_size.value() > 0)
186 {
187 report.queue_saturation = static_cast<double>(queue_depth) /
188 static_cast<double>(max_size.value());
189 }
190 else if (queue_depth > 0)
191 {
192 // For unbounded queues, use heuristic: saturation based on queue depth vs workers
193 // High queue depth relative to workers indicates potential saturation
194 report.queue_saturation = std::min(1.0,
195 static_cast<double>(queue_depth) / static_cast<double>(worker_count * 10));
196 }
197 }
198
199 // Calculate worker utilization (instantaneous)
200 if (worker_count > 0)
201 {
202 report.worker_utilization = static_cast<double>(active_count) /
203 static_cast<double>(worker_count);
204 }
205
206 // Get per-worker utilization for variance calculation
207 auto thread_states = pool_.collect_worker_diagnostics();
208 if (!thread_states.empty())
209 {
210 // Calculate mean utilization from worker stats
211 double sum_utilization = 0.0;
212 for (const auto& t : thread_states)
213 {
214 sum_utilization += t.utilization;
215 }
216 double mean_utilization = sum_utilization / static_cast<double>(thread_states.size());
217
218 // Calculate variance
219 double variance_sum = 0.0;
220 for (const auto& t : thread_states)
221 {
222 double diff = t.utilization - mean_utilization;
223 variance_sum += diff * diff;
224 }
225 report.utilization_variance = variance_sum / static_cast<double>(thread_states.size());
226
227 // Use mean utilization from actual worker stats if available
228 if (mean_utilization > 0.0)
229 {
230 report.worker_utilization = mean_utilization;
231 }
232 }
233
234 // Calculate average wait time from metrics
235 auto total_jobs = metrics_snap.tasks_executed + metrics_snap.tasks_failed;
236 if (total_jobs > 0)
237 {
238 // Estimate wait time from idle time (approximation)
239 auto avg_idle_ns = metrics_snap.total_idle_time_ns / total_jobs;
240 report.avg_wait_time_ms = static_cast<double>(avg_idle_ns) / 1e6;
241
242 // Calculate estimated backlog time
243 // Average execution time per job
244 double avg_exec_time_ms = 0.0;
245 if (metrics_snap.total_busy_time_ns > 0 && total_jobs > 0)
246 {
247 avg_exec_time_ms = static_cast<double>(metrics_snap.total_busy_time_ns) /
248 static_cast<double>(total_jobs) / 1e6;
249 }
250
251 // Estimated time to clear backlog = (queue_depth * avg_exec_time) / active_workers
252 if (active_count > 0 && avg_exec_time_ms > 0)
253 {
254 report.estimated_backlog_time_ms = static_cast<std::size_t>(
255 (static_cast<double>(queue_depth) * avg_exec_time_ms) /
256 static_cast<double>(active_count));
257 }
258 else if (worker_count > 0 && avg_exec_time_ms > 0)
259 {
260 report.estimated_backlog_time_ms = static_cast<std::size_t>(
261 (static_cast<double>(queue_depth) * avg_exec_time_ms) /
262 static_cast<double>(worker_count));
263 }
264 }
265
266 // Jobs rejected tracking not available in basic metrics
267 report.jobs_rejected = 0;
268
269 // Detect bottleneck type (ordered by severity)
270 // 1. Queue full - most critical
271 if (report.queue_saturation > 0.95 || report.jobs_rejected > 0)
272 {
273 report.has_bottleneck = true;
275 report.description = "Queue is at or near capacity, jobs are being rejected";
276 }
277 // 2. Worker starvation - high utilization with growing backlog
278 else if (report.worker_utilization > 0.95 && queue_depth > worker_count * 2)
279 {
280 report.has_bottleneck = true;
282 report.description = "Not enough workers to handle the workload";
283 }
284 // 3. Slow consumer - high wait time with high utilization
287 {
288 report.has_bottleneck = true;
290 report.description = "Workers cannot keep up with job submission rate";
291 }
292 // 4. Uneven distribution - high variance in worker utilization
293 else if (report.utilization_variance > 0.1 && worker_count > 1)
294 {
295 // Variance > 0.1 means standard deviation > ~0.32 which is significant
296 report.has_bottleneck = true;
298 report.description = "Work is not evenly distributed across workers";
299 }
300 // 5. Lock contention - high wait time but low utilization (workers waiting on locks)
301 else if (report.avg_wait_time_ms > config_.wait_time_threshold_ms * 2 &&
302 report.worker_utilization < 0.5 && active_count > 0)
303 {
304 report.has_bottleneck = true;
306 report.description = "High wait times with low utilization suggests lock contention";
307 }
308 // 6. Memory pressure - check queue memory usage
309 else if (queue)
310 {
311 auto mem_stats = queue->get_memory_stats();
312 // Consider memory pressure if queue uses more than 100MB
313 constexpr std::size_t memory_threshold = 100 * 1024 * 1024;
314 if (mem_stats.queue_size_bytes > memory_threshold)
315 {
316 report.has_bottleneck = true;
318 report.description = "Excessive memory usage in job queue";
319 }
320 }
321
322 // Generate recommendations if bottleneck detected
323 if (report.has_bottleneck)
324 {
326 }
327
328 return report;
329 }
330
332 {
333 switch (report.type)
334 {
336 report.recommendations.push_back("Consider increasing queue capacity");
337 report.recommendations.push_back("Enable backpressure with adaptive policy");
338 report.recommendations.push_back("Add more worker threads if CPU permits");
339 break;
340
342 report.recommendations.push_back("Add more worker threads");
343 report.recommendations.push_back("Optimize job execution time");
344 report.recommendations.push_back("Consider job batching for small tasks");
345 break;
346
348 report.recommendations.push_back("Increase worker thread count");
349 report.recommendations.push_back("Consider scaling based on hardware cores");
350 report.recommendations.push_back("Enable autoscaling for dynamic adjustment");
351 break;
352
354 report.recommendations.push_back("Enable work stealing if not already");
355 report.recommendations.push_back("Review job distribution patterns");
356 report.recommendations.push_back("Consider using priority-based scheduling");
357 break;
358
360 report.recommendations.push_back("Review shared resource access patterns");
361 report.recommendations.push_back("Consider using lock-free data structures");
362 report.recommendations.push_back("Reduce critical section scope");
363 report.recommendations.push_back("Use finer-grained locking strategies");
364 break;
365
367 report.recommendations.push_back("Reduce queue capacity or enable backpressure");
368 report.recommendations.push_back("Optimize job object size");
369 report.recommendations.push_back("Add more workers to process jobs faster");
370 report.recommendations.push_back("Consider job prioritization to clear backlog");
371 break;
372
374 default:
375 break;
376 }
377 }
378
379 // =========================================================================
380 // Health Checks
381 // =========================================================================
382
384 {
385 health_status status;
386 status.check_time = std::chrono::steady_clock::now();
387
388 // Calculate uptime
389 auto uptime = status.check_time - start_time_;
390 status.uptime_seconds = std::chrono::duration<double>(uptime).count();
391
392 // Get metrics
393 auto metrics_snap = pool_.metrics().snapshot();
394 status.total_jobs_processed = metrics_snap.tasks_executed +
395 metrics_snap.tasks_failed;
396
397 if (status.total_jobs_processed > 0)
398 {
399 status.success_rate = static_cast<double>(metrics_snap.tasks_executed) /
400 static_cast<double>(status.total_jobs_processed);
401
402 // Calculate average latency (total execution time / total jobs)
403 // busy_time represents total execution time across all workers
404 double total_exec_time_ms = static_cast<double>(metrics_snap.total_busy_time_ns) / 1e6;
405 status.avg_latency_ms = total_exec_time_ms /
406 static_cast<double>(status.total_jobs_processed);
407 }
408
409 // Worker stats
410 {
411 std::scoped_lock<std::mutex> lock(pool_.workers_mutex_);
412 status.total_workers = pool_.workers_.size();
413 }
416
417 // Get queue capacity
418 auto queue = pool_.get_job_queue();
419 if (queue)
420 {
421 auto max_size = queue->get_max_size();
422 if (max_size.has_value())
423 {
424 status.queue_capacity = max_size.value();
425 }
426 }
427
428 // Check components
429 status.components.push_back(check_worker_health());
430 status.components.push_back(check_queue_health());
431 status.components.push_back(check_metrics_health(status.avg_latency_ms,
432 status.success_rate));
433
434 // Calculate overall status
436
437 return status;
438 }
439
441 {
442 std::size_t worker_count;
443 {
444 std::scoped_lock<std::mutex> lock(pool_.workers_mutex_);
445 worker_count = pool_.workers_.size();
446 }
447 return pool_.is_running() && worker_count > 0;
448 }
449
451 {
452 component_health health;
453 health.name = "workers";
454
455 std::size_t total;
456 {
457 std::scoped_lock<std::mutex> lock(pool_.workers_mutex_);
458 total = pool_.workers_.size();
459 }
462
463 health.details["total"] = std::to_string(total);
464 health.details["active"] = std::to_string(active);
465 health.details["idle"] = std::to_string(idle);
466
467 if (!pool_.is_running())
468 {
470 health.message = "Thread pool is not running";
471 }
472 else if (total == 0)
473 {
475 health.message = "No workers available";
476 }
477 else if (active == total)
478 {
480 health.message = "All workers are busy";
481 }
482 else
483 {
485 health.message = std::to_string(idle) + " workers available";
486 }
487
488 return health;
489 }
490
492 {
493 component_health health;
494 health.name = "queue";
495
496 auto depth = pool_.get_pending_task_count();
497 health.details["depth"] = std::to_string(depth);
498
499 // Get queue capacity and calculate saturation
500 auto queue = pool_.get_job_queue();
501 double saturation = 0.0;
502 if (queue)
503 {
504 auto max_size = queue->get_max_size();
505 if (max_size.has_value() && max_size.value() > 0)
506 {
507 health.details["capacity"] = std::to_string(max_size.value());
508 saturation = static_cast<double>(depth) / static_cast<double>(max_size.value());
509 health.details["saturation"] = std::format("{:.2f}", saturation);
510 }
511 }
512
513 // Note: Job rejection tracking requires backpressure queue
514 // For basic queue, assume no rejections
515 std::uint64_t rejected = 0;
516 health.details["rejected"] = std::to_string(rejected);
517
518 const auto& thresholds = config_.health_thresholds_config;
519
520 if (saturation >= thresholds.queue_saturation_critical)
521 {
523 health.message = "Queue at critical capacity";
524 }
525 else if (saturation >= thresholds.queue_saturation_warning || rejected > 0)
526 {
528 if (rejected > 0)
529 {
530 health.message = std::to_string(rejected) + " jobs rejected due to backpressure";
531 }
532 else
533 {
534 health.message = "Queue saturation above warning threshold";
535 }
536 }
537 else
538 {
540 health.message = "Queue operational";
541 }
542
543 return health;
544 }
545
547 double success_rate) const -> component_health
548 {
549 component_health health;
550 health.name = "metrics";
551
552 health.details["avg_latency_ms"] = std::format("{:.3f}", avg_latency_ms);
553 health.details["success_rate"] = std::format("{:.4f}", success_rate);
554
555 const auto& thresholds = config_.health_thresholds_config;
556
557 // Check success rate first (more critical)
558 if (success_rate < thresholds.unhealthy_success_rate)
559 {
561 health.message = "Success rate critically low: " +
562 std::format("{:.1f}%", success_rate * 100.0);
563 }
564 else if (success_rate < thresholds.min_success_rate)
565 {
567 health.message = "Success rate below threshold: " +
568 std::format("{:.1f}%", success_rate * 100.0);
569 }
570 // Check latency
571 else if (avg_latency_ms > thresholds.degraded_latency_ms)
572 {
574 health.message = "High average latency: " +
575 std::format("{:.2f}ms", avg_latency_ms);
576 }
577 else if (avg_latency_ms > thresholds.max_healthy_latency_ms)
578 {
580 health.message = "Elevated latency: " +
581 std::format("{:.2f}ms", avg_latency_ms);
582 }
583 else
584 {
586 health.message = "Performance metrics within normal range";
587 }
588
589 return health;
590 }
591
592 // =========================================================================
593 // Event Tracing
594 // =========================================================================
595
596 void thread_pool_diagnostics::enable_tracing(bool enable, std::size_t history_size)
597 {
598 tracing_enabled_.store(enable, std::memory_order_relaxed);
599
600 if (enable)
601 {
602 std::lock_guard<std::mutex> lock(events_mutex_);
603 // Clear and resize if needed
604 while (event_history_.size() > history_size)
605 {
606 event_history_.pop_front();
607 }
608 }
609
610 // Update config
611 config_.event_history_size = history_size;
612 config_.enable_tracing = enable;
613 }
614
616 {
617 return tracing_enabled_.load(std::memory_order_relaxed);
618 }
619
621 std::shared_ptr<execution_event_listener> listener)
622 {
623 if (!listener) return;
624
625 std::lock_guard<std::mutex> lock(listeners_mutex_);
626 listeners_.push_back(std::move(listener));
627 }
628
630 std::shared_ptr<execution_event_listener> listener)
631 {
632 if (!listener) return;
633
634 std::lock_guard<std::mutex> lock(listeners_mutex_);
635 auto it = std::find(listeners_.begin(), listeners_.end(), listener);
636 if (it != listeners_.end())
637 {
638 listeners_.erase(it);
639 }
640 }
641
643 {
644 if (!tracing_enabled_.load(std::memory_order_relaxed))
645 {
646 return;
647 }
648
649 // Store in history
650 {
651 std::lock_guard<std::mutex> lock(events_mutex_);
652 event_history_.push_back(event);
654 {
655 event_history_.pop_front();
656 }
657 }
658
659 // Notify listeners
660 notify_listeners(event);
661 }
662
664 {
665 std::vector<std::shared_ptr<execution_event_listener>> listeners_copy;
666 {
667 std::lock_guard<std::mutex> lock(listeners_mutex_);
668 listeners_copy = listeners_;
669 }
670
671 for (const auto& listener : listeners_copy)
672 {
673 if (listener)
674 {
675 listener->on_event(event);
676 }
677 }
678 }
679
680 auto thread_pool_diagnostics::get_recent_events(std::size_t limit) const
681 -> std::vector<job_execution_event>
682 {
683 std::lock_guard<std::mutex> lock(events_mutex_);
684
685 std::vector<job_execution_event> result;
686 auto count = std::min(limit, event_history_.size());
687 result.reserve(count);
688
689 auto it = event_history_.rbegin();
690 for (std::size_t i = 0; i < count && it != event_history_.rend(); ++i, ++it)
691 {
692 result.push_back(*it);
693 }
694
695 return result;
696 }
697
698 // =========================================================================
699 // Export
700 // =========================================================================
701
702 auto thread_pool_diagnostics::to_json() const -> std::string
703 {
704 std::ostringstream oss;
705 oss << "{\n";
706
707 // Health status
708 auto health = health_check();
709 oss << " \"health\": {\n";
710 oss << " \"status\": \"" << health_state_to_string(health.overall_status) << "\",\n";
711 oss << " \"message\": \"" << health.status_message << "\",\n";
712 oss << " \"uptime_seconds\": " << std::fixed << std::setprecision(2)
713 << health.uptime_seconds << ",\n";
714 oss << " \"total_jobs_processed\": " << health.total_jobs_processed << ",\n";
715 oss << " \"success_rate\": " << std::fixed << std::setprecision(4)
716 << health.success_rate << "\n";
717 oss << " },\n";
718
719 // Workers
720 oss << " \"workers\": {\n";
721 oss << " \"total\": " << health.total_workers << ",\n";
722 oss << " \"active\": " << health.active_workers << ",\n";
723 oss << " \"idle\": " << (health.total_workers - health.active_workers) << "\n";
724 oss << " },\n";
725
726 // Queue
727 oss << " \"queue\": {\n";
728 oss << " \"depth\": " << health.queue_depth << "\n";
729 oss << " },\n";
730
731 // Bottleneck
732 auto bottleneck = detect_bottlenecks();
733 oss << " \"bottleneck\": {\n";
734 oss << " \"detected\": " << (bottleneck.has_bottleneck ? "true" : "false") << ",\n";
735 oss << " \"type\": \"" << bottleneck_type_to_string(bottleneck.type) << "\",\n";
736 oss << " \"severity\": \"" << bottleneck.severity_string() << "\"\n";
737 oss << " }\n";
738
739 oss << "}";
740 return oss.str();
741 }
742
744 {
745 return format_thread_dump();
746 }
747
749 {
750 auto health = health_check();
751 return health.to_prometheus(pool_.to_string());
752 }
753
754 // =========================================================================
755 // Configuration
756 // =========================================================================
757
759 {
760 return config_;
761 }
762
764 {
765 config_ = config;
766 tracing_enabled_.store(config.enable_tracing, std::memory_order_relaxed);
767 }
768
770 std::size_t index) const -> thread_info
771 {
773 info.worker_id = worker.get_worker_id();
774 info.thread_name = "Worker-" + std::to_string(index);
775 info.state = worker.is_idle() ? worker_state::idle : worker_state::active;
776 info.state_since = std::chrono::steady_clock::now();
777 return info;
778 }
779
780} // namespace kcenon::thread::diagnostics
auto check_queue_health() const -> component_health
Checks queue component health.
void generate_recommendations(bottleneck_report &report) const
Generates recommendations for a bottleneck.
auto get_active_jobs() const -> std::vector< job_info >
Gets currently executing jobs.
void add_event_listener(std::shared_ptr< execution_event_listener > listener)
Adds an event listener.
auto get_worker_info(const thread_worker &worker, std::size_t index) const -> thread_info
Gets thread info for a single worker.
auto to_json() const -> std::string
Exports diagnostics as JSON.
auto get_recent_jobs(std::size_t limit=100) const -> std::vector< job_info >
Gets recent completed/failed jobs.
diagnostics_config config_
Configuration for diagnostics.
auto to_prometheus() const -> std::string
Exports diagnostics as Prometheus-compatible metrics.
std::vector< std::shared_ptr< execution_event_listener > > listeners_
Event listeners.
void record_job_completion(const job_info &info)
Records a job completion for history tracking.
thread_pool & pool_
Reference to the monitored thread pool.
auto detect_bottlenecks() const -> bottleneck_report
Analyzes for bottlenecks.
auto is_tracing_enabled() const -> bool
Checks if tracing is enabled.
auto check_metrics_health(double avg_latency_ms, double success_rate) const -> component_health
Checks metrics component health.
auto get_pending_jobs(std::size_t limit=100) const -> std::vector< job_info >
Gets pending jobs in queue.
auto get_config() const -> diagnostics_config
Gets the current configuration.
auto get_recent_events(std::size_t limit=100) const -> std::vector< job_execution_event >
Gets recent execution events.
std::chrono::steady_clock::time_point start_time_
Time when the pool was started.
auto format_thread_dump() const -> std::string
Gets formatted thread dump (human-readable).
thread_pool_diagnostics(thread_pool &pool, const diagnostics_config &config={})
Constructs diagnostics for a thread pool.
void notify_listeners(const job_execution_event &event)
Notifies all event listeners.
auto to_string() const -> std::string
Exports diagnostics as formatted string.
void remove_event_listener(std::shared_ptr< execution_event_listener > listener)
Removes an event listener.
void enable_tracing(bool enable, std::size_t history_size=1000)
Enables or disables job execution tracing.
auto health_check() const -> health_status
Performs comprehensive health check.
std::atomic< bool > tracing_enabled_
Whether event tracing is enabled.
void record_event(const job_execution_event &event)
Records a job execution event.
auto is_healthy() const -> bool
Quick check if pool is healthy.
std::deque< job_execution_event > event_history_
Ring buffer for event history.
std::mutex events_mutex_
Mutex for event history access.
auto dump_thread_states() const -> std::vector< thread_info >
Gets current state of all worker threads.
void set_config(const diagnostics_config &config)
Updates the configuration.
auto check_worker_health() const -> component_health
Checks worker component health.
std::deque< job_info > recent_jobs_
Ring buffer for recent job completions.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Snapshot snapshot() const
Get a snapshot of all metrics.
A template class representing either a value or an error.
A thread pool for concurrent execution of jobs using multiple worker threads.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
auto collect_worker_diagnostics() const -> std::vector< diagnostics::thread_info >
Collects diagnostics information from all workers.
auto is_running() const -> bool
Check if the thread pool is currently running.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
auto get_job_queue(void) -> std::shared_ptr< job_queue >
Returns the shared job_queue used by this thread pool.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.
A specialized worker thread that processes jobs from a job_queue.
Core thread pool implementation with work stealing and auto-scaling.
@ active
Worker is executing a job.
@ idle
Worker is waiting for jobs.
@ healthy
Component is fully operational.
@ degraded
Component is operational but with reduced capacity/performance.
@ unhealthy
Component is not operational or failing.
@ slow_consumer
Workers can't keep up with job submission rate.
@ lock_contention
High mutex wait times affecting throughput.
@ worker_starvation
Not enough workers for the workload.
@ memory_pressure
Excessive memory allocations causing slowdown.
@ uneven_distribution
Work is not evenly distributed (work stealing needed)
auto bottleneck_type_to_string(bottleneck_type type) -> std::string
Converts bottleneck_type to human-readable string.
auto worker_state_to_string(worker_state state) -> std::string
Converts worker_state to human-readable string.
Definition thread_info.h:47
auto health_state_to_string(health_state state) -> std::string
Converts health_state to human-readable string.
@ info
Informational messages highlighting progress.
@ queue_depth
Queue depth threshold exceeded.
STL namespace.
Analysis report of bottlenecks in the thread pool.
bottleneck_type type
Type of bottleneck detected.
double avg_wait_time_ms
Average wait time in milliseconds.
std::vector< std::string > recommendations
Actionable recommendations to resolve the bottleneck.
std::string description
Human-readable description of the bottleneck.
double utilization_variance
Variance in worker utilization.
bool has_bottleneck
Whether a bottleneck was detected.
std::uint64_t jobs_rejected
Jobs rejected due to queue full.
std::size_t estimated_backlog_time_ms
Estimated time to process the current backlog.
std::size_t idle_workers
Number of idle workers.
std::size_t total_workers
Total number of workers.
double worker_utilization
Average worker utilization.
Health status of a single component.
std::string name
Name of the component (e.g., "workers", "queue", "metrics").
health_state state
Current health state of this component.
std::string message
Human-readable message describing the current state.
std::map< std::string, std::string > details
Additional details about this component's health.
Configuration options for thread pool diagnostics.
health_thresholds health_thresholds_config
Configurable thresholds for health status determination.
std::size_t recent_jobs_capacity
Maximum number of recent jobs to track.
double wait_time_threshold_ms
Wait time threshold (ms) for slow consumer detection.
bool enable_tracing
Enable automatic event tracing.
std::size_t event_history_size
Maximum number of events to retain in history.
double utilization_high_threshold
Worker utilization threshold for bottleneck detection.
Comprehensive health status of the thread pool.
double success_rate
Job success rate (0.0 to 1.0).
std::size_t active_workers
Number of active workers.
std::vector< component_health > components
Health status of individual components.
std::size_t total_workers
Total number of workers.
std::uint64_t total_jobs_processed
Total number of jobs processed since startup.
std::size_t queue_capacity
Queue capacity (if bounded).
double uptime_seconds
Time since the thread pool was started (seconds).
auto calculate_overall_status() -> void
Calculates overall status from component states.
std::chrono::steady_clock::time_point check_time
Time when this health check was performed.
std::size_t queue_depth
Current queue depth.
double avg_latency_ms
Average job latency in milliseconds.
Event data for job execution tracing.
Information about a job in the thread pool.
Definition job_info.h:90
Information about a worker thread in the pool.
Definition thread_info.h:88
std::size_t worker_id
Worker ID within the pool.
Runtime diagnostics, health monitoring, and execution tracing for thread pools.
Specialized worker thread that processes jobs from a job_queue.