Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
23#include <memory>
24#include <atomic>
25#include <vector>
26#include <chrono>
27#include <cstddef>
28#include <type_traits>
29
30namespace kcenon { namespace monitoring {
31
37 size_t capacity = 8192; // Default capacity (power of 2)
38 bool overwrite_old = true; // Overwrite oldest data when full
39 size_t batch_size = 64; // Batch size for bulk operations
40 std::chrono::milliseconds gc_interval{1000}; // Garbage collection interval
41
45 common::VoidResult validate() const {
46 if (capacity == 0 || (capacity & (capacity - 1)) != 0) {
47 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
48 "Capacity must be a power of 2").to_common_error());
49 }
50
51 if (batch_size == 0 || batch_size > capacity) {
52 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
53 "Invalid batch size").to_common_error());
54 }
55
56 return common::ok();
57 }
58};
59
65 std::atomic<size_t> total_writes{0};
66 std::atomic<size_t> total_reads{0};
67 std::atomic<size_t> overwrites{0};
68 std::atomic<size_t> failed_writes{0};
69 std::atomic<size_t> failed_reads{0};
70 std::atomic<size_t> contention_retries{0};
71 std::chrono::system_clock::time_point creation_time;
72
73 ring_buffer_stats() : creation_time(std::chrono::system_clock::now()) {}
74
78 double get_utilization(size_t current_size, size_t capacity) const {
79 return capacity > 0 ? (static_cast<double>(current_size) / capacity) * 100.0 : 0.0;
80 }
81
85 double get_write_success_rate() const {
86 auto total = total_writes.load();
87 auto failed = failed_writes.load();
88 return total > 0 ? (1.0 - static_cast<double>(failed) / total) * 100.0 : 100.0;
89 }
90
94 double get_overflow_rate() const {
95 auto total = total_writes.load();
96 auto overflows = overwrites.load();
97 return total > 0 ? (static_cast<double>(overflows) / total) * 100.0 : 0.0;
98 }
99
104 return get_overflow_rate() > 10.0;
105 }
106
110 double get_avg_contention() const {
111 auto total = total_writes.load();
112 auto retries = contention_retries.load();
113 return total > 0 ? static_cast<double>(retries) / total : 0.0;
114 }
115
119 double get_read_success_rate() const {
120 auto total = total_reads.load();
121 auto failed = failed_reads.load();
122 return total > 0 ? (1.0 - static_cast<double>(failed) / total) * 100.0 : 100.0;
123 }
124};
125
135#ifdef _MSC_VER
136#pragma warning(push)
137#pragma warning(disable: 4324) // structure was padded due to alignment specifier
138#endif
139
140template<typename T>
142private:
143 static_assert(std::is_move_constructible_v<T>, "T must be move constructible");
144 static_assert(std::is_move_assignable_v<T>, "T must be move assignable");
145
146 struct validated_tag {};
147
148 alignas(64) std::atomic<size_t> write_index_{0}; // Cache line aligned
149 alignas(64) std::atomic<size_t> read_index_{0}; // Cache line aligned
150
151 std::unique_ptr<T[]> buffer_;
154
155 // Private constructor for validated creation via create()
157 : buffer_(std::make_unique<T[]>(config.capacity))
158 , config_(config) {}
159
163 size_t get_mask() const noexcept {
164 return config_.capacity - 1;
165 }
166
170 bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept {
171 return ((write_idx + 1) & get_mask()) == read_idx;
172 }
173
177 bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept {
178 return write_idx == read_idx;
179 }
180
181public:
187 static common::Result<std::unique_ptr<ring_buffer>> create(
188 const ring_buffer_config& config = {}) {
189 auto validation = config.validate();
190 if (validation.is_err()) {
191 return common::Result<std::unique_ptr<ring_buffer>>::err(
193 "Invalid ring buffer configuration: " +
194 validation.error().message)
195 .to_common_error());
196 }
197 return common::ok(std::unique_ptr<ring_buffer>(
198 new ring_buffer(config, validated_tag{})));
199 }
200
207 explicit ring_buffer(const ring_buffer_config& config = {})
208 : buffer_(std::make_unique<T[]>(config.capacity))
209 , config_(config) {
210
211 // Validate configuration
212 auto validation = config_.validate();
213 if (validation.is_err()) {
214 throw std::invalid_argument("Invalid ring buffer configuration: " +
215 validation.error().message);
216 }
217 }
218
219 // Non-copyable but moveable
220 ring_buffer(const ring_buffer&) = delete;
224
230 common::VoidResult write(T&& item) {
231 stats_.total_writes.fetch_add(1, std::memory_order_relaxed);
232
233 // Atomically claim a write slot using CAS loop to avoid ABA problem
234 size_t current_write;
235 size_t new_write;
236 bool overflow_handled = false;
237 size_t retry_count = 0;
238 constexpr size_t max_retries = 100;
239
240 do {
241 current_write = write_index_.load(std::memory_order_acquire);
242 size_t current_read = read_index_.load(std::memory_order_acquire);
243
244 // Check for buffer full condition
245 if (is_full_unsafe(current_write, current_read)) {
247 // Advance read index to overwrite oldest data
248 // Use strong CAS in a loop to ensure it succeeds
249 size_t expected_read = current_read;
250 size_t new_read = (current_read + 1) & get_mask();
251
252 // Try to advance read index with strong CAS
253 // If it fails, another thread already advanced it, which is fine
254 if (read_index_.compare_exchange_strong(expected_read, new_read,
255 std::memory_order_acq_rel,
256 std::memory_order_acquire)) {
257 // Successfully advanced read index
258 if (!overflow_handled) {
259 stats_.overwrites.fetch_add(1, std::memory_order_relaxed);
260 overflow_handled = true;
261 }
262 }
263 // Continue to claim write slot even if CAS failed
264 // (another thread may have already made space)
265 } else {
266 // Buffer is full and overwrite is not allowed
267 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
268
269 // Provide more detailed error information
270 size_t current_size = size();
271 return common::VoidResult::err(error_info(monitoring_error_code::storage_full,
272 "Ring buffer is full (size: " +
273 std::to_string(current_size) +
274 "/" + std::to_string(config_.capacity) +
275 ", overwrites: " +
276 std::to_string(stats_.overwrites.load()) + ")").to_common_error());
277 }
278 }
279
280 new_write = (current_write + 1) & get_mask();
281
282 // Prevent infinite loop in case of extreme contention
283 if (++retry_count > max_retries) {
284 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
285 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
286 "Failed to write to ring buffer after " +
287 std::to_string(max_retries) + " retries (high contention)").to_common_error());
288 }
289
290 // Atomically claim the write slot
291 } while (!write_index_.compare_exchange_weak(current_write, new_write,
292 std::memory_order_acq_rel,
293 std::memory_order_acquire));
294
295 // Write the item to the claimed slot
296 buffer_[current_write] = std::move(item);
297
298 // Memory fence ensures data write completes before index update is visible
299 std::atomic_thread_fence(std::memory_order_release);
300
301 return common::ok();
302 }
303
309 size_t write_batch(std::vector<T>&& items) {
310 if (items.empty()) {
311 return 0;
312 }
313
314 size_t written = 0;
315 size_t failed = 0;
316
317 for (auto& item : items) {
318 auto result = write(std::move(item));
319 if (result.is_ok()) {
320 ++written;
321 } else {
322 ++failed;
323 // Stop on first failure if not overwriting to prevent data loss
324 if (!config_.overwrite_old) {
325 break;
326 }
327 }
328 }
329
330 return written;
331 }
332
338 common::VoidResult read(T& item) {
339 stats_.total_reads.fetch_add(1, std::memory_order_relaxed);
340
341 size_t current_read = read_index_.load(std::memory_order_acquire);
342 size_t current_write = write_index_.load(std::memory_order_acquire);
343
344 if (is_empty_unsafe(current_write, current_read)) {
345 stats_.failed_reads.fetch_add(1, std::memory_order_relaxed);
346 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
347 "Ring buffer is empty").to_common_error());
348 }
349
350 // Read the item
351 item = std::move(buffer_[current_read]);
352
353 // Update read index
354 size_t new_read = (current_read + 1) & get_mask();
355 read_index_.store(new_read, std::memory_order_release);
356
357 return common::ok();
358 }
359
366 size_t read_batch(std::vector<T>& items, size_t max_count = SIZE_MAX) {
367 if (max_count == 0) {
368 return 0;
369 }
370
371 size_t batch_size = std::min(max_count, config_.batch_size);
372 items.reserve(items.size() + batch_size);
373
374 size_t read_count = 0;
375 T temp_item;
376
377 while (read_count < batch_size) {
378 auto result = read(temp_item);
379 if (result.is_err()) {
380 break; // No more items to read
381 }
382
383 items.emplace_back(std::move(temp_item));
384 ++read_count;
385 }
386
387 return read_count;
388 }
389
395 common::VoidResult peek(T& item) const {
396 size_t current_read = read_index_.load(std::memory_order_acquire);
397 size_t current_write = write_index_.load(std::memory_order_acquire);
398
399 if (is_empty_unsafe(current_write, current_read)) {
400 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
401 "Ring buffer is empty").to_common_error());
402 }
403
404 item = buffer_[current_read]; // Copy, don't move
405 return common::ok();
406 }
407
411 size_t size() const noexcept {
412 size_t write_idx = write_index_.load(std::memory_order_acquire);
413 size_t read_idx = read_index_.load(std::memory_order_acquire);
414
415 // Handle wraparound correctly
416 if (write_idx >= read_idx) {
417 return write_idx - read_idx;
418 } else {
419 // Wraparound case: write has wrapped around but read hasn't
420 return config_.capacity - read_idx + write_idx;
421 }
422 }
423
427 bool empty() const noexcept {
428 return size() == 0;
429 }
430
434 bool full() const noexcept {
435 size_t write_idx = write_index_.load(std::memory_order_acquire);
436 size_t read_idx = read_index_.load(std::memory_order_acquire);
437 return is_full_unsafe(write_idx, read_idx);
438 }
439
443 size_t capacity() const noexcept {
444 return config_.capacity;
445 }
446
450 void clear() noexcept {
451 write_index_.store(0, std::memory_order_release);
452 read_index_.store(0, std::memory_order_release);
453 }
454
458 const ring_buffer_config& get_config() const noexcept {
459 return config_;
460 }
461
465 const ring_buffer_stats& get_stats() const noexcept {
466 return stats_;
467 }
468
472 void reset_stats() noexcept {
473 stats_.total_writes.store(0);
474 stats_.total_reads.store(0);
475 stats_.overwrites.store(0);
476 stats_.failed_writes.store(0);
477 stats_.failed_reads.store(0);
478 stats_.contention_retries.store(0);
479 stats_.creation_time = std::chrono::system_clock::now();
480 }
481
486 bool is_overflow_rate_high() const noexcept {
488 }
489
493 double get_overflow_rate() const noexcept {
494 return stats_.get_overflow_rate();
495 }
496};
497
498#ifdef _MSC_VER
499#pragma warning(pop)
500#endif
501
505template<typename T>
506std::unique_ptr<ring_buffer<T>> make_ring_buffer(size_t capacity = 8192) {
507 ring_buffer_config config;
508 config.capacity = capacity;
509 return std::make_unique<ring_buffer<T>>(config);
510}
511
515template<typename T>
516std::unique_ptr<ring_buffer<T>> make_ring_buffer(const ring_buffer_config& config) {
517 return std::make_unique<ring_buffer<T>>(config);
518}
519
520} } // namespace kcenon::monitoring
Lock-free ring buffer with atomic operations.
size_t write_batch(std::vector< T > &&items)
Write multiple elements in batch.
std::unique_ptr< T[]> buffer_
size_t size() const noexcept
Get current number of elements in buffer.
common::VoidResult write(T &&item)
Write a single element to the buffer.
void clear() noexcept
Clear all elements in the buffer.
size_t capacity() const noexcept
Get buffer capacity.
ring_buffer(const ring_buffer &)=delete
bool is_overflow_rate_high() const noexcept
Check if buffer is experiencing high overflow.
void reset_stats() noexcept
Reset statistics.
const ring_buffer_stats & get_stats() const noexcept
Get buffer statistics.
size_t read_batch(std::vector< T > &items, size_t max_count=SIZE_MAX)
Read multiple elements in batch.
ring_buffer & operator=(const ring_buffer &)=delete
bool full() const noexcept
Check if buffer is full.
ring_buffer(const ring_buffer_config &config, validated_tag)
bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is empty.
size_t get_mask() const noexcept
Get the mask for efficient modulo operation.
common::VoidResult peek(T &item) const
Peek at the next item without removing it.
ring_buffer(const ring_buffer_config &config={})
Constructor with configuration.
double get_overflow_rate() const noexcept
Get overflow rate percentage.
ring_buffer & operator=(ring_buffer &&)=default
ring_buffer(ring_buffer &&)=default
bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is full.
std::atomic< size_t > write_index_
common::VoidResult read(T &item)
Read a single element from the buffer.
const ring_buffer_config & get_config() const noexcept
Get buffer configuration.
std::atomic< size_t > read_index_
bool empty() const noexcept
Check if buffer is empty.
static common::Result< std::unique_ptr< ring_buffer > > create(const ring_buffer_config &config={})
Create a ring buffer with validated configuration.
Monitoring system specific error codes.
std::unique_ptr< ring_buffer< T > > make_ring_buffer(size_t capacity=8192)
Helper function to create a ring buffer with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Configuration for ring buffer behavior.
Definition ring_buffer.h:36
common::VoidResult validate() const
Validate ring buffer configuration.
Definition ring_buffer.h:45
std::chrono::milliseconds gc_interval
Definition ring_buffer.h:40
Statistics for ring buffer performance monitoring.
Definition ring_buffer.h:64
double get_write_success_rate() const
Get write success rate.
Definition ring_buffer.h:85
double get_overflow_rate() const
Get overflow rate (overwrites per total writes)
Definition ring_buffer.h:94
std::atomic< size_t > failed_writes
Definition ring_buffer.h:68
double get_read_success_rate() const
Get read success rate.
std::atomic< size_t > contention_retries
Definition ring_buffer.h:70
std::chrono::system_clock::time_point creation_time
Definition ring_buffer.h:71
std::atomic< size_t > total_writes
Definition ring_buffer.h:65
double get_utilization(size_t current_size, size_t capacity) const
Get current utilization percentage.
Definition ring_buffer.h:78
std::atomic< size_t > failed_reads
Definition ring_buffer.h:69
double get_avg_contention() const
Get average contention (retries per write)
std::atomic< size_t > total_reads
Definition ring_buffer.h:66
bool is_overflow_rate_high() const
Check if overflow rate is high (> 10%)