Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
work_affinity_tracker.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8
9namespace kcenon::thread
10{
11
13 std::size_t history_size)
14 : worker_count_(worker_count)
15 , tracked_count_(std::min(worker_count, MAX_TRACKED_WORKERS))
16 , history_size_(history_size)
17 , matrix_size_(0)
18 , total_cooperations_(0)
19{
20 if (tracked_count_ > 1)
21 {
22 // Size of upper triangular matrix without diagonal
23 // For n tracked workers: n*(n-1)/2 pairs
24 // Capped at MAX_TRACKED_WORKERS to prevent O(n^2) growth
27 std::make_unique<std::atomic<std::uint64_t>[]>(matrix_size_);
28
29 // Initialize all counters to zero
30 for (std::size_t i = 0; i < matrix_size_; ++i)
31 {
32 cooperation_matrix_[i].store(0, std::memory_order_relaxed);
33 }
34 }
35}
36
38 : worker_count_(other.worker_count_)
39 , tracked_count_(other.tracked_count_)
40 , history_size_(other.history_size_)
41 , matrix_size_(other.matrix_size_)
42 , cooperation_matrix_(std::move(other.cooperation_matrix_))
43 , total_cooperations_(other.total_cooperations_.load(std::memory_order_relaxed))
44{
45 other.worker_count_ = 0;
46 other.tracked_count_ = 0;
47 other.history_size_ = 0;
48 other.matrix_size_ = 0;
49 other.total_cooperations_.store(0, std::memory_order_relaxed);
50}
51
54{
55 if (this != &other)
56 {
57 worker_count_ = other.worker_count_;
58 tracked_count_ = other.tracked_count_;
59 history_size_ = other.history_size_;
60 matrix_size_ = other.matrix_size_;
61 cooperation_matrix_ = std::move(other.cooperation_matrix_);
62 total_cooperations_.store(
63 other.total_cooperations_.load(std::memory_order_relaxed),
64 std::memory_order_relaxed);
65
66 other.worker_count_ = 0;
67 other.tracked_count_ = 0;
68 other.history_size_ = 0;
69 other.matrix_size_ = 0;
70 other.total_cooperations_.store(0, std::memory_order_relaxed);
71 }
72 return *this;
73}
74
76 std::size_t victim_id)
77{
78 if (thief_id >= tracked_count_ || victim_id >= tracked_count_ ||
79 thief_id == victim_id || !cooperation_matrix_)
80 {
81 return;
82 }
83
84 auto idx = get_matrix_index(thief_id, victim_id);
85 if (idx < matrix_size_)
86 {
87 cooperation_matrix_[idx].fetch_add(1, std::memory_order_relaxed);
88 total_cooperations_.fetch_add(1, std::memory_order_relaxed);
89 }
90}
91
92auto work_affinity_tracker::get_affinity(std::size_t worker_a,
93 std::size_t worker_b) const -> double
94{
95 if (worker_a >= tracked_count_ || worker_b >= tracked_count_ ||
96 worker_a == worker_b || !cooperation_matrix_)
97 {
98 return 0.0;
99 }
100
101 auto idx = get_matrix_index(worker_a, worker_b);
102 if (idx >= matrix_size_)
103 {
104 return 0.0;
105 }
106
107 auto cooperation_count = cooperation_matrix_[idx].load(std::memory_order_relaxed);
108 if (cooperation_count == 0)
109 {
110 return 0.0;
111 }
112
113 // Normalize by history size to make scores comparable
114 return static_cast<double>(cooperation_count) / static_cast<double>(history_size_);
115}
116
118 std::size_t max_count) const
119 -> std::vector<std::size_t>
120{
121 if (worker_id >= tracked_count_ || max_count == 0)
122 {
123 return {};
124 }
125
126 // Build list of (affinity, worker_id) pairs for tracked workers only
127 std::vector<std::pair<double, std::size_t>> affinities;
128 affinities.reserve(tracked_count_ - 1);
129
130 for (std::size_t i = 0; i < tracked_count_; ++i)
131 {
132 if (i != worker_id)
133 {
134 double affinity = get_affinity(worker_id, i);
135 affinities.emplace_back(affinity, i);
136 }
137 }
138
139 // Sort by descending affinity
140 std::sort(affinities.begin(), affinities.end(),
141 [](const auto& lhs, const auto& rhs) {
142 return lhs.first > rhs.first;
143 });
144
145 // Extract worker IDs up to max_count
146 std::vector<std::size_t> result;
147 result.reserve(std::min(max_count, affinities.size()));
148
149 for (std::size_t i = 0; i < std::min(max_count, affinities.size()); ++i)
150 {
151 result.push_back(affinities[i].second);
152 }
153
154 return result;
155}
156
158{
160 {
161 for (std::size_t i = 0; i < matrix_size_; ++i)
162 {
163 cooperation_matrix_[i].store(0, std::memory_order_relaxed);
164 }
165 }
166 total_cooperations_.store(0, std::memory_order_relaxed);
167}
168
170{
171 return worker_count_;
172}
173
175{
176 return history_size_;
177}
178
180{
181 return total_cooperations_.load(std::memory_order_relaxed);
182}
183
185 std::size_t worker_b) const
186 -> std::size_t
187{
188 auto [i, j] = normalize_pair(worker_a, worker_b);
189
190 // Upper triangular matrix index formula:
191 // For pair (i, j) where i < j:
192 // index = i * tracked_count - i*(i+1)/2 + j - i - 1
193 return (i * tracked_count_) - ((i * (i + 1)) / 2) + j - i - 1;
194}
195
196auto work_affinity_tracker::normalize_pair(std::size_t a, std::size_t b)
197 -> std::pair<std::size_t, std::size_t>
198{
199 if (a < b)
200 {
201 return {a, b};
202 }
203 return {b, a};
204}
205
206} // namespace kcenon::thread
A template class representing either a value or an error.
Tracks cooperation patterns between workers for locality-aware stealing.
auto get_affinity(std::size_t worker_a, std::size_t worker_b) const -> double
Get the affinity score between two workers.
work_affinity_tracker()=default
Default constructor - creates an empty tracker.
auto worker_count() const -> std::size_t
Get the number of workers being tracked.
auto history_size() const -> std::size_t
Get the configured history size.
void record_cooperation(std::size_t thief_id, std::size_t victim_id)
Record a cooperation event between two workers.
std::atomic< std::uint64_t > total_cooperations_
std::unique_ptr< std::atomic< std::uint64_t >[]> cooperation_matrix_
auto get_matrix_index(std::size_t worker_a, std::size_t worker_b) const -> std::size_t
Get the matrix index for a worker pair.
static auto normalize_pair(std::size_t a, std::size_t b) -> std::pair< std::size_t, std::size_t >
Normalize worker IDs to ensure a < b.
auto operator=(work_affinity_tracker &&other) noexcept -> work_affinity_tracker &
Move assignment operator.
auto total_cooperations() const -> std::uint64_t
Get total cooperation events recorded.
auto get_preferred_victims(std::size_t worker_id, std::size_t max_count) const -> std::vector< std::size_t >
Get preferred victims for a worker, sorted by affinity.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Tracks cooperation patterns between workers for locality-aware stealing.