30namespace kcenon {
namespace monitoring {
48 "Capacity must be a power of 2").to_common_error());
53 "Invalid batch size").to_common_error());
79 return capacity > 0 ? (
static_cast<double>(current_size) / capacity) * 100.0 : 0.0;
88 return total > 0 ? (1.0 -
static_cast<double>(failed) / total) * 100.0 : 100.0;
97 return total > 0 ? (
static_cast<double>(overflows) / total) * 100.0 : 0.0;
113 return total > 0 ?
static_cast<double>(retries) / total : 0.0;
122 return total > 0 ? (1.0 -
static_cast<double>(failed) / total) * 100.0 : 100.0;
137#pragma warning(disable: 4324)
143 static_assert(std::is_move_constructible_v<T>,
"T must be move constructible");
144 static_assert(std::is_move_assignable_v<T>,
"T must be move assignable");
164 return ((write_idx + 1) &
get_mask()) == read_idx;
171 return write_idx == read_idx;
186 if (validation.is_err()) {
187 throw std::invalid_argument(
"Invalid ring buffer configuration: " +
188 validation.error().message);
203 common::VoidResult
write(T&& item) {
207 size_t current_write;
209 bool overflow_handled =
false;
210 size_t retry_count = 0;
211 constexpr size_t max_retries = 100;
214 current_write =
write_index_.load(std::memory_order_acquire);
215 size_t current_read =
read_index_.load(std::memory_order_acquire);
222 size_t expected_read = current_read;
223 size_t new_read = (current_read + 1) &
get_mask();
227 if (
read_index_.compare_exchange_strong(expected_read, new_read,
228 std::memory_order_acq_rel,
229 std::memory_order_acquire)) {
231 if (!overflow_handled) {
233 overflow_handled =
true;
243 size_t current_size =
size();
245 "Ring buffer is full (size: " +
246 std::to_string(current_size) +
253 new_write = (current_write + 1) &
get_mask();
256 if (++retry_count > max_retries) {
259 "Failed to write to ring buffer after " +
260 std::to_string(max_retries) +
" retries (high contention)").to_common_error());
264 }
while (!
write_index_.compare_exchange_weak(current_write, new_write,
265 std::memory_order_acq_rel,
266 std::memory_order_acquire));
269 buffer_[current_write] = std::move(item);
272 std::atomic_thread_fence(std::memory_order_release);
290 for (
auto& item : items) {
291 auto result =
write(std::move(item));
292 if (result.is_ok()) {
311 common::VoidResult
read(T& item) {
314 size_t current_read =
read_index_.load(std::memory_order_acquire);
315 size_t current_write =
write_index_.load(std::memory_order_acquire);
320 "Ring buffer is empty").to_common_error());
324 item = std::move(
buffer_[current_read]);
327 size_t new_read = (current_read + 1) &
get_mask();
328 read_index_.store(new_read, std::memory_order_release);
339 size_t read_batch(std::vector<T>& items,
size_t max_count = SIZE_MAX) {
340 if (max_count == 0) {
345 items.reserve(items.size() + batch_size);
347 size_t read_count = 0;
350 while (read_count < batch_size) {
351 auto result =
read(temp_item);
352 if (result.is_err()) {
356 items.emplace_back(std::move(temp_item));
368 common::VoidResult
peek(T& item)
const {
369 size_t current_read =
read_index_.load(std::memory_order_acquire);
370 size_t current_write =
write_index_.load(std::memory_order_acquire);
374 "Ring buffer is empty").to_common_error());
385 size_t write_idx =
write_index_.load(std::memory_order_acquire);
386 size_t read_idx =
read_index_.load(std::memory_order_acquire);
389 if (write_idx >= read_idx) {
390 return write_idx - read_idx;
408 size_t write_idx =
write_index_.load(std::memory_order_acquire);
409 size_t read_idx =
read_index_.load(std::memory_order_acquire);
482 return std::make_unique<ring_buffer<T>>(config);
490 return std::make_unique<ring_buffer<T>>(config);
Lock-free ring buffer with atomic operations.
size_t write_batch(std::vector< T > &&items)
Write multiple elements in batch.
std::unique_ptr< T[]> buffer_
size_t size() const noexcept
Get current number of elements in buffer.
common::VoidResult write(T &&item)
Write a single element to the buffer.
void clear() noexcept
Clear all elements in the buffer.
size_t capacity() const noexcept
Get buffer capacity.
ring_buffer(const ring_buffer &)=delete
bool is_overflow_rate_high() const noexcept
Check if buffer is experiencing high overflow.
void reset_stats() noexcept
Reset statistics.
const ring_buffer_stats & get_stats() const noexcept
Get buffer statistics.
size_t read_batch(std::vector< T > &items, size_t max_count=SIZE_MAX)
Read multiple elements in batch.
ring_buffer & operator=(const ring_buffer &)=delete
bool full() const noexcept
Check if buffer is full.
bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is empty.
size_t get_mask() const noexcept
Get the mask for efficient modulo operation.
common::VoidResult peek(T &item) const
Peek at the next item without removing it.
ring_buffer(const ring_buffer_config &config={})
Constructor with configuration.
double get_overflow_rate() const noexcept
Get overflow rate percentage.
ring_buffer & operator=(ring_buffer &&)=default
ring_buffer(ring_buffer &&)=default
bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is full.
ring_buffer_config config_
std::atomic< size_t > write_index_
common::VoidResult read(T &item)
Read a single element from the buffer.
const ring_buffer_config & get_config() const noexcept
Get buffer configuration.
std::atomic< size_t > read_index_
bool empty() const noexcept
Check if buffer is empty.
Monitoring system specific error codes.
std::unique_ptr< ring_buffer< T > > make_ring_buffer(size_t capacity=8192)
Helper function to create a ring buffer with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Configuration for ring buffer behavior.
common::VoidResult validate() const
Validate ring buffer configuration.
std::chrono::milliseconds gc_interval
Statistics for ring buffer performance monitoring.
double get_write_success_rate() const
Get write success rate.
double get_overflow_rate() const
Get overflow rate (overwrites per total writes)
std::atomic< size_t > failed_writes
double get_read_success_rate() const
Get read success rate.
std::atomic< size_t > contention_retries
std::chrono::system_clock::time_point creation_time
std::atomic< size_t > total_writes
double get_utilization(size_t current_size, size_t capacity) const
Get current utilization percentage.
std::atomic< size_t > failed_reads
double get_avg_contention() const
Get average contention (retries per write)
std::atomic< size_t > total_reads
bool is_overflow_rate_high() const
Check if overflow rate is high (> 10%)
std::atomic< size_t > overwrites