Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::token_bucket Class Reference

Lock-free token bucket rate limiter for controlling throughput. More...

#include <token_bucket.h>

Collaboration diagram for kcenon::thread::token_bucket:
Collaboration graph

Public Member Functions

 token_bucket (std::size_t tokens_per_second, std::size_t burst_size)
 Constructs a token bucket with the specified rate and burst size.
 
 ~token_bucket ()=default
 Default destructor.
 
 token_bucket (const token_bucket &)=delete
 
token_bucketoperator= (const token_bucket &)=delete
 
 token_bucket (token_bucket &&)=delete
 
token_bucketoperator= (token_bucket &&)=delete
 
auto try_acquire (std::size_t tokens=1) -> bool
 Attempts to acquire tokens without waiting.
 
auto try_acquire_for (std::size_t tokens, std::chrono::milliseconds timeout) -> bool
 Attempts to acquire tokens with a timeout.
 
auto available_tokens () const -> std::size_t
 Returns the current number of available tokens.
 
auto time_until_available (std::size_t tokens) const -> std::chrono::nanoseconds
 Calculates time until the specified tokens become available.
 
auto set_rate (std::size_t tokens_per_second) -> void
 Updates the token refill rate.
 
auto set_burst_size (std::size_t burst_size) -> void
 Updates the maximum bucket capacity.
 
auto get_rate () const -> std::size_t
 Returns the current refill rate.
 
auto get_burst_size () const -> std::size_t
 Returns the maximum bucket capacity.
 
auto reset () -> void
 Resets the bucket to full capacity.
 

Private Member Functions

auto refill () -> void
 Refills tokens based on elapsed time since last refill.
 

Private Attributes

std::atomic< std::int64_t > tokens_
 Current token count (scaled by 1000 for sub-token precision).
 
std::atomic< std::int64_t > max_tokens_
 Maximum tokens (burst size) scaled by precision factor.
 
std::atomic< double > refill_rate_
 Token refill rate in nano-tokens per nanosecond.
 
std::atomic< std::chrono::steady_clock::time_point::rep > last_refill_
 Timestamp of last token refill.
 

Static Private Attributes

static constexpr std::int64_t PRECISION_FACTOR = 1000
 Precision factor for fixed-point token calculations.
 

Detailed Description

Lock-free token bucket rate limiter for controlling throughput.

The token bucket algorithm is a metering mechanism that controls the rate at which operations can proceed. Tokens are added to a bucket at a fixed rate, and operations consume tokens. If no tokens are available, the operation either waits or is rejected.

Design Principles

  • Lock-free: Uses atomic operations for thread-safe token management
  • Continuous Refill: Tokens are calculated on-demand, not via timer
  • Burst Support: Allows bursts up to bucket capacity
  • Configurable: Rate and burst size can be adjusted at runtime

Algorithm

tokens = min(max_tokens, tokens + elapsed_time * refill_rate)
if (tokens >= requested) {
tokens -= requested
return success
}
return failure

Thread Safety

All methods are thread-safe and lock-free. Multiple threads can concurrently acquire tokens without blocking each other.

Usage Example

// Create bucket: 1000 tokens/sec, burst of 100
token_bucket bucket(1000, 100);
// Try to acquire token (non-blocking)
if (bucket.try_acquire()) {
process_request();
}
// Wait up to 100ms for token
if (bucket.try_acquire_for(1, std::chrono::milliseconds{100})) {
process_request();
}
Lock-free token bucket rate limiter for controlling throughput.
See also
backpressure_config For integration with job queues

Definition at line 68 of file token_bucket.h.

Constructor & Destructor Documentation

◆ token_bucket() [1/3]

kcenon::thread::token_bucket::token_bucket ( std::size_t tokens_per_second,
std::size_t burst_size )

Constructs a token bucket with the specified rate and burst size.

Constructs a token bucket with the specified rate and capacity.

Parameters
tokens_per_secondNumber of tokens added per second.
burst_sizeMaximum tokens that can accumulate (bucket capacity).

The bucket starts full (burst_size tokens available).

Implementation details:

  • Initializes tokens to maximum capacity (full bucket)
  • Calculates refill rate as tokens per nanosecond (for high precision)
  • Records initial timestamp for refill calculations
  • Uses fixed-point arithmetic (PRECISION_FACTOR = 1000) for sub-token precision
Parameters
tokens_per_secondNumber of tokens to add per second
burst_sizeMaximum tokens that can accumulate

Definition at line 33 of file token_bucket.cpp.

34 : tokens_(static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR)
35 , max_tokens_(static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR)
36 , refill_rate_(static_cast<double>(tokens_per_second) * PRECISION_FACTOR / 1e9)
37 , last_refill_(std::chrono::steady_clock::now().time_since_epoch().count())
38 {
39 }
std::atomic< std::int64_t > max_tokens_
Maximum tokens (burst size) scaled by precision factor.
std::atomic< std::chrono::steady_clock::time_point::rep > last_refill_
Timestamp of last token refill.
std::atomic< double > refill_rate_
Token refill rate in nano-tokens per nanosecond.
std::atomic< std::int64_t > tokens_
Current token count (scaled by 1000 for sub-token precision).
static constexpr std::int64_t PRECISION_FACTOR
Precision factor for fixed-point token calculations.

◆ ~token_bucket()

kcenon::thread::token_bucket::~token_bucket ( )
default

Default destructor.

◆ token_bucket() [2/3]

kcenon::thread::token_bucket::token_bucket ( const token_bucket & )
delete

◆ token_bucket() [3/3]

kcenon::thread::token_bucket::token_bucket ( token_bucket && )
delete

Member Function Documentation

◆ available_tokens()

auto kcenon::thread::token_bucket::available_tokens ( ) const -> std::size_t
nodiscard

Returns the current number of available tokens.

Returns current available tokens.

Returns
Current token count (may be fractional, returned as integer).

This is a snapshot that may become stale immediately in a multi-threaded environment.

Thread Safety: Lock-free read.

Implementation details:

  • First refills to get accurate count
  • Converts from fixed-point to integer
  • Returns 0 if tokens are negative (shouldn't happen)
Returns
Available token count

Definition at line 201 of file token_bucket.cpp.

202 {
203 // Need non-const refill, but we're returning a snapshot anyway
204 const_cast<token_bucket*>(this)->refill();
205
206 std::int64_t current = tokens_.load(std::memory_order_acquire);
207 if (current <= 0)
208 {
209 return 0;
210 }
211 return static_cast<std::size_t>(current / PRECISION_FACTOR);
212 }
token_bucket(std::size_t tokens_per_second, std::size_t burst_size)
Constructs a token bucket with the specified rate and burst size.
auto refill() -> void
Refills tokens based on elapsed time since last refill.

References PRECISION_FACTOR, refill(), and tokens_.

Here is the call graph for this function:

◆ get_burst_size()

auto kcenon::thread::token_bucket::get_burst_size ( ) const -> std::size_t
nodiscard

Returns the maximum bucket capacity.

Returns
Maximum tokens (burst size).
Burst size (maximum tokens)

Definition at line 316 of file token_bucket.cpp.

317 {
318 std::int64_t max = max_tokens_.load(std::memory_order_acquire);
319 return static_cast<std::size_t>(max / PRECISION_FACTOR);
320 }

References max_tokens_, and PRECISION_FACTOR.

◆ get_rate()

auto kcenon::thread::token_bucket::get_rate ( ) const -> std::size_t
nodiscard

Returns the current refill rate.

Returns
Tokens per second.
Tokens per second

Definition at line 305 of file token_bucket.cpp.

306 {
307 double rate = refill_rate_.load(std::memory_order_acquire);
308 // Convert back: rate * 1e9 / PRECISION_FACTOR
309 return static_cast<std::size_t>(rate * 1e9 / PRECISION_FACTOR);
310 }

References PRECISION_FACTOR, and refill_rate_.

◆ operator=() [1/2]

token_bucket & kcenon::thread::token_bucket::operator= ( const token_bucket & )
delete

◆ operator=() [2/2]

token_bucket & kcenon::thread::token_bucket::operator= ( token_bucket && )
delete

◆ refill()

auto kcenon::thread::token_bucket::refill ( ) -> void
private

Refills tokens based on elapsed time since last refill.

Uses CAS loop to atomically update both tokens and timestamp. Called internally before each token acquisition attempt.

Implementation details:

  • Uses CAS loop to atomically update tokens and timestamp
  • Calculates tokens to add based on elapsed nanoseconds
  • Caps tokens at max_tokens_ to prevent overflow
  • Lock-free: multiple threads can refill concurrently

Algorithm:

  1. Read current timestamp
  2. Calculate elapsed time since last_refill_
  3. Calculate new tokens = current + (elapsed * rate)
  4. Cap at max_tokens_
  5. CAS update (retry if another thread modified)

Definition at line 57 of file token_bucket.cpp.

58 {
59 auto now = std::chrono::steady_clock::now().time_since_epoch().count();
60 auto last = last_refill_.load(std::memory_order_acquire);
61
62 // Calculate elapsed nanoseconds
63 auto elapsed_ns = now - last;
64 if (elapsed_ns <= 0)
65 {
66 return; // No time passed, nothing to refill
67 }
68
69 // Try to update last_refill_ atomically
70 if (!last_refill_.compare_exchange_weak(
71 last, now,
72 std::memory_order_acq_rel,
73 std::memory_order_relaxed))
74 {
75 // Another thread updated, our calculation is stale
76 return;
77 }
78
79 // Calculate tokens to add
80 double rate = refill_rate_.load(std::memory_order_relaxed);
81 auto new_tokens = static_cast<std::int64_t>(elapsed_ns * rate);
82
83 if (new_tokens <= 0)
84 {
85 return;
86 }
87
88 // Add tokens (capped at max)
89 std::int64_t max = max_tokens_.load(std::memory_order_relaxed);
90 std::int64_t current = tokens_.load(std::memory_order_relaxed);
91 std::int64_t updated = std::min(current + new_tokens, max);
92
93 // Relaxed CAS is fine here since we're just accumulating
94 tokens_.compare_exchange_weak(
95 current, updated,
96 std::memory_order_relaxed,
97 std::memory_order_relaxed);
98 }

Referenced by available_tokens().

Here is the caller graph for this function:

◆ reset()

auto kcenon::thread::token_bucket::reset ( ) -> void

Resets the bucket to full capacity.

Restores tokens to burst_size and resets the last refill time.

Thread Safety: Lock-free write.

Implementation details:

  • Sets tokens to max_tokens_
  • Updates last_refill_ to current time
  • Useful for manual intervention or testing

Definition at line 330 of file token_bucket.cpp.

331 {
332 std::int64_t max = max_tokens_.load(std::memory_order_acquire);
333 tokens_.store(max, std::memory_order_release);
334 last_refill_.store(
335 std::chrono::steady_clock::now().time_since_epoch().count(),
336 std::memory_order_release);
337 }

◆ set_burst_size()

auto kcenon::thread::token_bucket::set_burst_size ( std::size_t burst_size) -> void

Updates the maximum bucket capacity.

Parameters
burst_sizeNew maximum tokens.

If current tokens exceed new capacity, excess tokens are discarded.

Thread Safety: Lock-free write.

Implementation details:

  • Updates max_tokens_ atomically
  • If current tokens exceed new max, caps them
Parameters
burst_sizeNew maximum capacity

Definition at line 282 of file token_bucket.cpp.

283 {
284 std::int64_t new_max = static_cast<std::int64_t>(burst_size) * PRECISION_FACTOR;
285 max_tokens_.store(new_max, std::memory_order_release);
286
287 // Cap current tokens if they exceed new max
288 std::int64_t current = tokens_.load(std::memory_order_acquire);
289 while (current > new_max)
290 {
291 if (tokens_.compare_exchange_weak(
292 current, new_max,
293 std::memory_order_acq_rel,
294 std::memory_order_acquire))
295 {
296 break;
297 }
298 }
299 }

◆ set_rate()

auto kcenon::thread::token_bucket::set_rate ( std::size_t tokens_per_second) -> void

Updates the token refill rate.

Parameters
tokens_per_secondNew tokens per second rate.

Takes effect immediately. Does not affect currently accumulated tokens.

Thread Safety: Lock-free write.

Implementation details:

  • First refills with old rate to preserve accumulated tokens
  • Then updates rate for future refills
  • Converts tokens/second to tokens/nanosecond
Parameters
tokens_per_secondNew refill rate

Definition at line 263 of file token_bucket.cpp.

264 {
265 // Refill with current rate before changing
266 refill();
267
268 // Update rate (tokens per nanosecond, scaled by precision)
269 double new_rate = static_cast<double>(tokens_per_second) * PRECISION_FACTOR / 1e9;
270 refill_rate_.store(new_rate, std::memory_order_release);
271 }

◆ time_until_available()

auto kcenon::thread::token_bucket::time_until_available ( std::size_t tokens) const -> std::chrono::nanoseconds
nodiscard

Calculates time until the specified tokens become available.

Calculates time until tokens become available.

Parameters
tokensNumber of tokens needed.
Returns
Duration until tokens will be available (0 if already available).

Useful for implementing waiting strategies or displaying estimated wait times to users.

Thread Safety: Lock-free calculation.

Implementation details:

  • If sufficient tokens exist, returns 0
  • Otherwise, calculates time based on deficit and refill rate
  • Returns duration in nanoseconds for maximum precision
Parameters
tokensNumber of tokens needed
Returns
Duration until tokens available (0 if already available)

Definition at line 225 of file token_bucket.cpp.

227 {
228 const_cast<token_bucket*>(this)->refill();
229
230 std::int64_t needed = static_cast<std::int64_t>(tokens) * PRECISION_FACTOR;
231 std::int64_t current = tokens_.load(std::memory_order_acquire);
232
233 if (current >= needed)
234 {
235 return std::chrono::nanoseconds{0};
236 }
237
238 // Calculate deficit
239 std::int64_t deficit = needed - current;
240
241 // Time = deficit / rate (rate is in tokens per nanosecond)
242 double rate = refill_rate_.load(std::memory_order_relaxed);
243 if (rate <= 0)
244 {
245 // Infinite wait if rate is zero
246 return std::chrono::nanoseconds::max();
247 }
248
249 auto wait_ns = static_cast<std::int64_t>(deficit / rate);
250 return std::chrono::nanoseconds{wait_ns};
251 }

◆ try_acquire()

auto kcenon::thread::token_bucket::try_acquire ( std::size_t tokens = 1) -> bool
nodiscard

Attempts to acquire tokens without waiting.

Parameters
tokensNumber of tokens to acquire (default: 1).
Returns
true if tokens were acquired, false if insufficient tokens.

This method is non-blocking and returns immediately. If the bucket doesn't have enough tokens, the operation fails without waiting.

Thread Safety: Lock-free, safe for concurrent calls.

Implementation details:

  • First refills bucket based on elapsed time
  • Then attempts atomic decrement if sufficient tokens
  • Uses CAS loop to handle concurrent acquisitions
  • Returns immediately if insufficient tokens

Thread Safety:

  • Lock-free via CAS loop
  • Multiple threads can acquire concurrently
  • Fair in the sense that all competing threads have equal chance
Parameters
tokensNumber of tokens to acquire
Returns
true if acquired, false if insufficient

Definition at line 117 of file token_bucket.cpp.

118 {
119 // First, refill based on elapsed time
120 refill();
121
122 // Scale requested tokens by precision factor
123 std::int64_t needed = static_cast<std::int64_t>(tokens) * PRECISION_FACTOR;
124
125 // CAS loop to atomically decrement tokens
126 std::int64_t current = tokens_.load(std::memory_order_acquire);
127 while (current >= needed)
128 {
129 if (tokens_.compare_exchange_weak(
130 current, current - needed,
131 std::memory_order_acq_rel,
132 std::memory_order_acquire))
133 {
134 return true; // Successfully acquired
135 }
136 // CAS failed, current has been updated, retry
137 }
138
139 return false; // Insufficient tokens
140 }

◆ try_acquire_for()

auto kcenon::thread::token_bucket::try_acquire_for ( std::size_t tokens,
std::chrono::milliseconds timeout ) -> bool
nodiscard

Attempts to acquire tokens with a timeout.

Attempts to acquire tokens with timeout and backoff.

Parameters
tokensNumber of tokens to acquire.
timeoutMaximum time to wait for tokens.
Returns
true if tokens were acquired within timeout, false otherwise.

This method will spin-wait (with backoff) until either:

  • Enough tokens become available (returns true)
  • The timeout expires (returns false)

Implementation uses exponential backoff to reduce CPU usage while waiting for token refill.

Thread Safety: Lock-free with cooperative spin-waiting.

Implementation details:

  • Uses exponential backoff to reduce CPU usage while waiting
  • Backoff starts at 1μs, doubles each iteration, caps at 1ms
  • Checks deadline after each failed attempt
  • Returns as soon as tokens are acquired

Backoff Strategy:

  • Initial: 1μs sleep
  • Max: 1ms sleep
  • Multiplier: 2x per iteration
  • Purpose: Balance responsiveness vs CPU usage
Parameters
tokensNumber of tokens to acquire
timeoutMaximum time to wait
Returns
true if acquired within timeout, false otherwise

Definition at line 161 of file token_bucket.cpp.

164 {
165 auto deadline = std::chrono::steady_clock::now() + timeout;
166
167 // Start with small backoff, increase exponentially
168 auto backoff = std::chrono::microseconds{1};
169 constexpr auto max_backoff = std::chrono::milliseconds{1};
170
171 while (std::chrono::steady_clock::now() < deadline)
172 {
173 if (try_acquire(tokens))
174 {
175 return true;
176 }
177
178 // Sleep with exponential backoff
179 std::this_thread::sleep_for(backoff);
180
181 // Double backoff, cap at max
182 backoff = std::min(
183 backoff * 2,
184 std::chrono::duration_cast<std::chrono::microseconds>(max_backoff));
185 }
186
187 // Final attempt after loop
188 return try_acquire(tokens);
189 }
auto try_acquire(std::size_t tokens=1) -> bool
Attempts to acquire tokens without waiting.

Member Data Documentation

◆ last_refill_

std::atomic<std::chrono::steady_clock::time_point::rep> kcenon::thread::token_bucket::last_refill_
private

Timestamp of last token refill.

Definition at line 219 of file token_bucket.h.

◆ max_tokens_

std::atomic<std::int64_t> kcenon::thread::token_bucket::max_tokens_
private

Maximum tokens (burst size) scaled by precision factor.

Definition at line 207 of file token_bucket.h.

Referenced by get_burst_size().

◆ PRECISION_FACTOR

std::int64_t kcenon::thread::token_bucket::PRECISION_FACTOR = 1000
staticconstexprprivate

Precision factor for fixed-point token calculations.

Using 1000 allows for milli-token precision without floating point.

Definition at line 226 of file token_bucket.h.

Referenced by available_tokens(), get_burst_size(), and get_rate().

◆ refill_rate_

std::atomic<double> kcenon::thread::token_bucket::refill_rate_
private

Token refill rate in nano-tokens per nanosecond.

Calculated as: (tokens_per_second * PRECISION_FACTOR) / 1e9

Definition at line 214 of file token_bucket.h.

Referenced by get_rate().

◆ tokens_

std::atomic<std::int64_t> kcenon::thread::token_bucket::tokens_
private

Current token count (scaled by 1000 for sub-token precision).

We use fixed-point arithmetic to avoid floating-point atomics. Actual tokens = tokens_ / PRECISION_FACTOR

Definition at line 202 of file token_bucket.h.

Referenced by available_tokens().


The documentation for this class was generated from the following files: