Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::lockfree_queue< T > Class Template Reference

Thread-safe lock-free MPMC (Multiple Producer Multiple Consumer) queue. More...

#include <lockfree_queue.h>

Collaboration diagram for kcenon::monitoring::lockfree_queue< T >:
Collaboration graph

Classes

struct  slot
 

Public Member Functions

 lockfree_queue ()
 Default constructor with default configuration.
 
 lockfree_queue (const lockfree_queue_config &config)
 Construct with configuration.
 
 lockfree_queue (const lockfree_queue &)=delete
 
lockfree_queueoperator= (const lockfree_queue &)=delete
 
 lockfree_queue (lockfree_queue &&other) noexcept
 
lockfree_queueoperator= (lockfree_queue &&other) noexcept
 
common::Result< bool > push (const T &value)
 Push an element to the queue.
 
common::Result< bool > push (T &&value)
 Push an element to the queue (move version)
 
common::Result< T > pop ()
 Pop an element from the queue.
 
bool empty () const
 Check if the queue is empty.
 
size_t size () const
 Get current queue size.
 
size_t capacity () const
 Get queue capacity.
 
const lockfree_queue_statisticsget_statistics () const
 Get queue statistics.
 
void reset_statistics ()
 Reset statistics.
 

Private Member Functions

template<typename U >
common::Result< bool > push_impl (U &&value)
 

Private Attributes

lockfree_queue_config config_
 
size_t capacity_
 
std::vector< slotbuffer_
 
std::atomic< size_t > head_
 
std::atomic< size_t > tail_
 
std::atomic< size_t > size_
 
lockfree_queue_statistics stats_
 

Detailed Description

template<typename T>
class kcenon::monitoring::lockfree_queue< T >

Thread-safe lock-free MPMC (Multiple Producer Multiple Consumer) queue.

This implementation uses a bounded ring buffer with atomic operations for thread-safe access without locks.

Template Parameters
TThe type of elements stored in the queue

Definition at line 120 of file lockfree_queue.h.

Constructor & Destructor Documentation

◆ lockfree_queue() [1/4]

template<typename T >
kcenon::monitoring::lockfree_queue< T >::lockfree_queue ( )
inline

Default constructor with default configuration.

Definition at line 125 of file lockfree_queue.h.

125: lockfree_queue(lockfree_queue_config{}) {}
lockfree_queue()
Default constructor with default configuration.

◆ lockfree_queue() [2/4]

template<typename T >
kcenon::monitoring::lockfree_queue< T >::lockfree_queue ( const lockfree_queue_config & config)
inlineexplicit

Construct with configuration.

Parameters
configQueue configuration

Definition at line 131 of file lockfree_queue.h.

132 : config_(config)
133 , capacity_(config.initial_capacity)
134 , buffer_(config.initial_capacity)
135 , head_(0)
136 , tail_(0)
137 , size_(0) {
138 // Initialize each slot's sequence to its index
139 for (size_t i = 0; i < capacity_; ++i) {
140 buffer_[i].sequence.store(i, std::memory_order_relaxed);
141 }
142 }

References kcenon::monitoring::lockfree_queue< T >::buffer_, and kcenon::monitoring::lockfree_queue< T >::capacity_.

◆ lockfree_queue() [3/4]

template<typename T >
kcenon::monitoring::lockfree_queue< T >::lockfree_queue ( const lockfree_queue< T > & )
delete

◆ lockfree_queue() [4/4]

template<typename T >
kcenon::monitoring::lockfree_queue< T >::lockfree_queue ( lockfree_queue< T > && other)
inlinenoexcept

Definition at line 149 of file lockfree_queue.h.

150 : config_(std::move(other.config_))
151 , capacity_(other.capacity_)
152 , buffer_(std::move(other.buffer_))
153 , head_(other.head_.load())
154 , tail_(other.tail_.load())
155 , size_(other.size_.load())
156 , stats_(std::move(other.stats_)) {}
lockfree_queue_statistics stats_

Member Function Documentation

◆ capacity()

template<typename T >
size_t kcenon::monitoring::lockfree_queue< T >::capacity ( ) const
inline

Get queue capacity.

Returns
Current capacity of the queue

Definition at line 245 of file lockfree_queue.h.

245 {
246 return capacity_;
247 }

References kcenon::monitoring::lockfree_queue< T >::capacity_.

◆ empty()

template<typename T >
bool kcenon::monitoring::lockfree_queue< T >::empty ( ) const
inline

Check if the queue is empty.

Returns
true if empty

Definition at line 229 of file lockfree_queue.h.

229 {
230 return size_.load(std::memory_order_relaxed) == 0;
231 }

References kcenon::monitoring::lockfree_queue< T >::size_.

Referenced by TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ get_statistics()

template<typename T >
const lockfree_queue_statistics & kcenon::monitoring::lockfree_queue< T >::get_statistics ( ) const
inline

Get queue statistics.

Returns
Reference to statistics

Definition at line 253 of file lockfree_queue.h.

253 {
254 return stats_;
255 }

References kcenon::monitoring::lockfree_queue< T >::stats_.

Referenced by TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ operator=() [1/2]

template<typename T >
lockfree_queue & kcenon::monitoring::lockfree_queue< T >::operator= ( const lockfree_queue< T > & )
delete

◆ operator=() [2/2]

template<typename T >
lockfree_queue & kcenon::monitoring::lockfree_queue< T >::operator= ( lockfree_queue< T > && other)
inlinenoexcept

◆ pop()

template<typename T >
common::Result< T > kcenon::monitoring::lockfree_queue< T >::pop ( )
inline

Pop an element from the queue.

Returns
common::Result<T> containing the value on success, error if queue is empty

Definition at line 193 of file lockfree_queue.h.

193 {
195
196 size_t current_head = head_.load(std::memory_order_relaxed);
197
198 while (true) {
199 size_t index = current_head % capacity_;
200 auto& slot = buffer_[index];
201 size_t seq = slot.sequence.load(std::memory_order_acquire);
202 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(current_head + 1);
203
204 if (diff == 0) {
205 // Slot is ready to be read
206 if (head_.compare_exchange_weak(current_head, current_head + 1,
207 std::memory_order_relaxed)) {
208 T value = std::move(slot.data);
209 slot.sequence.store(current_head + capacity_, std::memory_order_release);
210 size_.fetch_sub(1, std::memory_order_relaxed);
212 return common::ok(std::move(value));
213 }
214 } else if (diff < 0) {
215 // Queue is empty
217 return common::Result<T>::err(error_info(monitoring_error_code::resource_unavailable, "Queue is empty").to_common_error());
218 } else {
219 // Another thread is modifying, retry
220 current_head = head_.load(std::memory_order_relaxed);
221 }
222 }
223 }

References kcenon::monitoring::lockfree_queue< T >::buffer_, kcenon::monitoring::lockfree_queue< T >::capacity_, kcenon::monitoring::lockfree_queue< T >::slot::data, kcenon::monitoring::lockfree_queue< T >::head_, kcenon::monitoring::lockfree_queue_statistics::pop_attempts, kcenon::monitoring::lockfree_queue_statistics::pop_failures, kcenon::monitoring::lockfree_queue_statistics::pop_successes, kcenon::monitoring::resource_unavailable, kcenon::monitoring::lockfree_queue< T >::slot::sequence, kcenon::monitoring::lockfree_queue< T >::size_, and kcenon::monitoring::lockfree_queue< T >::stats_.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ push() [1/2]

template<typename T >
common::Result< bool > kcenon::monitoring::lockfree_queue< T >::push ( const T & value)
inline

Push an element to the queue.

Parameters
valueThe value to push
Returns
common::Result<bool> containing true on success, false if queue is full

Definition at line 176 of file lockfree_queue.h.

176 {
177 return push_impl(value);
178 }
common::Result< bool > push_impl(U &&value)

References kcenon::monitoring::lockfree_queue< T >::push_impl().

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ push() [2/2]

template<typename T >
common::Result< bool > kcenon::monitoring::lockfree_queue< T >::push ( T && value)
inline

Push an element to the queue (move version)

Parameters
valueThe value to push
Returns
common::Result<bool> containing true on success, false if queue is full

Definition at line 185 of file lockfree_queue.h.

185 {
186 return push_impl(std::move(value));
187 }

References kcenon::monitoring::lockfree_queue< T >::push_impl().

Here is the call graph for this function:

◆ push_impl()

template<typename T >
template<typename U >
common::Result< bool > kcenon::monitoring::lockfree_queue< T >::push_impl ( U && value)
inlineprivate

Definition at line 271 of file lockfree_queue.h.

271 {
273
274 size_t current_tail = tail_.load(std::memory_order_relaxed);
275
276 while (true) {
277 size_t index = current_tail % capacity_;
278 auto& slot = buffer_[index];
279 size_t seq = slot.sequence.load(std::memory_order_acquire);
280 intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(current_tail);
281
282 if (diff == 0) {
283 // Slot is available for writing
284 if (tail_.compare_exchange_weak(current_tail, current_tail + 1,
285 std::memory_order_relaxed)) {
286 slot.data = std::forward<U>(value);
287 slot.sequence.store(current_tail + 1, std::memory_order_release);
288 size_.fetch_add(1, std::memory_order_relaxed);
290 return common::ok(true);
291 }
292 } else if (diff < 0) {
293 // Queue is full
295 return common::ok(false);
296 } else {
297 // Another thread is modifying, retry
298 current_tail = tail_.load(std::memory_order_relaxed);
299 }
300 }
301 }

References kcenon::monitoring::lockfree_queue< T >::buffer_, kcenon::monitoring::lockfree_queue< T >::capacity_, kcenon::monitoring::lockfree_queue< T >::slot::data, kcenon::monitoring::lockfree_queue_statistics::push_attempts, kcenon::monitoring::lockfree_queue_statistics::push_failures, kcenon::monitoring::lockfree_queue_statistics::push_successes, kcenon::monitoring::lockfree_queue< T >::slot::sequence, kcenon::monitoring::lockfree_queue< T >::size_, kcenon::monitoring::lockfree_queue< T >::stats_, and kcenon::monitoring::lockfree_queue< T >::tail_.

Referenced by kcenon::monitoring::lockfree_queue< T >::push(), and kcenon::monitoring::lockfree_queue< T >::push().

Here is the caller graph for this function:

◆ reset_statistics()

template<typename T >
void kcenon::monitoring::lockfree_queue< T >::reset_statistics ( )
inline

Reset statistics.

Definition at line 260 of file lockfree_queue.h.

260 {
261 stats_.reset();
262 }

References kcenon::monitoring::lockfree_queue_statistics::reset(), and kcenon::monitoring::lockfree_queue< T >::stats_.

Referenced by TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ size()

template<typename T >
size_t kcenon::monitoring::lockfree_queue< T >::size ( ) const
inline

Get current queue size.

Returns
Number of elements in the queue

Definition at line 237 of file lockfree_queue.h.

237 {
238 return size_.load(std::memory_order_relaxed);
239 }

References kcenon::monitoring::lockfree_queue< T >::size_.

Referenced by TEST_F(), TEST_F(), and TEST_F().

Here is the caller graph for this function:

Member Data Documentation

◆ buffer_

◆ capacity_

◆ config_

template<typename T >
lockfree_queue_config kcenon::monitoring::lockfree_queue< T >::config_
private

◆ head_

template<typename T >
std::atomic<size_t> kcenon::monitoring::lockfree_queue< T >::head_
private

◆ size_

◆ stats_

◆ tail_

template<typename T >
std::atomic<size_t> kcenon::monitoring::lockfree_queue< T >::tail_
private

The documentation for this class was generated from the following file: