Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
work_stealing_deque.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <algorithm>
15#include <atomic>
16#include <cstdint>
17#include <memory>
18#include <optional>
19#include <vector>
20
22
32template<typename T>
34public:
39 explicit circular_array(std::size_t log_size)
40 : log_size_(log_size)
41 , size_(1ULL << log_size)
42 , mask_(size_ - 1)
43 , buffer_(new std::atomic<T>[size_]) {
44 for (std::size_t i = 0; i < size_; ++i) {
45 buffer_[i].store(T{}, std::memory_order_relaxed);
46 }
47 }
48
50 delete[] buffer_;
51 }
52
53 // Non-copyable
56
61 [[nodiscard]] std::size_t size() const noexcept {
62 return size_;
63 }
64
70 [[nodiscard]] T get(std::int64_t index) const noexcept {
71 return buffer_[index & mask_].load(std::memory_order_relaxed);
72 }
73
79 void put(std::int64_t index, T value) noexcept {
80 buffer_[index & mask_].store(value, std::memory_order_relaxed);
81 }
82
89 [[nodiscard]] circular_array* grow(std::int64_t bottom, std::int64_t top) const {
90 auto* new_array = new circular_array(log_size_ + 1);
91 for (std::int64_t i = top; i < bottom; ++i) {
92 new_array->put(i, get(i));
93 }
94 return new_array;
95 }
96
97private:
98 std::size_t log_size_;
99 std::size_t size_;
100 std::size_t mask_;
101 std::atomic<T>* buffer_;
102};
103
144template<typename T>
146public:
150 static constexpr std::size_t LOG_INITIAL_SIZE = 5;
151
156 explicit work_stealing_deque(std::size_t log_initial_size = LOG_INITIAL_SIZE)
157 : top_(0)
158 , bottom_(0)
159 , array_(new circular_array<T>(log_initial_size)) {
160 }
161
166 delete array_.load(std::memory_order_relaxed);
167 }
168
169 // Non-copyable and non-movable
174
184 void push(T item) {
185 std::int64_t b = bottom_.load(std::memory_order_relaxed);
186 std::int64_t t = top_.load(std::memory_order_acquire);
187 circular_array<T>* a = array_.load(std::memory_order_relaxed);
188
189 // Check if array needs to grow
190 if (b - t > static_cast<std::int64_t>(a->size()) - 1) {
191 // Grow the array
192 circular_array<T>* new_array = a->grow(b, t);
193 // Store old array for cleanup (in a real implementation,
194 // you would use hazard pointers or epoch-based reclamation)
195 old_arrays_.push_back(a);
196 array_.store(new_array, std::memory_order_release);
197 a = new_array;
198 }
199
200 a->put(b, item);
201 std::atomic_thread_fence(std::memory_order_release);
202 bottom_.store(b + 1, std::memory_order_relaxed);
203 }
204
214 [[nodiscard]] std::optional<T> pop() {
215 std::int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
216 circular_array<T>* a = array_.load(std::memory_order_relaxed);
217 bottom_.store(b, std::memory_order_relaxed);
218 std::atomic_thread_fence(std::memory_order_seq_cst);
219 std::int64_t t = top_.load(std::memory_order_relaxed);
220
221 if (t <= b) {
222 // Non-empty queue
223 T item = a->get(b);
224 if (t == b) {
225 // Last element - compete with thieves
226 if (!top_.compare_exchange_strong(
227 t, t + 1,
228 std::memory_order_seq_cst,
229 std::memory_order_relaxed)) {
230 // Lost race with a thief
231 bottom_.store(b + 1, std::memory_order_relaxed);
232 return std::nullopt;
233 }
234 bottom_.store(b + 1, std::memory_order_relaxed);
235 }
236 return item;
237 } else {
238 // Empty queue
239 bottom_.store(b + 1, std::memory_order_relaxed);
240 return std::nullopt;
241 }
242 }
243
253 [[nodiscard]] std::optional<T> steal() {
254 std::int64_t t = top_.load(std::memory_order_acquire);
255 std::atomic_thread_fence(std::memory_order_seq_cst);
256 std::int64_t b = bottom_.load(std::memory_order_acquire);
257
258 if (t < b) {
259 // Non-empty queue
260 circular_array<T>* a = array_.load(std::memory_order_consume);
261 T item = a->get(t);
262
263 if (!top_.compare_exchange_strong(
264 t, t + 1,
265 std::memory_order_seq_cst,
266 std::memory_order_relaxed)) {
267 // Lost race with another thief or owner
268 return std::nullopt;
269 }
270 return item;
271 }
272 return std::nullopt;
273 }
274
299 [[nodiscard]] std::vector<T> steal_batch(std::size_t max_count) {
300 if (max_count == 0) {
301 return {};
302 }
303
304 std::int64_t t = top_.load(std::memory_order_acquire);
305 std::atomic_thread_fence(std::memory_order_seq_cst);
306 std::int64_t b = bottom_.load(std::memory_order_acquire);
307
308 if (t >= b) {
309 // Empty queue
310 return {};
311 }
312
313 // Calculate how many we can actually steal
314 std::int64_t available = b - t;
315 std::size_t to_steal = std::min(
316 max_count,
317 static_cast<std::size_t>(available)
318 );
319
320 // Try to atomically claim the range [t, t + to_steal)
321 std::int64_t new_top = t + static_cast<std::int64_t>(to_steal);
322
323 if (!top_.compare_exchange_strong(
324 t, new_top,
325 std::memory_order_seq_cst,
326 std::memory_order_relaxed)) {
327 // Lost race with another thief or owner
328 // Return empty and let caller retry if needed
329 return {};
330 }
331
332 // Successfully claimed the range - now read the elements
333 // The CAS already ensured we have exclusive access to [t, new_top)
334 circular_array<T>* a = array_.load(std::memory_order_consume);
335 std::vector<T> result;
336 result.reserve(to_steal);
337
338 for (std::int64_t i = t; i < new_top; ++i) {
339 result.push_back(a->get(i));
340 }
341
342 return result;
343 }
344
352 [[nodiscard]] bool empty() const noexcept {
353 std::int64_t b = bottom_.load(std::memory_order_relaxed);
354 std::int64_t t = top_.load(std::memory_order_relaxed);
355 return b <= t;
356 }
357
364 [[nodiscard]] std::size_t size() const noexcept {
365 std::int64_t b = bottom_.load(std::memory_order_relaxed);
366 std::int64_t t = top_.load(std::memory_order_relaxed);
367 std::int64_t diff = b - t;
368 return diff > 0 ? static_cast<std::size_t>(diff) : 0;
369 }
370
375 [[nodiscard]] std::size_t capacity() const noexcept {
376 return array_.load(std::memory_order_relaxed)->size();
377 }
378
386 for (auto* old_array : old_arrays_) {
387 delete old_array;
388 }
389 old_arrays_.clear();
390 }
391
392private:
393 // Cache line padding to prevent false sharing
394 static constexpr std::size_t CACHE_LINE_SIZE = 64;
395
396 alignas(CACHE_LINE_SIZE) std::atomic<std::int64_t> top_;
397 alignas(CACHE_LINE_SIZE) std::atomic<std::int64_t> bottom_;
399
400 // Storage for old arrays (simple approach - could use hazard pointers)
401 std::vector<circular_array<T>*> old_arrays_;
402};
403
404} // namespace kcenon::thread::lockfree
Dynamic circular array for work-stealing deque.
circular_array * grow(std::int64_t bottom, std::int64_t top) const
Create a new array with double the capacity, copying elements.
void put(std::int64_t index, T value) noexcept
Store element at index with relaxed memory ordering.
circular_array(std::size_t log_size)
Constructs a circular array with given capacity.
T get(std::int64_t index) const noexcept
Get element at index with relaxed memory ordering.
std::size_t size() const noexcept
Get the capacity of the array.
circular_array(const circular_array &)=delete
circular_array & operator=(const circular_array &)=delete
Lock-free work-stealing deque based on Chase-Lev algorithm.
work_stealing_deque(const work_stealing_deque &)=delete
std::size_t capacity() const noexcept
Get the capacity of the current array.
std::size_t size() const noexcept
Get approximate size of the deque.
~work_stealing_deque()
Destructor - cleans up the circular array.
work_stealing_deque & operator=(work_stealing_deque &&)=delete
static constexpr std::size_t LOG_INITIAL_SIZE
Default initial log capacity (2^LOG_INITIAL_SIZE = 32 elements)
bool empty() const noexcept
Check if the deque appears empty.
work_stealing_deque(std::size_t log_initial_size=LOG_INITIAL_SIZE)
Constructs an empty work-stealing deque.
std::vector< T > steal_batch(std::size_t max_count)
Steal multiple elements from the top of the deque (thief threads)
work_stealing_deque(work_stealing_deque &&)=delete
std::atomic< circular_array< T > * > array_
std::optional< T > steal()
Steal an element from the top of the deque (thief threads)
std::vector< circular_array< T > * > old_arrays_
void cleanup_old_arrays()
Clear all old arrays (for memory cleanup)
work_stealing_deque & operator=(const work_stealing_deque &)=delete
void push(T item)
Push an element onto the bottom of the deque (owner only)
std::optional< T > pop()
Pop an element from the bottom of the deque (owner only)
A template class representing either a value or an error.
STL namespace.