Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::ring_buffer< T > Class Template Reference

Lock-free ring buffer with atomic operations. More...

#include <ring_buffer.h>

Inheritance diagram for kcenon::monitoring::ring_buffer< T >:
Inheritance graph
Collaboration diagram for kcenon::monitoring::ring_buffer< T >:
Collaboration graph

Classes

struct  validated_tag
 

Public Member Functions

 ring_buffer (const ring_buffer_config &config={})
 Constructor with configuration.
 
 ring_buffer (const ring_buffer &)=delete
 
ring_bufferoperator= (const ring_buffer &)=delete
 
 ring_buffer (ring_buffer &&)=default
 
ring_bufferoperator= (ring_buffer &&)=default
 
common::VoidResult write (T &&item)
 Write a single element to the buffer.
 
size_t write_batch (std::vector< T > &&items)
 Write multiple elements in batch.
 
common::VoidResult read (T &item)
 Read a single element from the buffer.
 
size_t read_batch (std::vector< T > &items, size_t max_count=SIZE_MAX)
 Read multiple elements in batch.
 
common::VoidResult peek (T &item) const
 Peek at the next item without removing it.
 
size_t size () const noexcept
 Get current number of elements in buffer.
 
bool empty () const noexcept
 Check if buffer is empty.
 
bool full () const noexcept
 Check if buffer is full.
 
size_t capacity () const noexcept
 Get buffer capacity.
 
void clear () noexcept
 Clear all elements in the buffer.
 
const ring_buffer_configget_config () const noexcept
 Get buffer configuration.
 
const ring_buffer_statsget_stats () const noexcept
 Get buffer statistics.
 
void reset_stats () noexcept
 Reset statistics.
 
bool is_overflow_rate_high () const noexcept
 Check if buffer is experiencing high overflow.
 
double get_overflow_rate () const noexcept
 Get overflow rate percentage.
 

Static Public Member Functions

static common::Result< std::unique_ptr< ring_buffer > > create (const ring_buffer_config &config={})
 Create a ring buffer with validated configuration.
 

Private Member Functions

 ring_buffer (const ring_buffer_config &config, validated_tag)
 
size_t get_mask () const noexcept
 Get the mask for efficient modulo operation.
 
bool is_full_unsafe (size_t write_idx, size_t read_idx) const noexcept
 Check if buffer is full.
 
bool is_empty_unsafe (size_t write_idx, size_t read_idx) const noexcept
 Check if buffer is empty.
 

Private Attributes

std::atomic< size_t > write_index_ {0}
 
std::atomic< size_t > read_index_ {0}
 
std::unique_ptr< T[]> buffer_
 
ring_buffer_config config_
 
ring_buffer_stats stats_
 

Detailed Description

template<typename T>
class kcenon::monitoring::ring_buffer< T >

Lock-free ring buffer with atomic operations.

Template Parameters
TThe type of elements to store

This implementation uses atomic operations for thread-safety and provides efficient circular buffer semantics with configurable overflow behavior.

Definition at line 141 of file ring_buffer.h.

Constructor & Destructor Documentation

◆ ring_buffer() [1/4]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( const ring_buffer_config & config,
validated_tag  )
inlineprivate

Definition at line 156 of file ring_buffer.h.

157 : buffer_(std::make_unique<T[]>(config.capacity))
158 , config_(config) {}
std::unique_ptr< T[]> buffer_

◆ ring_buffer() [2/4]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( const ring_buffer_config & config = {})
inlineexplicit

Constructor with configuration.

Parameters
configRing buffer configuration options
Exceptions
std::invalid_argumentif configuration validation fails
Deprecated
Use create() for Result-based error handling

Definition at line 207 of file ring_buffer.h.

207 {})
208 : buffer_(std::make_unique<T[]>(config.capacity))
209 , config_(config) {
210
211 // Validate configuration
212 auto validation = config_.validate();
213 if (validation.is_err()) {
214 throw std::invalid_argument("Invalid ring buffer configuration: " +
215 validation.error().message);
216 }
217 }
size_t capacity() const noexcept
Get buffer capacity.
common::VoidResult validate() const
Validate ring buffer configuration.
Definition ring_buffer.h:45

◆ ring_buffer() [3/4]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( const ring_buffer< T > & )
delete

◆ ring_buffer() [4/4]

template<typename T >
kcenon::monitoring::ring_buffer< T >::ring_buffer ( ring_buffer< T > && )
default

Member Function Documentation

◆ capacity()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::capacity ( ) const
inlinenoexcept

Get buffer capacity.

Definition at line 443 of file ring_buffer.h.

443 {
444 return config_.capacity;
445 }

References kcenon::monitoring::ring_buffer_config::capacity, and kcenon::monitoring::ring_buffer< T >::config_.

Referenced by TEST_F().

Here is the caller graph for this function:

◆ clear()

template<typename T >
void kcenon::monitoring::ring_buffer< T >::clear ( )
inlinenoexcept

Clear all elements in the buffer.

Definition at line 450 of file ring_buffer.h.

450 {
451 write_index_.store(0, std::memory_order_release);
452 read_index_.store(0, std::memory_order_release);
453 }
std::atomic< size_t > write_index_
std::atomic< size_t > read_index_

References kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

◆ create()

template<typename T >
static common::Result< std::unique_ptr< ring_buffer > > kcenon::monitoring::ring_buffer< T >::create ( const ring_buffer_config & config = {})
inlinestatic

Create a ring buffer with validated configuration.

Parameters
configRing buffer configuration options
Returns
Result containing the ring buffer or error

Definition at line 187 of file ring_buffer.h.

188 {}) {
189 auto validation = config.validate();
190 if (validation.is_err()) {
191 return common::Result<std::unique_ptr<ring_buffer>>::err(
193 "Invalid ring buffer configuration: " +
194 validation.error().message)
195 .to_common_error());
196 }
197 return common::ok(std::unique_ptr<ring_buffer>(
198 new ring_buffer(config, validated_tag{})));
199 }
ring_buffer(const ring_buffer_config &config, validated_tag)

Referenced by TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ empty()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::empty ( ) const
inlinenoexcept

Check if buffer is empty.

Definition at line 427 of file ring_buffer.h.

427 {
428 return size() == 0;
429 }
size_t size() const noexcept
Get current number of elements in buffer.

References kcenon::monitoring::ring_buffer< T >::size().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ full()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::full ( ) const
inlinenoexcept

Check if buffer is full.

Definition at line 434 of file ring_buffer.h.

434 {
435 size_t write_idx = write_index_.load(std::memory_order_acquire);
436 size_t read_idx = read_index_.load(std::memory_order_acquire);
437 return is_full_unsafe(write_idx, read_idx);
438 }
bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is full.

References kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_config()

template<typename T >
const ring_buffer_config & kcenon::monitoring::ring_buffer< T >::get_config ( ) const
inlinenoexcept

Get buffer configuration.

Definition at line 458 of file ring_buffer.h.

458 {
459 return config_;
460 }

References kcenon::monitoring::ring_buffer< T >::config_.

◆ get_mask()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::get_mask ( ) const
inlineprivatenoexcept

Get the mask for efficient modulo operation.

Definition at line 163 of file ring_buffer.h.

163 {
164 return config_.capacity - 1;
165 }

References kcenon::monitoring::ring_buffer_config::capacity, and kcenon::monitoring::ring_buffer< T >::config_.

Referenced by kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer< T >::read(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the caller graph for this function:

◆ get_overflow_rate()

template<typename T >
double kcenon::monitoring::ring_buffer< T >::get_overflow_rate ( ) const
inlinenoexcept

Get overflow rate percentage.

Definition at line 493 of file ring_buffer.h.

493 {
494 return stats_.get_overflow_rate();
495 }
double get_overflow_rate() const
Get overflow rate (overwrites per total writes)
Definition ring_buffer.h:94

References kcenon::monitoring::ring_buffer_stats::get_overflow_rate(), and kcenon::monitoring::ring_buffer< T >::stats_.

Here is the call graph for this function:

◆ get_stats()

template<typename T >
const ring_buffer_stats & kcenon::monitoring::ring_buffer< T >::get_stats ( ) const
inlinenoexcept

Get buffer statistics.

Definition at line 465 of file ring_buffer.h.

465 {
466 return stats_;
467 }

References kcenon::monitoring::ring_buffer< T >::stats_.

◆ is_empty_unsafe()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_empty_unsafe ( size_t write_idx,
size_t read_idx ) const
inlineprivatenoexcept

Check if buffer is empty.

Definition at line 177 of file ring_buffer.h.

177 {
178 return write_idx == read_idx;
179 }

Referenced by kcenon::monitoring::ring_buffer< T >::peek(), and kcenon::monitoring::ring_buffer< T >::read().

Here is the caller graph for this function:

◆ is_full_unsafe()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_full_unsafe ( size_t write_idx,
size_t read_idx ) const
inlineprivatenoexcept

Check if buffer is full.

Definition at line 170 of file ring_buffer.h.

170 {
171 return ((write_idx + 1) & get_mask()) == read_idx;
172 }
size_t get_mask() const noexcept
Get the mask for efficient modulo operation.

References kcenon::monitoring::ring_buffer< T >::get_mask().

Referenced by kcenon::monitoring::ring_buffer< T >::full(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_overflow_rate_high()

template<typename T >
bool kcenon::monitoring::ring_buffer< T >::is_overflow_rate_high ( ) const
inlinenoexcept

Check if buffer is experiencing high overflow.

Returns
true if overflow rate exceeds threshold

Definition at line 486 of file ring_buffer.h.

486 {
488 }
bool is_overflow_rate_high() const
Check if overflow rate is high (> 10%)

References kcenon::monitoring::ring_buffer_stats::is_overflow_rate_high(), and kcenon::monitoring::ring_buffer< T >::stats_.

Here is the call graph for this function:

◆ operator=() [1/2]

template<typename T >
ring_buffer & kcenon::monitoring::ring_buffer< T >::operator= ( const ring_buffer< T > & )
delete

◆ operator=() [2/2]

template<typename T >
ring_buffer & kcenon::monitoring::ring_buffer< T >::operator= ( ring_buffer< T > && )
default

◆ peek()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::peek ( T & item) const
inline

Peek at the next item without removing it.

Parameters
itemReference to store the peeked item
Returns
Result indicating success or failure

Definition at line 395 of file ring_buffer.h.

395 {
396 size_t current_read = read_index_.load(std::memory_order_acquire);
397 size_t current_write = write_index_.load(std::memory_order_acquire);
398
399 if (is_empty_unsafe(current_write, current_read)) {
400 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
401 "Ring buffer is empty").to_common_error());
402 }
403
404 item = buffer_[current_read]; // Copy, don't move
405 return common::ok();
406 }
bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is empty.

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer< T >::is_empty_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::read ( T & item)
inline

Read a single element from the buffer.

Parameters
itemReference to store the read item
Returns
Result indicating success or failure

Definition at line 338 of file ring_buffer.h.

338 {
339 stats_.total_reads.fetch_add(1, std::memory_order_relaxed);
340
341 size_t current_read = read_index_.load(std::memory_order_acquire);
342 size_t current_write = write_index_.load(std::memory_order_acquire);
343
344 if (is_empty_unsafe(current_write, current_read)) {
345 stats_.failed_reads.fetch_add(1, std::memory_order_relaxed);
346 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
347 "Ring buffer is empty").to_common_error());
348 }
349
350 // Read the item
351 item = std::move(buffer_[current_read]);
352
353 // Update read index
354 size_t new_read = (current_read + 1) & get_mask();
355 read_index_.store(new_read, std::memory_order_release);
356
357 return common::ok();
358 }
std::atomic< size_t > failed_reads
Definition ring_buffer.h:69
std::atomic< size_t > total_reads
Definition ring_buffer.h:66

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer_stats::failed_reads, kcenon::monitoring::ring_buffer< T >::get_mask(), kcenon::monitoring::ring_buffer< T >::is_empty_unsafe(), kcenon::monitoring::ring_buffer< T >::read_index_, kcenon::monitoring::ring_buffer< T >::stats_, kcenon::monitoring::ring_buffer_stats::total_reads, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by kcenon::monitoring::ring_buffer< T >::read_batch(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read_batch()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::read_batch ( std::vector< T > & items,
size_t max_count = SIZE_MAX )
inline

Read multiple elements in batch.

Parameters
itemsVector to store read items
max_countMaximum number of items to read
Returns
Number of items actually read

Definition at line 366 of file ring_buffer.h.

366 {
367 if (max_count == 0) {
368 return 0;
369 }
370
371 size_t batch_size = std::min(max_count, config_.batch_size);
372 items.reserve(items.size() + batch_size);
373
374 size_t read_count = 0;
375 T temp_item;
376
377 while (read_count < batch_size) {
378 auto result = read(temp_item);
379 if (result.is_err()) {
380 break; // No more items to read
381 }
382
383 items.emplace_back(std::move(temp_item));
384 ++read_count;
385 }
386
387 return read_count;
388 }
common::VoidResult read(T &item)
Read a single element from the buffer.

References kcenon::monitoring::ring_buffer_config::batch_size, kcenon::monitoring::ring_buffer< T >::config_, and kcenon::monitoring::ring_buffer< T >::read().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ reset_stats()

◆ size()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::size ( ) const
inlinenoexcept

Get current number of elements in buffer.

Definition at line 411 of file ring_buffer.h.

411 {
412 size_t write_idx = write_index_.load(std::memory_order_acquire);
413 size_t read_idx = read_index_.load(std::memory_order_acquire);
414
415 // Handle wraparound correctly
416 if (write_idx >= read_idx) {
417 return write_idx - read_idx;
418 } else {
419 // Wraparound case: write has wrapped around but read hasn't
420 return config_.capacity - read_idx + write_idx;
421 }
422 }

References kcenon::monitoring::ring_buffer_config::capacity, kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer< T >::read_index_, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by kcenon::monitoring::ring_buffer< T >::empty(), TEST_F(), TEST_F(), and kcenon::monitoring::ring_buffer< T >::write().

Here is the caller graph for this function:

◆ write()

template<typename T >
common::VoidResult kcenon::monitoring::ring_buffer< T >::write ( T && item)
inline

Write a single element to the buffer.

Parameters
itemItem to write
Returns
Result indicating success or failure

Definition at line 230 of file ring_buffer.h.

230 {
231 stats_.total_writes.fetch_add(1, std::memory_order_relaxed);
232
233 // Atomically claim a write slot using CAS loop to avoid ABA problem
234 size_t current_write;
235 size_t new_write;
236 bool overflow_handled = false;
237 size_t retry_count = 0;
238 constexpr size_t max_retries = 100;
239
240 do {
241 current_write = write_index_.load(std::memory_order_acquire);
242 size_t current_read = read_index_.load(std::memory_order_acquire);
243
244 // Check for buffer full condition
245 if (is_full_unsafe(current_write, current_read)) {
247 // Advance read index to overwrite oldest data
248 // Use strong CAS in a loop to ensure it succeeds
249 size_t expected_read = current_read;
250 size_t new_read = (current_read + 1) & get_mask();
251
252 // Try to advance read index with strong CAS
253 // If it fails, another thread already advanced it, which is fine
254 if (read_index_.compare_exchange_strong(expected_read, new_read,
255 std::memory_order_acq_rel,
256 std::memory_order_acquire)) {
257 // Successfully advanced read index
258 if (!overflow_handled) {
259 stats_.overwrites.fetch_add(1, std::memory_order_relaxed);
260 overflow_handled = true;
261 }
262 }
263 // Continue to claim write slot even if CAS failed
264 // (another thread may have already made space)
265 } else {
266 // Buffer is full and overwrite is not allowed
267 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
268
269 // Provide more detailed error information
270 size_t current_size = size();
271 return common::VoidResult::err(error_info(monitoring_error_code::storage_full,
272 "Ring buffer is full (size: " +
273 std::to_string(current_size) +
274 "/" + std::to_string(config_.capacity) +
275 ", overwrites: " +
276 std::to_string(stats_.overwrites.load()) + ")").to_common_error());
277 }
278 }
279
280 new_write = (current_write + 1) & get_mask();
281
282 // Prevent infinite loop in case of extreme contention
283 if (++retry_count > max_retries) {
284 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
285 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
286 "Failed to write to ring buffer after " +
287 std::to_string(max_retries) + " retries (high contention)").to_common_error());
288 }
289
290 // Atomically claim the write slot
291 } while (!write_index_.compare_exchange_weak(current_write, new_write,
292 std::memory_order_acq_rel,
293 std::memory_order_acquire));
294
295 // Write the item to the claimed slot
296 buffer_[current_write] = std::move(item);
297
298 // Memory fence ensures data write completes before index update is visible
299 std::atomic_thread_fence(std::memory_order_release);
300
301 return common::ok();
302 }

References kcenon::monitoring::ring_buffer< T >::buffer_, kcenon::monitoring::ring_buffer_config::capacity, kcenon::monitoring::collection_failed, kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer_stats::failed_writes, kcenon::monitoring::ring_buffer< T >::get_mask(), kcenon::monitoring::ring_buffer< T >::is_full_unsafe(), kcenon::monitoring::ring_buffer_config::overwrite_old, kcenon::monitoring::ring_buffer_stats::overwrites, kcenon::monitoring::ring_buffer< T >::read_index_, kcenon::monitoring::ring_buffer< T >::size(), kcenon::monitoring::ring_buffer< T >::stats_, kcenon::monitoring::storage_full, kcenon::monitoring::error_info::to_common_error(), kcenon::monitoring::ring_buffer_stats::total_writes, and kcenon::monitoring::ring_buffer< T >::write_index_.

Referenced by TEST_F(), TEST_F(), TEST_F(), and kcenon::monitoring::ring_buffer< T >::write_batch().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ write_batch()

template<typename T >
size_t kcenon::monitoring::ring_buffer< T >::write_batch ( std::vector< T > && items)
inline

Write multiple elements in batch.

Parameters
itemsVector of items to write
Returns
Number of items successfully written

Definition at line 309 of file ring_buffer.h.

309 {
310 if (items.empty()) {
311 return 0;
312 }
313
314 size_t written = 0;
315 size_t failed = 0;
316
317 for (auto& item : items) {
318 auto result = write(std::move(item));
319 if (result.is_ok()) {
320 ++written;
321 } else {
322 ++failed;
323 // Stop on first failure if not overwriting to prevent data loss
324 if (!config_.overwrite_old) {
325 break;
326 }
327 }
328 }
329
330 return written;
331 }
common::VoidResult write(T &&item)
Write a single element to the buffer.

References kcenon::monitoring::ring_buffer< T >::config_, kcenon::monitoring::ring_buffer_config::overwrite_old, and kcenon::monitoring::ring_buffer< T >::write().

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ buffer_

template<typename T >
std::unique_ptr<T[]> kcenon::monitoring::ring_buffer< T >::buffer_
private

◆ config_

◆ read_index_

◆ stats_

◆ write_index_


The documentation for this class was generated from the following file: