20 : worker_count_(worker_count)
21 , deque_accessor_(
std::move(deque_accessor))
22 , cpu_accessor_(
std::move(cpu_accessor))
25 , rng_(
std::random_device{}())
30 std::make_unique<work_affinity_tracker>(worker_count_, config_.locality_history_size);
43 if (!config_.enabled || worker_count_ <= 1)
48 auto start_time = std::chrono::steady_clock::now();
49 job* stolen_job =
nullptr;
51 auto victims = select_victims(worker_id, config_.max_steal_attempts);
53 std::size_t attempt = 0;
54 for (
auto victim_id : victims)
56 if (config_.collect_statistics)
58 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
61 auto* victim_deque = deque_accessor_(victim_id);
62 if (victim_deque ==
nullptr)
67 auto result = victim_deque->steal();
72 if (config_.collect_statistics)
74 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
75 stats_.jobs_stolen.fetch_add(1, std::memory_order_relaxed);
77 if (workers_on_same_node(worker_id, victim_id))
79 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
83 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
87 record_steal(worker_id, victim_id);
92 if (config_.collect_statistics)
94 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
98 if (++attempt < victims.size())
100 auto backoff_start = std::chrono::steady_clock::now();
101 auto delay = backoff_calculator_->calculate(attempt);
102 std::this_thread::sleep_for(
delay);
104 if (config_.collect_statistics)
106 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
107 std::chrono::steady_clock::now() - backoff_start)
109 stats_.total_backoff_time_ns.fetch_add(
static_cast<std::uint64_t
>(backoff_ns),
110 std::memory_order_relaxed);
116 if (config_.collect_statistics)
119 std::chrono::duration_cast<std::chrono::nanoseconds>(
120 std::chrono::steady_clock::now() - start_time)
122 stats_.total_steal_time_ns.fetch_add(
static_cast<std::uint64_t
>(elapsed_ns),
123 std::memory_order_relaxed);
132 if (!config_.enabled || worker_count_ <= 1 || max_count == 0)
137 auto start_time = std::chrono::steady_clock::now();
138 std::vector<job*> stolen_jobs;
140 auto victims = select_victims(worker_id, config_.max_steal_attempts);
142 std::size_t attempt = 0;
143 for (
auto victim_id : victims)
145 if (config_.collect_statistics)
147 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
150 auto* victim_deque = deque_accessor_(victim_id);
151 if (victim_deque ==
nullptr)
156 auto queue_size = victim_deque->size();
159 if (config_.collect_statistics)
161 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
166 auto batch_size = calculate_batch_size(queue_size);
167 batch_size = std::min(batch_size, max_count);
169 auto batch = victim_deque->steal_batch(batch_size);
172 stolen_jobs = std::move(batch);
174 if (config_.collect_statistics)
176 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
177 stats_.jobs_stolen.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
178 stats_.batch_steals.fetch_add(1, std::memory_order_relaxed);
179 stats_.total_batch_size.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
181 if (workers_on_same_node(worker_id, victim_id))
183 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
187 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
191 record_steal(worker_id, victim_id);
196 if (config_.collect_statistics)
198 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
202 if (++attempt < victims.size())
204 auto backoff_start = std::chrono::steady_clock::now();
205 auto delay = backoff_calculator_->calculate(attempt);
206 std::this_thread::sleep_for(
delay);
208 if (config_.collect_statistics)
210 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
211 std::chrono::steady_clock::now() - backoff_start)
213 stats_.total_backoff_time_ns.fetch_add(
static_cast<std::uint64_t
>(backoff_ns),
214 std::memory_order_relaxed);
220 if (config_.collect_statistics)
223 std::chrono::duration_cast<std::chrono::nanoseconds>(
224 std::chrono::steady_clock::now() - start_time)
226 stats_.total_steal_time_ns.fetch_add(
static_cast<std::uint64_t
>(elapsed_ns),
227 std::memory_order_relaxed);
283 -> std::vector<std::size_t>
285 switch (config_.policy)
288 return select_victims_random(requester_id, count);
291 return select_victims_round_robin(requester_id, count);
294 return select_victims_adaptive(requester_id, count);
297 return select_victims_numa_aware(requester_id, count);
300 return select_victims_locality_aware(requester_id, count);
303 return select_victims_hierarchical(requester_id, count);
306 return select_victims_random(requester_id, count);
311 -> std::vector<std::size_t>
313 std::vector<std::size_t> victims;
314 victims.reserve(count);
316 std::vector<std::size_t> candidates;
317 candidates.reserve(worker_count_ - 1);
319 for (std::size_t i = 0; i < worker_count_; ++i)
321 if (i != requester_id)
323 candidates.push_back(i);
328 std::shuffle(candidates.begin(), candidates.end(), rng_);
330 auto num_victims = std::min(count, candidates.size());
331 victims.insert(victims.end(), candidates.begin(), candidates.begin() + num_victims);
337 -> std::vector<std::size_t>
339 std::vector<std::size_t> victims;
340 victims.reserve(count);
342 auto start_index = round_robin_index_.fetch_add(1, std::memory_order_relaxed) % worker_count_;
344 for (std::size_t i = 0; i < worker_count_ && victims.size() < count; ++i)
346 auto victim_id = (start_index + i) % worker_count_;
347 if (victim_id != requester_id)
349 victims.push_back(victim_id);
357 -> std::vector<std::size_t>
360 std::vector<std::pair<std::size_t, std::size_t>> scored_workers;
361 scored_workers.reserve(worker_count_ - 1);
363 for (std::size_t i = 0; i < worker_count_; ++i)
365 if (i == requester_id)
370 auto* deque = deque_accessor_(i);
371 auto queue_size = deque ? deque->size() : 0;
372 scored_workers.emplace_back(i, queue_size);
376 std::sort(scored_workers.begin(),
377 scored_workers.end(),
378 [](
const auto& a,
const auto& b) { return a.second > b.second; });
381 std::vector<std::size_t> victims;
382 victims.reserve(count);
384 for (
const auto& [worker_id, queue_size] : scored_workers)
386 if (victims.size() >= count)
392 victims.push_back(worker_id);
397 if (victims.size() < count)
399 for (
const auto& [worker_id, queue_size] : scored_workers)
401 if (victims.size() >= count)
405 if (std::find(victims.begin(), victims.end(), worker_id) == victims.end())
407 victims.push_back(worker_id);
416 -> std::vector<std::size_t>
418 if (!config_.numa_aware || !topology_.is_numa_available())
420 return select_victims_adaptive(requester_id, count);
424 int requester_cpu = get_worker_cpu(requester_id);
425 int requester_node = topology_.get_node_for_cpu(requester_cpu);
428 std::vector<std::pair<std::size_t, double>> scored_workers;
429 scored_workers.reserve(worker_count_ - 1);
431 for (std::size_t i = 0; i < worker_count_; ++i)
433 if (i == requester_id)
438 int victim_cpu = get_worker_cpu(i);
439 int victim_node = topology_.get_node_for_cpu(victim_cpu);
441 auto* deque = deque_accessor_(i);
442 auto queue_size = deque ?
static_cast<double>(deque->size()) : 0.0;
445 double score = queue_size;
446 if (requester_node != victim_node && requester_node >= 0 && victim_node >= 0)
448 score /= config_.numa_penalty_factor;
451 scored_workers.emplace_back(i, score);
455 std::sort(scored_workers.begin(),
456 scored_workers.end(),
457 [](
const auto& a,
const auto& b) { return a.second > b.second; });
459 std::vector<std::size_t> victims;
460 victims.reserve(count);
462 for (
const auto& [worker_id, score] : scored_workers)
464 if (victims.size() >= count)
468 victims.push_back(worker_id);
475 -> std::vector<std::size_t>
477 if (!config_.track_locality || !affinity_tracker_)
479 return select_victims_adaptive(requester_id, count);
483 auto preferred = affinity_tracker_->get_preferred_victims(requester_id, count);
486 if (preferred.size() < count)
488 auto additional = select_victims_adaptive(requester_id, count - preferred.size());
489 for (
auto victim_id : additional)
491 if (std::find(preferred.begin(), preferred.end(), victim_id) == preferred.end())
493 preferred.push_back(victim_id);
495 if (preferred.size() >= count)
506 -> std::vector<std::size_t>
508 if (!config_.numa_aware || !topology_.is_numa_available())
510 return select_victims_adaptive(requester_id, count);
513 int requester_cpu = get_worker_cpu(requester_id);
514 int requester_node = topology_.get_node_for_cpu(requester_cpu);
517 std::vector<std::size_t> same_node_victims;
519 std::vector<std::size_t> other_node_victims;
521 for (std::size_t i = 0; i < worker_count_; ++i)
523 if (i == requester_id)
528 int victim_cpu = get_worker_cpu(i);
529 int victim_node = topology_.get_node_for_cpu(victim_cpu);
531 if (victim_node == requester_node)
533 same_node_victims.push_back(i);
537 other_node_victims.push_back(i);
542 std::shuffle(same_node_victims.begin(), same_node_victims.end(), rng_);
543 std::shuffle(other_node_victims.begin(), other_node_victims.end(), rng_);
546 std::vector<std::size_t> victims;
547 victims.reserve(count);
549 for (
auto victim_id : same_node_victims)
551 if (victims.size() >= count)
555 victims.push_back(victim_id);
558 for (
auto victim_id : other_node_victims)
560 if (victims.size() >= count)
564 victims.push_back(victim_id);
572 if (!config_.adaptive_batch_size)
574 return config_.max_steal_batch;
578 auto half_queue = victim_queue_size / 2;
579 if (half_queue < config_.min_steal_batch)
581 return config_.min_steal_batch;
584 return std::min(half_queue, config_.max_steal_batch);
593 return cpu_accessor_(worker_id);
599 if (!topology_.is_numa_available())
604 int cpu_a = get_worker_cpu(worker_a);
605 int cpu_b = get_worker_cpu(worker_b);
607 return topology_.is_same_node(cpu_a, cpu_b);
Represents a unit of work (task) to be executed, typically by a job queue.
NUMA (Non-Uniform Memory Access) topology information.
auto select_victims_adaptive(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using adaptive (queue-size based) policy.
auto select_victims_round_robin(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using round-robin policy.
auto get_config() const -> const enhanced_work_stealing_config &
Get the current configuration.
auto select_victims_locality_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using locality-aware policy.
std::size_t worker_count_
auto get_stats() const -> const work_stealing_stats &
Get the current statistics.
auto get_stats_snapshot() const -> work_stealing_stats_snapshot
Get a snapshot of current statistics.
void set_config(const enhanced_work_stealing_config &config)
Update the configuration.
void record_steal(std::size_t thief_id, std::size_t victim_id)
Record a successful steal for affinity tracking.
numa_work_stealer(std::size_t worker_count, deque_accessor_fn deque_accessor, cpu_accessor_fn cpu_accessor, enhanced_work_stealing_config config={})
Construct a NUMA-aware work stealer.
auto workers_on_same_node(std::size_t worker_a, std::size_t worker_b) const -> bool
Check if two workers are on the same NUMA node.
work_stealing_stats stats_
auto get_worker_cpu(std::size_t worker_id) const -> int
Get the CPU ID for a worker.
auto get_topology() const -> const numa_topology &
Get the NUMA topology information.
std::function< int(std::size_t)> cpu_accessor_fn
Function type for getting worker's CPU affinity.
auto select_victims_random(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using random policy.
enhanced_work_stealing_config config_
auto select_victims_hierarchical(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using hierarchical policy.
auto select_victims_numa_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using NUMA-aware policy.
std::function< lockfree::work_stealing_deque< job * > *(std::size_t)> deque_accessor_fn
Function type for accessing worker's local deque.
auto steal_for(std::size_t worker_id) -> job *
Attempt to steal work for a worker.
auto steal_batch_for(std::size_t worker_id, std::size_t max_count) -> std::vector< job * >
Attempt to steal multiple jobs for a worker.
std::unique_ptr< work_affinity_tracker > affinity_tracker_
void reset_stats()
Reset all statistics to zero.
auto calculate_batch_size(std::size_t victim_queue_size) const -> std::size_t
Calculate batch size based on configuration and victim queue depth.
auto select_victims(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victim workers based on the configured policy.
std::unique_ptr< backoff_calculator > backoff_calculator_
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
@ delay
Delay processing (attempt later)
Core threading foundation of the thread system library.
@ hierarchical
NUMA node first, then random within node (large NUMA systems)
@ round_robin
Sequential victim selection (deterministic, fair)
@ locality_aware
Prefer workers with recent cooperation history (cache affinity)
@ random
Random victim selection (baseline, good distribution)
@ adaptive
Select based on queue sizes (best for uneven loads)
@ numa_aware
Prefer workers on the same NUMA node (reduces cross-node traffic)
NUMA-aware work stealer with enhanced victim selection policies.
Configuration for enhanced work-stealing with NUMA awareness.
std::chrono::microseconds max_backoff
Maximum backoff delay cap (default: 1000 microseconds)
bool track_locality
Enable work affinity tracking between workers (default: disabled)
steal_backoff_strategy backoff_strategy
Backoff strategy between failed steal attempts (default: exponential)
std::chrono::microseconds initial_backoff
Initial backoff delay (default: 50 microseconds)
double backoff_multiplier
Backoff multiplier for exponential strategy (default: 2.0)
std::size_t locality_history_size
Size of cooperation history for locality tracking (default: 16)
Configuration for backoff behavior.
std::chrono::microseconds max_backoff
steal_backoff_strategy strategy
double multiplier
Multiplier for exponential backoff.
std::chrono::microseconds initial_backoff
Non-atomic snapshot of work-stealing statistics.
Statistics for work-stealing operations.
auto snapshot() const -> work_stealing_stats_snapshot
Create a snapshot of current statistics.
void reset()
Reset all statistics to zero.
Dynamic circular array work-stealing deque for lock-free task distribution.