Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
token_bucket.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <thread>
9
19namespace kcenon::thread
20{
33 token_bucket::token_bucket(std::size_t tokens_per_second, std::size_t burst_size)
34 : tokens_(static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR)
35 , max_tokens_(static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR)
36 , refill_rate_(static_cast<double>(tokens_per_second) * PRECISION_FACTOR / 1e9)
37 , last_refill_(std::chrono::steady_clock::now().time_since_epoch().count())
38 {
39 }
40
57 auto token_bucket::refill() -> void
58 {
59 auto now = std::chrono::steady_clock::now().time_since_epoch().count();
60 auto last = last_refill_.load(std::memory_order_acquire);
61
62 // Calculate elapsed nanoseconds
63 auto elapsed_ns = now - last;
64 if (elapsed_ns <= 0)
65 {
66 return; // No time passed, nothing to refill
67 }
68
69 // Try to update last_refill_ atomically
70 if (!last_refill_.compare_exchange_weak(
71 last, now,
72 std::memory_order_acq_rel,
73 std::memory_order_relaxed))
74 {
75 // Another thread updated, our calculation is stale
76 return;
77 }
78
79 // Calculate tokens to add
80 double rate = refill_rate_.load(std::memory_order_relaxed);
81 auto new_tokens = static_cast<std::int64_t>(elapsed_ns * rate);
82
83 if (new_tokens <= 0)
84 {
85 return;
86 }
87
88 // Add tokens (capped at max)
89 std::int64_t max = max_tokens_.load(std::memory_order_relaxed);
90 std::int64_t current = tokens_.load(std::memory_order_relaxed);
91 std::int64_t updated = std::min(current + new_tokens, max);
92
93 // Relaxed CAS is fine here since we're just accumulating
94 tokens_.compare_exchange_weak(
95 current, updated,
96 std::memory_order_relaxed,
97 std::memory_order_relaxed);
98 }
99
117 auto token_bucket::try_acquire(std::size_t tokens) -> bool
118 {
119 // First, refill based on elapsed time
120 refill();
121
122 // Scale requested tokens by precision factor
123 std::int64_t needed = static_cast<std::int64_t>(tokens) * PRECISION_FACTOR;
124
125 // CAS loop to atomically decrement tokens
126 std::int64_t current = tokens_.load(std::memory_order_acquire);
127 while (current >= needed)
128 {
129 if (tokens_.compare_exchange_weak(
130 current, current - needed,
131 std::memory_order_acq_rel,
132 std::memory_order_acquire))
133 {
134 return true; // Successfully acquired
135 }
136 // CAS failed, current has been updated, retry
137 }
138
139 return false; // Insufficient tokens
140 }
141
162 std::size_t tokens,
163 std::chrono::milliseconds timeout) -> bool
164 {
165 auto deadline = std::chrono::steady_clock::now() + timeout;
166
167 // Start with small backoff, increase exponentially
168 auto backoff = std::chrono::microseconds{1};
169 constexpr auto max_backoff = std::chrono::milliseconds{1};
170
171 while (std::chrono::steady_clock::now() < deadline)
172 {
173 if (try_acquire(tokens))
174 {
175 return true;
176 }
177
178 // Sleep with exponential backoff
179 std::this_thread::sleep_for(backoff);
180
181 // Double backoff, cap at max
182 backoff = std::min(
183 backoff * 2,
184 std::chrono::duration_cast<std::chrono::microseconds>(max_backoff));
185 }
186
187 // Final attempt after loop
188 return try_acquire(tokens);
189 }
190
201 auto token_bucket::available_tokens() const -> std::size_t
202 {
203 // Need non-const refill, but we're returning a snapshot anyway
204 const_cast<token_bucket*>(this)->refill();
205
206 std::int64_t current = tokens_.load(std::memory_order_acquire);
207 if (current <= 0)
208 {
209 return 0;
210 }
211 return static_cast<std::size_t>(current / PRECISION_FACTOR);
212 }
213
225 auto token_bucket::time_until_available(std::size_t tokens) const
226 -> std::chrono::nanoseconds
227 {
228 const_cast<token_bucket*>(this)->refill();
229
230 std::int64_t needed = static_cast<std::int64_t>(tokens) * PRECISION_FACTOR;
231 std::int64_t current = tokens_.load(std::memory_order_acquire);
232
233 if (current >= needed)
234 {
235 return std::chrono::nanoseconds{0};
236 }
237
238 // Calculate deficit
239 std::int64_t deficit = needed - current;
240
241 // Time = deficit / rate (rate is in tokens per nanosecond)
242 double rate = refill_rate_.load(std::memory_order_relaxed);
243 if (rate <= 0)
244 {
245 // Infinite wait if rate is zero
246 return std::chrono::nanoseconds::max();
247 }
248
249 auto wait_ns = static_cast<std::int64_t>(deficit / rate);
250 return std::chrono::nanoseconds{wait_ns};
251 }
252
263 auto token_bucket::set_rate(std::size_t tokens_per_second) -> void
264 {
265 // Refill with current rate before changing
266 refill();
267
268 // Update rate (tokens per nanosecond, scaled by precision)
269 double new_rate = static_cast<double>(tokens_per_second) * PRECISION_FACTOR / 1e9;
270 refill_rate_.store(new_rate, std::memory_order_release);
271 }
272
282 auto token_bucket::set_burst_size(std::size_t burst_size) -> void
283 {
284 std::int64_t new_max = static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR;
285 max_tokens_.store(new_max, std::memory_order_release);
286
287 // Cap current tokens if they exceed new max
288 std::int64_t current = tokens_.load(std::memory_order_acquire);
289 while (current > new_max)
290 {
291 if (tokens_.compare_exchange_weak(
292 current, new_max,
293 std::memory_order_acq_rel,
294 std::memory_order_acquire))
295 {
296 break;
297 }
298 }
299 }
300
305 auto token_bucket::get_rate() const -> std::size_t
306 {
307 double rate = refill_rate_.load(std::memory_order_acquire);
308 // Convert back: rate * 1e9 / PRECISION_FACTOR
309 return static_cast<std::size_t>(rate * 1e9 / PRECISION_FACTOR);
310 }
311
316 auto token_bucket::get_burst_size() const -> std::size_t
317 {
318 std::int64_t max = max_tokens_.load(std::memory_order_acquire);
319 return static_cast<std::size_t>(max / PRECISION_FACTOR);
320 }
321
330 auto token_bucket::reset() -> void
331 {
332 std::int64_t max = max_tokens_.load(std::memory_order_acquire);
333 tokens_.store(max, std::memory_order_release);
334 last_refill_.store(
335 std::chrono::steady_clock::now().time_since_epoch().count(),
336 std::memory_order_release);
337 }
338
339} // namespace kcenon::thread
Lock-free token bucket rate limiter for controlling throughput.
auto get_rate() const -> std::size_t
Returns the current refill rate.
std::atomic< std::int64_t > max_tokens_
Maximum tokens (burst size) scaled by precision factor.
auto try_acquire_for(std::size_t tokens, std::chrono::milliseconds timeout) -> bool
Attempts to acquire tokens with a timeout.
token_bucket(std::size_t tokens_per_second, std::size_t burst_size)
Constructs a token bucket with the specified rate and burst size.
auto time_until_available(std::size_t tokens) const -> std::chrono::nanoseconds
Calculates time until the specified tokens become available.
auto get_burst_size() const -> std::size_t
Returns the maximum bucket capacity.
std::atomic< double > refill_rate_
Token refill rate in nano-tokens per nanosecond.
std::atomic< std::int64_t > tokens_
Current token count (scaled by 1000 for sub-token precision).
auto available_tokens() const -> std::size_t
Returns the current number of available tokens.
auto set_burst_size(std::size_t burst_size) -> void
Updates the maximum bucket capacity.
auto reset() -> void
Resets the bucket to full capacity.
auto refill() -> void
Refills tokens based on elapsed time since last refill.
auto try_acquire(std::size_t tokens=1) -> bool
Attempts to acquire tokens without waiting.
static constexpr std::int64_t PRECISION_FACTOR
Precision factor for fixed-point token calculations.
auto set_rate(std::size_t tokens_per_second) -> void
Updates the token refill rate.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Lock-free token bucket rate limiter for controlling throughput.