Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
hazard_pointer.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13#pragma once
14
15#include <algorithm>
16#include <atomic>
17#include <cstddef>
18#include <functional>
19#include <memory>
20#include <vector>
21
22namespace kcenon::thread {
23
24// Forward declarations
25template <typename T>
26class hazard_pointer_domain;
27
28namespace detail {
29
33 static constexpr size_t MAX_HAZARDS_PER_THREAD = 8;
34
35 std::atomic<void*> hazards[MAX_HAZARDS_PER_THREAD];
36 thread_hazard_list* next; // Linked list of all thread lists
37 std::atomic<bool> active; // Is this thread still active?
38
39 thread_hazard_list() : next(nullptr), active(true) {
40 for (auto& h : hazards) {
41 h.store(nullptr, std::memory_order_relaxed);
42 }
43 }
44};
45
48 void* ptr;
49 std::function<void(void*)> deleter;
51
52 retire_node(void* p, std::function<void(void*)> d)
53 : ptr(p), deleter(std::move(d)), next(nullptr) {}
54};
55
59public:
61
64
66 void mark_inactive();
67
69 std::vector<void*> scan_hazard_pointers();
70
72 size_t get_active_thread_count() const;
73
74private:
76
77 std::atomic<thread_hazard_list*> head_{nullptr};
78 std::atomic<size_t> thread_count_{0};
79};
80
83public:
85
89 void add_orphaned_nodes(retire_node* head, size_t count);
90
94 size_t reclaim(const std::vector<void*>& protected_ptrs);
95
97 size_t get_orphaned_count() const;
98
99private:
101
102 std::atomic<retire_node*> head_{nullptr};
103 std::atomic<size_t> count_{0};
104};
105
106} // namespace detail
107
111public:
114
116 hazard_pointer(hazard_pointer&& other) noexcept;
117
119 hazard_pointer& operator=(hazard_pointer&& other) noexcept;
120
123
127
136 template <typename T>
137 void protect(T* ptr) noexcept {
138 if (slot_) {
139 slot_->store(static_cast<void*>(ptr), std::memory_order_seq_cst);
140 }
141 }
142
146 void reset() noexcept;
147
149 bool is_protected() const noexcept;
150
152 void* get_protected() const noexcept;
153
154private:
155 // Special marker value indicating slot is owned but not protecting anything
156 static inline const void* SLOT_OWNED_MARKER = reinterpret_cast<void*>(0x1);
157
158 std::atomic<void*>* slot_; // Pointer to thread-local hazard slot
159 size_t slot_index_; // Index in thread-local array
160};
161
164template <typename T>
166public:
169 static hazard_pointer_domain instance;
170 return instance;
171 }
172
176 return hazard_pointer();
177 }
178
182 void retire(T* ptr);
183
186 size_t reclaim();
187
195
196 stats get_stats() const;
197
200
201private:
203
204 // Thread-local retire list
206 detail::retire_node* head = nullptr;
207 size_t count = 0;
208
209 // Adaptive threshold: scales with active thread count
210 // Base threshold: 64 objects
211 // Scaling factor: 16 objects per active thread
212 // This prevents excessive memory usage under high thread churn
213 static constexpr size_t BASE_RECLAIM_THRESHOLD = 64;
214 static constexpr size_t RECLAIM_THRESHOLD_PER_THREAD = 16;
215
216 size_t get_adaptive_threshold() const {
217 auto& registry = detail::hazard_pointer_registry::instance();
218 size_t active_threads = registry.get_active_thread_count();
219 // Clamp between 64 and 512 to prevent extremes
220 return std::min(size_t(512),
221 BASE_RECLAIM_THRESHOLD + active_threads * RECLAIM_THRESHOLD_PER_THREAD);
222 }
223
224 void add(T* ptr);
225 size_t scan_and_reclaim(const std::vector<void*>& protected_ptrs);
226 void reclaim_all();
228 };
229
231 static thread_local thread_retire_list list;
232 return list;
233 }
234
235 // Statistics (atomic for thread-safety)
236 mutable std::atomic<size_t> objects_retired_{0};
237 mutable std::atomic<size_t> objects_reclaimed_{0};
238 mutable std::atomic<size_t> scan_count_{0};
239};
240
241// Template implementations
242
243template <typename T>
245 if (!ptr) {
246 return;
247 }
248
249 auto& retire_list = get_thread_retire_list();
250 retire_list.add(ptr);
251 objects_retired_.fetch_add(1, std::memory_order_relaxed);
252
253 // Check if we should run reclamation using adaptive threshold
254 // Threshold scales with active thread count to prevent excessive memory
255 if (retire_list.count >= retire_list.get_adaptive_threshold()) {
256 reclaim();
257 }
258}
259
260template <typename T>
262 scan_count_.fetch_add(1, std::memory_order_relaxed);
263
264 // Scan all hazard pointers to get protected set
266 auto protected_ptrs = registry.scan_hazard_pointers();
267
268 // Reclaim objects not in protected set
269 auto& retire_list = get_thread_retire_list();
270 size_t reclaimed = retire_list.scan_and_reclaim(protected_ptrs);
271
272 // Also try to reclaim from global orphanage
273 // We can do this every time or periodically. Doing it every time ensures faster cleanup.
274 reclaimed += detail::global_reclamation_manager::instance().reclaim(protected_ptrs);
275
276 objects_reclaimed_.fetch_add(reclaimed, std::memory_order_relaxed);
277 return reclaimed;
278}
279
280template <typename T>
283
284 return stats{.hazard_pointers_allocated = registry.get_active_thread_count() *
286 .objects_retired = objects_retired_.load(std::memory_order_relaxed),
287 .objects_reclaimed = objects_reclaimed_.load(std::memory_order_relaxed),
288 .scan_count = scan_count_.load(std::memory_order_relaxed)};
289}
290
291template <typename T>
293 // Ensure all objects are reclaimed before domain destruction
294 auto& retire_list = get_thread_retire_list();
295 retire_list.reclaim_all();
296}
297
298template <typename T>
300 auto* node = new detail::retire_node(static_cast<void*>(ptr),
301 [](void* p) { delete static_cast<T*>(p); });
302
303 node->next = head;
304 head = node;
305 ++count;
306}
307
308template <typename T>
310 const std::vector<void*>& protected_ptrs) {
311 size_t reclaimed = 0;
312 detail::retire_node** curr = &head;
313
314 while (*curr) {
315 // Check if this pointer is protected
316 bool is_protected = false;
317 // Binary search since protected_ptrs is sorted
318 if (std::binary_search(protected_ptrs.begin(), protected_ptrs.end(), (*curr)->ptr)) {
319 is_protected = true;
320 }
321
322 if (!is_protected) {
323 // Safe to reclaim
324 detail::retire_node* to_delete = *curr;
325 *curr = (*curr)->next;
326
327 to_delete->deleter(to_delete->ptr);
328 delete to_delete;
329
330 ++reclaimed;
331 --count;
332 } else {
333 // Keep in list
334 curr = &(*curr)->next;
335 }
336 }
337
338 return reclaimed;
339}
340
341template <typename T>
343 // 1. Try to reclaim what we can locally
345 auto protected_ptrs = registry.scan_hazard_pointers();
346 scan_and_reclaim(protected_ptrs);
347
348 // 2. Transfer ANY remaining nodes to GlobalReclamationManager
349 // These are nodes that are still protected by other threads.
350 // Instead of leaking them (or force deleting them which causes UAF),
351 // we give them to the global manager to clean up later.
352 if (head) {
354 head = nullptr;
355 count = 0;
356 }
357}
358
359template <typename T>
363
364} // namespace kcenon::thread
Global manager for orphaned nodes from terminated threads.
void add_orphaned_nodes(retire_node *head, size_t count)
Add a list of retired nodes to the global orphanage.
static global_reclamation_manager & instance()
size_t reclaim(const std::vector< void * > &protected_ptrs)
Reclaim orphaned nodes that are no longer protected.
Global hazard pointer registry Manages all thread-local hazard lists.
static hazard_pointer_registry & instance()
std::atomic< thread_hazard_list * > head_
std::vector< void * > scan_hazard_pointers()
Scan all hazard pointers and collect protected pointers.
thread_hazard_list * get_thread_list()
Get or create thread-local hazard list.
void mark_inactive()
Mark current thread's list as inactive.
size_t get_active_thread_count() const
Get total number of active threads.
Domain managing hazard pointers and retirement for a specific type.
hazard_pointer acquire()
Acquire a hazard pointer for this domain.
static thread_retire_list & get_thread_retire_list()
size_t reclaim()
Force reclamation scan (optional, for testing)
void retire(T *ptr)
Retire an object for later reclamation.
~hazard_pointer_domain()
Destructor - ensures all retire objects are reclaimed.
static hazard_pointer_domain & global()
Get the global domain instance for type T.
Single hazard pointer that protects one object from reclamation Uses RAII pattern - automatically rel...
void reset() noexcept
Release protection.
~hazard_pointer()
Destructor - automatically releases protection.
hazard_pointer & operator=(const hazard_pointer &)=delete
static const void * SLOT_OWNED_MARKER
void protect(T *ptr) noexcept
Protect a pointer from reclamation.
void * get_protected() const noexcept
Get the protected pointer (may be null)
bool is_protected() const noexcept
Check if currently protecting a pointer.
hazard_pointer(const hazard_pointer &)=delete
Non-copyable.
hazard_pointer()
Default constructor - acquires a hazard pointer slot.
hazard_pointer & operator=(hazard_pointer &&other) noexcept
Move assignment.
std::atomic< void * > * slot_
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Retire node for pending deletion.
retire_node(void *p, std::function< void(void *)> d)
std::function< void(void *)> deleter
Thread-local hazard pointer list Each thread maintains a small array of hazard pointers.
static constexpr size_t MAX_HAZARDS_PER_THREAD
std::atomic< void * > hazards[MAX_HAZARDS_PER_THREAD]
size_t scan_and_reclaim(const std::vector< void * > &protected_ptrs)