Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
safe_hazard_pointer.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <algorithm>
15#include <array>
16#include <atomic>
17#include <cassert>
18#include <functional>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <unordered_set>
23#include <vector>
24
25namespace kcenon::thread {
26
35public:
36 static constexpr size_t MAX_HAZARD_POINTERS = 2;
37
39 for (auto& hp : hazard_pointers_) {
40 hp.store(nullptr, std::memory_order_relaxed);
41 }
42 next.store(nullptr, std::memory_order_relaxed);
43 active.store(false, std::memory_order_relaxed);
44 }
45
54 void protect(void* p, size_t slot = 0) noexcept {
55 assert(slot < MAX_HAZARD_POINTERS);
56 hazard_pointers_[slot].store(p, std::memory_order_release);
57 }
58
66 void clear(size_t slot = 0) noexcept {
67 assert(slot < MAX_HAZARD_POINTERS);
68 hazard_pointers_[slot].store(nullptr, std::memory_order_release);
69 }
70
78 [[nodiscard]] bool contains(void* p) const noexcept {
79 for (const auto& hp : hazard_pointers_) {
80 if (hp.load(std::memory_order_acquire) == p) {
81 return true;
82 }
83 }
84 return false;
85 }
86
92 [[nodiscard]] void* get(size_t slot = 0) const noexcept {
93 assert(slot < MAX_HAZARD_POINTERS);
94 return hazard_pointers_[slot].load(std::memory_order_acquire);
95 }
96
97 // Linked list for global registry
98 std::atomic<safe_hazard_pointer_record*> next{nullptr};
99 std::atomic<bool> active{false};
100
101private:
102 std::array<std::atomic<void*>, MAX_HAZARD_POINTERS> hazard_pointers_;
103};
104
115public:
116 using retire_callback = std::function<void(void*)>;
117
122 static safe_hazard_pointer_domain domain;
123 return domain;
124 }
125
133 // 1. Try to reuse an inactive record first
134 safe_hazard_pointer_record* p = head_.load(std::memory_order_acquire);
135 while (p != nullptr) {
136 bool expected = false;
137 if (p->active.compare_exchange_strong(
138 expected, true,
139 std::memory_order_acq_rel,
140 std::memory_order_relaxed)) {
141 // Clear hazard pointers immediately after acquiring to avoid
142 // stale pointers from previous use affecting collect()
143 p->clear(0);
144 p->clear(1);
145 active_count_.fetch_add(1, std::memory_order_relaxed);
146 return p;
147 }
148 p = p->next.load(std::memory_order_acquire);
149 }
150
151 // 2. Create new record (constructor already clears hazard pointers)
152 auto* new_record = new safe_hazard_pointer_record();
153 new_record->active.store(true, std::memory_order_relaxed);
154
155 // 3. Add to list (lock-free)
157 do {
158 old_head = head_.load(std::memory_order_relaxed);
159 new_record->next.store(old_head, std::memory_order_relaxed);
160 } while (!head_.compare_exchange_weak(
161 old_head, new_record,
162 std::memory_order_release,
163 std::memory_order_relaxed));
164
165 active_count_.fetch_add(1, std::memory_order_relaxed);
166 return new_record;
167 }
168
175 void release(safe_hazard_pointer_record* record) noexcept {
176 if (record == nullptr) {
177 return;
178 }
179
180 record->clear(0);
181 record->clear(1);
182 record->active.store(false, std::memory_order_release);
183 active_count_.fetch_sub(1, std::memory_order_relaxed);
184 }
185
194 void retire(void* p, retire_callback deleter) {
195 if (p == nullptr) {
196 return;
197 }
198
199 bool should_collect = false;
200 {
201 std::lock_guard<std::mutex> lock(retire_mutex_);
202
203 // Remove any existing entry with the same address to handle memory reuse.
204 // This can happen when memory is freed and reallocated to the same address.
205 // In this case, the old entry's deleter must NOT be called since the memory
206 // is now occupied by a new valid object.
207 auto it = std::remove_if(retired_list_.begin(), retired_list_.end(),
208 [p](const auto& entry) { return entry.first == p; });
209 if (it != retired_list_.end()) {
210 size_t removed = std::distance(it, retired_list_.end());
211 retired_list_.erase(it, retired_list_.end());
212 retired_count_.fetch_sub(removed, std::memory_order_relaxed);
213 }
214
215 retired_list_.emplace_back(p, std::move(deleter));
216 retired_count_.fetch_add(1, std::memory_order_relaxed);
217
218 // Check threshold inside lock to avoid race
219 size_t threshold = get_adaptive_threshold();
220 should_collect = (retired_count_.load(std::memory_order_relaxed) >= threshold);
221 }
222
223 // Trigger collection after releasing lock to avoid deadlock
224 if (should_collect) {
225 collect();
226 }
227 }
228
235 void collect() {
236 std::lock_guard<std::mutex> lock(retire_mutex_);
238 }
239
243 [[nodiscard]] size_t retired_count() const noexcept {
244 return retired_count_.load(std::memory_order_relaxed);
245 }
246
250 [[nodiscard]] size_t active_count() const noexcept {
251 return active_count_.load(std::memory_order_relaxed);
252 }
253
254private:
256
258 // Delete all records
259 auto* p = head_.load(std::memory_order_relaxed);
260 while (p != nullptr) {
261 auto* next = p->next.load(std::memory_order_relaxed);
262 delete p;
263 p = next;
264 }
265
266 // Force delete remaining retired objects
267 for (auto& [ptr, deleter] : retired_list_) {
268 if (deleter && ptr) {
269 deleter(ptr);
270 }
271 }
272 }
273
274 // Prevent copying
277
282 if (retired_list_.empty()) {
283 return;
284 }
285
286 // Gather all currently protected pointers
287 // IMPORTANT: Check ALL records, not just active ones, to avoid race condition
288 // where a record is being reused while we're scanning.
289 // The hazard pointer value is set before active=true, so we must check
290 // the pointer value itself, not the active flag.
291 std::unordered_set<void*> hazards;
292 hazards.reserve(active_count_.load(std::memory_order_relaxed) *
294
295 safe_hazard_pointer_record* p = head_.load(std::memory_order_acquire);
296 while (p != nullptr) {
297 // Check all records regardless of active status to avoid race condition
298 for (size_t i = 0; i < safe_hazard_pointer_record::MAX_HAZARD_POINTERS; ++i) {
299 void* hp = p->get(i);
300 if (hp != nullptr) {
301 hazards.insert(hp);
302 }
303 }
304 p = p->next.load(std::memory_order_acquire);
305 }
306
307 // Delete objects not in hazard set
308 size_t reclaimed = 0;
309 auto it = retired_list_.begin();
310 while (it != retired_list_.end()) {
311 if (hazards.find(it->first) == hazards.end()) {
312 // Safe to delete
313 if (it->second && it->first) {
314 it->second(it->first);
315 }
316 it = retired_list_.erase(it);
317 ++reclaimed;
318 } else {
319 ++it;
320 }
321 }
322
323 retired_count_.fetch_sub(reclaimed, std::memory_order_relaxed);
324 }
325
329 [[nodiscard]] size_t get_adaptive_threshold() const noexcept {
330 static constexpr size_t BASE_THRESHOLD = 64;
331 static constexpr size_t PER_THREAD_THRESHOLD = 16;
332 static constexpr size_t MAX_THRESHOLD = 512;
333
334 size_t active = active_count_.load(std::memory_order_relaxed);
335 return std::min(MAX_THRESHOLD, BASE_THRESHOLD + active * PER_THREAD_THRESHOLD);
336 }
337
338 std::atomic<safe_hazard_pointer_record*> head_{nullptr};
339 std::atomic<size_t> active_count_{0};
340 std::atomic<size_t> retired_count_{0};
341 std::mutex retire_mutex_;
342 std::vector<std::pair<void*, retire_callback>> retired_list_;
343};
344
361public:
367 explicit safe_hazard_guard(void* p = nullptr, size_t slot = 0)
368 : record_(safe_hazard_pointer_domain::instance().acquire())
369 , slot_(slot) {
370 if (p != nullptr) {
371 record_->protect(p, slot_);
372 }
373 }
374
383
384 // Non-copyable
387
388 // Movable
390 : record_(other.record_)
391 , slot_(other.slot_) {
392 other.record_ = nullptr;
393 }
394
396 if (this != &other) {
397 if (record_ != nullptr) {
399 }
400 record_ = other.record_;
401 slot_ = other.slot_;
402 other.record_ = nullptr;
403 }
404 return *this;
405 }
406
411 void protect(void* p) noexcept {
412 if (record_ != nullptr) {
413 record_->protect(p, slot_);
414 }
415 }
416
420 void clear() noexcept {
421 if (record_ != nullptr) {
423 }
424 }
425
429 [[nodiscard]] void* get() const noexcept {
430 return record_ != nullptr ? record_->get(slot_) : nullptr;
431 }
432
436 [[nodiscard]] explicit operator bool() const noexcept {
437 return record_ != nullptr;
438 }
439
440private:
442 size_t slot_;
443};
444
459template<typename T>
461 if (p == nullptr) {
462 return;
463 }
465 p,
466 [](void* ptr) { delete static_cast<T*>(ptr); }
467 );
468}
469
477template<typename T>
479public:
481 static typed_safe_hazard_domain domain;
482 return domain;
483 }
484
485 void retire(T* p) {
487 }
488
492
493private:
495};
496
497} // namespace kcenon::thread
~safe_hazard_guard()
Destructor - releases the record.
safe_hazard_guard(safe_hazard_guard &&other) noexcept
void * get() const noexcept
Get protected pointer.
void protect(void *p) noexcept
Protect a pointer.
void clear() noexcept
Clear protection.
safe_hazard_guard & operator=(const safe_hazard_guard &)=delete
safe_hazard_guard & operator=(safe_hazard_guard &&other) noexcept
safe_hazard_guard(void *p=nullptr, size_t slot=0)
Construct guard, optionally protecting a pointer.
safe_hazard_pointer_record * record_
safe_hazard_guard(const safe_hazard_guard &)=delete
Global Hazard Pointer Domain Manager.
size_t active_count() const noexcept
Get active thread count.
std::atomic< safe_hazard_pointer_record * > head_
safe_hazard_pointer_domain(const safe_hazard_pointer_domain &)=delete
void collect()
Collect reclaimable objects.
static safe_hazard_pointer_domain & instance()
Get singleton instance.
void retire(void *p, retire_callback deleter)
Retire a pointer for later reclamation.
size_t retired_count() const noexcept
Get current retired count.
safe_hazard_pointer_domain & operator=(const safe_hazard_pointer_domain &)=delete
std::vector< std::pair< void *, retire_callback > > retired_list_
void release(safe_hazard_pointer_record *record) noexcept
Release a hazard pointer record.
safe_hazard_pointer_record * acquire()
Acquire a hazard pointer record for current thread.
size_t get_adaptive_threshold() const noexcept
Get adaptive threshold based on active thread count.
void collect_internal()
Collect reclaimable objects (internal, lock held)
Thread-local hazard pointer record with explicit memory ordering.
std::atomic< safe_hazard_pointer_record * > next
bool contains(void *p) const noexcept
Check if a pointer is protected by this record.
std::array< std::atomic< void * >, MAX_HAZARD_POINTERS > hazard_pointers_
void clear(size_t slot=0) noexcept
Clear hazard pointer protection.
void protect(void *p, size_t slot=0) noexcept
Protect a pointer from reclamation.
void * get(size_t slot=0) const noexcept
Get protected pointer at slot.
static typed_safe_hazard_domain & instance()
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ active
Worker is actively processing jobs.
void safe_retire_hazard(T *p)