Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::numa_work_stealer Class Reference

NUMA-aware work stealer with enhanced victim selection policies. More...

#include <numa_work_stealer.h>

Collaboration diagram for kcenon::thread::numa_work_stealer:
Collaboration graph

Public Types

using deque_accessor_fn = std::function<lockfree::work_stealing_deque<job*>*(std::size_t)>
 Function type for accessing worker's local deque.
 
using cpu_accessor_fn = std::function<int(std::size_t)>
 Function type for getting worker's CPU affinity.
 

Public Member Functions

 numa_work_stealer (std::size_t worker_count, deque_accessor_fn deque_accessor, cpu_accessor_fn cpu_accessor, enhanced_work_stealing_config config={})
 Construct a NUMA-aware work stealer.
 
 ~numa_work_stealer ()=default
 Destructor.
 
 numa_work_stealer (const numa_work_stealer &)=delete
 
numa_work_stealeroperator= (const numa_work_stealer &)=delete
 
 numa_work_stealer (numa_work_stealer &&)=delete
 
numa_work_stealeroperator= (numa_work_stealer &&)=delete
 
auto steal_for (std::size_t worker_id) -> job *
 Attempt to steal work for a worker.
 
auto steal_batch_for (std::size_t worker_id, std::size_t max_count) -> std::vector< job * >
 Attempt to steal multiple jobs for a worker.
 
auto get_stats () const -> const work_stealing_stats &
 Get the current statistics.
 
auto get_stats_snapshot () const -> work_stealing_stats_snapshot
 Get a snapshot of current statistics.
 
void reset_stats ()
 Reset all statistics to zero.
 
auto get_topology () const -> const numa_topology &
 Get the NUMA topology information.
 
auto get_config () const -> const enhanced_work_stealing_config &
 Get the current configuration.
 
void set_config (const enhanced_work_stealing_config &config)
 Update the configuration.
 

Private Member Functions

auto select_victims (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victim workers based on the configured policy.
 
auto select_victims_random (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using random policy.
 
auto select_victims_round_robin (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using round-robin policy.
 
auto select_victims_adaptive (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using adaptive (queue-size based) policy.
 
auto select_victims_numa_aware (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using NUMA-aware policy.
 
auto select_victims_locality_aware (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using locality-aware policy.
 
auto select_victims_hierarchical (std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
 Select victims using hierarchical policy.
 
auto calculate_batch_size (std::size_t victim_queue_size) const -> std::size_t
 Calculate batch size based on configuration and victim queue depth.
 
auto get_worker_cpu (std::size_t worker_id) const -> int
 Get the CPU ID for a worker.
 
auto workers_on_same_node (std::size_t worker_a, std::size_t worker_b) const -> bool
 Check if two workers are on the same NUMA node.
 
void record_steal (std::size_t thief_id, std::size_t victim_id)
 Record a successful steal for affinity tracking.
 

Private Attributes

std::size_t worker_count_
 
deque_accessor_fn deque_accessor_
 
cpu_accessor_fn cpu_accessor_
 
enhanced_work_stealing_config config_
 
numa_topology topology_
 
work_stealing_stats stats_
 
std::unique_ptr< work_affinity_trackeraffinity_tracker_
 
std::unique_ptr< backoff_calculatorbackoff_calculator_
 
std::mt19937_64 rng_
 
std::atomic< std::size_t > round_robin_index_ {0}
 

Detailed Description

NUMA-aware work stealer with enhanced victim selection policies.

This class implements advanced work-stealing strategies with NUMA awareness, locality tracking, batch stealing, and comprehensive statistics collection. It coordinates stealing across multiple workers using configurable policies.

Design Goals

  • Minimize cross-NUMA node memory access
  • Maximize cache locality through affinity tracking
  • Reduce contention through intelligent victim selection
  • Provide detailed statistics for performance analysis

Thread Safety

All public methods are thread-safe and can be called concurrently from multiple worker threads. Statistics updates use atomic operations.

Memory Model

  • Victim selection: sequential consistency for correctness
  • Statistics: relaxed ordering for performance
  • Topology access: read-only after construction

Usage Example

// Create workers
std::vector<std::unique_ptr<thread_worker>> workers;
// ... initialize workers ...
// Configure NUMA-aware stealing
// Create accessor function
auto get_worker_deque = [&](std::size_t id) {
return workers[id]->get_local_deque();
};
auto get_worker_cpu = [&](std::size_t id) {
return workers[id]->get_policy().preferred_cpu;
};
// Create work stealer
numa_work_stealer stealer(workers.size(), get_worker_deque, get_worker_cpu, config);
// Steal work for worker 0
if (auto* stolen_job = stealer.steal_for(0)) {
// Process stolen job
}
// Batch steal
auto batch = stealer.steal_batch_for(0, 4);
for (auto* j : batch) {
// Process jobs
}
// Get statistics
auto stats = stealer.get_stats();
std::cout << "Success rate: " << stats.steal_success_rate() * 100 << "%\n";
numa_work_stealer(std::size_t worker_count, deque_accessor_fn deque_accessor, cpu_accessor_fn cpu_accessor, enhanced_work_stealing_config config={})
Construct a NUMA-aware work stealer.
auto get_worker_cpu(std::size_t worker_id) const -> int
Get the CPU ID for a worker.
static auto numa_optimized() -> enhanced_work_stealing_config
Create a configuration optimized for NUMA systems.

Definition at line 98 of file numa_work_stealer.h.

Member Typedef Documentation

◆ cpu_accessor_fn

using kcenon::thread::numa_work_stealer::cpu_accessor_fn = std::function<int(std::size_t)>

Function type for getting worker's CPU affinity.

Parameters
worker_idThe worker ID
Returns
Preferred CPU for the worker, or -1 if no preference

Definition at line 113 of file numa_work_stealer.h.

◆ deque_accessor_fn

Function type for accessing worker's local deque.

Parameters
worker_idThe worker ID
Returns
Pointer to the worker's local deque, or nullptr if not available

Definition at line 106 of file numa_work_stealer.h.

Constructor & Destructor Documentation

◆ numa_work_stealer() [1/3]

kcenon::thread::numa_work_stealer::numa_work_stealer ( std::size_t worker_count,
deque_accessor_fn deque_accessor,
cpu_accessor_fn cpu_accessor,
enhanced_work_stealing_config config = {} )

Construct a NUMA-aware work stealer.

Parameters
worker_countNumber of workers in the pool
deque_accessorFunction to access worker deques
cpu_accessorFunction to get worker CPU affinity
configConfiguration for work stealing
Note
The accessor functions must remain valid for the lifetime of this object.

Definition at line 16 of file numa_work_stealer.cpp.

20 : worker_count_(worker_count)
21 , deque_accessor_(std::move(deque_accessor))
22 , cpu_accessor_(std::move(cpu_accessor))
23 , config_(config)
25 , rng_(std::random_device{}())
26{
28 {
30 std::make_unique<work_affinity_tracker>(worker_count_, config_.locality_history_size);
31 }
32
33 steal_backoff_config backoff_config;
34 backoff_config.strategy = config_.backoff_strategy;
35 backoff_config.initial_backoff = config_.initial_backoff;
36 backoff_config.max_backoff = config_.max_backoff;
37 backoff_config.multiplier = config_.backoff_multiplier;
38 backoff_calculator_ = std::make_unique<backoff_calculator>(backoff_config);
39}
static auto detect() -> numa_topology
Detect and return the system's NUMA topology.
enhanced_work_stealing_config config_
std::unique_ptr< work_affinity_tracker > affinity_tracker_
std::unique_ptr< backoff_calculator > backoff_calculator_
std::chrono::microseconds max_backoff
Maximum backoff delay cap (default: 1000 microseconds)
bool track_locality
Enable work affinity tracking between workers (default: disabled)
steal_backoff_strategy backoff_strategy
Backoff strategy between failed steal attempts (default: exponential)
std::chrono::microseconds initial_backoff
Initial backoff delay (default: 50 microseconds)
double backoff_multiplier
Backoff multiplier for exponential strategy (default: 2.0)
std::size_t locality_history_size
Size of cooperation history for locality tracking (default: 16)

References backoff_calculator_, kcenon::thread::enhanced_work_stealing_config::backoff_multiplier, kcenon::thread::enhanced_work_stealing_config::backoff_strategy, config_, kcenon::thread::enhanced_work_stealing_config::initial_backoff, kcenon::thread::enhanced_work_stealing_config::max_backoff, kcenon::thread::steal_backoff_config::strategy, kcenon::thread::enhanced_work_stealing_config::track_locality, and worker_count_.

◆ ~numa_work_stealer()

kcenon::thread::numa_work_stealer::~numa_work_stealer ( )
default

Destructor.

◆ numa_work_stealer() [2/3]

kcenon::thread::numa_work_stealer::numa_work_stealer ( const numa_work_stealer & )
delete

◆ numa_work_stealer() [3/3]

kcenon::thread::numa_work_stealer::numa_work_stealer ( numa_work_stealer && )
delete

Member Function Documentation

◆ calculate_batch_size()

auto kcenon::thread::numa_work_stealer::calculate_batch_size ( std::size_t victim_queue_size) const -> std::size_t
nodiscardprivate

Calculate batch size based on configuration and victim queue depth.

Definition at line 570 of file numa_work_stealer.cpp.

571{
573 {
575 }
576
577 // Adaptive batch size: steal at most half of victim's queue
578 auto half_queue = victim_queue_size / 2;
579 if (half_queue < config_.min_steal_batch)
580 {
582 }
583
584 return std::min(half_queue, config_.max_steal_batch);
585}
std::size_t max_steal_batch
Maximum number of jobs to steal in a batch (default: 4)
std::size_t min_steal_batch
Minimum number of jobs to steal in a batch (default: 1)
bool adaptive_batch_size
Dynamically adjust batch size based on victim's queue depth (default: true)

◆ get_config()

auto kcenon::thread::numa_work_stealer::get_config ( ) const -> const enhanced_work_stealing_config&
nodiscard

Get the current configuration.

Returns
Reference to the work-stealing configuration

Definition at line 253 of file numa_work_stealer.cpp.

254{
255 return config_;
256}

References config_.

◆ get_stats()

auto kcenon::thread::numa_work_stealer::get_stats ( ) const -> const work_stealing_stats&
nodiscard

Get the current statistics.

Returns
Reference to the work-stealing statistics

Definition at line 233 of file numa_work_stealer.cpp.

234{
235 return stats_;
236}

References stats_.

◆ get_stats_snapshot()

auto kcenon::thread::numa_work_stealer::get_stats_snapshot ( ) const -> work_stealing_stats_snapshot
nodiscard

Get a snapshot of current statistics.

Returns
Non-atomic copy of statistics for safe reading

Definition at line 238 of file numa_work_stealer.cpp.

239{
240 return stats_.snapshot();
241}
auto snapshot() const -> work_stealing_stats_snapshot
Create a snapshot of current statistics.

References kcenon::thread::work_stealing_stats::snapshot(), and stats_.

Here is the call graph for this function:

◆ get_topology()

auto kcenon::thread::numa_work_stealer::get_topology ( ) const -> const numa_topology&
nodiscard

Get the NUMA topology information.

Returns
Reference to the detected NUMA topology

Definition at line 248 of file numa_work_stealer.cpp.

249{
250 return topology_;
251}

References topology_.

◆ get_worker_cpu()

auto kcenon::thread::numa_work_stealer::get_worker_cpu ( std::size_t worker_id) const -> int
nodiscardprivate

Get the CPU ID for a worker.

Definition at line 587 of file numa_work_stealer.cpp.

588{
589 if (!cpu_accessor_)
590 {
591 return -1;
592 }
593 return cpu_accessor_(worker_id);
594}

◆ operator=() [1/2]

numa_work_stealer & kcenon::thread::numa_work_stealer::operator= ( const numa_work_stealer & )
delete

◆ operator=() [2/2]

numa_work_stealer & kcenon::thread::numa_work_stealer::operator= ( numa_work_stealer && )
delete

◆ record_steal()

void kcenon::thread::numa_work_stealer::record_steal ( std::size_t thief_id,
std::size_t victim_id )
private

Record a successful steal for affinity tracking.

Definition at line 610 of file numa_work_stealer.cpp.

611{
613 {
614 affinity_tracker_->record_cooperation(thief_id, victim_id);
615 }
616}

References affinity_tracker_, config_, and kcenon::thread::enhanced_work_stealing_config::track_locality.

◆ reset_stats()

void kcenon::thread::numa_work_stealer::reset_stats ( )

Reset all statistics to zero.

Definition at line 243 of file numa_work_stealer.cpp.

244{
245 stats_.reset();
246}
void reset()
Reset all statistics to zero.

References kcenon::thread::work_stealing_stats::reset(), and stats_.

Here is the call graph for this function:

◆ select_victims()

auto kcenon::thread::numa_work_stealer::select_victims ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victim workers based on the configured policy.

Parameters
requester_idWorker requesting victims
countMaximum number of victims to select
Returns
Vector of worker IDs to attempt stealing from

Definition at line 282 of file numa_work_stealer.cpp.

284{
285 switch (config_.policy)
286 {
288 return select_victims_random(requester_id, count);
289
291 return select_victims_round_robin(requester_id, count);
292
294 return select_victims_adaptive(requester_id, count);
295
297 return select_victims_numa_aware(requester_id, count);
298
300 return select_victims_locality_aware(requester_id, count);
301
303 return select_victims_hierarchical(requester_id, count);
304
305 default:
306 return select_victims_random(requester_id, count);
307 }
308}
auto select_victims_adaptive(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using adaptive (queue-size based) policy.
auto select_victims_round_robin(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using round-robin policy.
auto select_victims_locality_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using locality-aware policy.
auto select_victims_random(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using random policy.
auto select_victims_hierarchical(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using hierarchical policy.
auto select_victims_numa_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using NUMA-aware policy.
@ hierarchical
NUMA node first, then random within node (large NUMA systems)
@ round_robin
Sequential victim selection (deterministic, fair)
@ locality_aware
Prefer workers with recent cooperation history (cache affinity)
@ random
Random victim selection (baseline, good distribution)
@ adaptive
Select based on queue sizes (best for uneven loads)
@ numa_aware
Prefer workers on the same NUMA node (reduces cross-node traffic)
enhanced_steal_policy policy
Policy for selecting steal victims (default: adaptive)

References kcenon::thread::adaptive, kcenon::thread::hierarchical, kcenon::thread::locality_aware, kcenon::thread::numa_aware, kcenon::thread::random, and kcenon::thread::round_robin.

◆ select_victims_adaptive()

auto kcenon::thread::numa_work_stealer::select_victims_adaptive ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using adaptive (queue-size based) policy.

Definition at line 356 of file numa_work_stealer.cpp.

358{
359 // Score workers by queue size (larger queues are better targets)
360 std::vector<std::pair<std::size_t, std::size_t>> scored_workers;
361 scored_workers.reserve(worker_count_ - 1);
362
363 for (std::size_t i = 0; i < worker_count_; ++i)
364 {
365 if (i == requester_id)
366 {
367 continue;
368 }
369
370 auto* deque = deque_accessor_(i);
371 auto queue_size = deque ? deque->size() : 0;
372 scored_workers.emplace_back(i, queue_size);
373 }
374
375 // Sort by queue size (descending)
376 std::sort(scored_workers.begin(),
377 scored_workers.end(),
378 [](const auto& a, const auto& b) { return a.second > b.second; });
379
380 // Take top 'count' workers with non-empty queues
381 std::vector<std::size_t> victims;
382 victims.reserve(count);
383
384 for (const auto& [worker_id, queue_size] : scored_workers)
385 {
386 if (victims.size() >= count)
387 {
388 break;
389 }
390 if (queue_size > 0)
391 {
392 victims.push_back(worker_id);
393 }
394 }
395
396 // If not enough with non-empty queues, add some randomly
397 if (victims.size() < count)
398 {
399 for (const auto& [worker_id, queue_size] : scored_workers)
400 {
401 if (victims.size() >= count)
402 {
403 break;
404 }
405 if (std::find(victims.begin(), victims.end(), worker_id) == victims.end())
406 {
407 victims.push_back(worker_id);
408 }
409 }
410 }
411
412 return victims;
413}

◆ select_victims_hierarchical()

auto kcenon::thread::numa_work_stealer::select_victims_hierarchical ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using hierarchical policy.

Definition at line 505 of file numa_work_stealer.cpp.

507{
509 {
510 return select_victims_adaptive(requester_id, count);
511 }
512
513 int requester_cpu = get_worker_cpu(requester_id);
514 int requester_node = topology_.get_node_for_cpu(requester_cpu);
515
516 // First: workers on same NUMA node
517 std::vector<std::size_t> same_node_victims;
518 // Second: workers on other NUMA nodes
519 std::vector<std::size_t> other_node_victims;
520
521 for (std::size_t i = 0; i < worker_count_; ++i)
522 {
523 if (i == requester_id)
524 {
525 continue;
526 }
527
528 int victim_cpu = get_worker_cpu(i);
529 int victim_node = topology_.get_node_for_cpu(victim_cpu);
530
531 if (victim_node == requester_node)
532 {
533 same_node_victims.push_back(i);
534 }
535 else
536 {
537 other_node_victims.push_back(i);
538 }
539 }
540
541 // Shuffle within each group
542 std::shuffle(same_node_victims.begin(), same_node_victims.end(), rng_);
543 std::shuffle(other_node_victims.begin(), other_node_victims.end(), rng_);
544
545 // Combine: same node first, then other nodes
546 std::vector<std::size_t> victims;
547 victims.reserve(count);
548
549 for (auto victim_id : same_node_victims)
550 {
551 if (victims.size() >= count)
552 {
553 break;
554 }
555 victims.push_back(victim_id);
556 }
557
558 for (auto victim_id : other_node_victims)
559 {
560 if (victims.size() >= count)
561 {
562 break;
563 }
564 victims.push_back(victim_id);
565 }
566
567 return victims;
568}
auto get_node_for_cpu(int cpu_id) const -> int
Get the NUMA node for a given CPU.
auto is_numa_available() const -> bool
Check if NUMA is available on this system.
bool numa_aware
Enable NUMA-aware stealing (default: disabled)

◆ select_victims_locality_aware()

auto kcenon::thread::numa_work_stealer::select_victims_locality_aware ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using locality-aware policy.

Definition at line 474 of file numa_work_stealer.cpp.

476{
478 {
479 return select_victims_adaptive(requester_id, count);
480 }
481
482 // Get preferred victims from affinity tracker
483 auto preferred = affinity_tracker_->get_preferred_victims(requester_id, count);
484
485 // If not enough preferred victims, fill with adaptive selection
486 if (preferred.size() < count)
487 {
488 auto additional = select_victims_adaptive(requester_id, count - preferred.size());
489 for (auto victim_id : additional)
490 {
491 if (std::find(preferred.begin(), preferred.end(), victim_id) == preferred.end())
492 {
493 preferred.push_back(victim_id);
494 }
495 if (preferred.size() >= count)
496 {
497 break;
498 }
499 }
500 }
501
502 return preferred;
503}

◆ select_victims_numa_aware()

auto kcenon::thread::numa_work_stealer::select_victims_numa_aware ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using NUMA-aware policy.

Definition at line 415 of file numa_work_stealer.cpp.

417{
419 {
420 return select_victims_adaptive(requester_id, count);
421 }
422
423 // Get requester's NUMA node
424 int requester_cpu = get_worker_cpu(requester_id);
425 int requester_node = topology_.get_node_for_cpu(requester_cpu);
426
427 // Score workers: same node workers get higher priority
428 std::vector<std::pair<std::size_t, double>> scored_workers;
429 scored_workers.reserve(worker_count_ - 1);
430
431 for (std::size_t i = 0; i < worker_count_; ++i)
432 {
433 if (i == requester_id)
434 {
435 continue;
436 }
437
438 int victim_cpu = get_worker_cpu(i);
439 int victim_node = topology_.get_node_for_cpu(victim_cpu);
440
441 auto* deque = deque_accessor_(i);
442 auto queue_size = deque ? static_cast<double>(deque->size()) : 0.0;
443
444 // Apply NUMA penalty for cross-node workers
445 double score = queue_size;
446 if (requester_node != victim_node && requester_node >= 0 && victim_node >= 0)
447 {
449 }
450
451 scored_workers.emplace_back(i, score);
452 }
453
454 // Sort by score (descending)
455 std::sort(scored_workers.begin(),
456 scored_workers.end(),
457 [](const auto& a, const auto& b) { return a.second > b.second; });
458
459 std::vector<std::size_t> victims;
460 victims.reserve(count);
461
462 for (const auto& [worker_id, score] : scored_workers)
463 {
464 if (victims.size() >= count)
465 {
466 break;
467 }
468 victims.push_back(worker_id);
469 }
470
471 return victims;
472}
double numa_penalty_factor
Cost multiplier for cross-NUMA node steals (default: 2.0) Higher values make cross-node steals less l...

◆ select_victims_random()

auto kcenon::thread::numa_work_stealer::select_victims_random ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using random policy.

Definition at line 310 of file numa_work_stealer.cpp.

312{
313 std::vector<std::size_t> victims;
314 victims.reserve(count);
315
316 std::vector<std::size_t> candidates;
317 candidates.reserve(worker_count_ - 1);
318
319 for (std::size_t i = 0; i < worker_count_; ++i)
320 {
321 if (i != requester_id)
322 {
323 candidates.push_back(i);
324 }
325 }
326
327 // Shuffle and take first 'count' elements
328 std::shuffle(candidates.begin(), candidates.end(), rng_);
329
330 auto num_victims = std::min(count, candidates.size());
331 victims.insert(victims.end(), candidates.begin(), candidates.begin() + num_victims);
332
333 return victims;
334}

◆ select_victims_round_robin()

auto kcenon::thread::numa_work_stealer::select_victims_round_robin ( std::size_t requester_id,
std::size_t count ) -> std::vector<std::size_t>
nodiscardprivate

Select victims using round-robin policy.

Definition at line 336 of file numa_work_stealer.cpp.

338{
339 std::vector<std::size_t> victims;
340 victims.reserve(count);
341
342 auto start_index = round_robin_index_.fetch_add(1, std::memory_order_relaxed) % worker_count_;
343
344 for (std::size_t i = 0; i < worker_count_ && victims.size() < count; ++i)
345 {
346 auto victim_id = (start_index + i) % worker_count_;
347 if (victim_id != requester_id)
348 {
349 victims.push_back(victim_id);
350 }
351 }
352
353 return victims;
354}
std::atomic< std::size_t > round_robin_index_

◆ set_config()

void kcenon::thread::numa_work_stealer::set_config ( const enhanced_work_stealing_config & config)

Update the configuration.

Parameters
configNew configuration to use
Note
Changes take effect immediately. Be cautious when changing configuration while workers are actively stealing.

Definition at line 258 of file numa_work_stealer.cpp.

259{
260 config_ = config;
261
262 // Update backoff calculator
263 steal_backoff_config backoff_config;
264 backoff_config.strategy = config_.backoff_strategy;
265 backoff_config.initial_backoff = config_.initial_backoff;
266 backoff_config.max_backoff = config_.max_backoff;
267 backoff_config.multiplier = config_.backoff_multiplier;
268 backoff_calculator_->set_config(backoff_config);
269
270 // Update affinity tracker if needed
272 {
274 std::make_unique<work_affinity_tracker>(worker_count_, config_.locality_history_size);
275 }
276 else if (!config_.track_locality)
277 {
278 affinity_tracker_.reset();
279 }
280}

References affinity_tracker_, backoff_calculator_, kcenon::thread::enhanced_work_stealing_config::backoff_multiplier, kcenon::thread::enhanced_work_stealing_config::backoff_strategy, config_, kcenon::thread::enhanced_work_stealing_config::initial_backoff, kcenon::thread::steal_backoff_config::initial_backoff, kcenon::thread::enhanced_work_stealing_config::locality_history_size, kcenon::thread::enhanced_work_stealing_config::max_backoff, kcenon::thread::steal_backoff_config::max_backoff, kcenon::thread::steal_backoff_config::multiplier, kcenon::thread::steal_backoff_config::strategy, kcenon::thread::enhanced_work_stealing_config::track_locality, and worker_count_.

◆ steal_batch_for()

auto kcenon::thread::numa_work_stealer::steal_batch_for ( std::size_t worker_id,
std::size_t max_count ) -> std::vector<job*>
nodiscard

Attempt to steal multiple jobs for a worker.

Parameters
worker_idThe worker requesting work
max_countMaximum number of jobs to steal
Returns
Vector of stolen job pointers (may be empty or smaller than max_count)

Batch stealing is more efficient when multiple jobs need to be transferred. The actual batch size is determined by configuration and victim queue depth.

Thread Safety:

  • Safe to call concurrently from multiple workers
  • Statistics are updated atomically

Definition at line 129 of file numa_work_stealer.cpp.

131{
132 if (!config_.enabled || worker_count_ <= 1 || max_count == 0)
133 {
134 return {};
135 }
136
137 auto start_time = std::chrono::steady_clock::now();
138 std::vector<job*> stolen_jobs;
139
140 auto victims = select_victims(worker_id, config_.max_steal_attempts);
141
142 std::size_t attempt = 0;
143 for (auto victim_id : victims)
144 {
146 {
147 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
148 }
149
150 auto* victim_deque = deque_accessor_(victim_id);
151 if (victim_deque == nullptr)
152 {
153 continue;
154 }
155
156 auto queue_size = victim_deque->size();
157 if (queue_size == 0)
158 {
160 {
161 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
162 }
163 continue;
164 }
165
166 auto batch_size = calculate_batch_size(queue_size);
167 batch_size = std::min(batch_size, max_count);
168
169 auto batch = victim_deque->steal_batch(batch_size);
170 if (!batch.empty())
171 {
172 stolen_jobs = std::move(batch);
173
175 {
176 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
177 stats_.jobs_stolen.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
178 stats_.batch_steals.fetch_add(1, std::memory_order_relaxed);
179 stats_.total_batch_size.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
180
181 if (workers_on_same_node(worker_id, victim_id))
182 {
183 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
184 }
185 else
186 {
187 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
188 }
189 }
190
191 record_steal(worker_id, victim_id);
192 break;
193 }
194 else
195 {
197 {
198 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
199 }
200
201 // Apply backoff after failed steal
202 if (++attempt < victims.size())
203 {
204 auto backoff_start = std::chrono::steady_clock::now();
205 auto delay = backoff_calculator_->calculate(attempt);
206 std::this_thread::sleep_for(delay);
207
209 {
210 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
211 std::chrono::steady_clock::now() - backoff_start)
212 .count();
213 stats_.total_backoff_time_ns.fetch_add(static_cast<std::uint64_t>(backoff_ns),
214 std::memory_order_relaxed);
215 }
216 }
217 }
218 }
219
221 {
222 auto elapsed_ns =
223 std::chrono::duration_cast<std::chrono::nanoseconds>(
224 std::chrono::steady_clock::now() - start_time)
225 .count();
226 stats_.total_steal_time_ns.fetch_add(static_cast<std::uint64_t>(elapsed_ns),
227 std::memory_order_relaxed);
228 }
229
230 return stolen_jobs;
231}
void record_steal(std::size_t thief_id, std::size_t victim_id)
Record a successful steal for affinity tracking.
auto workers_on_same_node(std::size_t worker_a, std::size_t worker_b) const -> bool
Check if two workers are on the same NUMA node.
auto calculate_batch_size(std::size_t victim_queue_size) const -> std::size_t
Calculate batch size based on configuration and victim queue depth.
auto select_victims(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victim workers based on the configured policy.
@ delay
Delay processing (attempt later)
std::size_t max_steal_attempts
Maximum number of steal attempts per round (default: 3)
bool collect_statistics
Enable statistics collection (default: disabled)
bool enabled
Master switch for work-stealing (default: disabled)
std::atomic< std::uint64_t > total_steal_time_ns
Total time spent in steal operations (nanoseconds)
std::atomic< std::uint64_t > total_backoff_time_ns
Total time spent in backoff delays (nanoseconds)
std::atomic< std::uint64_t > failed_steals
Number of failed steal operations.
std::atomic< std::uint64_t > jobs_stolen
Total number of jobs successfully stolen.
std::atomic< std::uint64_t > total_batch_size
Total size of all batch steals (for averaging)
std::atomic< std::uint64_t > cross_node_steals
Steals from workers on different NUMA nodes.
std::atomic< std::uint64_t > successful_steals
Number of successful steal operations.
std::atomic< std::uint64_t > steal_attempts
Total number of steal attempts.
std::atomic< std::uint64_t > batch_steals
Number of batch steal operations (stealing multiple jobs)
std::atomic< std::uint64_t > same_node_steals
Steals from workers on the same NUMA node.

References kcenon::thread::delay.

◆ steal_for()

auto kcenon::thread::numa_work_stealer::steal_for ( std::size_t worker_id) -> job*
nodiscard

Attempt to steal work for a worker.

Parameters
worker_idThe worker requesting work
Returns
Stolen job pointer, or nullptr if no work available

This method selects victims based on the configured policy and attempts to steal a single job. NUMA awareness and affinity are considered when selecting victims.

Thread Safety:

  • Safe to call concurrently from multiple workers
  • Statistics are updated atomically

Definition at line 41 of file numa_work_stealer.cpp.

42{
43 if (!config_.enabled || worker_count_ <= 1)
44 {
45 return nullptr;
46 }
47
48 auto start_time = std::chrono::steady_clock::now();
49 job* stolen_job = nullptr;
50
51 auto victims = select_victims(worker_id, config_.max_steal_attempts);
52
53 std::size_t attempt = 0;
54 for (auto victim_id : victims)
55 {
57 {
58 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
59 }
60
61 auto* victim_deque = deque_accessor_(victim_id);
62 if (victim_deque == nullptr)
63 {
64 continue;
65 }
66
67 auto result = victim_deque->steal();
68 if (result.has_value())
69 {
70 stolen_job = result.value();
71
73 {
74 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
75 stats_.jobs_stolen.fetch_add(1, std::memory_order_relaxed);
76
77 if (workers_on_same_node(worker_id, victim_id))
78 {
79 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
80 }
81 else
82 {
83 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
84 }
85 }
86
87 record_steal(worker_id, victim_id);
88 break;
89 }
90 else
91 {
93 {
94 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
95 }
96
97 // Apply backoff after failed steal
98 if (++attempt < victims.size())
99 {
100 auto backoff_start = std::chrono::steady_clock::now();
101 auto delay = backoff_calculator_->calculate(attempt);
102 std::this_thread::sleep_for(delay);
103
105 {
106 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
107 std::chrono::steady_clock::now() - backoff_start)
108 .count();
109 stats_.total_backoff_time_ns.fetch_add(static_cast<std::uint64_t>(backoff_ns),
110 std::memory_order_relaxed);
111 }
112 }
113 }
114 }
115
117 {
118 auto elapsed_ns =
119 std::chrono::duration_cast<std::chrono::nanoseconds>(
120 std::chrono::steady_clock::now() - start_time)
121 .count();
122 stats_.total_steal_time_ns.fetch_add(static_cast<std::uint64_t>(elapsed_ns),
123 std::memory_order_relaxed);
124 }
125
126 return stolen_job;
127}

References kcenon::thread::delay, kcenon::thread::result< T >::has_value(), and kcenon::thread::result< T >::value().

Here is the call graph for this function:

◆ workers_on_same_node()

auto kcenon::thread::numa_work_stealer::workers_on_same_node ( std::size_t worker_a,
std::size_t worker_b ) const -> bool
nodiscardprivate

Check if two workers are on the same NUMA node.

Definition at line 596 of file numa_work_stealer.cpp.

598{
600 {
601 return true; // On single-node systems, all workers are on the same node
602 }
603
604 int cpu_a = get_worker_cpu(worker_a);
605 int cpu_b = get_worker_cpu(worker_b);
606
607 return topology_.is_same_node(cpu_a, cpu_b);
608}
auto is_same_node(int cpu1, int cpu2) const -> bool
Check if two CPUs are on the same NUMA node.

Member Data Documentation

◆ affinity_tracker_

std::unique_ptr<work_affinity_tracker> kcenon::thread::numa_work_stealer::affinity_tracker_
private

Definition at line 282 of file numa_work_stealer.h.

Referenced by record_steal(), and set_config().

◆ backoff_calculator_

std::unique_ptr<backoff_calculator> kcenon::thread::numa_work_stealer::backoff_calculator_
private

Definition at line 283 of file numa_work_stealer.h.

Referenced by numa_work_stealer(), and set_config().

◆ config_

enhanced_work_stealing_config kcenon::thread::numa_work_stealer::config_
private

Definition at line 279 of file numa_work_stealer.h.

Referenced by get_config(), numa_work_stealer(), record_steal(), and set_config().

◆ cpu_accessor_

cpu_accessor_fn kcenon::thread::numa_work_stealer::cpu_accessor_
private

Definition at line 278 of file numa_work_stealer.h.

◆ deque_accessor_

deque_accessor_fn kcenon::thread::numa_work_stealer::deque_accessor_
private

Definition at line 277 of file numa_work_stealer.h.

◆ rng_

std::mt19937_64 kcenon::thread::numa_work_stealer::rng_
mutableprivate

Definition at line 286 of file numa_work_stealer.h.

◆ round_robin_index_

std::atomic<std::size_t> kcenon::thread::numa_work_stealer::round_robin_index_ {0}
mutableprivate

Definition at line 289 of file numa_work_stealer.h.

289{0};

◆ stats_

work_stealing_stats kcenon::thread::numa_work_stealer::stats_
private

Definition at line 281 of file numa_work_stealer.h.

Referenced by get_stats(), get_stats_snapshot(), and reset_stats().

◆ topology_

numa_topology kcenon::thread::numa_work_stealer::topology_
private

Definition at line 280 of file numa_work_stealer.h.

Referenced by get_topology().

◆ worker_count_

std::size_t kcenon::thread::numa_work_stealer::worker_count_
private

Definition at line 276 of file numa_work_stealer.h.

Referenced by numa_work_stealer(), and set_config().


The documentation for this class was generated from the following files: