Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
sliding_histogram.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <cmath>
9#include <numeric>
10
12
14 : config_(std::move(cfg))
15 , bucket_duration_(std::chrono::duration_cast<std::chrono::milliseconds>(config_.window_duration)
16 / config_.bucket_count)
17{
18 // Ensure at least 1 bucket
19 if (config_.bucket_count == 0)
20 {
21 config_.bucket_count = 1;
22 bucket_duration_ = std::chrono::duration_cast<std::chrono::milliseconds>(config_.window_duration);
23 }
24}
25
27 : config_(std::move(other.config_))
28 , bucket_duration_(other.bucket_duration_)
29 , buckets_(std::move(other.buckets_))
30{
31}
32
34{
35 if (this != &other)
36 {
37 std::lock_guard<std::mutex> lock(mutex_);
38 config_ = std::move(other.config_);
39 bucket_duration_ = other.bucket_duration_;
40 buckets_ = std::move(other.buckets_);
41 }
42 return *this;
43}
44
45void sliding_histogram::record(double value)
46{
47 std::lock_guard<std::mutex> lock(mutex_);
49 auto& bucket = get_current_bucket();
50 bucket.hist.record(value);
51}
52
53auto sliding_histogram::count() const -> uint64_t
54{
55 std::lock_guard<std::mutex> lock(mutex_);
56 const_cast<sliding_histogram*>(this)->expire_old_buckets();
57
58 uint64_t total = 0;
59 for (const auto& bucket : buckets_)
60 {
61 total += bucket->hist.count();
62 }
63 return total;
64}
65
66auto sliding_histogram::sum() const -> double
67{
68 std::lock_guard<std::mutex> lock(mutex_);
69 const_cast<sliding_histogram*>(this)->expire_old_buckets();
70
71 double total = 0.0;
72 for (const auto& bucket : buckets_)
73 {
74 total += bucket->hist.sum();
75 }
76 return total;
77}
78
79auto sliding_histogram::mean() const -> double
80{
81 uint64_t c = count();
82 if (c == 0)
83 {
84 return 0.0;
85 }
86 return sum() / static_cast<double>(c);
87}
88
89auto sliding_histogram::percentile(double p) const -> double
90{
91 auto snap = aggregate();
92 if (snap.count == 0)
93 {
94 return 0.0;
95 }
96
97 // Clamp percentile
98 p = std::clamp(p, 0.0, 1.0);
99
100 // Use pre-calculated percentiles if available
101 auto it = snap.percentiles.find(p);
102 if (it != snap.percentiles.end())
103 {
104 return it->second;
105 }
106
107 // Calculate from buckets
108 double target = p * static_cast<double>(snap.count);
109
110 for (size_t i = 0; i < snap.buckets.size(); ++i)
111 {
112 if (static_cast<double>(snap.buckets[i].second) >= target)
113 {
114 double lower_bound = (i == 0) ? 0.0 : snap.buckets[i - 1].first;
115 double upper_bound = snap.buckets[i].first;
116
117 if (std::isinf(upper_bound))
118 {
119 return lower_bound;
120 }
121
122 uint64_t lower_count = (i == 0) ? 0 : snap.buckets[i - 1].second;
123 uint64_t upper_count = snap.buckets[i].second;
124
125 if (upper_count == lower_count)
126 {
127 return lower_bound;
128 }
129
130 double fraction = (target - static_cast<double>(lower_count))
131 / (static_cast<double>(upper_count) - static_cast<double>(lower_count));
132
133 return lower_bound + (fraction * (upper_bound - lower_bound));
134 }
135 }
136
137 return 0.0;
138}
139
140auto sliding_histogram::snapshot(const std::map<std::string, std::string>& labels) const
142{
143 auto snap = aggregate();
144 snap.labels = labels;
145 return snap;
146}
147
149{
150 std::lock_guard<std::mutex> lock(mutex_);
151 buckets_.clear();
152}
153
155{
156 auto now = std::chrono::steady_clock::now();
157 auto cutoff = now - config_.window_duration;
158
159 while (!buckets_.empty() && buckets_.front()->start_time < cutoff)
160 {
161 buckets_.pop_front();
162 }
163}
164
166{
167 auto now = std::chrono::steady_clock::now();
168
169 if (buckets_.empty())
170 {
171 buckets_.push_back(std::make_unique<time_bucket>(config_.hist_config));
172 return *buckets_.back();
173 }
174
175 auto& last = buckets_.back();
176 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - last->start_time);
177
178 if (elapsed >= bucket_duration_)
179 {
180 buckets_.push_back(std::make_unique<time_bucket>(config_.hist_config));
181 }
182
183 return *buckets_.back();
184}
185
187{
188 std::lock_guard<std::mutex> lock(mutex_);
189 const_cast<sliding_histogram*>(this)->expire_old_buckets();
190
191 histogram_snapshot result;
192 result.count = 0;
193 result.sum = 0.0;
194 result.min_value = std::numeric_limits<double>::infinity();
195 result.max_value = -std::numeric_limits<double>::infinity();
196
197 if (buckets_.empty())
198 {
199 return result;
200 }
201
202 // Aggregate statistics
203 std::vector<std::pair<double, uint64_t>> merged_buckets;
204 bool first_snapshot = true;
205
206 for (const auto& bucket : buckets_)
207 {
208 auto snap = bucket->hist.snapshot();
209 result.count += snap.count;
210 result.sum += snap.sum;
211
212 if (snap.min_value < result.min_value)
213 {
214 result.min_value = snap.min_value;
215 }
216 if (snap.max_value > result.max_value)
217 {
218 result.max_value = snap.max_value;
219 }
220
221 // Merge bucket counts
222 if (first_snapshot)
223 {
224 merged_buckets = snap.buckets;
225 first_snapshot = false;
226 }
227 else
228 {
229 // Need to convert from cumulative to individual counts, add, then back to cumulative
230 std::vector<uint64_t> individual_counts(snap.buckets.size(), 0);
231 for (size_t i = 0; i < snap.buckets.size(); ++i)
232 {
233 uint64_t prev = (i == 0) ? 0 : snap.buckets[i - 1].second;
234 individual_counts[i] = snap.buckets[i].second - prev;
235 }
236
237 std::vector<uint64_t> merged_individual(merged_buckets.size(), 0);
238 for (size_t i = 0; i < merged_buckets.size(); ++i)
239 {
240 uint64_t prev = (i == 0) ? 0 : merged_buckets[i - 1].second;
241 merged_individual[i] = merged_buckets[i].second - prev;
242 }
243
244 // Add individual counts
245 for (size_t i = 0; i < merged_buckets.size() && i < individual_counts.size(); ++i)
246 {
247 merged_individual[i] += individual_counts[i];
248 }
249
250 // Convert back to cumulative
251 uint64_t cumulative = 0;
252 for (size_t i = 0; i < merged_buckets.size(); ++i)
253 {
254 cumulative += merged_individual[i];
255 merged_buckets[i].second = cumulative;
256 }
257 }
258 }
259
260 result.buckets = std::move(merged_buckets);
261
262 // Calculate percentiles from merged buckets
263 if (result.count > 0 && !result.buckets.empty())
264 {
265 auto calc_percentile = [&](double p) -> double
266 {
267 double target = p * static_cast<double>(result.count);
268
269 for (size_t i = 0; i < result.buckets.size(); ++i)
270 {
271 if (static_cast<double>(result.buckets[i].second) >= target)
272 {
273 double lower_bound = (i == 0) ? 0.0 : result.buckets[i - 1].first;
274 double upper_bound = result.buckets[i].first;
275
276 if (std::isinf(upper_bound))
277 {
278 return lower_bound;
279 }
280
281 uint64_t lower_count = (i == 0) ? 0 : result.buckets[i - 1].second;
282 uint64_t upper_count = result.buckets[i].second;
283
284 if (upper_count == lower_count)
285 {
286 return lower_bound;
287 }
288
289 double fraction =
290 (target - static_cast<double>(lower_count))
291 / (static_cast<double>(upper_count) - static_cast<double>(lower_count));
292
293 return lower_bound + (fraction * (upper_bound - lower_bound));
294 }
295 }
296 return 0.0;
297 };
298
299 result.percentiles[0.5] = calc_percentile(0.5);
300 result.percentiles[0.9] = calc_percentile(0.9);
301 result.percentiles[0.95] = calc_percentile(0.95);
302 result.percentiles[0.99] = calc_percentile(0.99);
303 result.percentiles[0.999] = calc_percentile(0.999);
304 }
305
306 return result;
307}
308
309} // namespace kcenon::network::metrics
Time-windowed histogram for tracking recent latency distributions.
auto percentile(double p) const -> double
Calculate percentile value for current window.
auto mean() const -> double
Get mean of all observations in current window.
auto operator=(const sliding_histogram &) -> sliding_histogram &=delete
std::deque< std::unique_ptr< time_bucket > > buckets_
void expire_old_buckets()
Expire old buckets outside the window.
auto aggregate() const -> histogram_snapshot
Create aggregated histogram from all buckets.
auto sum() const -> double
Get sum of all observations in current window.
auto count() const -> uint64_t
Get total number of observations in current window.
void record(double value)
Record a value observation.
sliding_histogram(sliding_histogram_config cfg=sliding_histogram_config::default_config())
Construct sliding histogram with configuration.
auto get_current_bucket() -> time_bucket &
Get or create current time bucket.
auto snapshot(const std::map< std::string, std::string > &labels={}) const -> histogram_snapshot
Create snapshot aggregating all time buckets in current window.
Sliding window histogram for time-based latency tracking.
Immutable snapshot of histogram state for export.
Definition histogram.h:60
std::map< double, double > percentiles
Percentile -> value mapping.
Definition histogram.h:65
uint64_t count
Total number of observations.
Definition histogram.h:61
double sum
Sum of all observed values.
Definition histogram.h:62
double min_value
Minimum observed value.
Definition histogram.h:63
std::vector< std::pair< double, uint64_t > > buckets
Boundary -> cumulative count.
Definition histogram.h:66
double max_value
Maximum observed value.
Definition histogram.h:64
Configuration for sliding window histogram.
std::chrono::seconds window_duration
Total window duration.