Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::work_affinity_tracker Class Reference

Tracks cooperation patterns between workers for locality-aware stealing. More...

#include <work_affinity_tracker.h>

Collaboration diagram for kcenon::thread::work_affinity_tracker:
Collaboration graph

Public Member Functions

 work_affinity_tracker (std::size_t worker_count, std::size_t history_size=16)
 Construct a work affinity tracker.
 
 work_affinity_tracker ()=default
 Default constructor - creates an empty tracker.
 
 work_affinity_tracker (work_affinity_tracker &&other) noexcept
 Move constructor.
 
auto operator= (work_affinity_tracker &&other) noexcept -> work_affinity_tracker &
 Move assignment operator.
 
 work_affinity_tracker (const work_affinity_tracker &)=delete
 
auto operator= (const work_affinity_tracker &) -> work_affinity_tracker &=delete
 
 ~work_affinity_tracker ()=default
 Destructor.
 
void record_cooperation (std::size_t thief_id, std::size_t victim_id)
 Record a cooperation event between two workers.
 
auto get_affinity (std::size_t worker_a, std::size_t worker_b) const -> double
 Get the affinity score between two workers.
 
auto get_preferred_victims (std::size_t worker_id, std::size_t max_count) const -> std::vector< std::size_t >
 Get preferred victims for a worker, sorted by affinity.
 
void reset ()
 Reset all affinity data.
 
auto worker_count () const -> std::size_t
 Get the number of workers being tracked.
 
auto history_size () const -> std::size_t
 Get the configured history size.
 
auto total_cooperations () const -> std::uint64_t
 Get total cooperation events recorded.
 

Static Public Attributes

static constexpr std::size_t MAX_TRACKED_WORKERS = 32
 Maximum number of workers tracked for affinity.
 

Private Member Functions

auto get_matrix_index (std::size_t worker_a, std::size_t worker_b) const -> std::size_t
 Get the matrix index for a worker pair.
 

Static Private Member Functions

static auto normalize_pair (std::size_t a, std::size_t b) -> std::pair< std::size_t, std::size_t >
 Normalize worker IDs to ensure a < b.
 

Private Attributes

std::size_t worker_count_ {0}
 
std::size_t tracked_count_ {0}
 
std::size_t history_size_ {16}
 
std::size_t matrix_size_ {0}
 
std::unique_ptr< std::atomic< std::uint64_t >[]> cooperation_matrix_
 
std::atomic< std::uint64_t > total_cooperations_ {0}
 

Detailed Description

Tracks cooperation patterns between workers for locality-aware stealing.

This class maintains a cooperation matrix that records successful work-stealing interactions between worker threads. Workers that frequently exchange work develop higher affinity scores, making them preferred victims for future steals.

Design Rationale

When workers frequently cooperate (steal from each other successfully), they likely share related work that benefits from cache locality. By preferring to steal from high-affinity workers, we improve cache utilization and reduce memory access latency.

Thread Safety

All methods are thread-safe. The cooperation matrix uses atomic operations for lock-free updates and reads.

Memory Model

Usage Example

// Create tracker for 8 workers with history size of 16
work_affinity_tracker tracker(8, 16);
// Record successful steal: worker 2 stole from worker 5
tracker.record_cooperation(2, 5);
// Get preferred victims for worker 2
auto victims = tracker.get_preferred_victims(2, 3);
// Returns up to 3 workers sorted by affinity with worker 2
// Check specific affinity
double affinity = tracker.get_affinity(2, 5);
// Higher values indicate stronger cooperation history
Tracks cooperation patterns between workers for locality-aware stealing.

Decay Mechanism

The tracker implements a simple decay mechanism: new interactions are weighted more heavily than older ones. This ensures the affinity scores reflect recent behavior rather than historical patterns that may no longer be relevant.

Definition at line 69 of file work_affinity_tracker.h.

Constructor & Destructor Documentation

◆ work_affinity_tracker() [1/4]

kcenon::thread::work_affinity_tracker::work_affinity_tracker ( std::size_t worker_count,
std::size_t history_size = 16 )
explicit

Construct a work affinity tracker.

Parameters
worker_countNumber of workers to track
history_sizeSize of history to consider for affinity calculations
Note
If worker_count exceeds MAX_TRACKED_WORKERS, only the first MAX_TRACKED_WORKERS workers are tracked in the cooperation matrix.
The history_size affects memory usage: O(min(worker_count, MAX_TRACKED_WORKERS)^2)

Definition at line 12 of file work_affinity_tracker.cpp.

17 , matrix_size_(0)
19{
20 if (tracked_count_ > 1)
21 {
22 // Size of upper triangular matrix without diagonal
23 // For n tracked workers: n*(n-1)/2 pairs
24 // Capped at MAX_TRACKED_WORKERS to prevent O(n^2) growth
27 std::make_unique<std::atomic<std::uint64_t>[]>(matrix_size_);
28
29 // Initialize all counters to zero
30 for (std::size_t i = 0; i < matrix_size_; ++i)
31 {
32 cooperation_matrix_[i].store(0, std::memory_order_relaxed);
33 }
34 }
35}
auto worker_count() const -> std::size_t
Get the number of workers being tracked.
static constexpr std::size_t MAX_TRACKED_WORKERS
Maximum number of workers tracked for affinity.
auto history_size() const -> std::size_t
Get the configured history size.
std::atomic< std::uint64_t > total_cooperations_
std::unique_ptr< std::atomic< std::uint64_t >[]> cooperation_matrix_

References cooperation_matrix_, matrix_size_, and tracked_count_.

◆ work_affinity_tracker() [2/4]

kcenon::thread::work_affinity_tracker::work_affinity_tracker ( )
default

Default constructor - creates an empty tracker.

◆ work_affinity_tracker() [3/4]

kcenon::thread::work_affinity_tracker::work_affinity_tracker ( work_affinity_tracker && other)
noexcept

Move constructor.

Definition at line 37 of file work_affinity_tracker.cpp.

38 : worker_count_(other.worker_count_)
39 , tracked_count_(other.tracked_count_)
40 , history_size_(other.history_size_)
41 , matrix_size_(other.matrix_size_)
42 , cooperation_matrix_(std::move(other.cooperation_matrix_))
43 , total_cooperations_(other.total_cooperations_.load(std::memory_order_relaxed))
44{
45 other.worker_count_ = 0;
46 other.tracked_count_ = 0;
47 other.history_size_ = 0;
48 other.matrix_size_ = 0;
49 other.total_cooperations_.store(0, std::memory_order_relaxed);
50}

◆ work_affinity_tracker() [4/4]

kcenon::thread::work_affinity_tracker::work_affinity_tracker ( const work_affinity_tracker & )
delete

◆ ~work_affinity_tracker()

kcenon::thread::work_affinity_tracker::~work_affinity_tracker ( )
default

Destructor.

Member Function Documentation

◆ get_affinity()

auto kcenon::thread::work_affinity_tracker::get_affinity ( std::size_t worker_a,
std::size_t worker_b ) const -> double
nodiscard

Get the affinity score between two workers.

Parameters
worker_aFirst worker ID
worker_bSecond worker ID
Returns
Affinity score (0.0 = no cooperation, higher = more cooperation)

The affinity is symmetric: get_affinity(a, b) == get_affinity(b, a)

The returned score is normalized based on the history size, making it comparable across different tracker configurations.

Definition at line 92 of file work_affinity_tracker.cpp.

94{
95 if (worker_a >= tracked_count_ || worker_b >= tracked_count_ ||
96 worker_a == worker_b || !cooperation_matrix_)
97 {
98 return 0.0;
99 }
100
101 auto idx = get_matrix_index(worker_a, worker_b);
102 if (idx >= matrix_size_)
103 {
104 return 0.0;
105 }
106
107 auto cooperation_count = cooperation_matrix_[idx].load(std::memory_order_relaxed);
108 if (cooperation_count == 0)
109 {
110 return 0.0;
111 }
112
113 // Normalize by history size to make scores comparable
114 return static_cast<double>(cooperation_count) / static_cast<double>(history_size_);
115}
auto get_matrix_index(std::size_t worker_a, std::size_t worker_b) const -> std::size_t
Get the matrix index for a worker pair.

◆ get_matrix_index()

auto kcenon::thread::work_affinity_tracker::get_matrix_index ( std::size_t worker_a,
std::size_t worker_b ) const -> std::size_t
nodiscardprivate

Get the matrix index for a worker pair.

Note
Uses upper triangular matrix indexing (worker_a < worker_b)

Definition at line 184 of file work_affinity_tracker.cpp.

187{
188 auto [i, j] = normalize_pair(worker_a, worker_b);
189
190 // Upper triangular matrix index formula:
191 // For pair (i, j) where i < j:
192 // index = i * tracked_count - i*(i+1)/2 + j - i - 1
193 return (i * tracked_count_) - ((i * (i + 1)) / 2) + j - i - 1;
194}
static auto normalize_pair(std::size_t a, std::size_t b) -> std::pair< std::size_t, std::size_t >
Normalize worker IDs to ensure a < b.

Referenced by record_cooperation().

Here is the caller graph for this function:

◆ get_preferred_victims()

auto kcenon::thread::work_affinity_tracker::get_preferred_victims ( std::size_t worker_id,
std::size_t max_count ) const -> std::vector<std::size_t>
nodiscard

Get preferred victims for a worker, sorted by affinity.

Parameters
worker_idThe worker seeking victims
max_countMaximum number of victims to return
Returns
Vector of worker IDs sorted by descending affinity

Returns workers with the highest affinity scores, excluding the requesting worker itself. Workers with zero affinity may be included if there aren't enough high-affinity workers.

Definition at line 117 of file work_affinity_tracker.cpp.

120{
121 if (worker_id >= tracked_count_ || max_count == 0)
122 {
123 return {};
124 }
125
126 // Build list of (affinity, worker_id) pairs for tracked workers only
127 std::vector<std::pair<double, std::size_t>> affinities;
128 affinities.reserve(tracked_count_ - 1);
129
130 for (std::size_t i = 0; i < tracked_count_; ++i)
131 {
132 if (i != worker_id)
133 {
134 double affinity = get_affinity(worker_id, i);
135 affinities.emplace_back(affinity, i);
136 }
137 }
138
139 // Sort by descending affinity
140 std::sort(affinities.begin(), affinities.end(),
141 [](const auto& lhs, const auto& rhs) {
142 return lhs.first > rhs.first;
143 });
144
145 // Extract worker IDs up to max_count
146 std::vector<std::size_t> result;
147 result.reserve(std::min(max_count, affinities.size()));
148
149 for (std::size_t i = 0; i < std::min(max_count, affinities.size()); ++i)
150 {
151 result.push_back(affinities[i].second);
152 }
153
154 return result;
155}
auto get_affinity(std::size_t worker_a, std::size_t worker_b) const -> double
Get the affinity score between two workers.

◆ history_size()

auto kcenon::thread::work_affinity_tracker::history_size ( ) const -> std::size_t
nodiscard

Get the configured history size.

Returns
History size

Definition at line 174 of file work_affinity_tracker.cpp.

175{
176 return history_size_;
177}

References history_size_.

◆ normalize_pair()

auto kcenon::thread::work_affinity_tracker::normalize_pair ( std::size_t a,
std::size_t b ) -> std::pair<std::size_t, std::size_t>
staticprivate

Normalize worker IDs to ensure a < b.

Definition at line 196 of file work_affinity_tracker.cpp.

198{
199 if (a < b)
200 {
201 return {a, b};
202 }
203 return {b, a};
204}

◆ operator=() [1/2]

auto kcenon::thread::work_affinity_tracker::operator= ( const work_affinity_tracker & ) -> work_affinity_tracker &=delete
delete

◆ operator=() [2/2]

auto kcenon::thread::work_affinity_tracker::operator= ( work_affinity_tracker && other) -> work_affinity_tracker&
noexcept

Move assignment operator.

Definition at line 52 of file work_affinity_tracker.cpp.

54{
55 if (this != &other)
56 {
57 worker_count_ = other.worker_count_;
58 tracked_count_ = other.tracked_count_;
59 history_size_ = other.history_size_;
60 matrix_size_ = other.matrix_size_;
61 cooperation_matrix_ = std::move(other.cooperation_matrix_);
63 other.total_cooperations_.load(std::memory_order_relaxed),
64 std::memory_order_relaxed);
65
66 other.worker_count_ = 0;
67 other.tracked_count_ = 0;
68 other.history_size_ = 0;
69 other.matrix_size_ = 0;
70 other.total_cooperations_.store(0, std::memory_order_relaxed);
71 }
72 return *this;
73}

◆ record_cooperation()

void kcenon::thread::work_affinity_tracker::record_cooperation ( std::size_t thief_id,
std::size_t victim_id )

Record a cooperation event between two workers.

Parameters
thief_idWorker that stole work (the one receiving work)
victim_idWorker that provided work (the one being stolen from)

This method is typically called after a successful steal operation. It updates the cooperation matrix to reflect the interaction.

Note
Thread-safe, can be called concurrently from multiple workers

Definition at line 75 of file work_affinity_tracker.cpp.

77{
78 if (thief_id >= tracked_count_ || victim_id >= tracked_count_ ||
79 thief_id == victim_id || !cooperation_matrix_)
80 {
81 return;
82 }
83
84 auto idx = get_matrix_index(thief_id, victim_id);
85 if (idx < matrix_size_)
86 {
87 cooperation_matrix_[idx].fetch_add(1, std::memory_order_relaxed);
88 total_cooperations_.fetch_add(1, std::memory_order_relaxed);
89 }
90}

References cooperation_matrix_, get_matrix_index(), matrix_size_, total_cooperations_, and tracked_count_.

Here is the call graph for this function:

◆ reset()

void kcenon::thread::work_affinity_tracker::reset ( )

Reset all affinity data.

Clears all cooperation history, resetting all affinities to zero. Useful when worker roles change significantly.

Definition at line 157 of file work_affinity_tracker.cpp.

158{
160 {
161 for (std::size_t i = 0; i < matrix_size_; ++i)
162 {
163 cooperation_matrix_[i].store(0, std::memory_order_relaxed);
164 }
165 }
166 total_cooperations_.store(0, std::memory_order_relaxed);
167}

References cooperation_matrix_, matrix_size_, and total_cooperations_.

◆ total_cooperations()

auto kcenon::thread::work_affinity_tracker::total_cooperations ( ) const -> std::uint64_t
nodiscard

Get total cooperation events recorded.

Returns
Total cooperation count

Definition at line 179 of file work_affinity_tracker.cpp.

180{
181 return total_cooperations_.load(std::memory_order_relaxed);
182}

References total_cooperations_.

◆ worker_count()

auto kcenon::thread::work_affinity_tracker::worker_count ( ) const -> std::size_t
nodiscard

Get the number of workers being tracked.

Returns
Worker count

Definition at line 169 of file work_affinity_tracker.cpp.

170{
171 return worker_count_;
172}

References worker_count_.

Member Data Documentation

◆ cooperation_matrix_

std::unique_ptr<std::atomic<std::uint64_t>[]> kcenon::thread::work_affinity_tracker::cooperation_matrix_
private

Definition at line 205 of file work_affinity_tracker.h.

Referenced by record_cooperation(), reset(), and work_affinity_tracker().

◆ history_size_

std::size_t kcenon::thread::work_affinity_tracker::history_size_ {16}
private

Definition at line 200 of file work_affinity_tracker.h.

200{16};

Referenced by history_size().

◆ matrix_size_

std::size_t kcenon::thread::work_affinity_tracker::matrix_size_ {0}
private

Definition at line 201 of file work_affinity_tracker.h.

201{0};

Referenced by record_cooperation(), reset(), and work_affinity_tracker().

◆ MAX_TRACKED_WORKERS

std::size_t kcenon::thread::work_affinity_tracker::MAX_TRACKED_WORKERS = 32
staticconstexpr

Maximum number of workers tracked for affinity.

When the pool has more workers than this limit, only the first MAX_TRACKED_WORKERS are tracked in the cooperation matrix to prevent O(n^2) memory growth. Workers beyond this cap can still participate in work stealing but won't have affinity data.

Definition at line 80 of file work_affinity_tracker.h.

◆ total_cooperations_

std::atomic<std::uint64_t> kcenon::thread::work_affinity_tracker::total_cooperations_ {0}
private

Definition at line 208 of file work_affinity_tracker.h.

208{0};

Referenced by record_cooperation(), reset(), and total_cooperations().

◆ tracked_count_

std::size_t kcenon::thread::work_affinity_tracker::tracked_count_ {0}
private

Definition at line 199 of file work_affinity_tracker.h.

199{0};

Referenced by record_cooperation(), and work_affinity_tracker().

◆ worker_count_

std::size_t kcenon::thread::work_affinity_tracker::worker_count_ {0}
private

Definition at line 198 of file work_affinity_tracker.h.

198{0};

Referenced by worker_count().


The documentation for this class was generated from the following files: