Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
atomic_wait.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
20#include <atomic>
21#include <mutex>
22#include <condition_variable>
23#include <thread>
24#include <chrono>
25#include <type_traits>
26
27#ifdef USE_STD_CONCEPTS
28#include <concepts>
29#endif
30
31#ifdef HAS_STD_ATOMIC_WAIT
32// C++20: std::atomic already has wait/notify
33// No additional implementation needed
34#else
35
36namespace kcenon::thread {
37
62template<typename T>
64 static_assert(std::is_trivially_copyable<T>::value,
65 "T must be trivially copyable for atomic operations");
66
67public:
81 void wait(std::atomic<T>& atomic, T old) noexcept {
82 // Phase 1: Short spin-wait
83 // Rationale: Many atomic operations complete quickly, avoiding syscall overhead
84 constexpr int SPIN_COUNT = 40;
85 for (int i = 0; i < SPIN_COUNT; ++i) {
86 if (atomic.load(std::memory_order_acquire) != old) {
87 return;
88 }
89 // Hint to CPU: this is a spin-wait loop
90 #if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86)
91 __builtin_ia32_pause();
92 #elif defined(__aarch64__) || defined(_M_ARM64)
93 __asm__ __volatile__("yield" ::: "memory");
94 #else
95 std::this_thread::yield();
96 #endif
97 }
98
99 // Phase 2: Exponential backoff with brief sleeps
100 // Rationale: Longer operations benefit from yielding CPU
101 constexpr int BACKOFF_COUNT = 5;
102 auto backoff_time = std::chrono::microseconds(1);
103
104 for (int i = 0; i < BACKOFF_COUNT; ++i) {
105 if (atomic.load(std::memory_order_acquire) != old) {
106 return;
107 }
108 std::this_thread::sleep_for(backoff_time);
109 backoff_time *= 2; // Exponential backoff
110 }
111
112 // Phase 3: Blocking wait
113 // Rationale: Value hasn't changed after spin+backoff, use efficient blocking
114 std::unique_lock<std::mutex> lock(mutex_);
115 cv_.wait(lock, [&] {
116 return atomic.load(std::memory_order_acquire) != old;
117 });
118 }
119
126 void notify_one() noexcept {
127 // Lock-free fast path: if no waiters, skip mutex
128 // Note: This is an optimization, correctness doesn't depend on it
129 std::lock_guard<std::mutex> lock(mutex_);
130 cv_.notify_one();
131 }
132
138 void notify_all() noexcept {
139 std::lock_guard<std::mutex> lock(mutex_);
140 cv_.notify_all();
141 }
142
143private:
144 mutable std::mutex mutex_;
145 mutable std::condition_variable cv_;
146};
147
168template<typename T>
170public:
171 atomic_with_wait() noexcept : value_(T{}) {}
172 explicit atomic_with_wait(T initial) noexcept : value_(initial) {}
173
174 // Disable copy/move (consistent with std::atomic)
177
178 // Basic atomic operations
179 T load(std::memory_order order = std::memory_order_seq_cst) const noexcept {
180 return value_.load(order);
181 }
182
183 void store(T desired, std::memory_order order = std::memory_order_seq_cst) noexcept {
184 value_.store(desired, order);
185 }
186
187 T exchange(T desired, std::memory_order order = std::memory_order_seq_cst) noexcept {
188 return value_.exchange(desired, order);
189 }
190
191 bool compare_exchange_weak(T& expected, T desired,
192 std::memory_order order = std::memory_order_seq_cst) noexcept {
193 return value_.compare_exchange_weak(expected, desired, order);
194 }
195
196 bool compare_exchange_strong(T& expected, T desired,
197 std::memory_order order = std::memory_order_seq_cst) noexcept {
198 return value_.compare_exchange_strong(expected, desired, order);
199 }
200
201 // Wait/notify operations
202 void wait(T old, std::memory_order order = std::memory_order_seq_cst) noexcept {
203 waiter_.wait(value_, old);
204 }
205
206 void notify_one() noexcept {
207 waiter_.notify_one();
208 }
209
210 void notify_all() noexcept {
211 waiter_.notify_all();
212 }
213
214 // Conversion operator
215 operator T() const noexcept {
216 return load();
217 }
218
219 // Assignment operator
220 T operator=(T desired) noexcept {
221 store(desired);
222 return desired;
223 }
224
225 // Atomic arithmetic operations (for integral types)
226#ifdef USE_STD_CONCEPTS
227 T fetch_add(T arg, std::memory_order order = std::memory_order_seq_cst) noexcept
228 requires std::integral<T>
229 {
230 return value_.fetch_add(arg, order);
231 }
232
233 T fetch_sub(T arg, std::memory_order order = std::memory_order_seq_cst) noexcept
234 requires std::integral<T>
235 {
236 return value_.fetch_sub(arg, order);
237 }
238
239 T operator++() noexcept
240 requires std::integral<T>
241 {
242 return fetch_add(1) + 1;
243 }
244
245 T operator++(int) noexcept
246 requires std::integral<T>
247 {
248 return fetch_add(1);
249 }
250
251 T operator--() noexcept
252 requires std::integral<T>
253 {
254 return fetch_sub(1) - 1;
255 }
256
257 T operator--(int) noexcept
258 requires std::integral<T>
259 {
260 return fetch_sub(1);
261 }
262#else
263 // C++17 fallback using SFINAE
264 template<typename U = T>
265 typename std::enable_if<std::is_integral<U>::value, U>::type
266 fetch_add(T arg, std::memory_order order = std::memory_order_seq_cst) noexcept {
267 return value_.fetch_add(arg, order);
268 }
269
270 template<typename U = T>
271 typename std::enable_if<std::is_integral<U>::value, U>::type
272 fetch_sub(T arg, std::memory_order order = std::memory_order_seq_cst) noexcept {
273 return value_.fetch_sub(arg, order);
274 }
275
276 template<typename U = T>
277 typename std::enable_if<std::is_integral<U>::value, U>::type
278 operator++() noexcept {
279 return fetch_add(1) + 1;
280 }
281
282 template<typename U = T>
283 typename std::enable_if<std::is_integral<U>::value, U>::type
284 operator++(int) noexcept {
285 return fetch_add(1);
286 }
287
288 template<typename U = T>
289 typename std::enable_if<std::is_integral<U>::value, U>::type
290 operator--() noexcept {
291 return fetch_sub(1) - 1;
292 }
293
294 template<typename U = T>
295 typename std::enable_if<std::is_integral<U>::value, U>::type
296 operator--(int) noexcept {
297 return fetch_sub(1);
298 }
299#endif
300
301private:
302 std::atomic<T> value_;
304};
305
306} // namespace kcenon::thread
307
308#endif // HAS_STD_ATOMIC_WAIT
Helper class to add wait/notify functionality to std::atomic.
Definition atomic_wait.h:63
void notify_all() noexcept
Unblocks all threads waiting on this atomic.
std::condition_variable cv_
void notify_one() noexcept
Unblocks one thread waiting on this atomic.
void wait(std::atomic< T > &atomic, T old) noexcept
Blocks until the atomic value differs from the expected value.
Definition atomic_wait.h:81
RAII wrapper to add wait/notify to atomic variables.
std::enable_if< std::is_integral< U >::value, U >::type operator--(int) noexcept
void store(T desired, std::memory_order order=std::memory_order_seq_cst) noexcept
T load(std::memory_order order=std::memory_order_seq_cst) const noexcept
void wait(T old, std::memory_order order=std::memory_order_seq_cst) noexcept
atomic_with_wait(T initial) noexcept
T operator=(T desired) noexcept
bool compare_exchange_weak(T &expected, T desired, std::memory_order order=std::memory_order_seq_cst) noexcept
atomic_wait_helper< T > waiter_
std::enable_if< std::is_integral< U >::value, U >::type operator++(int) noexcept
std::enable_if< std::is_integral< U >::value, U >::type operator--() noexcept
T exchange(T desired, std::memory_order order=std::memory_order_seq_cst) noexcept
bool compare_exchange_strong(T &expected, T desired, std::memory_order order=std::memory_order_seq_cst) noexcept
std::enable_if< std::is_integral< U >::value, U >::type fetch_add(T arg, std::memory_order order=std::memory_order_seq_cst) noexcept
atomic_with_wait(const atomic_with_wait &)=delete
std::enable_if< std::is_integral< U >::value, U >::type operator++() noexcept
atomic_with_wait & operator=(const atomic_with_wait &)=delete
std::enable_if< std::is_integral< U >::value, U >::type fetch_sub(T arg, std::memory_order order=std::memory_order_seq_cst) noexcept
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.