Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
metrics_service.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <stdexcept>
8
10
12 : basic_metrics_(std::make_shared<ThreadPoolMetrics>()) {
13}
14
15void metrics_service::record_submission(std::size_t count) {
16 basic_metrics_->record_submission(count);
17
18 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
19 for (std::size_t i = 0; i < count; ++i) {
20 enhanced_metrics_->record_submission();
21 }
22 }
23}
24
25void metrics_service::record_enqueue(std::size_t count) {
26 basic_metrics_->record_enqueue(count);
27}
28
30 std::chrono::nanoseconds latency,
31 std::size_t count) {
32 basic_metrics_->record_enqueue(count);
33
34 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
35 // Distribute latency across batch items for more accurate per-job tracking
36 auto per_item_latency = latency / static_cast<long>(count);
37 for (std::size_t i = 0; i < count; ++i) {
38 enhanced_metrics_->record_enqueue(per_item_latency);
39 }
40 }
41}
42
43void metrics_service::record_execution(std::uint64_t duration_ns, bool success) {
44 basic_metrics_->record_execution(duration_ns, success);
45
46 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
47 enhanced_metrics_->record_execution(
48 std::chrono::nanoseconds{duration_ns}, success);
49 }
50}
51
53 std::chrono::nanoseconds duration,
54 std::chrono::nanoseconds wait_time,
55 bool success) {
56 basic_metrics_->record_execution(
57 static_cast<std::uint64_t>(duration.count()), success);
58
59 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
60 enhanced_metrics_->record_execution(duration, success);
61 enhanced_metrics_->record_wait_time(wait_time);
62 }
63}
64
65void metrics_service::record_idle_time(std::uint64_t duration_ns) {
66 basic_metrics_->record_idle_time(duration_ns);
67}
68
69void metrics_service::record_queue_depth(std::size_t depth) {
70 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
71 enhanced_metrics_->record_queue_depth(depth);
72 }
73}
74
76 std::size_t worker_id,
77 bool busy,
78 std::uint64_t duration_ns) {
79 if (enhanced_enabled_.load(std::memory_order_relaxed) && enhanced_metrics_) {
80 enhanced_metrics_->record_worker_state(worker_id, busy, duration_ns);
81 }
82}
83
84void metrics_service::set_enhanced_metrics_enabled(bool enabled, std::size_t worker_count) {
85 if (enabled && !enhanced_metrics_) {
86 std::scoped_lock<std::mutex> lock(init_mutex_);
87 // Double-check after acquiring lock
88 if (!enhanced_metrics_) {
89 enhanced_metrics_ = std::make_shared<EnhancedThreadPoolMetrics>(worker_count);
90 enhanced_metrics_->set_active_workers(worker_count);
91 }
92 }
93 enhanced_enabled_.store(enabled, std::memory_order_release);
94}
95
97 return enhanced_enabled_.load(std::memory_order_acquire);
98}
99
101 if (enhanced_metrics_) {
102 enhanced_metrics_->update_worker_count(count);
103 }
104}
105
106void metrics_service::set_active_workers(std::size_t count) {
107 if (enhanced_metrics_) {
108 enhanced_metrics_->set_active_workers(count);
109 }
110}
111
113 return *basic_metrics_;
114}
115
117 if (!enhanced_metrics_) {
118 throw std::runtime_error(
119 "Enhanced metrics is not enabled. "
120 "Call set_enhanced_metrics_enabled(true) first.");
121 }
122 return *enhanced_metrics_;
123}
124
126 if (!enhanced_enabled_.load(std::memory_order_acquire) || !enhanced_metrics_) {
127 return EnhancedSnapshot{};
128 }
129 return enhanced_metrics_->snapshot();
130}
131
132std::shared_ptr<ThreadPoolMetrics> metrics_service::get_basic_metrics() const noexcept {
133 return basic_metrics_;
134}
135
137 basic_metrics_->reset();
138 if (enhanced_metrics_) {
139 enhanced_metrics_->reset();
140 }
141}
142
143} // namespace kcenon::thread::metrics
Enhanced thread pool metrics with histograms and percentiles.
Lightweight metrics container shared between thread_pool and workers.
void record_execution_with_wait_time(std::chrono::nanoseconds duration, std::chrono::nanoseconds wait_time, bool success)
Record task execution with wait time tracking.
void set_enhanced_metrics_enabled(bool enabled, std::size_t worker_count=0)
Enable or disable enhanced metrics collection.
std::shared_ptr< EnhancedThreadPoolMetrics > enhanced_metrics_
Enhanced metrics collector.
std::mutex init_mutex_
Mutex for thread-safe enhanced metrics initialization.
void reset()
Reset all metrics to their initial state.
void record_execution(std::uint64_t duration_ns, bool success)
Record task execution completion.
void record_enqueue(std::size_t count=1)
Record enqueue operation(s).
void record_enqueue_with_latency(std::chrono::nanoseconds latency, std::size_t count=1)
Record enqueue operation with latency measurement.
std::shared_ptr< ThreadPoolMetrics > basic_metrics_
Basic metrics collector.
void record_worker_state(std::size_t worker_id, bool busy, std::uint64_t duration_ns=0)
Update worker state for per-worker metrics.
const ThreadPoolMetrics & basic_metrics() const noexcept
Access basic metrics (read-only reference).
void update_worker_count(std::size_t count)
Update the worker count for enhanced metrics.
void record_submission(std::size_t count=1)
Record task submission(s).
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
void record_queue_depth(std::size_t depth)
Record current queue depth.
void record_idle_time(std::uint64_t duration_ns)
Record idle time.
std::atomic< bool > enhanced_enabled_
Flag indicating if enhanced metrics collection is enabled.
std::shared_ptr< ThreadPoolMetrics > get_basic_metrics() const noexcept
Get the shared pointer to basic metrics.
void set_active_workers(std::size_t count)
Set the number of active workers.
EnhancedSnapshot enhanced_snapshot() const
Get enhanced metrics snapshot.
const EnhancedThreadPoolMetrics & enhanced_metrics() const
Access enhanced metrics (read-only reference).
Centralized metrics service for thread pool metrics management.
@ latency
Latency threshold exceeded.
STL namespace.
Enhanced snapshot with latency percentiles and throughput.