Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
pool_metrics.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
7#include <atomic>
8#include <chrono>
9#include <string>
10#include <map>
11#include <mutex>
12
13namespace database::monitoring {
14
26 // Connection statistics
27 std::atomic<uint64_t> total_acquisitions{0};
28 std::atomic<uint64_t> successful_acquisitions{0};
29 std::atomic<uint64_t> failed_acquisitions{0};
30 std::atomic<uint64_t> timeouts{0};
31
32 // Timing statistics (microseconds)
33 std::atomic<uint64_t> total_wait_time_us{0};
34 std::atomic<uint64_t> min_wait_time_us{UINT64_MAX};
35 std::atomic<uint64_t> max_wait_time_us{0};
36
37 // Current state
38 std::atomic<uint64_t> current_active{0};
39 std::atomic<uint64_t> current_queued{0};
40 std::atomic<uint64_t> peak_active{0};
41 std::atomic<uint64_t> peak_queued{0};
42
43 // Health check statistics
44 std::atomic<uint64_t> health_checks_performed{0};
45 std::atomic<uint64_t> unhealthy_connections_removed{0};
46
52 void record_acquisition(uint64_t wait_time_us, bool success) {
53 total_acquisitions.fetch_add(1, std::memory_order_relaxed);
54
55 if (success) {
56 successful_acquisitions.fetch_add(1, std::memory_order_relaxed);
57
58 // Update timing statistics
59 total_wait_time_us.fetch_add(wait_time_us, std::memory_order_relaxed);
60
61 // Update min/max (requires atomic compare-exchange)
62 uint64_t current_min = min_wait_time_us.load(std::memory_order_relaxed);
63 while (wait_time_us < current_min &&
64 !min_wait_time_us.compare_exchange_weak(current_min, wait_time_us,
65 std::memory_order_relaxed)) {}
66
67 uint64_t current_max = max_wait_time_us.load(std::memory_order_relaxed);
68 while (wait_time_us > current_max &&
69 !max_wait_time_us.compare_exchange_weak(current_max, wait_time_us,
70 std::memory_order_relaxed)) {}
71 } else {
72 failed_acquisitions.fetch_add(1, std::memory_order_relaxed);
73 }
74 }
75
80 timeouts.fetch_add(1, std::memory_order_relaxed);
81 }
82
87 void update_active(int delta) {
88 uint64_t new_active = current_active.fetch_add(delta, std::memory_order_relaxed) + delta;
89
90 // Update peak if necessary
91 uint64_t current_peak = peak_active.load(std::memory_order_relaxed);
92 while (new_active > current_peak &&
93 !peak_active.compare_exchange_weak(current_peak, new_active,
94 std::memory_order_relaxed)) {}
95 }
96
101 void update_queued(int delta) {
102 uint64_t new_queued = current_queued.fetch_add(delta, std::memory_order_relaxed) + delta;
103
104 // Update peak if necessary
105 uint64_t current_peak = peak_queued.load(std::memory_order_relaxed);
106 while (new_queued > current_peak &&
107 !peak_queued.compare_exchange_weak(current_peak, new_queued,
108 std::memory_order_relaxed)) {}
109 }
110
115 void record_health_check(uint64_t removed_connections = 0) {
116 health_checks_performed.fetch_add(1, std::memory_order_relaxed);
117 if (removed_connections > 0) {
118 unhealthy_connections_removed.fetch_add(removed_connections, std::memory_order_relaxed);
119 }
120 }
121
126 double average_wait_time_us() const {
127 uint64_t total = total_acquisitions.load(std::memory_order_relaxed);
128 if (total == 0) return 0.0;
129
130 uint64_t total_wait = total_wait_time_us.load(std::memory_order_relaxed);
131 return static_cast<double>(total_wait) / static_cast<double>(total);
132 }
133
138 double success_rate() const {
139 uint64_t total = total_acquisitions.load(std::memory_order_relaxed);
140 if (total == 0) return 100.0;
141
142 uint64_t successful = successful_acquisitions.load(std::memory_order_relaxed);
143 return (static_cast<double>(successful) / static_cast<double>(total)) * 100.0;
144 }
145
149 void reset() {
150 total_acquisitions.store(0, std::memory_order_relaxed);
151 successful_acquisitions.store(0, std::memory_order_relaxed);
152 failed_acquisitions.store(0, std::memory_order_relaxed);
153 timeouts.store(0, std::memory_order_relaxed);
154
155 total_wait_time_us.store(0, std::memory_order_relaxed);
156 min_wait_time_us.store(UINT64_MAX, std::memory_order_relaxed);
157 max_wait_time_us.store(0, std::memory_order_relaxed);
158
159 // Don't reset current_active and current_queued (they reflect current state)
160 peak_active.store(current_active.load(std::memory_order_relaxed), std::memory_order_relaxed);
161 peak_queued.store(current_queued.load(std::memory_order_relaxed), std::memory_order_relaxed);
162
163 health_checks_performed.store(0, std::memory_order_relaxed);
164 unhealthy_connections_removed.store(0, std::memory_order_relaxed);
165 }
166};
167
168#ifdef USE_THREAD_SYSTEM
176template<typename PriorityType>
177struct priority_metrics : public pool_metrics {
178 using priority_type = PriorityType;
179
180 // Per-priority statistics
181 std::map<PriorityType, std::atomic<uint64_t>> acquisitions_by_priority;
182 std::map<PriorityType, std::atomic<uint64_t>> wait_time_by_priority;
183
184 mutable std::mutex map_mutex; // Protects map modifications
185
192 void record_acquisition_with_priority(PriorityType priority, uint64_t wait_time_us, bool success) {
193 // Record base metrics
194 record_acquisition(wait_time_us, success);
195
196 if (success) {
197 // Update priority-specific metrics
198 std::lock_guard<std::mutex> lock(map_mutex);
199
200 // Initialize if first time seeing this priority
201 if (acquisitions_by_priority.find(priority) == acquisitions_by_priority.end()) {
202 acquisitions_by_priority[priority].store(0, std::memory_order_relaxed);
203 wait_time_by_priority[priority].store(0, std::memory_order_relaxed);
204 }
205
206 acquisitions_by_priority[priority].fetch_add(1, std::memory_order_relaxed);
207 wait_time_by_priority[priority].fetch_add(wait_time_us, std::memory_order_relaxed);
208 }
209 }
210
216 double average_wait_time_for_priority(PriorityType priority) const {
217 std::lock_guard<std::mutex> lock(map_mutex);
218
219 auto acq_it = acquisitions_by_priority.find(priority);
220 auto wait_it = wait_time_by_priority.find(priority);
221
222 if (acq_it == acquisitions_by_priority.end() || wait_it == wait_time_by_priority.end()) {
223 return 0.0;
224 }
225
226 uint64_t count = acq_it->second.load(std::memory_order_relaxed);
227 if (count == 0) return 0.0;
228
229 uint64_t total_wait = wait_it->second.load(std::memory_order_relaxed);
230 return static_cast<double>(total_wait) / static_cast<double>(count);
231 }
232
236 void reset_all() {
237 pool_metrics::reset();
238
239 std::lock_guard<std::mutex> lock(map_mutex);
240 for (auto& [priority, counter] : acquisitions_by_priority) {
241 counter.store(0, std::memory_order_relaxed);
242 }
243 for (auto& [priority, wait_time] : wait_time_by_priority) {
244 wait_time.store(0, std::memory_order_relaxed);
245 }
246 }
247};
248#endif // USE_THREAD_SYSTEM
249
250} // namespace database::monitoring
Performance metrics for connection pools.
std::atomic< uint64_t > min_wait_time_us
void reset()
Reset all metrics.
std::atomic< uint64_t > current_queued
void update_active(int delta)
Update current active connection count.
std::atomic< uint64_t > peak_queued
std::atomic< uint64_t > total_wait_time_us
std::atomic< uint64_t > timeouts
std::atomic< uint64_t > current_active
std::atomic< uint64_t > max_wait_time_us
double average_wait_time_us() const
Calculate average wait time.
void update_queued(int delta)
Update current queued request count.
std::atomic< uint64_t > successful_acquisitions
std::atomic< uint64_t > total_acquisitions
void record_health_check(uint64_t removed_connections=0)
Record a health check operation.
std::atomic< uint64_t > peak_active
void record_timeout()
Record a timeout event.
std::atomic< uint64_t > failed_acquisitions
void record_acquisition(uint64_t wait_time_us, bool success)
Record a connection acquisition.
std::atomic< uint64_t > health_checks_performed
std::atomic< uint64_t > unhealthy_connections_removed
double success_rate() const
Calculate success rate.