Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
hazard_pointer.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <mutex>
9#include <stdexcept>
10
11namespace kcenon::thread {
12
13namespace detail {
14
15// Global registry singleton
20
21// Get or create thread-local hazard list
23 // Thread-local storage for this thread's hazard list
24 static thread_local thread_hazard_list* thread_list = nullptr;
25
26 if (thread_list == nullptr) {
27 // Try to reuse an inactive list first to prevent unbounded memory growth
28 thread_hazard_list* curr = head_.load(std::memory_order_acquire);
29 while (curr) {
30 bool expected = false;
31 // Try to claim an inactive list
32 // Use acq_rel: acquire to see prior hazard pointer clears,
33 // release to publish active=true to scanning threads
34 if (curr->active.compare_exchange_strong(expected, true, std::memory_order_acq_rel,
35 std::memory_order_relaxed)) {
36 thread_list = curr;
37 thread_count_.fetch_add(1, std::memory_order_relaxed);
38 break;
39 }
40 curr = curr->next;
41 }
42
43 // If no inactive list found, allocate a new one
44 if (thread_list == nullptr) {
45 thread_list = new thread_hazard_list();
46
47 // Add to global linked list
48 // Use acq_rel on CAS: acquire to read current list, release to
49 // publish the new node so that scanning threads see it
50 thread_hazard_list* old_head = head_.load(std::memory_order_acquire);
51 do {
52 thread_list->next = old_head;
53 } while (!head_.compare_exchange_weak(old_head, thread_list, std::memory_order_acq_rel,
54 std::memory_order_relaxed));
55
56 thread_count_.fetch_add(1, std::memory_order_relaxed);
57 }
58
59 // Register thread cleanup
60 static thread_local struct thread_cleanup {
62
63 ~thread_cleanup() {
64 if (list) {
65 hazard_pointer_registry::instance().mark_inactive();
66 }
67 }
68 } cleanup{thread_list};
69 }
70
71 return thread_list;
72}
73
74// Mark current thread's list as inactive
76 static thread_local thread_hazard_list* thread_list = get_thread_list();
77
78 if (thread_list) {
79 // Clear all hazard pointers
80 for (auto& h : thread_list->hazards) {
81 h.store(nullptr, std::memory_order_release);
82 }
83
84 // Mark as inactive (using release to ensure visibility)
85 thread_list->active.store(false, std::memory_order_release);
86 thread_count_.fetch_sub(1, std::memory_order_relaxed);
87 }
88}
89
90// Scan all hazard pointers and collect protected pointers
92 std::vector<void*> protected_ptrs;
93 protected_ptrs.reserve(256); // Pre-allocate reasonable size
94
95 // Marker value for owned but not protecting slots
96 const void* SLOT_OWNED_MARKER = reinterpret_cast<void*>(0x1);
97
98 // Periodically clean up inactive thread lists
99 // Use scan counter to avoid overhead on every scan
100 static thread_local size_t scan_counter = 0;
101 static constexpr size_t CLEANUP_INTERVAL = 100; // Clean every 100 scans
102 bool should_cleanup = (++scan_counter % CLEANUP_INTERVAL == 0);
103
104 // Traverse all thread lists
105 thread_hazard_list* curr = head_.load(std::memory_order_acquire);
106 size_t inactive_count = 0;
107
108 while (curr) {
109 // IMPORTANT: Scan ALL records regardless of active status.
110 // A thread may be in the process of setting a hazard pointer before
111 // setting active=true (race window in get_thread_list). If we skip
112 // inactive records, we might miss a valid hazard pointer and
113 // prematurely reclaim a protected node.
114 // This matches the approach used in safe_hazard_pointer.h.
115 for (auto& hazard : curr->hazards) {
116 void* ptr = hazard.load(std::memory_order_acquire);
117 // Only add if it's a real pointer (not nullptr or SLOT_OWNED_MARKER)
118 if (ptr != nullptr && ptr != SLOT_OWNED_MARKER) {
119 protected_ptrs.push_back(ptr);
120 }
121 }
122
123 if (!curr->active.load(std::memory_order_acquire)) {
124 ++inactive_count;
125 }
126
127 curr = curr->next;
128 }
129
130 // Sort for efficient binary search in scan_and_reclaim
131 // O(N log N) but enables O(log N) lookups later
132 std::sort(protected_ptrs.begin(), protected_ptrs.end());
133
134 // Remove duplicates to minimize search space
135 // Multiple threads may protect the same pointer
136 protected_ptrs.erase(std::unique(protected_ptrs.begin(), protected_ptrs.end()),
137 protected_ptrs.end());
138
139 return protected_ptrs;
140}
141
142// Get total number of active threads
144 return thread_count_.load(std::memory_order_relaxed);
145}
146
147// Global reclamation manager implementation
152
154 if (!head)
155 return;
156
157 // Find tail of the new list
158 retire_node* tail = head;
159 while (tail->next) {
160 tail = tail->next;
161 }
162
163 // Atomically prepend to the global list
164 // Use acq_rel: acquire to see the current list, release to publish
165 // the new nodes so that reclaim() sees them
166 retire_node* old_head = head_.load(std::memory_order_acquire);
167 do {
168 tail->next = old_head;
169 } while (!head_.compare_exchange_weak(old_head, head, std::memory_order_acq_rel,
170 std::memory_order_relaxed));
171
172 count_.fetch_add(count, std::memory_order_relaxed);
173}
174
175size_t global_reclamation_manager::reclaim(const std::vector<void*>& protected_ptrs) {
176 // Take the entire list to process
177 // Use acq_rel: acquire to see all node data, release to publish
178 // the nullptr so concurrent add_orphaned_nodes sees it
179 retire_node* curr = head_.exchange(nullptr, std::memory_order_acq_rel);
180 if (!curr)
181 return 0;
182
183 size_t reclaimed = 0;
184 retire_node* keep_head = nullptr;
185 retire_node* keep_tail = nullptr;
186 size_t keep_count = 0;
187
188 while (curr) {
189 retire_node* next = curr->next;
190 bool is_protected = false;
191
192 // Check if protected
193 // Binary search since protected_ptrs is sorted
194 if (std::binary_search(protected_ptrs.begin(), protected_ptrs.end(), curr->ptr)) {
195 is_protected = true;
196 }
197
198 if (!is_protected) {
199 // Safe to delete
200 curr->deleter(curr->ptr);
201 delete curr;
202 ++reclaimed;
203 } else {
204 // Keep node
205 if (!keep_head) {
206 keep_head = curr;
207 keep_tail = curr;
208 } else {
209 keep_tail->next = curr;
210 keep_tail = curr;
211 }
212 curr->next = nullptr;
213 ++keep_count;
214 }
215
216 curr = next;
217 }
218
219 // Subtract reclaimed count (not kept, since kept nodes will be re-added
220 // via add_orphaned_nodes which increments count_ itself)
221 // We need to subtract the total taken (reclaimed + keep_count) because
222 // add_orphaned_nodes will add keep_count back
223 count_.fetch_sub(reclaimed + keep_count, std::memory_order_relaxed);
224
225 // Add back kept nodes (this will add keep_count to count_)
226 if (keep_head) {
227 add_orphaned_nodes(keep_head, keep_count);
228 }
229
230 return reclaimed;
231}
232
234 return count_.load(std::memory_order_relaxed);
235}
236
237} // namespace detail
238
239// hazard_pointer implementation
240
241hazard_pointer::hazard_pointer() : slot_(nullptr), slot_index_(0) {
243
244 // Find an available slot (slot value is nullptr means available)
245 for (size_t i = 0; i < detail::thread_hazard_list::MAX_HAZARDS_PER_THREAD; ++i) {
246 void* expected = nullptr;
247 // Try to claim this slot with SLOT_OWNED_MARKER
248 // Use acq_rel: acquire to synchronize with prior release of the slot,
249 // release to publish ownership to scanning threads
250 if (thread_list->hazards[i].compare_exchange_strong(
251 expected, const_cast<void*>(SLOT_OWNED_MARKER), std::memory_order_acq_rel,
252 std::memory_order_relaxed)) {
253 slot_ = &thread_list->hazards[i];
254 slot_index_ = i;
255 // Slot is now owned with SLOT_OWNED_MARKER
256 return;
257 }
258 }
259
260 throw std::runtime_error("Hazard pointer slots exhausted");
261}
262
264 : slot_(other.slot_), slot_index_(other.slot_index_) {
265 other.slot_ = nullptr;
266 other.slot_index_ = 0;
267}
268
270 if (this != &other) {
271 reset();
272
273 slot_ = other.slot_;
274 slot_index_ = other.slot_index_;
275
276 other.slot_ = nullptr;
277 other.slot_index_ = 0;
278 }
279 return *this;
280}
281
283 if (slot_) {
284 // Release the slot back to the pool
285 slot_->store(nullptr, std::memory_order_release);
286 }
287}
288
289void hazard_pointer::reset() noexcept {
290 if (slot_) {
291 // Clear protection but keep slot owned
292 slot_->store(const_cast<void*>(SLOT_OWNED_MARKER), std::memory_order_release);
293 }
294}
295
296bool hazard_pointer::is_protected() const noexcept {
297 if (!slot_) {
298 return false;
299 }
300 void* ptr = slot_->load(std::memory_order_acquire);
301 return ptr != nullptr && ptr != SLOT_OWNED_MARKER;
302}
303
304void* hazard_pointer::get_protected() const noexcept {
305 if (!slot_) {
306 return nullptr;
307 }
308 void* ptr = slot_->load(std::memory_order_acquire);
309 // Return nullptr if slot is just owned but not protecting anything
310 return (ptr == SLOT_OWNED_MARKER) ? nullptr : ptr;
311}
312
313} // namespace kcenon::thread
Global manager for orphaned nodes from terminated threads.
void add_orphaned_nodes(retire_node *head, size_t count)
Add a list of retired nodes to the global orphanage.
static global_reclamation_manager & instance()
size_t reclaim(const std::vector< void * > &protected_ptrs)
Reclaim orphaned nodes that are no longer protected.
Global hazard pointer registry Manages all thread-local hazard lists.
static hazard_pointer_registry & instance()
std::atomic< thread_hazard_list * > head_
std::vector< void * > scan_hazard_pointers()
Scan all hazard pointers and collect protected pointers.
thread_hazard_list * get_thread_list()
Get or create thread-local hazard list.
void mark_inactive()
Mark current thread's list as inactive.
size_t get_active_thread_count() const
Get total number of active threads.
Single hazard pointer that protects one object from reclamation Uses RAII pattern - automatically rel...
void reset() noexcept
Release protection.
~hazard_pointer()
Destructor - automatically releases protection.
static const void * SLOT_OWNED_MARKER
void * get_protected() const noexcept
Get the protected pointer (may be null)
bool is_protected() const noexcept
Check if currently protecting a pointer.
hazard_pointer()
Default constructor - acquires a hazard pointer slot.
hazard_pointer & operator=(hazard_pointer &&other) noexcept
Move assignment.
std::atomic< void * > * slot_
Hazard pointer implementation for lock-free memory reclamation.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Retire node for pending deletion.
std::function< void(void *)> deleter
Thread-local hazard pointer list Each thread maintains a small array of hazard pointers.
static constexpr size_t MAX_HAZARDS_PER_THREAD
std::atomic< void * > hazards[MAX_HAZARDS_PER_THREAD]