Demonstrates basic hazard pointer acquisition and retirement, a concurrent lock-free stack using hazard pointers, and a memory safety test with concurrent reader/writer threads.
#include "thread_base/lockfree/memory/hazard_pointer.h"
#include "logger/core/logger.h"
#include <thread>
#include <vector>
#include <random>
#include <chrono>
#include <atomic>
std::atomic<int>
data{0};
std::atomic<TestNode*>
next{
nullptr};
};
private:
std::atomic<TestNode*>
head_{
nullptr};
public:
while (
auto* node =
head_.load()) {
head_.store(node->next.load());
delete node;
}
}
auto* old_head =
head_.load();
do {
new_node->next.store(old_head);
}
while (!
head_.compare_exchange_weak(old_head, new_node));
}
while (true) {
auto* head = hp.protect(
head_);
if (!head) {
return false;
}
auto* next = head->next.load();
if (
head_.compare_exchange_weak(head, next)) {
return true;
}
}
}
};
log_module::write_information("\n=== Basic Hazard Pointer Usage Demo ===");
hazard_pointer_manager hp_manager(4, 2);
auto stats = hp_manager.get_statistics();
log_module::write_information("Initial statistics:");
log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
log_module::write_information(" Retired list size: {}", stats.retired_list_size);
log_module::write_information(" Total retired: {}", stats.total_retired);
log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
std::atomic<TestNode*> test_ptr{
new TestNode(42)};
{
auto hp = hp_manager.acquire();
auto* protected_ptr = hp.protect(test_ptr);
log_module::write_information("Protected pointer value: {}", protected_ptr->data.load());
}
auto* node_to_retire = test_ptr.exchange(nullptr);
hp_manager.retire(node_to_retire);
hp_manager.scan_and_reclaim();
stats = hp_manager.get_statistics();
log_module::write_information("Final statistics:");
log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
log_module::write_information(" Retired list size: {}", stats.retired_list_size);
log_module::write_information(" Total retired: {}", stats.total_retired);
log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
}
log_module::write_information("\n=== Concurrent Access Demo ===");
constexpr int NUM_THREADS = 4;
constexpr int OPERATIONS_PER_THREAD = 1000;
hazard_pointer_manager hp_manager(NUM_THREADS, 2);
for (int i = 0; i < 100; ++i) {
stack.push(i);
}
std::atomic<int> push_count{0};
std::atomic<int> pop_count{0};
std::atomic<int> failed_pops{0};
auto start_time = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
threads.reserve(NUM_THREADS);
for (int thread_id = 0; thread_id < NUM_THREADS; ++thread_id) {
threads.emplace_back([&, thread_id]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 1);
for (int op = 0; op < OPERATIONS_PER_THREAD; ++op) {
if (dis(gen) == 0) {
int value = thread_id * OPERATIONS_PER_THREAD + op;
stack.push(value);
push_count.fetch_add(1);
} else {
pop_count.fetch_add(1);
} else {
failed_pops.fetch_add(1);
}
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
log_module::write_information("Concurrent operations completed in {} ms", duration.count());
log_module::write_information("Push operations: {}", push_count.load());
log_module::write_information("Successful pop operations: {}", pop_count.load());
log_module::write_information("Failed pop operations: {}", failed_pops.load());
auto stats = hp_manager.get_statistics();
log_module::write_information("Final hazard pointer statistics:");
log_module::write_information(" Active hazard pointers: {}", stats.active_hazard_pointers);
log_module::write_information(" Retired list size: {}", stats.retired_list_size);
log_module::write_information(" Total retired: {}", stats.total_retired);
log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
}
log_module::write_information("\n=== Memory Safety Demo ===");
hazard_pointer_manager hp_manager(2, 1);
std::atomic<TestNode*> shared_ptr{
new TestNode(123)};
std::atomic<bool> reader_done{false};
std::atomic<bool> writer_done{false};
std::thread reader([&]() {
auto hp = hp_manager.acquire();
for (int i = 0; i < 100; ++i) {
auto* protected_ptr = hp.protect(shared_ptr);
if (protected_ptr) {
volatile int value = protected_ptr->data.load();
(void)value;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
hp.clear();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
reader_done = true;
});
std::thread writer([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
for (int i = 0; i < 10; ++i) {
auto* old_node = shared_ptr.exchange(new_node);
if (old_node) {
hp_manager.retire(old_node);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
writer_done = true;
});
reader.join();
writer.join();
auto* final_node = shared_ptr.exchange(nullptr);
if (final_node) {
hp_manager.retire(final_node);
}
hp_manager.scan_and_reclaim();
auto stats = hp_manager.get_statistics();
log_module::write_information("Memory safety test completed safely!");
log_module::write_information("Final statistics:");
log_module::write_information(" Total retired: {}", stats.total_retired);
log_module::write_information(" Total reclaimed: {}", stats.total_reclaimed);
}
log_module::start();
log_module::console_target(log_module::log_types::Information);
log_module::write_information("Hazard Pointer Manager Sample");
log_module::write_information("=============================");
try {
log_module::write_information("\n=== All demos completed successfully! ===");
} catch (const std::exception& e) {
log_module::write_error("Error: {}", e.what());
log_module::stop();
return 1;
}
log_module::stop();
return 0;
}
LockFreeStack(hazard_pointer_manager &hp_mgr)
hazard_pointer_manager & hp_manager_
std::atomic< TestNode * > head_
A template class representing either a value or an error.
void demonstrate_concurrent_access()
void demonstrate_basic_usage()
void demonstrate_memory_safety()
Core threading foundation of the thread system library.
std::atomic< TestNode * > next