Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::lockfree::work_stealing_deque< T > Class Template Reference

Lock-free work-stealing deque based on Chase-Lev algorithm. More...

#include <work_stealing_deque.h>

Inheritance diagram for kcenon::thread::lockfree::work_stealing_deque< T >:
Inheritance graph
Collaboration diagram for kcenon::thread::lockfree::work_stealing_deque< T >:
Collaboration graph

Public Member Functions

 work_stealing_deque (std::size_t log_initial_size=LOG_INITIAL_SIZE)
 Constructs an empty work-stealing deque.
 
 ~work_stealing_deque ()
 Destructor - cleans up the circular array.
 
 work_stealing_deque (const work_stealing_deque &)=delete
 
work_stealing_dequeoperator= (const work_stealing_deque &)=delete
 
 work_stealing_deque (work_stealing_deque &&)=delete
 
work_stealing_dequeoperator= (work_stealing_deque &&)=delete
 
void push (T item)
 Push an element onto the bottom of the deque (owner only)
 
std::optional< T > pop ()
 Pop an element from the bottom of the deque (owner only)
 
std::optional< T > steal ()
 Steal an element from the top of the deque (thief threads)
 
std::vector< T > steal_batch (std::size_t max_count)
 Steal multiple elements from the top of the deque (thief threads)
 
bool empty () const noexcept
 Check if the deque appears empty.
 
std::size_t size () const noexcept
 Get approximate size of the deque.
 
std::size_t capacity () const noexcept
 Get the capacity of the current array.
 
void cleanup_old_arrays ()
 Clear all old arrays (for memory cleanup)
 

Static Public Attributes

static constexpr std::size_t LOG_INITIAL_SIZE = 5
 Default initial log capacity (2^LOG_INITIAL_SIZE = 32 elements)
 

Private Attributes

std::atomic< std::int64_t > top_
 
std::atomic< std::int64_t > bottom_
 
std::atomic< circular_array< T > * > array_
 
std::vector< circular_array< T > * > old_arrays_
 

Static Private Attributes

static constexpr std::size_t CACHE_LINE_SIZE = 64
 

Detailed Description

template<typename T>
class kcenon::thread::lockfree::work_stealing_deque< T >

Lock-free work-stealing deque based on Chase-Lev algorithm.

This class implements a work-stealing deque (double-ended queue) using the Chase-Lev algorithm. It provides efficient local operations for the owner thread (push/pop) and concurrent stealing for other threads.

Key Features:

  • Owner-side push/pop: LIFO order for cache locality
  • Thief-side steal: FIFO order for fairness
  • Lock-free operations with proper memory ordering
  • Dynamic resizing when full

Algorithm Reference: "Dynamic Circular Work-Stealing Deque" (Chase & Lev, 2005)

Memory Layout:

Owner Thread: Thief Threads:
push/popsteal
┌───────────────────────┐
│ bottom top │
│ ↓ ↑ │
│ [T4][T3][T2][T1][--][--] │
│ LIFO FIFO │
│ (locality) (fairness)│
└───────────────────────┘
std::optional< T > steal()
Steal an element from the top of the deque (thief threads)
void push(T item)
Push an element onto the bottom of the deque (owner only)
std::optional< T > pop()
Pop an element from the bottom of the deque (owner only)

Thread Safety:

  • push(): Single-threaded (owner only)
  • pop(): Single-threaded (owner only)
  • steal(): Multi-threaded (any thief)
  • empty()/size(): Any thread (approximate)
Template Parameters
TElement type (must be pointer or trivially copyable)
Note
For use with job*, ensure T = job*

Definition at line 36 of file numa_work_stealer.h.

Constructor & Destructor Documentation

◆ work_stealing_deque() [1/3]

template<typename T >
kcenon::thread::lockfree::work_stealing_deque< T >::work_stealing_deque ( std::size_t log_initial_size = LOG_INITIAL_SIZE)
inlineexplicit

Constructs an empty work-stealing deque.

Parameters
log_initial_sizeInitial capacity as log base 2 (default: 5, i.e., 32 elements)

Definition at line 156 of file work_stealing_deque.h.

157 : top_(0)
158 , bottom_(0)
159 , array_(new circular_array<T>(log_initial_size)) {
160 }
std::atomic< circular_array< T > * > array_

◆ ~work_stealing_deque()

template<typename T >
kcenon::thread::lockfree::work_stealing_deque< T >::~work_stealing_deque ( )
inline

Destructor - cleans up the circular array.

Definition at line 165 of file work_stealing_deque.h.

165 {
166 delete array_.load(std::memory_order_relaxed);
167 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_.

◆ work_stealing_deque() [2/3]

template<typename T >
kcenon::thread::lockfree::work_stealing_deque< T >::work_stealing_deque ( const work_stealing_deque< T > & )
delete

◆ work_stealing_deque() [3/3]

template<typename T >
kcenon::thread::lockfree::work_stealing_deque< T >::work_stealing_deque ( work_stealing_deque< T > && )
delete

Member Function Documentation

◆ capacity()

template<typename T >
std::size_t kcenon::thread::lockfree::work_stealing_deque< T >::capacity ( ) const
inlinenodiscardnoexcept

Get the capacity of the current array.

Returns
Current capacity

Definition at line 375 of file work_stealing_deque.h.

375 {
376 return array_.load(std::memory_order_relaxed)->size();
377 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_.

◆ cleanup_old_arrays()

template<typename T >
void kcenon::thread::lockfree::work_stealing_deque< T >::cleanup_old_arrays ( )
inline

Clear all old arrays (for memory cleanup)

Note
Should only be called when no operations are in progress. Typically called during shutdown or periodic cleanup.

Definition at line 385 of file work_stealing_deque.h.

385 {
386 for (auto* old_array : old_arrays_) {
387 delete old_array;
388 }
389 old_arrays_.clear();
390 }
std::vector< circular_array< T > * > old_arrays_

References kcenon::thread::lockfree::work_stealing_deque< T >::old_arrays_.

◆ empty()

template<typename T >
bool kcenon::thread::lockfree::work_stealing_deque< T >::empty ( ) const
inlinenodiscardnoexcept

Check if the deque appears empty.

Returns
true if the deque appears empty, false otherwise
Note
This is a snapshot view; the deque may change immediately after. Use for hints only, not for synchronization.

Definition at line 352 of file work_stealing_deque.h.

352 {
353 std::int64_t b = bottom_.load(std::memory_order_relaxed);
354 std::int64_t t = top_.load(std::memory_order_relaxed);
355 return b <= t;
356 }

References kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

◆ operator=() [1/2]

template<typename T >
work_stealing_deque & kcenon::thread::lockfree::work_stealing_deque< T >::operator= ( const work_stealing_deque< T > & )
delete

◆ operator=() [2/2]

template<typename T >
work_stealing_deque & kcenon::thread::lockfree::work_stealing_deque< T >::operator= ( work_stealing_deque< T > && )
delete

◆ pop()

template<typename T >
std::optional< T > kcenon::thread::lockfree::work_stealing_deque< T >::pop ( )
inlinenodiscard

Pop an element from the bottom of the deque (owner only)

Returns
The popped element, or std::nullopt if empty

Time Complexity: O(1)

Note
This method should only be called by the owner thread. Uses LIFO order for better cache locality.

Definition at line 214 of file work_stealing_deque.h.

214 {
215 std::int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
216 circular_array<T>* a = array_.load(std::memory_order_relaxed);
217 bottom_.store(b, std::memory_order_relaxed);
218 std::atomic_thread_fence(std::memory_order_seq_cst);
219 std::int64_t t = top_.load(std::memory_order_relaxed);
220
221 if (t <= b) {
222 // Non-empty queue
223 T item = a->get(b);
224 if (t == b) {
225 // Last element - compete with thieves
226 if (!top_.compare_exchange_strong(
227 t, t + 1,
228 std::memory_order_seq_cst,
229 std::memory_order_relaxed)) {
230 // Lost race with a thief
231 bottom_.store(b + 1, std::memory_order_relaxed);
232 return std::nullopt;
233 }
234 bottom_.store(b + 1, std::memory_order_relaxed);
235 }
236 return item;
237 } else {
238 // Empty queue
239 bottom_.store(b + 1, std::memory_order_relaxed);
240 return std::nullopt;
241 }
242 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_, kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, kcenon::thread::lockfree::circular_array< T >::get(), and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

Here is the call graph for this function:

◆ push()

template<typename T >
void kcenon::thread::lockfree::work_stealing_deque< T >::push ( T item)
inline

Push an element onto the bottom of the deque (owner only)

Parameters
itemElement to push

Time Complexity: O(1) amortized (O(n) when resizing)

Note
This method should only be called by the owner thread. Calling from multiple threads results in undefined behavior.

Definition at line 184 of file work_stealing_deque.h.

184 {
185 std::int64_t b = bottom_.load(std::memory_order_relaxed);
186 std::int64_t t = top_.load(std::memory_order_acquire);
187 circular_array<T>* a = array_.load(std::memory_order_relaxed);
188
189 // Check if array needs to grow
190 if (b - t > static_cast<std::int64_t>(a->size()) - 1) {
191 // Grow the array
192 circular_array<T>* new_array = a->grow(b, t);
193 // Store old array for cleanup (in a real implementation,
194 // you would use hazard pointers or epoch-based reclamation)
195 old_arrays_.push_back(a);
196 array_.store(new_array, std::memory_order_release);
197 a = new_array;
198 }
199
200 a->put(b, item);
201 std::atomic_thread_fence(std::memory_order_release);
202 bottom_.store(b + 1, std::memory_order_relaxed);
203 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_, kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, kcenon::thread::lockfree::circular_array< T >::grow(), kcenon::thread::lockfree::work_stealing_deque< T >::old_arrays_, kcenon::thread::lockfree::circular_array< T >::put(), kcenon::thread::lockfree::circular_array< T >::size(), and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

Here is the call graph for this function:

◆ size()

template<typename T >
std::size_t kcenon::thread::lockfree::work_stealing_deque< T >::size ( ) const
inlinenodiscardnoexcept

Get approximate size of the deque.

Returns
Approximate number of elements
Note
This is a best-effort estimate due to concurrent modifications.

Definition at line 364 of file work_stealing_deque.h.

364 {
365 std::int64_t b = bottom_.load(std::memory_order_relaxed);
366 std::int64_t t = top_.load(std::memory_order_relaxed);
367 std::int64_t diff = b - t;
368 return diff > 0 ? static_cast<std::size_t>(diff) : 0;
369 }

References kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

◆ steal()

template<typename T >
std::optional< T > kcenon::thread::lockfree::work_stealing_deque< T >::steal ( )
inlinenodiscard

Steal an element from the top of the deque (thief threads)

Returns
The stolen element, or std::nullopt if empty or contention

Time Complexity: O(1)

Note
This method can be called concurrently by multiple thief threads. Uses FIFO order for fairness.

Definition at line 253 of file work_stealing_deque.h.

253 {
254 std::int64_t t = top_.load(std::memory_order_acquire);
255 std::atomic_thread_fence(std::memory_order_seq_cst);
256 std::int64_t b = bottom_.load(std::memory_order_acquire);
257
258 if (t < b) {
259 // Non-empty queue
260 circular_array<T>* a = array_.load(std::memory_order_consume);
261 T item = a->get(t);
262
263 if (!top_.compare_exchange_strong(
264 t, t + 1,
265 std::memory_order_seq_cst,
266 std::memory_order_relaxed)) {
267 // Lost race with another thief or owner
268 return std::nullopt;
269 }
270 return item;
271 }
272 return std::nullopt;
273 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_, kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, kcenon::thread::lockfree::circular_array< T >::get(), and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

Here is the call graph for this function:

◆ steal_batch()

template<typename T >
std::vector< T > kcenon::thread::lockfree::work_stealing_deque< T >::steal_batch ( std::size_t max_count)
inlinenodiscard

Steal multiple elements from the top of the deque (thief threads)

Parameters
max_countMaximum number of elements to steal
Returns
Vector of stolen elements (may be empty or smaller than max_count)

Time Complexity: O(max_count)

This method attempts to steal up to max_count elements atomically. The batch steal uses a CAS loop to claim a range of elements from the top.

Key behaviors:

  • Returns empty vector if deque is empty or contention prevents stealing
  • May return fewer elements than requested if deque has fewer elements
  • All returned elements are guaranteed to have been successfully claimed
  • Uses FIFO order (oldest elements first)

Memory Ordering:

  • Uses seq_cst for top CAS to ensure proper synchronization
  • Uses acquire/release for reading bottom and array
Note
This method can be called concurrently by multiple thief threads. Batch stealing is more efficient than repeated single steals when multiple items need to be transferred.

Definition at line 299 of file work_stealing_deque.h.

299 {
300 if (max_count == 0) {
301 return {};
302 }
303
304 std::int64_t t = top_.load(std::memory_order_acquire);
305 std::atomic_thread_fence(std::memory_order_seq_cst);
306 std::int64_t b = bottom_.load(std::memory_order_acquire);
307
308 if (t >= b) {
309 // Empty queue
310 return {};
311 }
312
313 // Calculate how many we can actually steal
314 std::int64_t available = b - t;
315 std::size_t to_steal = std::min(
316 max_count,
317 static_cast<std::size_t>(available)
318 );
319
320 // Try to atomically claim the range [t, t + to_steal)
321 std::int64_t new_top = t + static_cast<std::int64_t>(to_steal);
322
323 if (!top_.compare_exchange_strong(
324 t, new_top,
325 std::memory_order_seq_cst,
326 std::memory_order_relaxed)) {
327 // Lost race with another thief or owner
328 // Return empty and let caller retry if needed
329 return {};
330 }
331
332 // Successfully claimed the range - now read the elements
333 // The CAS already ensured we have exclusive access to [t, new_top)
334 circular_array<T>* a = array_.load(std::memory_order_consume);
335 std::vector<T> result;
336 result.reserve(to_steal);
337
338 for (std::int64_t i = t; i < new_top; ++i) {
339 result.push_back(a->get(i));
340 }
341
342 return result;
343 }

References kcenon::thread::lockfree::work_stealing_deque< T >::array_, kcenon::thread::lockfree::work_stealing_deque< T >::bottom_, kcenon::thread::lockfree::circular_array< T >::get(), and kcenon::thread::lockfree::work_stealing_deque< T >::top_.

Here is the call graph for this function:

Member Data Documentation

◆ array_

◆ bottom_

◆ CACHE_LINE_SIZE

template<typename T >
std::size_t kcenon::thread::lockfree::work_stealing_deque< T >::CACHE_LINE_SIZE = 64
staticconstexprprivate

Definition at line 394 of file work_stealing_deque.h.

◆ LOG_INITIAL_SIZE

template<typename T >
std::size_t kcenon::thread::lockfree::work_stealing_deque< T >::LOG_INITIAL_SIZE = 5
staticconstexpr

Default initial log capacity (2^LOG_INITIAL_SIZE = 32 elements)

Definition at line 150 of file work_stealing_deque.h.

◆ old_arrays_

◆ top_


The documentation for this class was generated from the following files: