Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::ring_buffer< T > Class Template Reference

Lock-free ring buffer with atomic operations. More...

#include <ring_buffer.h>

Inheritance diagram for kcenon::monitoring::ring_buffer< T >:
Inheritance graph
Collaboration diagram for kcenon::monitoring::ring_buffer< T >:
Collaboration graph

Public Member Functions

 ring_buffer (const ring_buffer_config &config={})
 Constructor with configuration.
 
 ring_buffer (const ring_buffer &)=delete
 
ring_bufferoperator= (const ring_buffer &)=delete
 
 ring_buffer (ring_buffer &&)=default
 
ring_bufferoperator= (ring_buffer &&)=default
 
common::VoidResult write (T &&item)
 Write a single element to the buffer.
 
size_t write_batch (std::vector< T > &&items)
 Write multiple elements in batch.
 
common::VoidResult read (T &item)
 Read a single element from the buffer.
 
size_t read_batch (std::vector< T > &items, size_t max_count=SIZE_MAX)
 Read multiple elements in batch.
 
common::VoidResult peek (T &item) const
 Peek at the next item without removing it.
 
size_t size () const noexcept
 Get current number of elements in buffer.
 
bool empty () const noexcept
 Check if buffer is empty.
 
bool full () const noexcept
 Check if buffer is full.
 
size_t capacity () const noexcept
 Get buffer capacity.
 
void clear () noexcept
 Clear all elements in the buffer.
 
const ring_buffer_configget_config () const noexcept
 Get buffer configuration.
 
const ring_buffer_statsget_stats () const noexcept
 Get buffer statistics.
 
void reset_stats () noexcept
 Reset statistics.
 
bool is_overflow_rate_high () const noexcept
 Check if buffer is experiencing high overflow.
 
double get_overflow_rate () const noexcept
 Get overflow rate percentage.
 

Private Member Functions

size_t get_mask () const noexcept
 Get the mask for efficient modulo operation.
 
bool is_full_unsafe (size_t write_idx, size_t read_idx) const noexcept
 Check if buffer is full.
 
bool is_empty_unsafe (size_t write_idx, size_t read_idx) const noexcept
 Check if buffer is empty.
 

Private Attributes

std::atomic< size_t > write_index_ {0}
 
std::atomic< size_t > read_index_ {0}
 
std::unique_ptr< T[]> buffer_
 
ring_buffer_config config_
 
ring_buffer_stats stats_
 

Detailed Description

template<typename T>
class kcenon::monitoring::ring_buffer< T >

Lock-free ring buffer with atomic operations.

Template Parameters
TThe type of elements to store

This implementation uses atomic operations for thread-safety and provides efficient circular buffer semantics with configurable overflow behavior.

Definition at line 141 of file ring_buffer.h.

Constructor & Destructor Documentation

◆ ring_buffer() [1/3]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( const ring_buffer_config & config = {})
inlineexplicit

Constructor with configuration.

Parameters
configRing buffer configuration options
Exceptions
std::invalid_argumentif configuration validation fails

Definition at line 180 of file ring_buffer.h.

180 {})
181 : buffer_(std::make_unique<T[]>(config.capacity))
182 , config_(config) {
183
184 // Validate configuration
185 auto validation = config_.validate();
186 if (validation.is_err()) {
187 throw std::invalid_argument("Invalid ring buffer configuration: " +
188 validation.error().message);
189 }
190 }
std::unique_ptr< T[]> buffer_
size_t capacity() const noexcept
Get buffer capacity.
common::VoidResult validate() const
Validate ring buffer configuration.
Definition ring_buffer.h:45

◆ ring_buffer() [2/3]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( const ring_buffer< T > & )
delete

◆ ring_buffer() [3/3]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( ring_buffer< T > && )
default

Member Function Documentation

◆ capacity()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::capacity ( ) const
inlinenoexcept

Get buffer capacity.

Definition at line 416 of file ring_buffer.h.

416 {
417 return config_.capacity;
418 }

References kcenon::monitoring::ring_buffer_config::capacity, and kcenon::monitoring::ring_buffer< T >::config_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ clear()

template<typename T >
void kcenon::monitoring::ring_buffer< T >::clear ( )
inlinenoexcept

Clear all elements in the buffer.

Definition at line 423 of file ring_buffer.h.

423 {
424 write_index_.store(0, std::memory_order_release);
425 read_index_.store(0, std::memory_order_release);
426 }
std::atomic< size_t > write_index_
std::atomic< size_t > read_index_

References kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

◆ empty()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::empty ( ) const
inlinenoexcept

Check if buffer is empty.

Definition at line 400 of file ring_buffer.h.

400 {
401 return size() == 0;
402 }
size_t size() const noexcept
Get current number of elements in buffer.

References kcenon::monitoring::ring_buffer< T >::size().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ full()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::full ( ) const
inlinenoexcept

Check if buffer is full.

Definition at line 407 of file ring_buffer.h.

407 {
408 size_t write_idx = write_index_.load(std::memory_order_acquire);
409 size_t read_idx = read_index_.load(std::memory_order_acquire);
410 return is_full_unsafe(write_idx, read_idx);
411 }
bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is full.

References kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_config()

template<typename T >
const ring_buffer_config & kcenon::monitoring::ring_buffer< T >::get_config ( ) const
inlinenoexcept

Get buffer configuration.

Definition at line 431 of file ring_buffer.h.

431 {
432 return config_;
433 }

References kcenon::monitoring::ring_buffer< T >::config_.

◆ get_mask()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::get_mask ( ) const
inlineprivatenoexcept

Get the mask for efficient modulo operation.

Definition at line 156 of file ring_buffer.h.

156 {
157 return config_.capacity - 1;
158 }

References kcenon::monitoring::ring_buffer_config::capacity, and kcenon::monitoring::ring_buffer< T >::config_.

Referenced by kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer< T >::read(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the caller graph for this function:

◆ get_overflow_rate()

template<typename T >
double kcenon::monitoring::ring_buffer< T >::get_overflow_rate ( ) const
inlinenoexcept

Get overflow rate percentage.

Definition at line 466 of file ring_buffer.h.

466 {
467 return stats_.get_overflow_rate();
468 }
double get_overflow_rate() const
Get overflow rate (overwrites per total writes)
Definition ring_buffer.h:94

References kcenon::monitoring::ring_buffer_stats::get_overflow_rate(), and kcenon::monitoring::ring_buffer< T >::stats_.

Here is the call graph for this function:

◆ get_stats()

template<typename T >
const ring_buffer_stats & kcenon::monitoring::ring_buffer< T >::get_stats ( ) const
inlinenoexcept

Get buffer statistics.

Definition at line 438 of file ring_buffer.h.

438 {
439 return stats_;
440 }

References kcenon::monitoring::ring_buffer< T >::stats_.

◆ is_empty_unsafe()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_empty_unsafe ( size_t write_idx,
size_t read_idx ) const
inlineprivatenoexcept

Check if buffer is empty.

Definition at line 170 of file ring_buffer.h.

170 {
171 return write_idx == read_idx;
172 }

Referenced by kcenon::monitoring::ring_buffer< T >::peek(), and kcenon::monitoring::ring_buffer< T >::read().

Here is the caller graph for this function:

◆ is_full_unsafe()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_full_unsafe ( size_t write_idx,
size_t read_idx ) const
inlineprivatenoexcept

Check if buffer is full.

Definition at line 163 of file ring_buffer.h.

163 {
164 return ((write_idx + 1) & get_mask()) == read_idx;
165 }
size_t get_mask() const noexcept
Get the mask for efficient modulo operation.

References kcenon::monitoring::ring_buffer< T >::get_mask().

Referenced by kcenon::monitoring::ring_buffer< T >::full(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_overflow_rate_high()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_overflow_rate_high ( ) const
inlinenoexcept

Check if buffer is experiencing high overflow.

Returns
true if overflow rate exceeds threshold

Definition at line 459 of file ring_buffer.h.

459 {
461 }
bool is_overflow_rate_high() const
Check if overflow rate is high (> 10%)

References kcenon::monitoring::ring_buffer_stats::is_overflow_rate_high(), and kcenon::monitoring::ring_buffer< T >::stats_.

Here is the call graph for this function:

◆ operator=() [1/2]

template<typename T >
ring_buffer & kcenon::monitoring::ring_buffer< T >::operator= ( const ring_buffer< T > & )
delete

◆ operator=() [2/2]

template<typename T >
ring_buffer & kcenon::monitoring::ring_buffer< T >::operator= ( ring_buffer< T > && )
default

◆ peek()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::peek ( T & item) const
inline

Peek at the next item without removing it.

Parameters
itemReference to store the peeked item
Returns
Result indicating success or failure

Definition at line 368 of file ring_buffer.h.

368 {
369 size_t current_read = read_index_.load(std::memory_order_acquire);
370 size_t current_write = write_index_.load(std::memory_order_acquire);
371
372 if (is_empty_unsafe(current_write, current_read)) {
373 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
374 "Ring buffer is empty").to_common_error());
375 }
376
377 item = buffer_[current_read]; // Copy, don't move
378 return common::ok();
379 }
bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is empty.

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer< T >::is_empty_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::read ( T & item)
inline

Read a single element from the buffer.

Parameters
itemReference to store the read item
Returns
Result indicating success or failure

Definition at line 311 of file ring_buffer.h.

311 {
312 stats_.total_reads.fetch_add(1, std::memory_order_relaxed);
313
314 size_t current_read = read_index_.load(std::memory_order_acquire);
315 size_t current_write = write_index_.load(std::memory_order_acquire);
316
317 if (is_empty_unsafe(current_write, current_read)) {
318 stats_.failed_reads.fetch_add(1, std::memory_order_relaxed);
319 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
320 "Ring buffer is empty").to_common_error());
321 }
322
323 // Read the item
324 item = std::move(buffer_[current_read]);
325
326 // Update read index
327 size_t new_read = (current_read + 1) & get_mask();
328 read_index_.store(new_read, std::memory_order_release);
329
330 return common::ok();
331 }
std::atomic< size_t > failed_reads
Definition ring_buffer.h:69
std::atomic< size_t > total_reads
Definition ring_buffer.h:66

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer_stats::failed_reads, kcenon::monitoring::ring_buffer< T >::get_mask(), kcenon::monitoring::ring_buffer< T >::is_empty_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, kcenon::monitoring::ring_buffer< T >::stats_, kcenon::monitoring::ring_buffer_stats::total_reads, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by kcenon::monitoring::ring_buffer< T >::read_batch(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read_batch()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::read_batch ( std::vector< T > & items,
size_t max_count = SIZE_MAX )
inline

Read multiple elements in batch.

Parameters
itemsVector to store read items
max_countMaximum number of items to read
Returns
Number of items actually read

Definition at line 339 of file ring_buffer.h.

339 {
340 if (max_count == 0) {
341 return 0;
342 }
343
344 size_t batch_size = std::min(max_count, config_.batch_size);
345 items.reserve(items.size() + batch_size);
346
347 size_t read_count = 0;
348 T temp_item;
349
350 while (read_count < batch_size) {
351 auto result = read(temp_item);
352 if (result.is_err()) {
353 break; // No more items to read
354 }
355
356 items.emplace_back(std::move(temp_item));
357 ++read_count;
358 }
359
360 return read_count;
361 }
common::VoidResult read(T &item)
Read a single element from the buffer.

References kcenon::monitoring::ring_buffer_config::batch_size, kcenon::monitoring::ring_buffer< T >::config_, and kcenon::monitoring::ring_buffer< T >::read().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ reset_stats()

◆ size()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::size ( ) const
inlinenoexcept

Get current number of elements in buffer.

Definition at line 384 of file ring_buffer.h.

384 {
385 size_t write_idx = write_index_.load(std::memory_order_acquire);
386 size_t read_idx = read_index_.load(std::memory_order_acquire);
387
388 // Handle wraparound correctly
389 if (write_idx >= read_idx) {
390 return write_idx - read_idx;
391 } else {
392 // Wraparound case: write has wrapped around but read hasn't
393 return config_.capacity - read_idx + write_idx;
394 }
395 }

References kcenon::monitoring::ring_buffer_config::capacity, kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by kcenon::monitoring::ring_buffer< T >::empty(), TEST_F(), TEST_F(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the caller graph for this function:

◆ write()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::write ( T && item)
inline

Write a single element to the buffer.

Parameters
itemItem to write
Returns
Result indicating success or failure

Definition at line 203 of file ring_buffer.h.

203 {
204 stats_.total_writes.fetch_add(1, std::memory_order_relaxed);
205
206 // Atomically claim a write slot using CAS loop to avoid ABA problem
207 size_t current_write;
208 size_t new_write;
209 bool overflow_handled = false;
210 size_t retry_count = 0;
211 constexpr size_t max_retries = 100;
212
213 do {
214 current_write = write_index_.load(std::memory_order_acquire);
215 size_t current_read = read_index_.load(std::memory_order_acquire);
216
217 // Check for buffer full condition
218 if (is_full_unsafe(current_write, current_read)) {
220 // Advance read index to overwrite oldest data
221 // Use strong CAS in a loop to ensure it succeeds
222 size_t expected_read = current_read;
223 size_t new_read = (current_read + 1) & get_mask();
224
225 // Try to advance read index with strong CAS
226 // If it fails, another thread already advanced it, which is fine
227 if (read_index_.compare_exchange_strong(expected_read, new_read,
228 std::memory_order_acq_rel,
229 std::memory_order_acquire)) {
230 // Successfully advanced read index
231 if (!overflow_handled) {
232 stats_.overwrites.fetch_add(1, std::memory_order_relaxed);
233 overflow_handled = true;
234 }
235 }
236 // Continue to claim write slot even if CAS failed
237 // (another thread may have already made space)
238 } else {
239 // Buffer is full and overwrite is not allowed
240 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
241
242 // Provide more detailed error information
243 size_t current_size = size();
244 return common::VoidResult::err(error_info(monitoring_error_code::storage_full,
245 "Ring buffer is full (size: " +
246 std::to_string(current_size) +
247 "/" + std::to_string(config_.capacity) +
248 ", overwrites: " +
249 std::to_string(stats_.overwrites.load()) + ")").to_common_error());
250 }
251 }
252
253 new_write = (current_write + 1) & get_mask();
254
255 // Prevent infinite loop in case of extreme contention
256 if (++retry_count > max_retries) {
257 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
258 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
259 "Failed to write to ring buffer after " +
260 std::to_string(max_retries) + " retries (high contention)").to_common_error());
261 }
262
263 // Atomically claim the write slot
264 } while (!write_index_.compare_exchange_weak(current_write, new_write,
265 std::memory_order_acq_rel,
266 std::memory_order_acquire));
267
268 // Write the item to the claimed slot
269 buffer_[current_write] = std::move(item);
270
271 // Memory fence ensures data write completes before index update is visible
272 std::atomic_thread_fence(std::memory_order_release);
273
274 return common::ok();
275 }

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::ring_buffer_config::capacity, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer_stats::failed_writes, kcenon::monitoring::ring_buffer< T >::get_mask(), kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer_config::overwrite_old, kcenon::monitoring::ring_buffer_stats::overwrites, kcenon::monitoring::ring_buffer< T >::read_index_, kcenon::monitoring::ring_buffer< T >::size(), kcenon::monitoring::ring_buffer< T >::stats_, kcenon::monitoring::storage_full, kcenon::monitoring::error_info::to_common_error(), kcenon::monitoring::ring_buffer_stats::total_writes, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F(), TEST_F(), TEST_F(), and kcenon::monitoring::ring_buffer< T >::write_batch().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ write_batch()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::write_batch ( std::vector< T > && items)
inline

Write multiple elements in batch.

Parameters
itemsVector of items to write
Returns
Number of items successfully written

Definition at line 282 of file ring_buffer.h.

282 {
283 if (items.empty()) {
284 return 0;
285 }
286
287 size_t written = 0;
288 size_t failed = 0;
289
290 for (auto& item : items) {
291 auto result = write(std::move(item));
292 if (result.is_ok()) {
293 ++written;
294 } else {
295 ++failed;
296 // Stop on first failure if not overwriting to prevent data loss
297 if (!config_.overwrite_old) {
298 break;
299 }
300 }
301 }
302
303 return written;
304 }
common::VoidResult write(T &&item)
Write a single element to the buffer.

References kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer_config::overwrite_old, and kcenon::monitoring::ring_buffer< T >::write().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ buffer_

template<typename T >
std::unique_ptr<T[]> kcenon::monitoring::ring_buffer< T >::buffer_
private

◆ config_

◆ read_index_

◆ stats_

◆ write_index_


The documentation for this class was generated from the following file: