Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
numa_work_stealer.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
8
9#include <algorithm>
10#include <chrono>
11#include <thread>
12
13namespace kcenon::thread
14{
15
16numa_work_stealer::numa_work_stealer(std::size_t worker_count,
17 deque_accessor_fn deque_accessor,
18 cpu_accessor_fn cpu_accessor,
20 : worker_count_(worker_count)
21 , deque_accessor_(std::move(deque_accessor))
22 , cpu_accessor_(std::move(cpu_accessor))
23 , config_(config)
24 , topology_(numa_topology::detect())
25 , rng_(std::random_device{}())
26{
28 {
29 affinity_tracker_ =
30 std::make_unique<work_affinity_tracker>(worker_count_, config_.locality_history_size);
31 }
32
33 steal_backoff_config backoff_config;
34 backoff_config.strategy = config_.backoff_strategy;
35 backoff_config.initial_backoff = config_.initial_backoff;
36 backoff_config.max_backoff = config_.max_backoff;
37 backoff_config.multiplier = config_.backoff_multiplier;
38 backoff_calculator_ = std::make_unique<backoff_calculator>(backoff_config);
39}
40
41auto numa_work_stealer::steal_for(std::size_t worker_id) -> job*
42{
43 if (!config_.enabled || worker_count_ <= 1)
44 {
45 return nullptr;
46 }
47
48 auto start_time = std::chrono::steady_clock::now();
49 job* stolen_job = nullptr;
50
51 auto victims = select_victims(worker_id, config_.max_steal_attempts);
52
53 std::size_t attempt = 0;
54 for (auto victim_id : victims)
55 {
56 if (config_.collect_statistics)
57 {
58 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
59 }
60
61 auto* victim_deque = deque_accessor_(victim_id);
62 if (victim_deque == nullptr)
63 {
64 continue;
65 }
66
67 auto result = victim_deque->steal();
68 if (result.has_value())
69 {
70 stolen_job = result.value();
71
72 if (config_.collect_statistics)
73 {
74 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
75 stats_.jobs_stolen.fetch_add(1, std::memory_order_relaxed);
76
77 if (workers_on_same_node(worker_id, victim_id))
78 {
79 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
80 }
81 else
82 {
83 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
84 }
85 }
86
87 record_steal(worker_id, victim_id);
88 break;
89 }
90 else
91 {
92 if (config_.collect_statistics)
93 {
94 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
95 }
96
97 // Apply backoff after failed steal
98 if (++attempt < victims.size())
99 {
100 auto backoff_start = std::chrono::steady_clock::now();
101 auto delay = backoff_calculator_->calculate(attempt);
102 std::this_thread::sleep_for(delay);
103
104 if (config_.collect_statistics)
105 {
106 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
107 std::chrono::steady_clock::now() - backoff_start)
108 .count();
109 stats_.total_backoff_time_ns.fetch_add(static_cast<std::uint64_t>(backoff_ns),
110 std::memory_order_relaxed);
111 }
112 }
113 }
114 }
115
116 if (config_.collect_statistics)
117 {
118 auto elapsed_ns =
119 std::chrono::duration_cast<std::chrono::nanoseconds>(
120 std::chrono::steady_clock::now() - start_time)
121 .count();
122 stats_.total_steal_time_ns.fetch_add(static_cast<std::uint64_t>(elapsed_ns),
123 std::memory_order_relaxed);
124 }
125
126 return stolen_job;
127}
128
129auto numa_work_stealer::steal_batch_for(std::size_t worker_id, std::size_t max_count)
130 -> std::vector<job*>
131{
132 if (!config_.enabled || worker_count_ <= 1 || max_count == 0)
133 {
134 return {};
135 }
136
137 auto start_time = std::chrono::steady_clock::now();
138 std::vector<job*> stolen_jobs;
139
140 auto victims = select_victims(worker_id, config_.max_steal_attempts);
141
142 std::size_t attempt = 0;
143 for (auto victim_id : victims)
144 {
145 if (config_.collect_statistics)
146 {
147 stats_.steal_attempts.fetch_add(1, std::memory_order_relaxed);
148 }
149
150 auto* victim_deque = deque_accessor_(victim_id);
151 if (victim_deque == nullptr)
152 {
153 continue;
154 }
155
156 auto queue_size = victim_deque->size();
157 if (queue_size == 0)
158 {
159 if (config_.collect_statistics)
160 {
161 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
162 }
163 continue;
164 }
165
166 auto batch_size = calculate_batch_size(queue_size);
167 batch_size = std::min(batch_size, max_count);
168
169 auto batch = victim_deque->steal_batch(batch_size);
170 if (!batch.empty())
171 {
172 stolen_jobs = std::move(batch);
173
174 if (config_.collect_statistics)
175 {
176 stats_.successful_steals.fetch_add(1, std::memory_order_relaxed);
177 stats_.jobs_stolen.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
178 stats_.batch_steals.fetch_add(1, std::memory_order_relaxed);
179 stats_.total_batch_size.fetch_add(stolen_jobs.size(), std::memory_order_relaxed);
180
181 if (workers_on_same_node(worker_id, victim_id))
182 {
183 stats_.same_node_steals.fetch_add(1, std::memory_order_relaxed);
184 }
185 else
186 {
187 stats_.cross_node_steals.fetch_add(1, std::memory_order_relaxed);
188 }
189 }
190
191 record_steal(worker_id, victim_id);
192 break;
193 }
194 else
195 {
196 if (config_.collect_statistics)
197 {
198 stats_.failed_steals.fetch_add(1, std::memory_order_relaxed);
199 }
200
201 // Apply backoff after failed steal
202 if (++attempt < victims.size())
203 {
204 auto backoff_start = std::chrono::steady_clock::now();
205 auto delay = backoff_calculator_->calculate(attempt);
206 std::this_thread::sleep_for(delay);
207
208 if (config_.collect_statistics)
209 {
210 auto backoff_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
211 std::chrono::steady_clock::now() - backoff_start)
212 .count();
213 stats_.total_backoff_time_ns.fetch_add(static_cast<std::uint64_t>(backoff_ns),
214 std::memory_order_relaxed);
215 }
216 }
217 }
218 }
219
220 if (config_.collect_statistics)
221 {
222 auto elapsed_ns =
223 std::chrono::duration_cast<std::chrono::nanoseconds>(
224 std::chrono::steady_clock::now() - start_time)
225 .count();
226 stats_.total_steal_time_ns.fetch_add(static_cast<std::uint64_t>(elapsed_ns),
227 std::memory_order_relaxed);
228 }
229
230 return stolen_jobs;
231}
232
234{
235 return stats_;
236}
237
242
247
249{
250 return topology_;
251}
252
254{
255 return config_;
256}
257
259{
260 config_ = config;
261
262 // Update backoff calculator
263 steal_backoff_config backoff_config;
264 backoff_config.strategy = config_.backoff_strategy;
265 backoff_config.initial_backoff = config_.initial_backoff;
266 backoff_config.max_backoff = config_.max_backoff;
267 backoff_config.multiplier = config_.backoff_multiplier;
268 backoff_calculator_->set_config(backoff_config);
269
270 // Update affinity tracker if needed
272 {
274 std::make_unique<work_affinity_tracker>(worker_count_, config_.locality_history_size);
275 }
276 else if (!config_.track_locality)
277 {
278 affinity_tracker_.reset();
279 }
280}
281
282auto numa_work_stealer::select_victims(std::size_t requester_id, std::size_t count)
283 -> std::vector<std::size_t>
284{
285 switch (config_.policy)
286 {
288 return select_victims_random(requester_id, count);
289
291 return select_victims_round_robin(requester_id, count);
292
294 return select_victims_adaptive(requester_id, count);
295
297 return select_victims_numa_aware(requester_id, count);
298
300 return select_victims_locality_aware(requester_id, count);
301
303 return select_victims_hierarchical(requester_id, count);
304
305 default:
306 return select_victims_random(requester_id, count);
307 }
308}
309
310auto numa_work_stealer::select_victims_random(std::size_t requester_id, std::size_t count)
311 -> std::vector<std::size_t>
312{
313 std::vector<std::size_t> victims;
314 victims.reserve(count);
315
316 std::vector<std::size_t> candidates;
317 candidates.reserve(worker_count_ - 1);
318
319 for (std::size_t i = 0; i < worker_count_; ++i)
320 {
321 if (i != requester_id)
322 {
323 candidates.push_back(i);
324 }
325 }
326
327 // Shuffle and take first 'count' elements
328 std::shuffle(candidates.begin(), candidates.end(), rng_);
329
330 auto num_victims = std::min(count, candidates.size());
331 victims.insert(victims.end(), candidates.begin(), candidates.begin() + num_victims);
332
333 return victims;
334}
335
336auto numa_work_stealer::select_victims_round_robin(std::size_t requester_id, std::size_t count)
337 -> std::vector<std::size_t>
338{
339 std::vector<std::size_t> victims;
340 victims.reserve(count);
341
342 auto start_index = round_robin_index_.fetch_add(1, std::memory_order_relaxed) % worker_count_;
343
344 for (std::size_t i = 0; i < worker_count_ && victims.size() < count; ++i)
345 {
346 auto victim_id = (start_index + i) % worker_count_;
347 if (victim_id != requester_id)
348 {
349 victims.push_back(victim_id);
350 }
351 }
352
353 return victims;
354}
355
356auto numa_work_stealer::select_victims_adaptive(std::size_t requester_id, std::size_t count)
357 -> std::vector<std::size_t>
358{
359 // Score workers by queue size (larger queues are better targets)
360 std::vector<std::pair<std::size_t, std::size_t>> scored_workers;
361 scored_workers.reserve(worker_count_ - 1);
362
363 for (std::size_t i = 0; i < worker_count_; ++i)
364 {
365 if (i == requester_id)
366 {
367 continue;
368 }
369
370 auto* deque = deque_accessor_(i);
371 auto queue_size = deque ? deque->size() : 0;
372 scored_workers.emplace_back(i, queue_size);
373 }
374
375 // Sort by queue size (descending)
376 std::sort(scored_workers.begin(),
377 scored_workers.end(),
378 [](const auto& a, const auto& b) { return a.second > b.second; });
379
380 // Take top 'count' workers with non-empty queues
381 std::vector<std::size_t> victims;
382 victims.reserve(count);
383
384 for (const auto& [worker_id, queue_size] : scored_workers)
385 {
386 if (victims.size() >= count)
387 {
388 break;
389 }
390 if (queue_size > 0)
391 {
392 victims.push_back(worker_id);
393 }
394 }
395
396 // If not enough with non-empty queues, add some randomly
397 if (victims.size() < count)
398 {
399 for (const auto& [worker_id, queue_size] : scored_workers)
400 {
401 if (victims.size() >= count)
402 {
403 break;
404 }
405 if (std::find(victims.begin(), victims.end(), worker_id) == victims.end())
406 {
407 victims.push_back(worker_id);
408 }
409 }
410 }
411
412 return victims;
413}
414
415auto numa_work_stealer::select_victims_numa_aware(std::size_t requester_id, std::size_t count)
416 -> std::vector<std::size_t>
417{
418 if (!config_.numa_aware || !topology_.is_numa_available())
419 {
420 return select_victims_adaptive(requester_id, count);
421 }
422
423 // Get requester's NUMA node
424 int requester_cpu = get_worker_cpu(requester_id);
425 int requester_node = topology_.get_node_for_cpu(requester_cpu);
426
427 // Score workers: same node workers get higher priority
428 std::vector<std::pair<std::size_t, double>> scored_workers;
429 scored_workers.reserve(worker_count_ - 1);
430
431 for (std::size_t i = 0; i < worker_count_; ++i)
432 {
433 if (i == requester_id)
434 {
435 continue;
436 }
437
438 int victim_cpu = get_worker_cpu(i);
439 int victim_node = topology_.get_node_for_cpu(victim_cpu);
440
441 auto* deque = deque_accessor_(i);
442 auto queue_size = deque ? static_cast<double>(deque->size()) : 0.0;
443
444 // Apply NUMA penalty for cross-node workers
445 double score = queue_size;
446 if (requester_node != victim_node && requester_node >= 0 && victim_node >= 0)
447 {
448 score /= config_.numa_penalty_factor;
449 }
450
451 scored_workers.emplace_back(i, score);
452 }
453
454 // Sort by score (descending)
455 std::sort(scored_workers.begin(),
456 scored_workers.end(),
457 [](const auto& a, const auto& b) { return a.second > b.second; });
458
459 std::vector<std::size_t> victims;
460 victims.reserve(count);
461
462 for (const auto& [worker_id, score] : scored_workers)
463 {
464 if (victims.size() >= count)
465 {
466 break;
467 }
468 victims.push_back(worker_id);
469 }
470
471 return victims;
472}
473
474auto numa_work_stealer::select_victims_locality_aware(std::size_t requester_id, std::size_t count)
475 -> std::vector<std::size_t>
476{
477 if (!config_.track_locality || !affinity_tracker_)
478 {
479 return select_victims_adaptive(requester_id, count);
480 }
481
482 // Get preferred victims from affinity tracker
483 auto preferred = affinity_tracker_->get_preferred_victims(requester_id, count);
484
485 // If not enough preferred victims, fill with adaptive selection
486 if (preferred.size() < count)
487 {
488 auto additional = select_victims_adaptive(requester_id, count - preferred.size());
489 for (auto victim_id : additional)
490 {
491 if (std::find(preferred.begin(), preferred.end(), victim_id) == preferred.end())
492 {
493 preferred.push_back(victim_id);
494 }
495 if (preferred.size() >= count)
496 {
497 break;
498 }
499 }
500 }
501
502 return preferred;
503}
504
505auto numa_work_stealer::select_victims_hierarchical(std::size_t requester_id, std::size_t count)
506 -> std::vector<std::size_t>
507{
508 if (!config_.numa_aware || !topology_.is_numa_available())
509 {
510 return select_victims_adaptive(requester_id, count);
511 }
512
513 int requester_cpu = get_worker_cpu(requester_id);
514 int requester_node = topology_.get_node_for_cpu(requester_cpu);
515
516 // First: workers on same NUMA node
517 std::vector<std::size_t> same_node_victims;
518 // Second: workers on other NUMA nodes
519 std::vector<std::size_t> other_node_victims;
520
521 for (std::size_t i = 0; i < worker_count_; ++i)
522 {
523 if (i == requester_id)
524 {
525 continue;
526 }
527
528 int victim_cpu = get_worker_cpu(i);
529 int victim_node = topology_.get_node_for_cpu(victim_cpu);
530
531 if (victim_node == requester_node)
532 {
533 same_node_victims.push_back(i);
534 }
535 else
536 {
537 other_node_victims.push_back(i);
538 }
539 }
540
541 // Shuffle within each group
542 std::shuffle(same_node_victims.begin(), same_node_victims.end(), rng_);
543 std::shuffle(other_node_victims.begin(), other_node_victims.end(), rng_);
544
545 // Combine: same node first, then other nodes
546 std::vector<std::size_t> victims;
547 victims.reserve(count);
548
549 for (auto victim_id : same_node_victims)
550 {
551 if (victims.size() >= count)
552 {
553 break;
554 }
555 victims.push_back(victim_id);
556 }
557
558 for (auto victim_id : other_node_victims)
559 {
560 if (victims.size() >= count)
561 {
562 break;
563 }
564 victims.push_back(victim_id);
565 }
566
567 return victims;
568}
569
570auto numa_work_stealer::calculate_batch_size(std::size_t victim_queue_size) const -> std::size_t
571{
572 if (!config_.adaptive_batch_size)
573 {
574 return config_.max_steal_batch;
575 }
576
577 // Adaptive batch size: steal at most half of victim's queue
578 auto half_queue = victim_queue_size / 2;
579 if (half_queue < config_.min_steal_batch)
580 {
581 return config_.min_steal_batch;
582 }
583
584 return std::min(half_queue, config_.max_steal_batch);
585}
586
587auto numa_work_stealer::get_worker_cpu(std::size_t worker_id) const -> int
588{
589 if (!cpu_accessor_)
590 {
591 return -1;
592 }
593 return cpu_accessor_(worker_id);
594}
595
596auto numa_work_stealer::workers_on_same_node(std::size_t worker_a, std::size_t worker_b) const
597 -> bool
598{
599 if (!topology_.is_numa_available())
600 {
601 return true; // On single-node systems, all workers are on the same node
602 }
603
604 int cpu_a = get_worker_cpu(worker_a);
605 int cpu_b = get_worker_cpu(worker_b);
606
607 return topology_.is_same_node(cpu_a, cpu_b);
608}
609
610void numa_work_stealer::record_steal(std::size_t thief_id, std::size_t victim_id)
611{
613 {
614 affinity_tracker_->record_cooperation(thief_id, victim_id);
615 }
616}
617
618} // namespace kcenon::thread
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
NUMA (Non-Uniform Memory Access) topology information.
auto select_victims_adaptive(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using adaptive (queue-size based) policy.
auto select_victims_round_robin(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using round-robin policy.
auto get_config() const -> const enhanced_work_stealing_config &
Get the current configuration.
auto select_victims_locality_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using locality-aware policy.
auto get_stats() const -> const work_stealing_stats &
Get the current statistics.
auto get_stats_snapshot() const -> work_stealing_stats_snapshot
Get a snapshot of current statistics.
void set_config(const enhanced_work_stealing_config &config)
Update the configuration.
void record_steal(std::size_t thief_id, std::size_t victim_id)
Record a successful steal for affinity tracking.
numa_work_stealer(std::size_t worker_count, deque_accessor_fn deque_accessor, cpu_accessor_fn cpu_accessor, enhanced_work_stealing_config config={})
Construct a NUMA-aware work stealer.
auto workers_on_same_node(std::size_t worker_a, std::size_t worker_b) const -> bool
Check if two workers are on the same NUMA node.
auto get_worker_cpu(std::size_t worker_id) const -> int
Get the CPU ID for a worker.
auto get_topology() const -> const numa_topology &
Get the NUMA topology information.
std::function< int(std::size_t)> cpu_accessor_fn
Function type for getting worker's CPU affinity.
auto select_victims_random(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using random policy.
enhanced_work_stealing_config config_
auto select_victims_hierarchical(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using hierarchical policy.
auto select_victims_numa_aware(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victims using NUMA-aware policy.
std::function< lockfree::work_stealing_deque< job * > *(std::size_t)> deque_accessor_fn
Function type for accessing worker's local deque.
auto steal_for(std::size_t worker_id) -> job *
Attempt to steal work for a worker.
auto steal_batch_for(std::size_t worker_id, std::size_t max_count) -> std::vector< job * >
Attempt to steal multiple jobs for a worker.
std::unique_ptr< work_affinity_tracker > affinity_tracker_
void reset_stats()
Reset all statistics to zero.
auto calculate_batch_size(std::size_t victim_queue_size) const -> std::size_t
Calculate batch size based on configuration and victim queue depth.
auto select_victims(std::size_t requester_id, std::size_t count) -> std::vector< std::size_t >
Select victim workers based on the configured policy.
std::unique_ptr< backoff_calculator > backoff_calculator_
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
@ delay
Delay processing (attempt later)
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ hierarchical
NUMA node first, then random within node (large NUMA systems)
@ round_robin
Sequential victim selection (deterministic, fair)
@ locality_aware
Prefer workers with recent cooperation history (cache affinity)
@ random
Random victim selection (baseline, good distribution)
@ adaptive
Select based on queue sizes (best for uneven loads)
@ numa_aware
Prefer workers on the same NUMA node (reduces cross-node traffic)
STL namespace.
NUMA-aware work stealer with enhanced victim selection policies.
Configuration for enhanced work-stealing with NUMA awareness.
std::chrono::microseconds max_backoff
Maximum backoff delay cap (default: 1000 microseconds)
bool track_locality
Enable work affinity tracking between workers (default: disabled)
steal_backoff_strategy backoff_strategy
Backoff strategy between failed steal attempts (default: exponential)
std::chrono::microseconds initial_backoff
Initial backoff delay (default: 50 microseconds)
double backoff_multiplier
Backoff multiplier for exponential strategy (default: 2.0)
std::size_t locality_history_size
Size of cooperation history for locality tracking (default: 16)
Configuration for backoff behavior.
double multiplier
Multiplier for exponential backoff.
Non-atomic snapshot of work-stealing statistics.
Statistics for work-stealing operations.
auto snapshot() const -> work_stealing_stats_snapshot
Create a snapshot of current statistics.
void reset()
Reset all statistics to zero.
Dynamic circular array work-stealing deque for lock-free task distribution.