Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
adaptive_job_queue.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7namespace kcenon::thread {
8
9// ============================================
10// Constructor / Destructor
11// ============================================
12
14 : policy_(p)
15 , current_mode_(mode::mutex)
16 , mutex_queue_(std::make_shared<job_queue>())
17 , lockfree_queue_(std::make_unique<detail::lockfree_job_queue>())
18 , mode_start_time_(std::chrono::steady_clock::now()) {
19
20 // Set initial mode based on policy
21 switch (policy_) {
23 current_mode_.store(mode::mutex, std::memory_order_release);
24 break;
26 current_mode_.store(mode::lock_free, std::memory_order_release);
27 break;
29 case policy::manual:
30 // Default to mutex mode for balanced and manual
31 current_mode_.store(mode::mutex, std::memory_order_release);
32 break;
33 }
34}
35
37 // Stop and clear both queues
38 stop();
39
40 // Update final mode time
42}
43
44// ============================================
45// scheduler_interface implementation
46// ============================================
47
48auto adaptive_job_queue::schedule(std::unique_ptr<job>&& work) -> common::VoidResult {
49 return enqueue(std::move(work));
50}
51
52auto adaptive_job_queue::get_next_job() -> common::Result<std::unique_ptr<job>> {
53 return dequeue();
54}
55
56// ============================================
57// Standard queue operations
58// ============================================
59
60auto adaptive_job_queue::enqueue(std::unique_ptr<job>&& j) -> common::VoidResult {
61 if (stopped_.load(std::memory_order_acquire)) {
62 return common::error_info{static_cast<int>(error_code::queue_stopped), "Queue is stopped", "thread_system"};
63 }
64
65 if (!j) {
66 return common::error_info{static_cast<int>(error_code::invalid_argument), "Cannot enqueue null job", "thread_system"};
67 }
68
69 mode current = current_mode_.load(std::memory_order_acquire);
70 common::VoidResult result = (current == mode::mutex)
71 ? mutex_queue_->enqueue(std::move(j))
72 : lockfree_queue_->enqueue(std::move(j));
73
74 if (result.is_ok()) {
75 std::lock_guard<std::mutex> lock(stats_mutex_);
76 ++stats_.enqueue_count;
77 }
78
79 return result;
80}
81
82auto adaptive_job_queue::dequeue() -> common::Result<std::unique_ptr<job>> {
83 if (stopped_.load(std::memory_order_acquire)) {
84 return common::error_info{static_cast<int>(error_code::queue_stopped), "Queue is stopped", "thread_system"};
85 }
86
87 common::Result<std::unique_ptr<job>> result = common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
88 mode current = current_mode_.load(std::memory_order_acquire);
89
90 // Try current mode first
91 if (current == mode::mutex) {
92 result = mutex_queue_->try_dequeue();
93 // If current mode is empty, also check other queue for race condition handling
94 if (result.is_err()) {
95 result = lockfree_queue_->dequeue();
96 }
97 } else {
98 result = lockfree_queue_->dequeue();
99 // If current mode is empty, also check other queue for race condition handling
100 if (result.is_err()) {
101 result = mutex_queue_->try_dequeue();
102 }
103 }
104
105 if (result.is_ok()) {
106 std::lock_guard<std::mutex> lock(stats_mutex_);
107 ++stats_.dequeue_count;
108 }
109
110 return result;
111}
112
113auto adaptive_job_queue::try_dequeue() -> common::Result<std::unique_ptr<job>> {
114 if (stopped_.load(std::memory_order_acquire)) {
115 return common::error_info{static_cast<int>(error_code::queue_stopped), "Queue is stopped", "thread_system"};
116 }
117
118 common::Result<std::unique_ptr<job>> result = common::error_info{static_cast<int>(error_code::queue_empty), "Queue is empty", "thread_system"};
119 mode current = current_mode_.load(std::memory_order_acquire);
120
121 // Try current mode first
122 if (current == mode::mutex) {
123 result = mutex_queue_->try_dequeue();
124 // If current mode is empty, also check other queue for race condition handling
125 if (result.is_err()) {
126 result = lockfree_queue_->try_dequeue();
127 }
128 } else {
129 result = lockfree_queue_->try_dequeue();
130 // If current mode is empty, also check other queue for race condition handling
131 if (result.is_err()) {
132 result = mutex_queue_->try_dequeue();
133 }
134 }
135
136 if (result.is_ok()) {
137 std::lock_guard<std::mutex> lock(stats_mutex_);
138 ++stats_.dequeue_count;
139 }
140
141 return result;
142}
143
144auto adaptive_job_queue::empty() const -> bool {
145 // Check both queues to handle race conditions during mode transitions
146 // During migration, items may temporarily exist in either queue
147 return mutex_queue_->empty() && lockfree_queue_->empty();
148}
149
150auto adaptive_job_queue::size() const -> std::size_t {
151 mode current = current_mode_.load(std::memory_order_acquire);
152
153 if (current == mode::mutex) {
154 return mutex_queue_->size();
155 } else {
156 return lockfree_queue_->size();
157 }
158}
159
161 std::lock_guard<std::mutex> lock(migration_mutex_);
162
163 mutex_queue_->clear();
164 // lockfree_queue doesn't have clear(), drain it
165 while (true) {
166 auto result = lockfree_queue_->dequeue();
167 if (result.is_err()) {
168 break;
169 }
170 // Jobs are destroyed when unique_ptr goes out of scope
171 }
172}
173
175 stopped_.store(true, std::memory_order_release);
176 mutex_queue_->stop();
177}
178
179auto adaptive_job_queue::is_stopped() const -> bool {
180 return stopped_.load(std::memory_order_acquire);
181}
182
183// ============================================
184// queue_capabilities_interface implementation
185// ============================================
186
188 mode current = current_mode_.load(std::memory_order_acquire);
189
190 if (current == mode::mutex) {
191 return queue_capabilities{
192 .exact_size = true,
193 .atomic_empty_check = true,
194 .lock_free = false,
195 .wait_free = false,
196 .supports_batch = true,
197 .supports_blocking_wait = true,
198 .supports_stop = true
199 };
200 } else {
201 return queue_capabilities{
202 .exact_size = false,
203 .atomic_empty_check = false,
204 .lock_free = true,
205 .wait_free = false,
206 .supports_batch = false,
207 .supports_blocking_wait = false,
208 .supports_stop = true
209 };
210 }
211}
212
213// ============================================
214// Adaptive-specific API
215// ============================================
216
218 return current_mode_.load(std::memory_order_acquire);
219}
220
222 return policy_;
223}
224
225auto adaptive_job_queue::switch_mode(mode m) -> common::VoidResult {
226 if (policy_ != policy::manual) {
227 return common::error_info{static_cast<int>(error_code::invalid_argument),
228 "Mode switching is only allowed with manual policy", "thread_system"};
229 }
230
231 if (current_mode_.load(std::memory_order_acquire) == m) {
232 return common::ok(); // Already in target mode
233 }
234
235 migrate_to_mode(m);
236 return common::ok();
237}
238
240 std::lock_guard<std::mutex> lock(stats_mutex_);
241
242 // Create a copy of stats with updated time
244
245 // Add current mode time
246 auto now = std::chrono::steady_clock::now();
247 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
248 now - mode_start_time_).count();
249
250 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
251 result.time_in_mutex_ms += duration;
252 } else {
253 result.time_in_lockfree_ms += duration;
254 }
255
256 return result;
257}
258
262
263// ============================================
264// accuracy_guard implementation
265// ============================================
266
268 : queue_(&queue)
269 , previous_mode_(queue.current_mode())
270 , active_(true) {
271
272 // Increment guard count
273 int prev_count = queue_->accuracy_guard_count_.fetch_add(1, std::memory_order_acq_rel);
274
275 // If this is the first guard and not already in mutex mode, switch
276 if (prev_count == 0 && previous_mode_ != mode::mutex) {
278 }
279}
280
282 if (!active_ || !queue_) {
283 return;
284 }
285
286 // Decrement guard count
287 int prev_count = queue_->accuracy_guard_count_.fetch_sub(1, std::memory_order_acq_rel);
288
289 // If this was the last guard and we should restore previous mode
290 if (prev_count == 1) {
291 // Restore based on policy
292 switch (queue_->policy_) {
294 // Stay in mutex mode
295 break;
297 queue_->migrate_to_mode(mode::lock_free);
298 break;
299 case policy::balanced:
300 // Let balanced policy decide
301 {
302 mode target = queue_->determine_mode_for_balanced();
303 if (target != mode::mutex) {
304 queue_->migrate_to_mode(target);
305 }
306 }
307 break;
308 case policy::manual:
309 // Restore previous mode
310 if (previous_mode_ != mode::mutex) {
311 queue_->migrate_to_mode(previous_mode_);
312 }
313 break;
314 }
315 }
316}
317
319 : queue_(other.queue_)
320 , previous_mode_(other.previous_mode_)
321 , active_(other.active_) {
322 other.active_ = false;
323 other.queue_ = nullptr;
324}
325
326// ============================================
327// Private methods
328// ============================================
329
331 std::lock_guard<std::mutex> lock(migration_mutex_);
332
333 mode current = current_mode_.load(std::memory_order_acquire);
334 if (current == target) {
335 return; // Already in target mode
336 }
337
338 // Update time tracking before mode change
340
341 // IMPORTANT: Update mode FIRST, then drain the old queue.
342 // This ensures new enqueues go to the target queue while we drain,
343 // preventing an infinite drain loop where the producer keeps adding
344 // to the source queue faster than we can drain it.
345 current_mode_.store(target, std::memory_order_release);
346
347 if (target == mode::mutex) {
348 // Migrate from lock-free to mutex
349 // Drain remaining jobs from lockfree queue and move to mutex queue
350 while (true) {
351 auto result = lockfree_queue_->dequeue();
352 if (result.is_err()) {
353 break;
354 }
355 (void)mutex_queue_->enqueue(std::move(result.value()));
356 }
357 } else {
358 // Migrate from mutex to lock-free
359 // Drain mutex queue and move jobs to lockfree queue
360 auto jobs = mutex_queue_->dequeue_batch();
361 for (auto& job : jobs) {
362 (void)lockfree_queue_->enqueue(std::move(job));
363 }
364 }
365
366 // Update statistics
367 {
368 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
370 }
371
372 // Reset mode start time
373 mode_start_time_ = std::chrono::steady_clock::now();
374}
375
377 std::lock_guard<std::mutex> lock(stats_mutex_);
378
379 auto now = std::chrono::steady_clock::now();
380 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
381 now - mode_start_time_).count();
382
383 if (current_mode_.load(std::memory_order_acquire) == mode::mutex) {
384 stats_.time_in_mutex_ms += duration;
385 } else {
386 stats_.time_in_lockfree_ms += duration;
387 }
388
389 mode_start_time_ = now;
390}
391
393 // For balanced policy, use heuristics based on recent activity
394 // Current simple heuristic: prefer lock-free for high throughput
395
396 std::lock_guard<std::mutex> lock(stats_mutex_);
397
398 // If we've processed a lot of jobs, prefer lock-free for performance
399 uint64_t total_ops = stats_.enqueue_count + stats_.dequeue_count;
400 if (total_ops > 10000) {
401 return mode::lock_free;
402 }
403
404 // For lower throughput, prefer mutex for accuracy
405 return mode::mutex;
406}
407
408} // namespace kcenon::thread
Adaptive queue that auto-switches between mutex and lock-free modes.
RAII guard that temporarily switches to accuracy mode.
~accuracy_guard()
Destructor - restores previous mode.
accuracy_guard(adaptive_job_queue &queue)
Construct guard and switch queue to mutex mode.
Adaptive queue that switches between mutex and lock-free modes.
auto stop() -> void
Signals the queue to stop.
auto require_accuracy() -> accuracy_guard
Request temporary accuracy mode.
auto current_policy() const -> policy
Get current policy.
auto empty() const -> bool
Checks if the queue is empty.
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
Get next job (delegates to current queue)
auto switch_mode(mode m) -> common::VoidResult
Manually switch mode (only if policy is manual)
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeues a job from the current active queue.
std::shared_ptr< job_queue > mutex_queue_
auto enqueue(std::unique_ptr< job > &&j) -> common::VoidResult
Enqueues a job into the current active queue.
adaptive_job_queue(policy p=policy::balanced)
Create adaptive queue with specified policy.
std::chrono::steady_clock::time_point mode_start_time_
~adaptive_job_queue()
Destructor - cleans up both queue implementations.
auto schedule(std::unique_ptr< job > &&work) -> common::VoidResult override
Schedule a job (delegates to current queue)
std::unique_ptr< detail::lockfree_job_queue > lockfree_queue_
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Tries to dequeue a job without blocking.
auto clear() -> void
Clears all jobs from the queue.
auto get_stats() const -> stats
Get statistics about queue usage.
@ mutex
Using job_queue (accuracy mode)
@ lock_free
Using lockfree_job_queue (performance mode)
auto get_capabilities() const -> queue_capabilities override
Returns capabilities based on current mode.
auto size() const -> std::size_t
Returns the current number of jobs in the queue.
auto is_stopped() const -> bool
Checks if the queue is stopped.
auto current_mode() const -> mode
Get current operating mode.
@ performance_first
Always use lock-free mode.
@ balanced
Auto-switch based on usage.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
T & value() &
Gets the value.
bool is_ok() const noexcept
Checks if the result is successful.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
uint64_t time_in_lockfree_ms
Cumulative time in lock-free mode (ms)
uint64_t time_in_mutex_ms
Cumulative time in mutex mode (ms)
uint64_t mode_switches
Total number of mode switches.
uint64_t dequeue_count
Total dequeue operations.
uint64_t enqueue_count
Total enqueue operations.
Runtime-queryable queue capabilities descriptor.