Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
sliding_window_counter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8
10
12 std::chrono::seconds window_size,
13 std::size_t buckets_per_second)
14 : window_size_(window_size),
15 bucket_duration_(std::chrono::milliseconds{1000 / buckets_per_second}),
16 buckets_(static_cast<std::size_t>(window_size.count()) * buckets_per_second) {
17 // Initialize all buckets
18 for (auto& bucket : buckets_) {
19 bucket.count.store(0, std::memory_order_relaxed);
20 bucket.timestamp_ms.store(0, std::memory_order_relaxed);
21 }
22}
23
25 : window_size_(other.window_size_),
26 bucket_duration_(other.bucket_duration_),
27 buckets_(other.buckets_.size()) {
28 for (std::size_t i = 0; i < buckets_.size(); ++i) {
29 buckets_[i].count.store(
30 other.buckets_[i].count.load(std::memory_order_relaxed),
31 std::memory_order_relaxed);
32 buckets_[i].timestamp_ms.store(
33 other.buckets_[i].timestamp_ms.load(std::memory_order_relaxed),
34 std::memory_order_relaxed);
35 }
36 all_time_total_.store(
37 other.all_time_total_.load(std::memory_order_relaxed),
38 std::memory_order_relaxed);
39}
40
42 : window_size_(other.window_size_),
43 bucket_duration_(other.bucket_duration_),
44 buckets_(std::move(other.buckets_)) {
45 all_time_total_.store(
46 other.all_time_total_.exchange(0, std::memory_order_relaxed),
47 std::memory_order_relaxed);
48}
49
51 const SlidingWindowCounter& other) {
52 if (this != &other) {
55
56 // Resize if needed
57 if (buckets_.size() != other.buckets_.size()) {
58 buckets_.resize(other.buckets_.size());
59 }
60
61 for (std::size_t i = 0; i < buckets_.size(); ++i) {
62 buckets_[i].count.store(
63 other.buckets_[i].count.load(std::memory_order_relaxed),
64 std::memory_order_relaxed);
65 buckets_[i].timestamp_ms.store(
66 other.buckets_[i].timestamp_ms.load(std::memory_order_relaxed),
67 std::memory_order_relaxed);
68 }
69 all_time_total_.store(
70 other.all_time_total_.load(std::memory_order_relaxed),
71 std::memory_order_relaxed);
72 }
73 return *this;
74}
75
77 SlidingWindowCounter&& other) noexcept {
78 if (this != &other) {
79 window_size_ = other.window_size_;
80 bucket_duration_ = other.bucket_duration_;
81 buckets_ = std::move(other.buckets_);
82 all_time_total_.store(
83 other.all_time_total_.exchange(0, std::memory_order_relaxed),
84 std::memory_order_relaxed);
85 }
86 return *this;
87}
88
89void SlidingWindowCounter::increment(std::size_t count) {
90 const auto current_ms = current_time_ms();
91 const auto bucket_index = bucket_index_for_time(current_ms);
92
93 // Advance bucket if it's stale
94 advance_bucket(bucket_index, current_ms);
95
96 // Increment the bucket and all-time total
97 buckets_[bucket_index].count.fetch_add(
98 static_cast<std::uint64_t>(count), std::memory_order_relaxed);
99 all_time_total_.fetch_add(
100 static_cast<std::uint64_t>(count), std::memory_order_relaxed);
101}
102
104 const auto total = total_in_window();
105 const auto window_seconds = static_cast<double>(window_size_.count());
106 return static_cast<double>(total) / window_seconds;
107}
108
110 const auto current_ms = current_time_ms();
111 std::uint64_t total = 0;
112
113 for (const auto& bucket : buckets_) {
114 const auto timestamp = bucket.timestamp_ms.load(std::memory_order_relaxed);
115 if (is_bucket_valid(timestamp, current_ms)) {
116 total += bucket.count.load(std::memory_order_relaxed);
117 }
118 }
119
120 return total;
121}
122
124 return all_time_total_.load(std::memory_order_relaxed);
125}
126
128 for (auto& bucket : buckets_) {
129 bucket.count.store(0, std::memory_order_relaxed);
130 bucket.timestamp_ms.store(0, std::memory_order_relaxed);
131 }
132 all_time_total_.store(0, std::memory_order_relaxed);
133}
134
135std::chrono::seconds SlidingWindowCounter::window_size() const {
136 return window_size_;
137}
138
140 return buckets_.size();
141}
142
146
148 std::uint64_t timestamp_ms) const {
149 const auto bucket_duration_ms =
150 static_cast<std::uint64_t>(bucket_duration_.count());
151 return static_cast<std::size_t>(
152 (timestamp_ms / bucket_duration_ms) % buckets_.size());
153}
154
156 return static_cast<std::uint64_t>(
157 std::chrono::duration_cast<std::chrono::milliseconds>(
158 std::chrono::steady_clock::now().time_since_epoch())
159 .count());
160}
161
163 std::uint64_t bucket_timestamp_ms,
164 std::uint64_t current_ms) const {
165 if (bucket_timestamp_ms == 0) {
166 return false;
167 }
168
169 const auto window_ms =
170 static_cast<std::uint64_t>(window_size_.count()) * 1000;
171 return (current_ms - bucket_timestamp_ms) < window_ms;
172}
173
175 std::size_t bucket_index,
176 std::uint64_t current_ms) {
177 auto& bucket = buckets_[bucket_index];
178 const auto bucket_duration_ms =
179 static_cast<std::uint64_t>(bucket_duration_.count());
180
181 // Calculate the expected timestamp for this bucket at current time
182 const auto expected_timestamp =
183 (current_ms / bucket_duration_ms) * bucket_duration_ms;
184
185 auto old_timestamp = bucket.timestamp_ms.load(std::memory_order_relaxed);
186
187 // Check if bucket needs to be advanced (stale bucket from previous window)
188 if (old_timestamp < expected_timestamp) {
189 // Try to update the timestamp atomically
190 if (bucket.timestamp_ms.compare_exchange_strong(
191 old_timestamp, expected_timestamp,
192 std::memory_order_relaxed, std::memory_order_relaxed)) {
193 // We successfully claimed this bucket for the new time period
194 // Reset the count (old data is now invalid)
195 bucket.count.store(0, std::memory_order_relaxed);
196 }
197 // If CAS failed, another thread already advanced it - that's fine
198 }
199}
200
201} // namespace kcenon::thread::metrics
Sliding window counter for throughput measurement.
SlidingWindowCounter & operator=(const SlidingWindowCounter &other)
Copy assignment operator.
bool is_bucket_valid(std::uint64_t bucket_timestamp_ms, std::uint64_t current_ms) const
Check if a bucket is within the current window.
std::size_t bucket_count() const
Get the number of buckets.
std::chrono::seconds window_size_
The sliding window duration.
std::size_t current_bucket_index() const
Get the current bucket index based on current time.
std::chrono::seconds window_size() const
Get the window size.
void increment(std::size_t count=1)
Increment the counter.
SlidingWindowCounter(std::chrono::seconds window_size, std::size_t buckets_per_second=DEFAULT_BUCKETS_PER_SECOND)
Constructs a sliding window counter.
std::uint64_t all_time_total() const
Get the all-time total count.
std::uint64_t total_in_window() const
Get the total count within the current window.
std::vector< Bucket > buckets_
Circular buffer of time buckets.
std::size_t bucket_index_for_time(std::uint64_t timestamp_ms) const
Get the bucket index for a specific timestamp.
double rate_per_second() const
Get the current rate per second.
std::atomic< std::uint64_t > all_time_total_
All-time total count.
std::chrono::milliseconds bucket_duration_
Duration of each bucket.
void advance_bucket(std::size_t bucket_index, std::uint64_t current_ms)
Advance bucket to current time period if needed.
static std::uint64_t current_time_ms()
Get current time in milliseconds since epoch.
STL namespace.
Sliding window counter for throughput measurement.