Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
hazard_pointer_sample.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17#include "thread_base/lockfree/memory/hazard_pointer.h"
18#include "logger/core/logger.h"
19#include <thread>
20#include <vector>
21#include <random>
22#include <chrono>
23#include <atomic>
24
25using namespace kcenon::thread;
26
27struct TestNode {
28 std::atomic<int> data{0};
29 std::atomic<TestNode*> next{nullptr};
30
31 explicit TestNode(int value) : data(value) {}
32};
33
35private:
36 std::atomic<TestNode*> head_{nullptr};
37 hazard_pointer_manager& hp_manager_;
38
39public:
40 explicit LockFreeStack(hazard_pointer_manager& hp_mgr) : hp_manager_(hp_mgr) {}
41
43 while (auto* node = head_.load()) {
44 head_.store(node->next.load());
45 delete node;
46 }
47 }
48
49 void push(int value) {
50 auto* new_node = new TestNode(value);
51 auto* old_head = head_.load();
52
53 do {
54 new_node->next.store(old_head);
55 } while (!head_.compare_exchange_weak(old_head, new_node));
56 }
57
58 bool pop(int& result) {
59 auto hp = hp_manager_.acquire();
60
61 while (true) {
62 auto* head = hp.protect(head_);
63 if (!head) {
64 return false; // Stack is empty
65 }
66
67 auto* next = head->next.load();
68
69 // Try to update head
70 if (head_.compare_exchange_weak(head, next)) {
71 result = head->data.load();
72 hp_manager_.retire(head);
73 return true;
74 }
75 }
76 }
77};
78
80 log_module::write_information("\n=== Basic Hazard Pointer Usage Demo ===");
81
82 hazard_pointer_manager hp_manager(4, 2); // 4 threads, 2 pointers per thread
83
84 // Show initial statistics
85 auto stats = hp_manager.get_statistics();
86 log_module::write_information("Initial statistics:");
87 log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
88 log_module::write_information(" Retired list size: {}", stats.retired_list_size);
89 log_module::write_information(" Total retired: {}", stats.total_retired);
90 log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
91
92 // Create a simple atomic pointer
93 std::atomic<TestNode*> test_ptr{new TestNode(42)};
94
95 {
96 // Acquire hazard pointer and protect the object
97 auto hp = hp_manager.acquire();
98 auto* protected_ptr = hp.protect(test_ptr);
99
100 log_module::write_information("Protected pointer value: {}", protected_ptr->data.load());
101
102 // The hazard pointer automatically clears when going out of scope
103 }
104
105 // Retire the object
106 auto* node_to_retire = test_ptr.exchange(nullptr);
107 hp_manager.retire(node_to_retire);
108
109 // Force reclamation
110 hp_manager.scan_and_reclaim();
111
112 // Show final statistics
113 stats = hp_manager.get_statistics();
114 log_module::write_information("Final statistics:");
115 log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
116 log_module::write_information(" Retired list size: {}", stats.retired_list_size);
117 log_module::write_information(" Total retired: {}", stats.total_retired);
118 log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
119}
120
122 log_module::write_information("\n=== Concurrent Access Demo ===");
123
124 constexpr int NUM_THREADS = 4;
125 constexpr int OPERATIONS_PER_THREAD = 1000;
126
127 hazard_pointer_manager hp_manager(NUM_THREADS, 2);
128 LockFreeStack stack(hp_manager);
129
130 // Fill the stack initially
131 for (int i = 0; i < 100; ++i) {
132 stack.push(i);
133 }
134
135 std::atomic<int> push_count{0};
136 std::atomic<int> pop_count{0};
137 std::atomic<int> failed_pops{0};
138
139 auto start_time = std::chrono::high_resolution_clock::now();
140
141 std::vector<std::thread> threads;
142 threads.reserve(NUM_THREADS);
143
144 // Create worker threads
145 for (int thread_id = 0; thread_id < NUM_THREADS; ++thread_id) {
146 threads.emplace_back([&, thread_id]() {
147 std::random_device rd;
148 std::mt19937 gen(rd());
149 std::uniform_int_distribution<> dis(0, 1);
150
151 for (int op = 0; op < OPERATIONS_PER_THREAD; ++op) {
152 if (dis(gen) == 0) {
153 // Push operation
154 int value = thread_id * OPERATIONS_PER_THREAD + op;
155 stack.push(value);
156 push_count.fetch_add(1);
157 } else {
158 // Pop operation
159 int result;
160 if (stack.pop(result)) {
161 pop_count.fetch_add(1);
162 } else {
163 failed_pops.fetch_add(1);
164 }
165 }
166 }
167 });
168 }
169
170 // Wait for all threads to complete
171 for (auto& thread : threads) {
172 thread.join();
173 }
174
175 auto end_time = std::chrono::high_resolution_clock::now();
176 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
177
178 log_module::write_information("Concurrent operations completed in {} ms", duration.count());
179 log_module::write_information("Push operations: {}", push_count.load());
180 log_module::write_information("Successful pop operations: {}", pop_count.load());
181 log_module::write_information("Failed pop operations: {}", failed_pops.load());
182
183 // Final statistics
184 auto stats = hp_manager.get_statistics();
185 log_module::write_information("Final hazard pointer statistics:");
186 log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
187 log_module::write_information(" Retired list size: {}", stats.retired_list_size);
188 log_module::write_information(" Total retired: {}", stats.total_retired);
189 log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
190}
191
193 log_module::write_information("\n=== Memory Safety Demo ===");
194
195 hazard_pointer_manager hp_manager(2, 1);
196 std::atomic<TestNode*> shared_ptr{new TestNode(123)};
197
198 std::atomic<bool> reader_done{false};
199 std::atomic<bool> writer_done{false};
200
201 // Reader thread - tries to access the shared object
202 std::thread reader([&]() {
203 auto hp = hp_manager.acquire();
204
205 for (int i = 0; i < 100; ++i) {
206 auto* protected_ptr = hp.protect(shared_ptr);
207 if (protected_ptr) {
208 // Safe to access the object
209 volatile int value = protected_ptr->data.load();
210 (void)value; // Prevent optimization
211 std::this_thread::sleep_for(std::chrono::microseconds(10));
212 }
213 hp.clear();
214 std::this_thread::sleep_for(std::chrono::microseconds(10));
215 }
216 reader_done = true;
217 });
218
219 // Writer thread - tries to replace and retire the shared object
220 std::thread writer([&]() {
221 std::this_thread::sleep_for(std::chrono::milliseconds(10)); // Let reader start
222
223 for (int i = 0; i < 10; ++i) {
224 auto* new_node = new TestNode(456 + i);
225 auto* old_node = shared_ptr.exchange(new_node);
226
227 if (old_node) {
228 hp_manager.retire(old_node);
229 }
230
231 std::this_thread::sleep_for(std::chrono::milliseconds(5));
232 }
233 writer_done = true;
234 });
235
236 reader.join();
237 writer.join();
238
239 // Clean up remaining object
240 auto* final_node = shared_ptr.exchange(nullptr);
241 if (final_node) {
242 hp_manager.retire(final_node);
243 }
244
245 hp_manager.scan_and_reclaim();
246
247 auto stats = hp_manager.get_statistics();
248 log_module::write_information("Memory safety test completed safely!");
249 log_module::write_information("Final statistics:");
250 log_module::write_information(" Total retired: {}", stats.total_retired);
251 log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
252}
253
254int main() {
255 // Initialize logger
256 log_module::start();
257 log_module::console_target(log_module::log_types::Information);
258
259 log_module::write_information("Hazard Pointer Manager Sample");
260 log_module::write_information("=============================");
261
262 try {
266
267 log_module::write_information("\n=== All demos completed successfully! ===");
268
269 } catch (const std::exception& e) {
270 log_module::write_error("Error: {}", e.what());
271 log_module::stop();
272 return 1;
273 }
274
275 // Cleanup logger
276 log_module::stop();
277 return 0;
278}
LockFreeStack(hazard_pointer_manager &hp_mgr)
hazard_pointer_manager & hp_manager_
std::atomic< TestNode * > head_
bool pop(int &result)
A template class representing either a value or an error.
void demonstrate_concurrent_access()
void demonstrate_basic_usage()
void demonstrate_memory_safety()
Core threading foundation of the thread system library.
Definition thread_impl.h:17
std::atomic< int > data
std::atomic< TestNode * > next