26#pragma warning(disable: 4324)
83 return (
static_cast<double>(
push_successes.load()) /
static_cast<double>(attempts)) * 100.0;
95 return (
static_cast<double>(
pop_successes.load()) /
static_cast<double>(attempts)) * 100.0;
134 ,
buffer_(config.initial_capacity)
140 buffer_[i].sequence.store(i, std::memory_order_relaxed);
159 if (
this != &
other) {
176 common::Result<bool>
push(
const T& value) {
185 common::Result<bool>
push(T&& value) {
196 size_t current_head =
head_.load(std::memory_order_relaxed);
201 size_t seq =
slot.
sequence.load(std::memory_order_acquire);
202 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(current_head + 1);
206 if (
head_.compare_exchange_weak(current_head, current_head + 1,
207 std::memory_order_relaxed)) {
210 size_.fetch_sub(1, std::memory_order_relaxed);
212 return common::ok(std::move(value));
214 }
else if (diff < 0) {
220 current_head =
head_.load(std::memory_order_relaxed);
230 return size_.load(std::memory_order_relaxed) == 0;
238 return size_.load(std::memory_order_relaxed);
274 size_t current_tail =
tail_.load(std::memory_order_relaxed);
279 size_t seq =
slot.
sequence.load(std::memory_order_acquire);
280 intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(current_tail);
284 if (
tail_.compare_exchange_weak(current_tail, current_tail + 1,
285 std::memory_order_relaxed)) {
287 slot.
sequence.store(current_tail + 1, std::memory_order_release);
288 size_.fetch_add(1, std::memory_order_relaxed);
290 return common::ok(
true);
292 }
else if (diff < 0) {
295 return common::ok(
false);
298 current_tail =
tail_.load(std::memory_order_relaxed);
306 alignas(64) std::atomic<size_t>
head_;
307 alignas(64) std::atomic<size_t>
tail_;
308 alignas(64) std::atomic<size_t>
size_;
319 return std::make_unique<lockfree_queue<T>>();
330 return std::make_unique<lockfree_queue<T>>(config);
340 {.initial_capacity = 64, .max_capacity = 256, .allow_overwrite =
false},
342 {.initial_capacity = 1024, .max_capacity = 4096, .allow_overwrite =
false},
344 {.initial_capacity = 4096, .max_capacity = 65536, .allow_overwrite =
false},
346 {.initial_capacity = 1024, .max_capacity = 1024, .allow_overwrite =
true}
Thread-safe lock-free MPMC (Multiple Producer Multiple Consumer) queue.
std::atomic< size_t > size_
const lockfree_queue_statistics & get_statistics() const
Get queue statistics.
std::atomic< size_t > tail_
size_t size() const
Get current queue size.
common::Result< bool > push(const T &value)
Push an element to the queue.
lockfree_queue(lockfree_queue &&other) noexcept
bool empty() const
Check if the queue is empty.
common::Result< T > pop()
Pop an element from the queue.
common::Result< bool > push_impl(U &&value)
lockfree_queue & operator=(lockfree_queue &&other) noexcept
lockfree_queue(const lockfree_queue &)=delete
std::vector< slot > buffer_
size_t capacity() const
Get queue capacity.
lockfree_queue_config config_
lockfree_queue & operator=(const lockfree_queue &)=delete
lockfree_queue()
Default constructor with default configuration.
lockfree_queue_statistics stats_
void reset_statistics()
Reset statistics.
common::Result< bool > push(T &&value)
Push an element to the queue (move version)
lockfree_queue(const lockfree_queue_config &config)
Construct with configuration.
std::atomic< size_t > head_
std::vector< lockfree_queue_config > create_default_queue_configs()
Create default queue configurations for different use cases.
std::unique_ptr< lockfree_queue< T > > make_lockfree_queue()
Create a lock-free queue with default configuration.
Result pattern type definitions for monitoring system.
Extended error information with context.
std::atomic< size_t > sequence
Configuration for lock-free queue.
size_t initial_capacity
Initial capacity of the queue.
bool allow_overwrite
Allow overwriting oldest elements when full.
size_t max_capacity
Maximum capacity (0 = unlimited)
bool validate() const
Validate configuration.
Statistics for lock-free queue operations.
std::atomic< size_t > pop_successes
std::atomic< size_t > pop_failures
lockfree_queue_statistics()=default
std::atomic< size_t > push_failures
void reset()
Reset all statistics.
std::atomic< size_t > pop_attempts
lockfree_queue_statistics(const lockfree_queue_statistics &other)
std::atomic< size_t > push_attempts
double get_pop_success_rate() const
Get pop success rate.
std::atomic< size_t > push_successes
double get_push_success_rate() const
Get push success rate.