25 , tracing_enabled_(config.enable_tracing)
26 , start_time_(
std::chrono::steady_clock::now())
44 std::ostringstream oss;
47 auto now = std::chrono::system_clock::now();
48 auto time_t = std::chrono::system_clock::to_time_t(now);
50 std::size_t worker_count;
60 oss <<
"Time: " << std::put_time(std::gmtime(&time_t),
"%Y-%m-%dT%H:%M:%SZ") <<
"\n";
61 oss <<
"Workers: " << worker_count <<
", Active: " << active_count
62 <<
", Idle: " << idle_count <<
"\n\n";
65 for (
const auto& t : threads)
67 auto state_duration = t.state_duration();
68 auto duration_sec = std::chrono::duration<double>(state_duration).count();
70 oss << t.thread_name <<
" [tid:" << t.thread_id <<
"] "
72 <<
" (" << std::fixed << std::setprecision(1) << duration_sec <<
"s)\n";
74 if (t.current_job.has_value())
76 const auto&
job = t.current_job.value();
77 auto exec_time_ms = std::chrono::duration<double, std::milli>(
78 job.execution_time).count();
79 oss <<
" Current Job: " <<
job.job_name <<
"#" <<
job.job_id
80 <<
" (running " << std::fixed << std::setprecision(0)
81 << exec_time_ms <<
"ms)\n";
84 oss <<
" Jobs: " << t.jobs_completed <<
" completed, "
85 << t.jobs_failed <<
" failed\n";
86 oss <<
" Utilization: " << std::fixed << std::setprecision(1)
87 << (t.utilization * 100.0) <<
"%\n\n";
99 std::vector<job_info>
result;
104 for (
const auto& thread : threads)
106 if (thread.current_job.has_value())
108 result.push_back(thread.current_job.value());
116 -> std::vector<job_info>
119 auto queue = pool_.get_job_queue();
125 return queue->inspect_pending_jobs(limit);
129 -> std::vector<job_info>
131 std::lock_guard<std::mutex> lock(jobs_mutex_);
133 std::vector<job_info>
result;
134 auto count = std::min(limit, recent_jobs_.size());
137 auto it = recent_jobs_.rbegin();
138 for (std::size_t i = 0; i < count && it != recent_jobs_.rend(); ++i, ++it)
167 std::size_t worker_count;
184 auto max_size = queue->get_max_size();
185 if (max_size.has_value() && max_size.value() > 0)
188 static_cast<double>(max_size.value());
195 static_cast<double>(
queue_depth) /
static_cast<double>(worker_count * 10));
200 if (worker_count > 0)
203 static_cast<double>(worker_count);
208 if (!thread_states.empty())
211 double sum_utilization = 0.0;
212 for (
const auto& t : thread_states)
214 sum_utilization += t.utilization;
216 double mean_utilization = sum_utilization /
static_cast<double>(thread_states.size());
219 double variance_sum = 0.0;
220 for (
const auto& t : thread_states)
222 double diff = t.utilization - mean_utilization;
223 variance_sum += diff * diff;
228 if (mean_utilization > 0.0)
235 auto total_jobs = metrics_snap.tasks_executed + metrics_snap.tasks_failed;
239 auto avg_idle_ns = metrics_snap.total_idle_time_ns / total_jobs;
244 double avg_exec_time_ms = 0.0;
245 if (metrics_snap.total_busy_time_ns > 0 && total_jobs > 0)
247 avg_exec_time_ms =
static_cast<double>(metrics_snap.total_busy_time_ns) /
248 static_cast<double>(total_jobs) / 1e6;
252 if (active_count > 0 && avg_exec_time_ms > 0)
255 (
static_cast<double>(
queue_depth) * avg_exec_time_ms) /
256 static_cast<double>(active_count));
258 else if (worker_count > 0 && avg_exec_time_ms > 0)
261 (
static_cast<double>(
queue_depth) * avg_exec_time_ms) /
262 static_cast<double>(worker_count));
275 report.
description =
"Queue is at or near capacity, jobs are being rejected";
282 report.
description =
"Not enough workers to handle the workload";
290 report.
description =
"Workers cannot keep up with job submission rate";
298 report.
description =
"Work is not evenly distributed across workers";
306 report.
description =
"High wait times with low utilization suggests lock contention";
311 auto mem_stats = queue->get_memory_stats();
313 constexpr std::size_t memory_threshold = 100 * 1024 * 1024;
314 if (mem_stats.queue_size_bytes > memory_threshold)
318 report.
description =
"Excessive memory usage in job queue";
336 report.
recommendations.push_back(
"Consider increasing queue capacity");
337 report.
recommendations.push_back(
"Enable backpressure with adaptive policy");
338 report.
recommendations.push_back(
"Add more worker threads if CPU permits");
344 report.
recommendations.push_back(
"Consider job batching for small tasks");
349 report.
recommendations.push_back(
"Consider scaling based on hardware cores");
350 report.
recommendations.push_back(
"Enable autoscaling for dynamic adjustment");
354 report.
recommendations.push_back(
"Enable work stealing if not already");
356 report.
recommendations.push_back(
"Consider using priority-based scheduling");
360 report.
recommendations.push_back(
"Review shared resource access patterns");
361 report.
recommendations.push_back(
"Consider using lock-free data structures");
363 report.
recommendations.push_back(
"Use finer-grained locking strategies");
367 report.
recommendations.push_back(
"Reduce queue capacity or enable backpressure");
369 report.
recommendations.push_back(
"Add more workers to process jobs faster");
370 report.
recommendations.push_back(
"Consider job prioritization to clear backlog");
386 status.
check_time = std::chrono::steady_clock::now();
390 status.
uptime_seconds = std::chrono::duration<double>(uptime).count();
395 metrics_snap.tasks_failed;
399 status.
success_rate =
static_cast<double>(metrics_snap.tasks_executed) /
404 double total_exec_time_ms =
static_cast<double>(metrics_snap.total_busy_time_ns) / 1e6;
421 auto max_size = queue->get_max_size();
422 if (max_size.has_value())
442 std::size_t worker_count;
453 health.
name =
"workers";
463 health.
details[
"total"] = std::to_string(total);
470 health.
message =
"Thread pool is not running";
475 health.
message =
"No workers available";
480 health.
message =
"All workers are busy";
485 health.
message = std::to_string(
idle) +
" workers available";
494 health.
name =
"queue";
497 health.
details[
"depth"] = std::to_string(depth);
501 double saturation = 0.0;
504 auto max_size = queue->get_max_size();
505 if (max_size.has_value() && max_size.value() > 0)
507 health.
details[
"capacity"] = std::to_string(max_size.value());
508 saturation =
static_cast<double>(depth) /
static_cast<double>(max_size.value());
509 health.
details[
"saturation"] = std::format(
"{:.2f}", saturation);
515 std::uint64_t rejected = 0;
516 health.
details[
"rejected"] = std::to_string(rejected);
520 if (saturation >= thresholds.queue_saturation_critical)
523 health.
message =
"Queue at critical capacity";
525 else if (saturation >= thresholds.queue_saturation_warning || rejected > 0)
530 health.
message = std::to_string(rejected) +
" jobs rejected due to backpressure";
534 health.
message =
"Queue saturation above warning threshold";
540 health.
message =
"Queue operational";
550 health.
name =
"metrics";
552 health.
details[
"avg_latency_ms"] = std::format(
"{:.3f}", avg_latency_ms);
553 health.
details[
"success_rate"] = std::format(
"{:.4f}", success_rate);
555 const auto& thresholds = config_.health_thresholds_config;
558 if (success_rate < thresholds.unhealthy_success_rate)
561 health.
message =
"Success rate critically low: " +
562 std::format(
"{:.1f}%", success_rate * 100.0);
564 else if (success_rate < thresholds.min_success_rate)
567 health.
message =
"Success rate below threshold: " +
568 std::format(
"{:.1f}%", success_rate * 100.0);
571 else if (avg_latency_ms > thresholds.degraded_latency_ms)
574 health.
message =
"High average latency: " +
575 std::format(
"{:.2f}ms", avg_latency_ms);
577 else if (avg_latency_ms > thresholds.max_healthy_latency_ms)
580 health.
message =
"Elevated latency: " +
581 std::format(
"{:.2f}ms", avg_latency_ms);
586 health.
message =
"Performance metrics within normal range";
621 std::shared_ptr<execution_event_listener> listener)
623 if (!listener)
return;
630 std::shared_ptr<execution_event_listener> listener)
632 if (!listener)
return;
665 std::vector<std::shared_ptr<execution_event_listener>> listeners_copy;
671 for (
const auto& listener : listeners_copy)
675 listener->on_event(event);
681 -> std::vector<job_execution_event>
683 std::lock_guard<std::mutex> lock(events_mutex_);
685 std::vector<job_execution_event>
result;
686 auto count = std::min(limit, event_history_.size());
689 auto it = event_history_.rbegin();
690 for (std::size_t i = 0; i < count && it != event_history_.rend(); ++i, ++it)
704 std::ostringstream oss;
709 oss <<
" \"health\": {\n";
711 oss <<
" \"message\": \"" << health.status_message <<
"\",\n";
712 oss <<
" \"uptime_seconds\": " << std::fixed << std::setprecision(2)
713 << health.uptime_seconds <<
",\n";
714 oss <<
" \"total_jobs_processed\": " << health.total_jobs_processed <<
",\n";
715 oss <<
" \"success_rate\": " << std::fixed << std::setprecision(4)
716 << health.success_rate <<
"\n";
720 oss <<
" \"workers\": {\n";
721 oss <<
" \"total\": " << health.total_workers <<
",\n";
722 oss <<
" \"active\": " << health.active_workers <<
",\n";
723 oss <<
" \"idle\": " << (health.total_workers - health.active_workers) <<
"\n";
727 oss <<
" \"queue\": {\n";
728 oss <<
" \"depth\": " << health.queue_depth <<
"\n";
733 oss <<
" \"bottleneck\": {\n";
734 oss <<
" \"detected\": " << (bottleneck.has_bottleneck ?
"true" :
"false") <<
",\n";
736 oss <<
" \"severity\": \"" << bottleneck.severity_string() <<
"\"\n";
774 info.thread_name =
"Worker-" + std::to_string(index);
776 info.state_since = std::chrono::steady_clock::now();
auto check_queue_health() const -> component_health
Checks queue component health.
void generate_recommendations(bottleneck_report &report) const
Generates recommendations for a bottleneck.
auto get_active_jobs() const -> std::vector< job_info >
Gets currently executing jobs.
void add_event_listener(std::shared_ptr< execution_event_listener > listener)
Adds an event listener.
auto get_worker_info(const thread_worker &worker, std::size_t index) const -> thread_info
Gets thread info for a single worker.
auto to_json() const -> std::string
Exports diagnostics as JSON.
auto get_recent_jobs(std::size_t limit=100) const -> std::vector< job_info >
Gets recent completed/failed jobs.
diagnostics_config config_
Configuration for diagnostics.
~thread_pool_diagnostics()
Destructor.
auto to_prometheus() const -> std::string
Exports diagnostics as Prometheus-compatible metrics.
std::mutex listeners_mutex_
Mutex for event listeners.
std::vector< std::shared_ptr< execution_event_listener > > listeners_
Event listeners.
void record_job_completion(const job_info &info)
Records a job completion for history tracking.
thread_pool & pool_
Reference to the monitored thread pool.
auto detect_bottlenecks() const -> bottleneck_report
Analyzes for bottlenecks.
auto is_tracing_enabled() const -> bool
Checks if tracing is enabled.
auto check_metrics_health(double avg_latency_ms, double success_rate) const -> component_health
Checks metrics component health.
auto get_pending_jobs(std::size_t limit=100) const -> std::vector< job_info >
Gets pending jobs in queue.
auto get_config() const -> diagnostics_config
Gets the current configuration.
auto get_recent_events(std::size_t limit=100) const -> std::vector< job_execution_event >
Gets recent execution events.
std::mutex jobs_mutex_
Mutex for recent jobs access.
std::chrono::steady_clock::time_point start_time_
Time when the pool was started.
auto format_thread_dump() const -> std::string
Gets formatted thread dump (human-readable).
thread_pool_diagnostics(thread_pool &pool, const diagnostics_config &config={})
Constructs diagnostics for a thread pool.
void notify_listeners(const job_execution_event &event)
Notifies all event listeners.
auto to_string() const -> std::string
Exports diagnostics as formatted string.
void remove_event_listener(std::shared_ptr< execution_event_listener > listener)
Removes an event listener.
void enable_tracing(bool enable, std::size_t history_size=1000)
Enables or disables job execution tracing.
auto health_check() const -> health_status
Performs comprehensive health check.
std::atomic< bool > tracing_enabled_
Whether event tracing is enabled.
void record_event(const job_execution_event &event)
Records a job execution event.
auto is_healthy() const -> bool
Quick check if pool is healthy.
std::deque< job_execution_event > event_history_
Ring buffer for event history.
std::mutex events_mutex_
Mutex for event history access.
auto dump_thread_states() const -> std::vector< thread_info >
Gets current state of all worker threads.
void set_config(const diagnostics_config &config)
Updates the configuration.
auto check_worker_health() const -> component_health
Checks worker component health.
std::deque< job_info > recent_jobs_
Ring buffer for recent job completions.
Represents a unit of work (task) to be executed, typically by a job queue.
Snapshot snapshot() const
Get a snapshot of all metrics.
A template class representing either a value or an error.
A thread pool for concurrent execution of jobs using multiple worker threads.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
std::vector< std::unique_ptr< thread_worker > > workers_
A collection of worker threads associated with this pool.
auto collect_worker_diagnostics() const -> std::vector< diagnostics::thread_info >
Collects diagnostics information from all workers.
auto is_running() const -> bool
Check if the thread pool is currently running.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
auto get_job_queue(void) -> std::shared_ptr< job_queue >
Returns the shared job_queue used by this thread pool.
std::mutex workers_mutex_
Mutex protecting concurrent access to the workers_ vector.
A specialized worker thread that processes jobs from a job_queue.
Core thread pool implementation with work stealing and auto-scaling.
@ active
Worker is executing a job.
@ idle
Worker is waiting for jobs.
@ healthy
Component is fully operational.
@ degraded
Component is operational but with reduced capacity/performance.
@ unhealthy
Component is not operational or failing.
@ slow_consumer
Workers can't keep up with job submission rate.
@ lock_contention
High mutex wait times affecting throughput.
@ worker_starvation
Not enough workers for the workload.
@ none
No bottleneck detected.
@ queue_full
Queue is at capacity.
@ memory_pressure
Excessive memory allocations causing slowdown.
@ uneven_distribution
Work is not evenly distributed (work stealing needed)
auto bottleneck_type_to_string(bottleneck_type type) -> std::string
Converts bottleneck_type to human-readable string.
auto worker_state_to_string(worker_state state) -> std::string
Converts worker_state to human-readable string.
auto health_state_to_string(health_state state) -> std::string
Converts health_state to human-readable string.
@ info
Informational messages highlighting progress.
@ queue_depth
Queue depth threshold exceeded.
Analysis report of bottlenecks in the thread pool.
bottleneck_type type
Type of bottleneck detected.
double avg_wait_time_ms
Average wait time in milliseconds.
std::vector< std::string > recommendations
Actionable recommendations to resolve the bottleneck.
std::string description
Human-readable description of the bottleneck.
double utilization_variance
Variance in worker utilization.
bool has_bottleneck
Whether a bottleneck was detected.
std::uint64_t jobs_rejected
Jobs rejected due to queue full.
std::size_t estimated_backlog_time_ms
Estimated time to process the current backlog.
std::size_t queue_depth
Current queue depth.
double queue_saturation
Queue saturation level.
std::size_t idle_workers
Number of idle workers.
std::size_t total_workers
Total number of workers.
double worker_utilization
Average worker utilization.
Health status of a single component.
std::string name
Name of the component (e.g., "workers", "queue", "metrics").
health_state state
Current health state of this component.
std::string message
Human-readable message describing the current state.
std::map< std::string, std::string > details
Additional details about this component's health.
Configuration options for thread pool diagnostics.
health_thresholds health_thresholds_config
Configurable thresholds for health status determination.
std::size_t recent_jobs_capacity
Maximum number of recent jobs to track.
double wait_time_threshold_ms
Wait time threshold (ms) for slow consumer detection.
bool enable_tracing
Enable automatic event tracing.
std::size_t event_history_size
Maximum number of events to retain in history.
double utilization_high_threshold
Worker utilization threshold for bottleneck detection.
Comprehensive health status of the thread pool.
double success_rate
Job success rate (0.0 to 1.0).
std::size_t active_workers
Number of active workers.
std::vector< component_health > components
Health status of individual components.
std::size_t total_workers
Total number of workers.
std::uint64_t total_jobs_processed
Total number of jobs processed since startup.
std::size_t queue_capacity
Queue capacity (if bounded).
double uptime_seconds
Time since the thread pool was started (seconds).
auto calculate_overall_status() -> void
Calculates overall status from component states.
std::chrono::steady_clock::time_point check_time
Time when this health check was performed.
std::size_t queue_depth
Current queue depth.
double avg_latency_ms
Average job latency in milliseconds.
Event data for job execution tracing.
Information about a job in the thread pool.
Information about a worker thread in the pool.
std::size_t worker_id
Worker ID within the pool.
Runtime diagnostics, health monitoring, and execution tracing for thread pools.
Specialized worker thread that processes jobs from a job_queue.