Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
lockfree_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#pragma once
12
13#include <atomic>
14#include <cstddef>
15#include <cstdint>
16#include <memory>
17#include <utility>
18#include <vector>
19
21
22// Disable MSVC warning C4324: structure was padded due to alignment specifier
23// This is intentional for cache line optimization in lock-free data structures
24#ifdef _MSC_VER
25#pragma warning(push)
26#pragma warning(disable: 4324)
27#endif
28
29namespace kcenon::monitoring {
30
35 size_t initial_capacity = 1024;
36 size_t max_capacity = 65536;
37 bool allow_overwrite = false;
38
43 bool validate() const {
44 if (initial_capacity == 0) {
45 return false;
46 }
48 return false;
49 }
50 return true;
51 }
52};
53
58 std::atomic<size_t> push_attempts{0};
59 std::atomic<size_t> push_successes{0};
60 std::atomic<size_t> push_failures{0};
61 std::atomic<size_t> pop_attempts{0};
62 std::atomic<size_t> pop_successes{0};
63 std::atomic<size_t> pop_failures{0};
64
73
78 double get_push_success_rate() const {
79 auto attempts = push_attempts.load();
80 if (attempts == 0) {
81 return 100.0;
82 }
83 return (static_cast<double>(push_successes.load()) / static_cast<double>(attempts)) * 100.0;
84 }
85
90 double get_pop_success_rate() const {
91 auto attempts = pop_attempts.load();
92 if (attempts == 0) {
93 return 100.0;
94 }
95 return (static_cast<double>(pop_successes.load()) / static_cast<double>(attempts)) * 100.0;
96 }
97
101 void reset() {
102 push_attempts.store(0);
103 push_successes.store(0);
104 push_failures.store(0);
105 pop_attempts.store(0);
106 pop_successes.store(0);
107 pop_failures.store(0);
108 }
109};
110
119template<typename T>
121public:
126
131 explicit lockfree_queue(const lockfree_queue_config& config)
132 : config_(config)
133 , capacity_(config.initial_capacity)
134 , buffer_(config.initial_capacity)
135 , head_(0)
136 , tail_(0)
137 , size_(0) {
138 // Initialize each slot's sequence to its index
139 for (size_t i = 0; i < capacity_; ++i) {
140 buffer_[i].sequence.store(i, std::memory_order_relaxed);
141 }
142 }
143
144 // Disable copy
147
148 // Enable move
150 : config_(std::move(other.config_))
151 , capacity_(other.capacity_)
152 , buffer_(std::move(other.buffer_))
153 , head_(other.head_.load())
154 , tail_(other.tail_.load())
155 , size_(other.size_.load())
156 , stats_(std::move(other.stats_)) {}
157
159 if (this != &other) {
160 config_ = std::move(other.config_);
161 capacity_ = other.capacity_;
162 buffer_ = std::move(other.buffer_);
163 head_.store(other.head_.load());
164 tail_.store(other.tail_.load());
165 size_.store(other.size_.load());
166 stats_ = std::move(other.stats_);
167 }
168 return *this;
169 }
170
176 common::Result<bool> push(const T& value) {
177 return push_impl(value);
178 }
179
185 common::Result<bool> push(T&& value) {
186 return push_impl(std::move(value));
187 }
188
193 common::Result<T> pop() {
195
196 size_t current_head = head_.load(std::memory_order_relaxed);
197
198 while (true) {
199 size_t index = current_head % capacity_;
200 auto& slot = buffer_[index];
201 size_t seq = slot.sequence.load(std::memory_order_acquire);
202 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(current_head + 1);
203
204 if (diff == 0) {
205 // Slot is ready to be read
206 if (head_.compare_exchange_weak(current_head, current_head + 1,
207 std::memory_order_relaxed)) {
208 T value = std::move(slot.data);
209 slot.sequence.store(current_head + capacity_, std::memory_order_release);
210 size_.fetch_sub(1, std::memory_order_relaxed);
212 return common::ok(std::move(value));
213 }
214 } else if (diff < 0) {
215 // Queue is empty
217 return common::Result<T>::err(error_info(monitoring_error_code::resource_unavailable, "Queue is empty").to_common_error());
218 } else {
219 // Another thread is modifying, retry
220 current_head = head_.load(std::memory_order_relaxed);
221 }
222 }
223 }
224
229 bool empty() const {
230 return size_.load(std::memory_order_relaxed) == 0;
231 }
232
237 size_t size() const {
238 return size_.load(std::memory_order_relaxed);
239 }
240
245 size_t capacity() const {
246 return capacity_;
247 }
248
254 return stats_;
255 }
256
261 stats_.reset();
262 }
263
264private:
265 struct alignas(64) slot {
266 std::atomic<size_t> sequence;
268 };
269
270 template<typename U>
271 common::Result<bool> push_impl(U&& value) {
273
274 size_t current_tail = tail_.load(std::memory_order_relaxed);
275
276 while (true) {
277 size_t index = current_tail % capacity_;
278 auto& slot = buffer_[index];
279 size_t seq = slot.sequence.load(std::memory_order_acquire);
280 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(current_tail);
281
282 if (diff == 0) {
283 // Slot is available for writing
284 if (tail_.compare_exchange_weak(current_tail, current_tail + 1,
285 std::memory_order_relaxed)) {
286 slot.data = std::forward<U>(value);
287 slot.sequence.store(current_tail + 1, std::memory_order_release);
288 size_.fetch_add(1, std::memory_order_relaxed);
290 return common::ok(true);
291 }
292 } else if (diff < 0) {
293 // Queue is full
295 return common::ok(false);
296 } else {
297 // Another thread is modifying, retry
298 current_tail = tail_.load(std::memory_order_relaxed);
299 }
300 }
301 }
302
304 size_t capacity_;
305 std::vector<slot> buffer_;
306 alignas(64) std::atomic<size_t> head_;
307 alignas(64) std::atomic<size_t> tail_;
308 alignas(64) std::atomic<size_t> size_;
310};
311
317template<typename T>
318std::unique_ptr<lockfree_queue<T>> make_lockfree_queue() {
319 return std::make_unique<lockfree_queue<T>>();
320}
321
328template<typename T>
329std::unique_ptr<lockfree_queue<T>> make_lockfree_queue(const lockfree_queue_config& config) {
330 return std::make_unique<lockfree_queue<T>>(config);
331}
332
337inline std::vector<lockfree_queue_config> create_default_queue_configs() {
338 return {
339 // Small queue for low-throughput scenarios
340 {.initial_capacity = 64, .max_capacity = 256, .allow_overwrite = false},
341 // Medium queue for general use
342 {.initial_capacity = 1024, .max_capacity = 4096, .allow_overwrite = false},
343 // Large queue for high-throughput scenarios
344 {.initial_capacity = 4096, .max_capacity = 65536, .allow_overwrite = false},
345 // Overwrite queue for streaming data
346 {.initial_capacity = 1024, .max_capacity = 1024, .allow_overwrite = true}
347 };
348}
349
350} // namespace kcenon::monitoring
351
352#ifdef _MSC_VER
353#pragma warning(pop)
354#endif
Thread-safe lock-free MPMC (Multiple Producer Multiple Consumer) queue.
const lockfree_queue_statistics & get_statistics() const
Get queue statistics.
size_t size() const
Get current queue size.
common::Result< bool > push(const T &value)
Push an element to the queue.
lockfree_queue(lockfree_queue &&other) noexcept
bool empty() const
Check if the queue is empty.
common::Result< T > pop()
Pop an element from the queue.
common::Result< bool > push_impl(U &&value)
lockfree_queue & operator=(lockfree_queue &&other) noexcept
lockfree_queue(const lockfree_queue &)=delete
size_t capacity() const
Get queue capacity.
lockfree_queue & operator=(const lockfree_queue &)=delete
lockfree_queue()
Default constructor with default configuration.
lockfree_queue_statistics stats_
void reset_statistics()
Reset statistics.
common::Result< bool > push(T &&value)
Push an element to the queue (move version)
lockfree_queue(const lockfree_queue_config &config)
Construct with configuration.
std::vector< lockfree_queue_config > create_default_queue_configs()
Create default queue configurations for different use cases.
std::unique_ptr< lockfree_queue< T > > make_lockfree_queue()
Create a lock-free queue with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
Configuration for lock-free queue.
size_t initial_capacity
Initial capacity of the queue.
bool allow_overwrite
Allow overwriting oldest elements when full.
size_t max_capacity
Maximum capacity (0 = unlimited)
bool validate() const
Validate configuration.
Statistics for lock-free queue operations.
lockfree_queue_statistics(const lockfree_queue_statistics &other)
double get_pop_success_rate() const
Get pop success rate.
double get_push_success_rate() const
Get push success rate.