Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1#pragma once
2
3// BSD 3-Clause License
4// Copyright (c) 2025, 🍀☀🌕🌥 🌊
5// See the LICENSE file in the project root for full license information.
6
7
23#include <memory>
24#include <atomic>
25#include <vector>
26#include <chrono>
27#include <cstddef>
28#include <type_traits>
29
30namespace kcenon { namespace monitoring {
31
37 size_t capacity = 8192; // Default capacity (power of 2)
38 bool overwrite_old = true; // Overwrite oldest data when full
39 size_t batch_size = 64; // Batch size for bulk operations
40 std::chrono::milliseconds gc_interval{1000}; // Garbage collection interval
41
45 common::VoidResult validate() const {
46 if (capacity == 0 || (capacity & (capacity - 1)) != 0) {
47 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
48 "Capacity must be a power of 2").to_common_error());
49 }
50
51 if (batch_size == 0 || batch_size > capacity) {
52 return common::VoidResult::err(error_info(monitoring_error_code::invalid_configuration,
53 "Invalid batch size").to_common_error());
54 }
55
56 return common::ok();
57 }
58};
59
65 std::atomic<size_t> total_writes{0};
66 std::atomic<size_t> total_reads{0};
67 std::atomic<size_t> overwrites{0};
68 std::atomic<size_t> failed_writes{0};
69 std::atomic<size_t> failed_reads{0};
70 std::atomic<size_t> contention_retries{0};
71 std::chrono::system_clock::time_point creation_time;
72
73 ring_buffer_stats() : creation_time(std::chrono::system_clock::now()) {}
74
78 double get_utilization(size_t current_size, size_t capacity) const {
79 return capacity > 0 ? (static_cast<double>(current_size) / capacity) * 100.0 : 0.0;
80 }
81
85 double get_write_success_rate() const {
86 auto total = total_writes.load();
87 auto failed = failed_writes.load();
88 return total > 0 ? (1.0 - static_cast<double>(failed) / total) * 100.0 : 100.0;
89 }
90
94 double get_overflow_rate() const {
95 auto total = total_writes.load();
96 auto overflows = overwrites.load();
97 return total > 0 ? (static_cast<double>(overflows) / total) * 100.0 : 0.0;
98 }
99
104 return get_overflow_rate() > 10.0;
105 }
106
110 double get_avg_contention() const {
111 auto total = total_writes.load();
112 auto retries = contention_retries.load();
113 return total > 0 ? static_cast<double>(retries) / total : 0.0;
114 }
115
119 double get_read_success_rate() const {
120 auto total = total_reads.load();
121 auto failed = failed_reads.load();
122 return total > 0 ? (1.0 - static_cast<double>(failed) / total) * 100.0 : 100.0;
123 }
124};
125
135#ifdef _MSC_VER
136#pragma warning(push)
137#pragma warning(disable: 4324) // structure was padded due to alignment specifier
138#endif
139
140template<typename T>
142private:
143 static_assert(std::is_move_constructible_v<T>, "T must be move constructible");
144 static_assert(std::is_move_assignable_v<T>, "T must be move assignable");
145
146 alignas(64) std::atomic<size_t> write_index_{0}; // Cache line aligned
147 alignas(64) std::atomic<size_t> read_index_{0}; // Cache line aligned
148
149 std::unique_ptr<T[]> buffer_;
152
156 size_t get_mask() const noexcept {
157 return config_.capacity - 1;
158 }
159
163 bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept {
164 return ((write_idx + 1) & get_mask()) == read_idx;
165 }
166
170 bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept {
171 return write_idx == read_idx;
172 }
173
174public:
180 explicit ring_buffer(const ring_buffer_config& config = {})
181 : buffer_(std::make_unique<T[]>(config.capacity))
182 , config_(config) {
183
184 // Validate configuration
185 auto validation = config_.validate();
186 if (validation.is_err()) {
187 throw std::invalid_argument("Invalid ring buffer configuration: " +
188 validation.error().message);
189 }
190 }
191
192 // Non-copyable but moveable
193 ring_buffer(const ring_buffer&) = delete;
197
203 common::VoidResult write(T&& item) {
204 stats_.total_writes.fetch_add(1, std::memory_order_relaxed);
205
206 // Atomically claim a write slot using CAS loop to avoid ABA problem
207 size_t current_write;
208 size_t new_write;
209 bool overflow_handled = false;
210 size_t retry_count = 0;
211 constexpr size_t max_retries = 100;
212
213 do {
214 current_write = write_index_.load(std::memory_order_acquire);
215 size_t current_read = read_index_.load(std::memory_order_acquire);
216
217 // Check for buffer full condition
218 if (is_full_unsafe(current_write, current_read)) {
220 // Advance read index to overwrite oldest data
221 // Use strong CAS in a loop to ensure it succeeds
222 size_t expected_read = current_read;
223 size_t new_read = (current_read + 1) & get_mask();
224
225 // Try to advance read index with strong CAS
226 // If it fails, another thread already advanced it, which is fine
227 if (read_index_.compare_exchange_strong(expected_read, new_read,
228 std::memory_order_acq_rel,
229 std::memory_order_acquire)) {
230 // Successfully advanced read index
231 if (!overflow_handled) {
232 stats_.overwrites.fetch_add(1, std::memory_order_relaxed);
233 overflow_handled = true;
234 }
235 }
236 // Continue to claim write slot even if CAS failed
237 // (another thread may have already made space)
238 } else {
239 // Buffer is full and overwrite is not allowed
240 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
241
242 // Provide more detailed error information
243 size_t current_size = size();
244 return common::VoidResult::err(error_info(monitoring_error_code::storage_full,
245 "Ring buffer is full (size: " +
246 std::to_string(current_size) +
247 "/" + std::to_string(config_.capacity) +
248 ", overwrites: " +
249 std::to_string(stats_.overwrites.load()) + ")").to_common_error());
250 }
251 }
252
253 new_write = (current_write + 1) & get_mask();
254
255 // Prevent infinite loop in case of extreme contention
256 if (++retry_count > max_retries) {
257 stats_.failed_writes.fetch_add(1, std::memory_order_relaxed);
258 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
259 "Failed to write to ring buffer after " +
260 std::to_string(max_retries) + " retries (high contention)").to_common_error());
261 }
262
263 // Atomically claim the write slot
264 } while (!write_index_.compare_exchange_weak(current_write, new_write,
265 std::memory_order_acq_rel,
266 std::memory_order_acquire));
267
268 // Write the item to the claimed slot
269 buffer_[current_write] = std::move(item);
270
271 // Memory fence ensures data write completes before index update is visible
272 std::atomic_thread_fence(std::memory_order_release);
273
274 return common::ok();
275 }
276
282 size_t write_batch(std::vector<T>&& items) {
283 if (items.empty()) {
284 return 0;
285 }
286
287 size_t written = 0;
288 size_t failed = 0;
289
290 for (auto& item : items) {
291 auto result = write(std::move(item));
292 if (result.is_ok()) {
293 ++written;
294 } else {
295 ++failed;
296 // Stop on first failure if not overwriting to prevent data loss
297 if (!config_.overwrite_old) {
298 break;
299 }
300 }
301 }
302
303 return written;
304 }
305
311 common::VoidResult read(T& item) {
312 stats_.total_reads.fetch_add(1, std::memory_order_relaxed);
313
314 size_t current_read = read_index_.load(std::memory_order_acquire);
315 size_t current_write = write_index_.load(std::memory_order_acquire);
316
317 if (is_empty_unsafe(current_write, current_read)) {
318 stats_.failed_reads.fetch_add(1, std::memory_order_relaxed);
319 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
320 "Ring buffer is empty").to_common_error());
321 }
322
323 // Read the item
324 item = std::move(buffer_[current_read]);
325
326 // Update read index
327 size_t new_read = (current_read + 1) & get_mask();
328 read_index_.store(new_read, std::memory_order_release);
329
330 return common::ok();
331 }
332
339 size_t read_batch(std::vector<T>& items, size_t max_count = SIZE_MAX) {
340 if (max_count == 0) {
341 return 0;
342 }
343
344 size_t batch_size = std::min(max_count, config_.batch_size);
345 items.reserve(items.size() + batch_size);
346
347 size_t read_count = 0;
348 T temp_item;
349
350 while (read_count < batch_size) {
351 auto result = read(temp_item);
352 if (result.is_err()) {
353 break; // No more items to read
354 }
355
356 items.emplace_back(std::move(temp_item));
357 ++read_count;
358 }
359
360 return read_count;
361 }
362
368 common::VoidResult peek(T& item) const {
369 size_t current_read = read_index_.load(std::memory_order_acquire);
370 size_t current_write = write_index_.load(std::memory_order_acquire);
371
372 if (is_empty_unsafe(current_write, current_read)) {
373 return common::VoidResult::err(error_info(monitoring_error_code::collection_failed,
374 "Ring buffer is empty").to_common_error());
375 }
376
377 item = buffer_[current_read]; // Copy, don't move
378 return common::ok();
379 }
380
384 size_t size() const noexcept {
385 size_t write_idx = write_index_.load(std::memory_order_acquire);
386 size_t read_idx = read_index_.load(std::memory_order_acquire);
387
388 // Handle wraparound correctly
389 if (write_idx >= read_idx) {
390 return write_idx - read_idx;
391 } else {
392 // Wraparound case: write has wrapped around but read hasn't
393 return config_.capacity - read_idx + write_idx;
394 }
395 }
396
400 bool empty() const noexcept {
401 return size() == 0;
402 }
403
407 bool full() const noexcept {
408 size_t write_idx = write_index_.load(std::memory_order_acquire);
409 size_t read_idx = read_index_.load(std::memory_order_acquire);
410 return is_full_unsafe(write_idx, read_idx);
411 }
412
416 size_t capacity() const noexcept {
417 return config_.capacity;
418 }
419
423 void clear() noexcept {
424 write_index_.store(0, std::memory_order_release);
425 read_index_.store(0, std::memory_order_release);
426 }
427
431 const ring_buffer_config& get_config() const noexcept {
432 return config_;
433 }
434
438 const ring_buffer_stats& get_stats() const noexcept {
439 return stats_;
440 }
441
445 void reset_stats() noexcept {
446 stats_.total_writes.store(0);
447 stats_.total_reads.store(0);
448 stats_.overwrites.store(0);
449 stats_.failed_writes.store(0);
450 stats_.failed_reads.store(0);
451 stats_.contention_retries.store(0);
452 stats_.creation_time = std::chrono::system_clock::now();
453 }
454
459 bool is_overflow_rate_high() const noexcept {
461 }
462
466 double get_overflow_rate() const noexcept {
467 return stats_.get_overflow_rate();
468 }
469};
470
471#ifdef _MSC_VER
472#pragma warning(pop)
473#endif
474
478template<typename T>
479std::unique_ptr<ring_buffer<T>> make_ring_buffer(size_t capacity = 8192) {
480 ring_buffer_config config;
481 config.capacity = capacity;
482 return std::make_unique<ring_buffer<T>>(config);
483}
484
488template<typename T>
489std::unique_ptr<ring_buffer<T>> make_ring_buffer(const ring_buffer_config& config) {
490 return std::make_unique<ring_buffer<T>>(config);
491}
492
493} } // namespace kcenon::monitoring
Lock-free ring buffer with atomic operations.
size_t write_batch(std::vector< T > &&items)
Write multiple elements in batch.
std::unique_ptr< T[]> buffer_
size_t size() const noexcept
Get current number of elements in buffer.
common::VoidResult write(T &&item)
Write a single element to the buffer.
void clear() noexcept
Clear all elements in the buffer.
size_t capacity() const noexcept
Get buffer capacity.
ring_buffer(const ring_buffer &)=delete
bool is_overflow_rate_high() const noexcept
Check if buffer is experiencing high overflow.
void reset_stats() noexcept
Reset statistics.
const ring_buffer_stats & get_stats() const noexcept
Get buffer statistics.
size_t read_batch(std::vector< T > &items, size_t max_count=SIZE_MAX)
Read multiple elements in batch.
ring_buffer & operator=(const ring_buffer &)=delete
bool full() const noexcept
Check if buffer is full.
bool is_empty_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is empty.
size_t get_mask() const noexcept
Get the mask for efficient modulo operation.
common::VoidResult peek(T &item) const
Peek at the next item without removing it.
ring_buffer(const ring_buffer_config &config={})
Constructor with configuration.
double get_overflow_rate() const noexcept
Get overflow rate percentage.
ring_buffer & operator=(ring_buffer &&)=default
ring_buffer(ring_buffer &&)=default
bool is_full_unsafe(size_t write_idx, size_t read_idx) const noexcept
Check if buffer is full.
std::atomic< size_t > write_index_
common::VoidResult read(T &item)
Read a single element from the buffer.
const ring_buffer_config & get_config() const noexcept
Get buffer configuration.
std::atomic< size_t > read_index_
bool empty() const noexcept
Check if buffer is empty.
Monitoring system specific error codes.
std::unique_ptr< ring_buffer< T > > make_ring_buffer(size_t capacity=8192)
Helper function to create a ring buffer with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
common::error_info to_common_error() const
Convert to common_system error_info.
Configuration for ring buffer behavior.
Definition ring_buffer.h:36
common::VoidResult validate() const
Validate ring buffer configuration.
Definition ring_buffer.h:45
std::chrono::milliseconds gc_interval
Definition ring_buffer.h:40
Statistics for ring buffer performance monitoring.
Definition ring_buffer.h:64
double get_write_success_rate() const
Get write success rate.
Definition ring_buffer.h:85
double get_overflow_rate() const
Get overflow rate (overwrites per total writes)
Definition ring_buffer.h:94
std::atomic< size_t > failed_writes
Definition ring_buffer.h:68
double get_read_success_rate() const
Get read success rate.
std::atomic< size_t > contention_retries
Definition ring_buffer.h:70
std::chrono::system_clock::time_point creation_time
Definition ring_buffer.h:71
std::atomic< size_t > total_writes
Definition ring_buffer.h:65
double get_utilization(size_t current_size, size_t capacity) const
Get current utilization percentage.
Definition ring_buffer.h:78
std::atomic< size_t > failed_reads
Definition ring_buffer.h:69
double get_avg_contention() const
Get average contention (retries per write)
std::atomic< size_t > total_reads
Definition ring_buffer.h:66
bool is_overflow_rate_high() const
Check if overflow rate is high (> 10%)