Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
enhanced_metrics.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕ðŸŒĨ 🌊
3// See the LICENSE file in the project root for full license information.
4
7
9
11 : throughput_1s_(std::chrono::seconds{1}),
12 throughput_1m_(std::chrono::seconds{60}),
13 per_worker_metrics_(worker_count) {
14 for (std::size_t i = 0; i < worker_count; ++i) {
15 per_worker_metrics_[i].worker_id = i;
16 }
17}
18
20 tasks_submitted_.fetch_add(1, std::memory_order_relaxed);
21}
22
26
28 std::chrono::nanoseconds latency,
29 bool success) {
31
32 if (success) {
33 tasks_executed_.fetch_add(1, std::memory_order_relaxed);
34 } else {
35 tasks_failed_.fetch_add(1, std::memory_order_relaxed);
36 }
37
38 // Update throughput counters
41}
42
43void EnhancedThreadPoolMetrics::record_wait_time(std::chrono::nanoseconds wait) {
44 wait_time_.record(wait);
45}
46
48 current_queue_depth_.store(depth, std::memory_order_relaxed);
49
50 // Update peak
51 auto current_peak = peak_queue_depth_.load(std::memory_order_relaxed);
52 while (depth > current_peak) {
53 if (peak_queue_depth_.compare_exchange_weak(
54 current_peak, depth,
55 std::memory_order_relaxed,
56 std::memory_order_relaxed)) {
57 break;
58 }
59 }
60
61 // Update average calculation
62 queue_depth_sum_.fetch_add(depth, std::memory_order_relaxed);
63 queue_depth_samples_.fetch_add(1, std::memory_order_relaxed);
64}
65
67 std::size_t worker_id,
68 bool busy,
69 std::uint64_t duration_ns) {
70 // Update global totals
71 if (busy) {
72 total_busy_time_ns_.fetch_add(duration_ns, std::memory_order_relaxed);
73 } else {
74 total_idle_time_ns_.fetch_add(duration_ns, std::memory_order_relaxed);
75 }
76
77 // Update per-worker metrics
78 std::lock_guard<std::mutex> lock(workers_mutex_);
79 if (worker_id < per_worker_metrics_.size()) {
80 auto& worker = per_worker_metrics_[worker_id];
81 worker.is_busy = busy;
82 if (busy) {
83 worker.busy_time_ns += duration_ns;
84 } else {
85 worker.idle_time_ns += duration_ns;
86 }
87 if (!busy && duration_ns > 0) {
88 // Task completed
89 worker.tasks_executed++;
90 }
91 }
92}
93
95 active_workers_.store(count, std::memory_order_relaxed);
96}
97
100 snap.snapshot_time = std::chrono::steady_clock::now();
101
102 // Basic counters (from MetricsBase)
105 snap.tasks_failed = tasks_failed();
106
107 // Latency percentiles (convert ns to Ξs)
111
115
119
120 // Throughput
123
124 // Queue health
125 snap.current_queue_depth = current_queue_depth_.load(std::memory_order_relaxed);
126 snap.peak_queue_depth = peak_queue_depth_.load(std::memory_order_relaxed);
127
128 auto samples = queue_depth_samples_.load(std::memory_order_relaxed);
129 if (samples > 0) {
130 snap.avg_queue_depth =
131 static_cast<double>(queue_depth_sum_.load(std::memory_order_relaxed)) /
132 static_cast<double>(samples);
133 }
134
135 // Worker utilization (from MetricsBase)
138 snap.active_workers = active_workers_.load(std::memory_order_relaxed);
139
140 auto total_time = snap.total_busy_time_ns + snap.total_idle_time_ns;
141 if (total_time > 0) {
142 snap.worker_utilization =
143 static_cast<double>(snap.total_busy_time_ns) /
144 static_cast<double>(total_time);
145 }
146
147 // Per-worker utilization
148 {
149 std::lock_guard<std::mutex> lock(workers_mutex_);
150 snap.per_worker_utilization.reserve(per_worker_metrics_.size());
151 for (const auto& worker : per_worker_metrics_) {
152 auto worker_total = worker.busy_time_ns + worker.idle_time_ns;
153 if (worker_total > 0) {
154 snap.per_worker_utilization.push_back(
155 static_cast<double>(worker.busy_time_ns) /
156 static_cast<double>(worker_total));
157 } else {
158 snap.per_worker_utilization.push_back(0.0);
159 }
160 }
161 }
162
163 return snap;
164}
165
169
173
177
178std::vector<WorkerMetrics> EnhancedThreadPoolMetrics::worker_metrics() const {
179 std::lock_guard<std::mutex> lock(workers_mutex_);
180 return per_worker_metrics_;
181}
182
186
190
192 // Reset base class counters first
194
195 // Reset histograms
199
200 // Reset throughput counters
203
204 // Reset queue depth tracking
205 current_queue_depth_.store(0, std::memory_order_relaxed);
206 peak_queue_depth_.store(0, std::memory_order_relaxed);
207 queue_depth_sum_.store(0, std::memory_order_relaxed);
208 queue_depth_samples_.store(0, std::memory_order_relaxed);
209
210 // Reset per-worker metrics
211 {
212 std::lock_guard<std::mutex> lock(workers_mutex_);
213 for (auto& worker : per_worker_metrics_) {
214 worker.tasks_executed = 0;
215 worker.busy_time_ns = 0;
216 worker.idle_time_ns = 0;
217 worker.is_busy = false;
218 }
219 }
220}
221
223 std::lock_guard<std::mutex> lock(workers_mutex_);
224 if (count > per_worker_metrics_.size()) {
225 auto old_size = per_worker_metrics_.size();
226 per_worker_metrics_.resize(count);
227 for (std::size_t i = old_size; i < count; ++i) {
228 per_worker_metrics_[i].worker_id = i;
229 }
230 }
231}
232
234 // Delegate to JsonBackend for consistent output format
235 auto backend = BackendRegistry::instance().get("json");
236 if (backend) {
237 return backend->export_enhanced(snapshot());
238 }
239
240 // Fallback if registry is not initialized (shouldn't happen)
243}
244
246 const std::string& prefix) const {
247 // Create a PrometheusBackend with the specified prefix
248 PrometheusBackend backend;
249 backend.set_prefix(prefix);
250 return backend.export_enhanced(snapshot());
251}
252
253} // namespace kcenon::thread::metrics
static BackendRegistry & instance()
Get the singleton instance.
std::shared_ptr< MetricsBackend > get(const std::string &name) const
Get a backend by name.
const SlidingWindowCounter & throughput_1s() const
Get the 1-second throughput counter (read-only).
const LatencyHistogram & execution_latency() const
Get the execution latency histogram (read-only).
void set_active_workers(std::size_t count)
Set the number of active workers.
void record_execution(std::chrono::nanoseconds latency, bool success)
Record task execution completion.
EnhancedSnapshot snapshot() const
Get a comprehensive snapshot of all metrics.
const SlidingWindowCounter & throughput_1m() const
Get the 1-minute throughput counter (read-only).
std::string to_prometheus(const std::string &prefix="thread_pool") const
Export metrics in Prometheus/OpenMetrics format.
std::vector< WorkerMetrics > worker_metrics() const
Get per-worker metrics.
void record_worker_state(std::size_t worker_id, bool busy, std::uint64_t duration_ns=0)
Update worker state.
void record_wait_time(std::chrono::nanoseconds wait)
Record wait time (time spent in queue).
std::string to_json() const
Export metrics as JSON string.
void reset() override
Reset all metrics to initial state.
const LatencyHistogram & wait_time() const
Get the wait time histogram (read-only).
const LatencyHistogram & enqueue_latency() const
Get the enqueue latency histogram (read-only).
void record_enqueue(std::chrono::nanoseconds latency)
Record enqueue operation latency.
EnhancedThreadPoolMetrics(std::size_t worker_count=0)
Constructs enhanced metrics with the specified worker count.
void update_worker_count(std::size_t count)
Update worker count.
void record_queue_depth(std::size_t depth)
Record current queue depth.
std::string export_enhanced(const EnhancedSnapshot &snapshot) const override
Export enhanced metrics snapshot.
HDR-style histogram for latency distribution with logarithmic buckets.
void record(std::chrono::nanoseconds value)
Record a latency value.
double p50() const
Get the 50th percentile (median).
double p90() const
Get the 90th percentile.
double p99() const
Get the 99th percentile.
void reset()
Reset all buckets and counters to zero.
virtual void set_prefix(const std::string &prefix)
Set metric name prefix.
std::uint64_t total_busy_time_ns() const
Get the total busy time in nanoseconds.
std::atomic< std::uint64_t > tasks_failed_
Counter for failed tasks.
std::uint64_t tasks_executed() const
Get the total number of tasks successfully executed.
std::atomic< std::uint64_t > total_idle_time_ns_
Accumulated idle time in nanoseconds.
std::uint64_t tasks_submitted() const
Get the total number of tasks submitted.
std::uint64_t tasks_failed() const
Get the total number of failed tasks.
std::atomic< std::uint64_t > total_busy_time_ns_
Accumulated busy time in nanoseconds.
std::atomic< std::uint64_t > tasks_submitted_
Counter for submitted tasks.
std::uint64_t total_idle_time_ns() const
Get the total idle time in nanoseconds.
std::atomic< std::uint64_t > tasks_executed_
Counter for successfully executed tasks.
virtual void reset()
Reset all metrics to their initial state.
Prometheus/OpenMetrics format backend.
std::string export_enhanced(const EnhancedSnapshot &snapshot) const override
Export enhanced metrics snapshot.
Sliding window counter for throughput measurement.
void increment(std::size_t count=1)
Increment the counter.
double rate_per_second() const
Get the current rate per second.
Enhanced metrics snapshot with latency percentiles and throughput.
Abstract interface for metrics export backends.
@ latency
Latency threshold exceeded.
@ fallback
Execute fallback job if available.
STL namespace.
Enhanced snapshot with latency percentiles and throughput.
double throughput_1s
Tasks completed per second (1-second window).
std::size_t peak_queue_depth
Peak queue depth since last reset.
double worker_utilization
Overall worker utilization (0.0 - 1.0).
std::size_t active_workers
Number of active workers.
double enqueue_latency_p50_us
Median (P50) enqueue latency in microseconds.
std::uint64_t tasks_failed
Total tasks that failed during execution.
std::uint64_t total_busy_time_ns
Total busy time across all workers in nanoseconds.
std::chrono::steady_clock::time_point snapshot_time
Timestamp when this snapshot was taken.
double enqueue_latency_p90_us
90th percentile enqueue latency in microseconds.
double wait_time_p90_us
90th percentile wait time in microseconds.
double execution_latency_p99_us
99th percentile execution latency in microseconds.
std::uint64_t tasks_executed
Total tasks successfully executed.
std::uint64_t tasks_submitted
Total tasks submitted to the pool.
std::uint64_t total_idle_time_ns
Total idle time across all workers in nanoseconds.
double avg_queue_depth
Average queue depth over the sampling period.
double execution_latency_p50_us
Median execution latency in microseconds.
double wait_time_p50_us
Median wait time (queue time) in microseconds.
double throughput_1m
Tasks completed per second (1-minute window average).
double execution_latency_p90_us
90th percentile execution latency in microseconds.
std::size_t current_queue_depth
Current queue depth.
double wait_time_p99_us
99th percentile wait time in microseconds.
std::vector< double > per_worker_utilization
Per-worker utilization (0.0 - 1.0 each).
double enqueue_latency_p99_us
99th percentile enqueue latency in microseconds.