Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::metrics::SlidingWindowCounter Class Reference

Sliding window counter for throughput measurement. More...

#include <sliding_window_counter.h>

Collaboration diagram for kcenon::thread::metrics::SlidingWindowCounter:
Collaboration graph

Classes

struct  Bucket
 Time bucket structure. More...
 

Public Member Functions

 SlidingWindowCounter (std::chrono::seconds window_size, std::size_t buckets_per_second=DEFAULT_BUCKETS_PER_SECOND)
 Constructs a sliding window counter.
 
 SlidingWindowCounter (const SlidingWindowCounter &other)
 Copy constructor.
 
 SlidingWindowCounter (SlidingWindowCounter &&other) noexcept
 Move constructor.
 
SlidingWindowCounteroperator= (const SlidingWindowCounter &other)
 Copy assignment operator.
 
SlidingWindowCounteroperator= (SlidingWindowCounter &&other) noexcept
 Move assignment operator.
 
 ~SlidingWindowCounter ()=default
 Destructor.
 
void increment (std::size_t count=1)
 Increment the counter.
 
double rate_per_second () const
 Get the current rate per second.
 
std::uint64_t total_in_window () const
 Get the total count within the current window.
 
std::uint64_t all_time_total () const
 Get the all-time total count.
 
void reset ()
 Reset the counter.
 
std::chrono::seconds window_size () const
 Get the window size.
 
std::size_t bucket_count () const
 Get the number of buckets.
 

Static Public Attributes

static constexpr std::size_t DEFAULT_BUCKETS_PER_SECOND = 10
 Default number of buckets per second.
 

Private Member Functions

std::size_t current_bucket_index () const
 Get the current bucket index based on current time.
 
std::size_t bucket_index_for_time (std::uint64_t timestamp_ms) const
 Get the bucket index for a specific timestamp.
 
bool is_bucket_valid (std::uint64_t bucket_timestamp_ms, std::uint64_t current_ms) const
 Check if a bucket is within the current window.
 
void advance_bucket (std::size_t bucket_index, std::uint64_t current_ms)
 Advance bucket to current time period if needed.
 

Static Private Member Functions

static std::uint64_t current_time_ms ()
 Get current time in milliseconds since epoch.
 

Private Attributes

std::chrono::seconds window_size_
 The sliding window duration.
 
std::chrono::milliseconds bucket_duration_
 Duration of each bucket.
 
std::vector< Bucketbuckets_
 Circular buffer of time buckets.
 
std::atomic< std::uint64_t > all_time_total_ {0}
 All-time total count.
 

Detailed Description

Sliding window counter for throughput measurement.

This class provides a lock-free, thread-safe counter that tracks events within a sliding time window. It uses a circular buffer of time buckets to maintain accurate rate calculations.

Design Principles

  • Lock-free: All operations use atomic instructions
  • Low overhead: O(1) increment, O(bucket_count) for rate calculation
  • Configurable: Window size and bucket granularity are configurable
  • Accurate: Sub-second precision with configurable bucket size

Implementation Details

The window is divided into multiple buckets (default: 10 per second). Each bucket covers a fixed time interval. As time advances, old buckets are automatically invalidated and reused for new time periods.

Example for a 1-second window with 10 buckets:

Time: [0ms - 100ms] [100ms - 200ms] ... [900ms - 1000ms]
Buckets: bucket[0] bucket[1] ... bucket[9]
Thread Safety:
Thread-safe for concurrent increment and rate queries.
See also
EnhancedThreadPoolMetrics

Definition at line 51 of file sliding_window_counter.h.

Constructor & Destructor Documentation

◆ SlidingWindowCounter() [1/3]

kcenon::thread::metrics::SlidingWindowCounter::SlidingWindowCounter ( std::chrono::seconds window_size,
std::size_t buckets_per_second = DEFAULT_BUCKETS_PER_SECOND )
explicit

Constructs a sliding window counter.

Parameters
window_sizeThe duration of the sliding window.
buckets_per_secondNumber of buckets per second (default: 10).

Higher buckets_per_second provides more precision but uses more memory. For a 60-second window with 10 buckets/second, uses 600 * 16 bytes = ~10KB.

Definition at line 11 of file sliding_window_counter.cpp.

15 bucket_duration_(std::chrono::milliseconds{1000 / buckets_per_second}),
16 buckets_(static_cast<std::size_t>(window_size.count()) * buckets_per_second) {
17 // Initialize all buckets
18 for (auto& bucket : buckets_) {
19 bucket.count.store(0, std::memory_order_relaxed);
20 bucket.timestamp_ms.store(0, std::memory_order_relaxed);
21 }
22}
std::chrono::seconds window_size_
The sliding window duration.
std::chrono::seconds window_size() const
Get the window size.
std::vector< Bucket > buckets_
Circular buffer of time buckets.
std::chrono::milliseconds bucket_duration_
Duration of each bucket.

References buckets_.

◆ SlidingWindowCounter() [2/3]

kcenon::thread::metrics::SlidingWindowCounter::SlidingWindowCounter ( const SlidingWindowCounter & other)

Copy constructor.

Parameters
otherThe counter to copy from.

Definition at line 24 of file sliding_window_counter.cpp.

25 : window_size_(other.window_size_),
26 bucket_duration_(other.bucket_duration_),
27 buckets_(other.buckets_.size()) {
28 for (std::size_t i = 0; i < buckets_.size(); ++i) {
29 buckets_[i].count.store(
30 other.buckets_[i].count.load(std::memory_order_relaxed),
31 std::memory_order_relaxed);
32 buckets_[i].timestamp_ms.store(
33 other.buckets_[i].timestamp_ms.load(std::memory_order_relaxed),
34 std::memory_order_relaxed);
35 }
36 all_time_total_.store(
37 other.all_time_total_.load(std::memory_order_relaxed),
38 std::memory_order_relaxed);
39}
std::atomic< std::uint64_t > all_time_total_
All-time total count.

References all_time_total_, and buckets_.

◆ SlidingWindowCounter() [3/3]

kcenon::thread::metrics::SlidingWindowCounter::SlidingWindowCounter ( SlidingWindowCounter && other)
noexcept

Move constructor.

Parameters
otherThe counter to move from.

Definition at line 41 of file sliding_window_counter.cpp.

42 : window_size_(other.window_size_),
43 bucket_duration_(other.bucket_duration_),
44 buckets_(std::move(other.buckets_)) {
45 all_time_total_.store(
46 other.all_time_total_.exchange(0, std::memory_order_relaxed),
47 std::memory_order_relaxed);
48}

◆ ~SlidingWindowCounter()

kcenon::thread::metrics::SlidingWindowCounter::~SlidingWindowCounter ( )
default

Destructor.

Member Function Documentation

◆ advance_bucket()

void kcenon::thread::metrics::SlidingWindowCounter::advance_bucket ( std::size_t bucket_index,
std::uint64_t current_ms )
private

Advance bucket to current time period if needed.

Parameters
bucket_indexThe bucket to advance.
current_msThe current timestamp.

Definition at line 174 of file sliding_window_counter.cpp.

176 {
177 auto& bucket = buckets_[bucket_index];
178 const auto bucket_duration_ms =
179 static_cast<std::uint64_t>(bucket_duration_.count());
180
181 // Calculate the expected timestamp for this bucket at current time
182 const auto expected_timestamp =
183 (current_ms / bucket_duration_ms) * bucket_duration_ms;
184
185 auto old_timestamp = bucket.timestamp_ms.load(std::memory_order_relaxed);
186
187 // Check if bucket needs to be advanced (stale bucket from previous window)
188 if (old_timestamp < expected_timestamp) {
189 // Try to update the timestamp atomically
190 if (bucket.timestamp_ms.compare_exchange_strong(
191 old_timestamp, expected_timestamp,
192 std::memory_order_relaxed, std::memory_order_relaxed)) {
193 // We successfully claimed this bucket for the new time period
194 // Reset the count (old data is now invalid)
195 bucket.count.store(0, std::memory_order_relaxed);
196 }
197 // If CAS failed, another thread already advanced it - that's fine
198 }
199}

References bucket_duration_, and buckets_.

Referenced by increment().

Here is the caller graph for this function:

◆ all_time_total()

std::uint64_t kcenon::thread::metrics::SlidingWindowCounter::all_time_total ( ) const
nodiscard

Get the all-time total count.

Returns
The cumulative count since creation or last reset.

Definition at line 123 of file sliding_window_counter.cpp.

123 {
124 return all_time_total_.load(std::memory_order_relaxed);
125}

References all_time_total_.

◆ bucket_count()

std::size_t kcenon::thread::metrics::SlidingWindowCounter::bucket_count ( ) const
nodiscard

Get the number of buckets.

Returns
The total number of time buckets.

Definition at line 139 of file sliding_window_counter.cpp.

139 {
140 return buckets_.size();
141}

References buckets_.

◆ bucket_index_for_time()

std::size_t kcenon::thread::metrics::SlidingWindowCounter::bucket_index_for_time ( std::uint64_t timestamp_ms) const
nodiscardprivate

Get the bucket index for a specific timestamp.

Parameters
timestamp_msTimestamp in milliseconds since epoch.
Returns
The bucket index.

Definition at line 147 of file sliding_window_counter.cpp.

148 {
149 const auto bucket_duration_ms =
150 static_cast<std::uint64_t>(bucket_duration_.count());
151 return static_cast<std::size_t>(
152 (timestamp_ms / bucket_duration_ms) % buckets_.size());
153}

References bucket_duration_, and buckets_.

Referenced by current_bucket_index(), and increment().

Here is the caller graph for this function:

◆ current_bucket_index()

std::size_t kcenon::thread::metrics::SlidingWindowCounter::current_bucket_index ( ) const
nodiscardprivate

Get the current bucket index based on current time.

Returns
The index into the circular buffer.

Definition at line 143 of file sliding_window_counter.cpp.

143 {
145}
std::size_t bucket_index_for_time(std::uint64_t timestamp_ms) const
Get the bucket index for a specific timestamp.
static std::uint64_t current_time_ms()
Get current time in milliseconds since epoch.

References bucket_index_for_time(), and current_time_ms().

Here is the call graph for this function:

◆ current_time_ms()

std::uint64_t kcenon::thread::metrics::SlidingWindowCounter::current_time_ms ( )
staticnodiscardprivate

Get current time in milliseconds since epoch.

Returns
Current timestamp in milliseconds.

Definition at line 155 of file sliding_window_counter.cpp.

155 {
156 return static_cast<std::uint64_t>(
157 std::chrono::duration_cast<std::chrono::milliseconds>(
158 std::chrono::steady_clock::now().time_since_epoch())
159 .count());
160}

Referenced by current_bucket_index(), increment(), and total_in_window().

Here is the caller graph for this function:

◆ increment()

void kcenon::thread::metrics::SlidingWindowCounter::increment ( std::size_t count = 1)

Increment the counter.

Parameters
countThe value to add (default: 1).

This operation is lock-free and thread-safe. Complexity: O(1).

Definition at line 89 of file sliding_window_counter.cpp.

89 {
90 const auto current_ms = current_time_ms();
91 const auto bucket_index = bucket_index_for_time(current_ms);
92
93 // Advance bucket if it's stale
94 advance_bucket(bucket_index, current_ms);
95
96 // Increment the bucket and all-time total
97 buckets_[bucket_index].count.fetch_add(
98 static_cast<std::uint64_t>(count), std::memory_order_relaxed);
99 all_time_total_.fetch_add(
100 static_cast<std::uint64_t>(count), std::memory_order_relaxed);
101}
void advance_bucket(std::size_t bucket_index, std::uint64_t current_ms)
Advance bucket to current time period if needed.

References advance_bucket(), all_time_total_, bucket_index_for_time(), buckets_, and current_time_ms().

Referenced by kcenon::thread::metrics::EnhancedThreadPoolMetrics::record_execution().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_bucket_valid()

bool kcenon::thread::metrics::SlidingWindowCounter::is_bucket_valid ( std::uint64_t bucket_timestamp_ms,
std::uint64_t current_ms ) const
nodiscardprivate

Check if a bucket is within the current window.

Parameters
bucket_timestamp_msThe bucket's timestamp.
current_msThe current timestamp.
Returns
True if the bucket is valid (within window).

Definition at line 162 of file sliding_window_counter.cpp.

164 {
165 if (bucket_timestamp_ms == 0) {
166 return false;
167 }
168
169 const auto window_ms =
170 static_cast<std::uint64_t>(window_size_.count()) * 1000;
171 return (current_ms - bucket_timestamp_ms) < window_ms;
172}

References window_size_.

Referenced by total_in_window().

Here is the caller graph for this function:

◆ operator=() [1/2]

SlidingWindowCounter & kcenon::thread::metrics::SlidingWindowCounter::operator= ( const SlidingWindowCounter & other)

Copy assignment operator.

Parameters
otherThe counter to copy from.
Returns
Reference to this counter.

Definition at line 50 of file sliding_window_counter.cpp.

51 {
52 if (this != &other) {
53 window_size_ = other.window_size_;
54 bucket_duration_ = other.bucket_duration_;
55
56 // Resize if needed
57 if (buckets_.size() != other.buckets_.size()) {
58 buckets_.resize(other.buckets_.size());
59 }
60
61 for (std::size_t i = 0; i < buckets_.size(); ++i) {
62 buckets_[i].count.store(
63 other.buckets_[i].count.load(std::memory_order_relaxed),
64 std::memory_order_relaxed);
65 buckets_[i].timestamp_ms.store(
66 other.buckets_[i].timestamp_ms.load(std::memory_order_relaxed),
67 std::memory_order_relaxed);
68 }
69 all_time_total_.store(
70 other.all_time_total_.load(std::memory_order_relaxed),
71 std::memory_order_relaxed);
72 }
73 return *this;
74}

References all_time_total_, bucket_duration_, buckets_, and window_size_.

◆ operator=() [2/2]

SlidingWindowCounter & kcenon::thread::metrics::SlidingWindowCounter::operator= ( SlidingWindowCounter && other)
noexcept

Move assignment operator.

Parameters
otherThe counter to move from.
Returns
Reference to this counter.

Definition at line 76 of file sliding_window_counter.cpp.

77 {
78 if (this != &other) {
79 window_size_ = other.window_size_;
80 bucket_duration_ = other.bucket_duration_;
81 buckets_ = std::move(other.buckets_);
82 all_time_total_.store(
83 other.all_time_total_.exchange(0, std::memory_order_relaxed),
84 std::memory_order_relaxed);
85 }
86 return *this;
87}

◆ rate_per_second()

double kcenon::thread::metrics::SlidingWindowCounter::rate_per_second ( ) const
nodiscard

Get the current rate per second.

Returns
The average rate over the sliding window.

This calculates the total count in the window divided by window duration.

Definition at line 103 of file sliding_window_counter.cpp.

103 {
104 const auto total = total_in_window();
105 const auto window_seconds = static_cast<double>(window_size_.count());
106 return static_cast<double>(total) / window_seconds;
107}
std::uint64_t total_in_window() const
Get the total count within the current window.

References total_in_window(), and window_size_.

Referenced by kcenon::thread::metrics::EnhancedThreadPoolMetrics::snapshot().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ reset()

void kcenon::thread::metrics::SlidingWindowCounter::reset ( )

Reset the counter.

Clears all buckets and resets the all-time total.

Definition at line 127 of file sliding_window_counter.cpp.

127 {
128 for (auto& bucket : buckets_) {
129 bucket.count.store(0, std::memory_order_relaxed);
130 bucket.timestamp_ms.store(0, std::memory_order_relaxed);
131 }
132 all_time_total_.store(0, std::memory_order_relaxed);
133}

References all_time_total_, and buckets_.

Referenced by kcenon::thread::metrics::EnhancedThreadPoolMetrics::reset().

Here is the caller graph for this function:

◆ total_in_window()

std::uint64_t kcenon::thread::metrics::SlidingWindowCounter::total_in_window ( ) const
nodiscard

Get the total count within the current window.

Returns
The sum of all counts in valid buckets.

Definition at line 109 of file sliding_window_counter.cpp.

109 {
110 const auto current_ms = current_time_ms();
111 std::uint64_t total = 0;
112
113 for (const auto& bucket : buckets_) {
114 const auto timestamp = bucket.timestamp_ms.load(std::memory_order_relaxed);
115 if (is_bucket_valid(timestamp, current_ms)) {
116 total += bucket.count.load(std::memory_order_relaxed);
117 }
118 }
119
120 return total;
121}
bool is_bucket_valid(std::uint64_t bucket_timestamp_ms, std::uint64_t current_ms) const
Check if a bucket is within the current window.

References buckets_, current_time_ms(), and is_bucket_valid().

Referenced by rate_per_second().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ window_size()

std::chrono::seconds kcenon::thread::metrics::SlidingWindowCounter::window_size ( ) const
nodiscard

Get the window size.

Returns
The configured sliding window duration.

Definition at line 135 of file sliding_window_counter.cpp.

135 {
136 return window_size_;
137}

References window_size_.

Member Data Documentation

◆ all_time_total_

std::atomic<std::uint64_t> kcenon::thread::metrics::SlidingWindowCounter::all_time_total_ {0}
private

All-time total count.

Definition at line 209 of file sliding_window_counter.h.

209{0};

Referenced by all_time_total(), increment(), operator=(), reset(), and SlidingWindowCounter().

◆ bucket_duration_

std::chrono::milliseconds kcenon::thread::metrics::SlidingWindowCounter::bucket_duration_
private

Duration of each bucket.

Definition at line 199 of file sliding_window_counter.h.

Referenced by advance_bucket(), bucket_index_for_time(), and operator=().

◆ buckets_

std::vector<Bucket> kcenon::thread::metrics::SlidingWindowCounter::buckets_
private

◆ DEFAULT_BUCKETS_PER_SECOND

std::size_t kcenon::thread::metrics::SlidingWindowCounter::DEFAULT_BUCKETS_PER_SECOND = 10
staticconstexpr

Default number of buckets per second.

Definition at line 56 of file sliding_window_counter.h.

◆ window_size_

std::chrono::seconds kcenon::thread::metrics::SlidingWindowCounter::window_size_
private

The sliding window duration.

Definition at line 194 of file sliding_window_counter.h.

Referenced by is_bucket_valid(), operator=(), rate_per_second(), and window_size().


The documentation for this class was generated from the following files: