11#if defined(__x86_64__) || defined(_M_X64)
23inline void backoff(
int retry) {
25#if defined(__x86_64__) || defined(_M_X64)
28 std::this_thread::yield();
30 }
else if (
retry < 64) {
31 std::this_thread::yield();
33 std::this_thread::sleep_for(
34 std::chrono::microseconds(1 << std::min(
retry - 64, 10)));
47 head_.store(dummy, std::memory_order_relaxed);
48 tail_.store(dummy, std::memory_order_relaxed);
55 shutdown_.store(
true, std::memory_order_release);
67 node* dummy =
head_.load(std::memory_order_acquire);
82 node* new_node = pool_->acquire(std::move(job_ptr));
88 constexpr int MAX_RETRIES = 1000;
92 node* tail = tail_.load(std::memory_order_acquire);
98 if (tail != tail_.load(std::memory_order_acquire)) {
103 node* next = tail->
next.load(std::memory_order_acquire);
106 if (tail == tail_.load(std::memory_order_acquire)) {
107 if (next ==
nullptr) {
109 if (tail->
next.compare_exchange_weak(
111 std::memory_order_release,
112 std::memory_order_relaxed)) {
115 tail_.compare_exchange_weak(
117 std::memory_order_release,
118 std::memory_order_relaxed);
121 approximate_size_.fetch_add(1, std::memory_order_relaxed);
128 tail_.compare_exchange_weak(
130 std::memory_order_release,
131 std::memory_order_relaxed);
139 pool_->release(new_node);
140 return common::error_info{
static_cast<int>(
error_code::queue_busy),
"Queue is busy, retry later",
"thread_system"};
152 constexpr int MAX_OUTER_RETRIES = 100;
153 constexpr int MAX_INNER_RETRIES = 10;
155 for (
int outer_retry = 0; outer_retry < MAX_OUTER_RETRIES; ++outer_retry) {
157 node* head = head_.load(std::memory_order_acquire);
161 if (head != head_.load(std::memory_order_acquire)) {
162 backoff(outer_retry);
166 node* tail = tail_.load(std::memory_order_acquire);
169 node* next =
nullptr;
170 bool next_stable =
false;
171 for (
int inner_retry = 0; inner_retry < MAX_INNER_RETRIES; ++inner_retry) {
172 next = head->
next.load(std::memory_order_acquire);
173 if (next ==
nullptr) {
181 if (next == head->
next.load(std::memory_order_acquire)) {
185 backoff(inner_retry);
191 backoff(outer_retry);
196 if (head == head_.load(std::memory_order_acquire)) {
198 if (next ==
nullptr) {
204 tail_.compare_exchange_weak(
206 std::memory_order_release,
207 std::memory_order_relaxed);
209 if (next ==
nullptr) {
211 backoff(outer_retry);
216 if (head_.compare_exchange_weak(
218 std::memory_order_release,
219 std::memory_order_relaxed)) {
223 std::unique_ptr<job> job_data = std::move(next->
data);
230 approximate_size_.fetch_sub(1, std::memory_order_relaxed);
233 return std::move(job_data);
235 backoff(outer_retry);
238 backoff(outer_retry);
255 constexpr int MAX_RETRIES = 10;
257 node* head =
head_.load(std::memory_order_acquire);
261 if (head !=
head_.load(std::memory_order_acquire)) {
265 node* next = head->
next.load(std::memory_order_acquire);
268 return next ==
nullptr;
273 node* head =
head_.load(std::memory_order_acquire);
275 node* next = head->
next.load(std::memory_order_acquire);
276 return next ==
nullptr;
291 std::shared_ptr<node_pool> pool =
pool_;
296 pool->release(
static_cast<node*
>(ptr));
Lock-free node freelist (Treiber stack) for node recycling.
~lockfree_job_queue()
Destructor.
std::atomic< std::size_t > approximate_size_
auto size() const -> std::size_t
Gets approximate queue size.
std::atomic< bool > shutdown_
lockfree_job_queue()
Constructs an empty lock-free job queue.
std::atomic< node * > tail_
std::atomic< node * > head_
std::shared_ptr< node_pool > pool_
void retire_node(node *n)
Retire a node through hazard pointers, recycling via pool on reclamation.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a job into the queue (thread-safe)
auto empty() const -> bool
Checks if the queue is empty.
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue (thread-safe)
A template class representing either a value or an error.
void protect(void *p) noexcept
Protect a pointer.
static safe_hazard_pointer_domain & instance()
Get singleton instance.
void retire(void *p, retire_callback deleter)
Retire a pointer for later reclamation.
Lock-free MPMC job queue using Michael-Scott algorithm with hazard pointers.
@ retry
Retry failed job (with max retries)
Internal queue node structure.
std::unique_ptr< job > data
std::atomic< node * > next
Retire node for pending deletion.