Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
backpressure_job_queue.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
8#include <algorithm>
9#include <chrono>
10
20namespace kcenon::thread
21{
35 std::size_t max_size,
37 : job_queue(max_size)
38 , config_(std::move(config))
39 , current_pressure_(pressure_level::none)
40 , current_pressure_ratio_(0.0)
41 {
42 // Initialize rate limiter if enabled
44 {
45 rate_limiter_ = std::make_unique<token_bucket>(
46 config_.rate_limit_tokens_per_second,
47 config_.rate_limit_burst_size);
48 }
49 }
50
55
68 auto backpressure_job_queue::enqueue(std::unique_ptr<job>&& value) -> common::VoidResult
69 {
70 // Early validation
71 if (is_stopped())
72 {
74 }
75
76 if (value == nullptr)
77 {
78 return make_error_result(error_code::invalid_argument, "cannot enqueue null job");
79 }
80
81 // Apply backpressure logic
82 return apply_backpressure(std::move(value));
83 }
84
97 auto backpressure_job_queue::enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs)
98 -> common::VoidResult
99 {
100 if (is_stopped())
101 {
103 }
104
105 if (jobs.empty())
106 {
107 return make_error_result(error_code::invalid_argument, "cannot enqueue empty batch");
108 }
109
110 // Validate all jobs
111 for (const auto& job_ptr : jobs)
112 {
113 if (job_ptr == nullptr)
114 {
116 "cannot enqueue null job in batch");
117 }
118 }
119
120 // Apply rate limiting for batch
121 if (config_.enable_rate_limiting && rate_limiter_)
122 {
123 if (!rate_limiter_->try_acquire(jobs.size()))
124 {
125 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
126
127 // Try waiting for tokens
128 auto start = std::chrono::steady_clock::now();
129 if (!rate_limiter_->try_acquire_for(jobs.size(), config_.block_timeout))
130 {
131 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
133 "rate limit timeout for batch");
134 }
135 auto elapsed = std::chrono::steady_clock::now() - start;
136 stats_.total_block_time_ns.fetch_add(
137 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
138 std::memory_order_relaxed);
139 }
140 }
141
142 // Check capacity
143 auto max_sz = get_max_size();
144 if (!max_sz.has_value())
145 {
146 // Unbounded queue, just enqueue
147 auto result = job_queue::enqueue_batch(std::move(jobs));
148 if (result.is_ok())
149 {
150 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
151 }
152 update_pressure_state();
153 return result;
154 }
155
156 std::size_t current_size = size();
157 std::size_t available_space = max_sz.value() > current_size
158 ? max_sz.value() - current_size
159 : 0;
160
161 if (jobs.size() <= available_space)
162 {
163 // Batch fits
164 auto result = job_queue::enqueue_batch(std::move(jobs));
165 if (result.is_ok())
166 {
167 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
168 }
169 update_pressure_state();
170 return result;
171 }
172
173 // Batch doesn't fit - handle based on policy
174 switch (config_.policy)
175 {
177 {
178 // Drop enough jobs to make room
179 std::size_t to_drop = jobs.size() - available_space;
180 {
181 std::scoped_lock<std::mutex> lock(mutex_);
182 for (std::size_t i = 0; i < to_drop && !empty(); ++i)
183 {
184 auto batch = dequeue_batch_limited(1);
185 stats_.jobs_dropped.fetch_add(batch.size(), std::memory_order_relaxed);
186 }
187 }
188 auto result = job_queue::enqueue_batch(std::move(jobs));
189 if (result.is_ok())
190 {
191 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
192 }
193 update_pressure_state();
194 return result;
195 }
196
198 {
199 // Wait for space (simplified: just check once with timeout)
200 auto start = std::chrono::steady_clock::now();
201 std::unique_lock<std::mutex> lock(mutex_);
202 bool got_space = space_available_.wait_for(
203 lock,
204 config_.block_timeout,
205 [this, needed = jobs.size(), max = max_sz.value()]() {
206 return size() + needed <= max || is_stopped();
207 });
208
209 if (!got_space || is_stopped())
210 {
211 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
212 auto elapsed = std::chrono::steady_clock::now() - start;
213 stats_.total_block_time_ns.fetch_add(
214 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
215 std::memory_order_relaxed);
217 "timeout waiting for space for batch");
218 }
219
220 lock.unlock();
221 auto result = job_queue::enqueue_batch(std::move(jobs));
222 if (result.is_ok())
223 {
224 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
225 }
226 update_pressure_state();
227 return result;
228 }
229
230 default:
231 // drop_newest, callback, adaptive - reject batch
232 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
233 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
234 update_pressure_state();
236 "queue cannot fit entire batch");
237 }
238 }
239
252 auto backpressure_job_queue::apply_backpressure(std::unique_ptr<job>&& value)
253 -> common::VoidResult
254 {
255 // Apply rate limiting
256 if (!apply_rate_limiting())
257 {
258 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
260 "rate limit exceeded");
261 }
262
263 // Check if queue is full before deciding on policy
264 bool queue_is_full = is_full();
265
266 // If not full, try direct enqueue
267 if (!queue_is_full)
268 {
269 auto result = direct_enqueue(std::move(value));
270 if (result.is_ok())
271 {
272 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
273 update_pressure_state();
274 return result;
275 }
276 // If direct_enqueue failed but queue wasn't reported as full,
277 // it might be a race condition - treat as full
278 queue_is_full = true;
279 }
280
281 // Queue is full, apply policy
282 switch (config_.policy)
283 {
285 return handle_block_policy(std::move(value));
286
288 return handle_drop_oldest_policy(std::move(value));
289
291 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
292 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
293 update_pressure_state();
295 "queue full: drop_newest policy rejected job");
296
298 return handle_callback_policy(std::move(value));
299
301 return handle_adaptive_policy(std::move(value));
302
303 default:
304 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
306 "queue full: unknown policy");
307 }
308 }
309
316 {
317 if (!config_.enable_rate_limiting || !rate_limiter_)
318 {
319 return true; // No rate limiting
320 }
321
322 // Try immediate acquire
323 if (rate_limiter_->try_acquire())
324 {
325 return true;
326 }
327
328 // Rate limited - record and try waiting
329 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
330
331 auto start = std::chrono::steady_clock::now();
332 bool acquired = rate_limiter_->try_acquire_for(1, config_.block_timeout);
333 auto elapsed = std::chrono::steady_clock::now() - start;
334
335 stats_.total_block_time_ns.fetch_add(
336 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
337 std::memory_order_relaxed);
338
339 return acquired;
340 }
341
346 {
347 auto max_sz = get_max_size();
348 if (!max_sz.has_value() || max_sz.value() == 0)
349 {
350 current_pressure_.store(pressure_level::none, std::memory_order_relaxed);
351 current_pressure_ratio_.store(0.0, std::memory_order_relaxed);
352 return;
353 }
354
355 std::size_t current = size();
356 double ratio = static_cast<double>(current) / static_cast<double>(max_sz.value());
357 current_pressure_ratio_.store(ratio, std::memory_order_relaxed);
358
359 pressure_level new_level;
360 if (current >= max_sz.value())
361 {
362 new_level = pressure_level::critical;
363 }
364 else if (ratio >= config_.high_watermark)
365 {
366 new_level = pressure_level::high;
367 }
368 else if (ratio >= config_.low_watermark)
369 {
370 new_level = pressure_level::low;
371 }
372 else
373 {
374 new_level = pressure_level::none;
375 }
376
377 pressure_level old_level = current_pressure_.exchange(
378 new_level, std::memory_order_acq_rel);
379
380 // Trigger callback on level change or when entering high/critical
381 if (config_.pressure_callback)
382 {
383 if (new_level != old_level ||
384 new_level == pressure_level::high ||
385 new_level == pressure_level::critical)
386 {
387 if (new_level >= pressure_level::high)
388 {
389 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
390 }
391 config_.pressure_callback(current, ratio);
392 }
393 }
394
395 // Notify waiting threads if space became available
396 if (new_level < old_level)
397 {
398 space_available_.notify_all();
399 }
400 }
401
405 auto backpressure_job_queue::handle_block_policy(std::unique_ptr<job>&& value)
406 -> common::VoidResult
407 {
408 auto start = std::chrono::steady_clock::now();
409 auto deadline = start + config_.block_timeout;
410
411 while (std::chrono::steady_clock::now() < deadline)
412 {
413 // Try to enqueue
414 auto result = direct_enqueue(std::move(value));
415 if (result.is_ok())
416 {
417 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
418 update_pressure_state();
419 return result;
420 }
421
422 // Wait for space
423 std::unique_lock<std::mutex> lock(mutex_);
424 auto remaining = deadline - std::chrono::steady_clock::now();
425 if (remaining <= std::chrono::milliseconds{0})
426 {
427 break;
428 }
429
430 space_available_.wait_for(lock, remaining, [this]() {
431 return !is_full() || is_stopped();
432 });
433
434 if (is_stopped())
435 {
436 break;
437 }
438
439 // Job was moved, need to return error
440 // This is a design issue - we can't retry after move
441 break;
442 }
443
444 auto elapsed = std::chrono::steady_clock::now() - start;
445 stats_.total_block_time_ns.fetch_add(
446 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
447 std::memory_order_relaxed);
448 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
449
451 "timeout waiting for queue space");
452 }
453
457 auto backpressure_job_queue::handle_drop_oldest_policy(std::unique_ptr<job>&& value)
458 -> common::VoidResult
459 {
460 // Drop oldest job - dequeue_batch_limited handles its own locking
461 auto dropped = dequeue_batch_limited(1);
462 stats_.jobs_dropped.fetch_add(dropped.size(), std::memory_order_relaxed);
463
464 // Now enqueue new job
465 auto result = direct_enqueue(std::move(value));
466 if (result.is_ok())
467 {
468 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
469 }
470 update_pressure_state();
471 return result;
472 }
473
477 auto backpressure_job_queue::handle_callback_policy(std::unique_ptr<job>&& value)
478 -> common::VoidResult
479 {
480 if (!config_.decision_callback)
481 {
482 // No callback, fall back to reject
483 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
485 "queue full and no decision callback");
486 }
487
488 backpressure_decision decision = config_.decision_callback(value);
489
490 switch (decision)
491 {
493 // Force accept (may exceed max_size temporarily)
494 return job_queue::enqueue(std::move(value));
495
497 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
499 "callback rejected job");
500
502 return handle_drop_oldest_policy(std::move(value));
503
505 // For now, treat as reject with retry hint
506 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
508 "callback requested delay, retry later");
509
510 default:
511 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
513 "unknown callback decision");
514 }
515 }
516
525 auto backpressure_job_queue::handle_adaptive_policy(std::unique_ptr<job>&& value)
526 -> common::VoidResult
527 {
528 double ratio = get_pressure_ratio();
529
530 // Below high watermark: accept
531 if (ratio < config_.high_watermark)
532 {
533 auto result = direct_enqueue(std::move(value));
534 if (result.is_ok())
535 {
536 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
537 }
538 update_pressure_state();
539 return result;
540 }
541
542 // Above high watermark but not full: probabilistic acceptance
543 // Acceptance probability decreases as we approach capacity
544 if (ratio < 1.0)
545 {
546 // Simple linear decrease from 1.0 at high_watermark to 0 at 1.0
547 double accept_prob = (1.0 - ratio) / (1.0 - config_.high_watermark);
548
549 // Use simple threshold (not truly random, but deterministic)
550 // This avoids random number generation overhead
551 static thread_local std::size_t counter = 0;
552 bool should_accept = (counter++ % 100) < (accept_prob * 100);
553
554 if (should_accept)
555 {
556 auto result = direct_enqueue(std::move(value));
557 if (result.is_ok())
558 {
559 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
560 }
561 update_pressure_state();
562 return result;
563 }
564 }
565
566 // At or above capacity: brief wait then reject
567 {
568 std::unique_lock<std::mutex> lock(mutex_);
569 auto brief_wait = std::chrono::milliseconds{10};
570 bool got_space = space_available_.wait_for(
571 lock,
572 brief_wait,
573 [this]() { return !is_full() || is_stopped(); });
574
575 if (got_space && !is_stopped())
576 {
577 lock.unlock();
578 auto result = direct_enqueue(std::move(value));
579 if (result.is_ok())
580 {
581 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
582 }
583 update_pressure_state();
584 return result;
585 }
586 }
587
588 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
589 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
590 update_pressure_state();
592 "adaptive policy rejected job due to high pressure");
593 }
594
598 auto backpressure_job_queue::direct_enqueue(std::unique_ptr<job>&& value)
599 -> common::VoidResult
600 {
601 return job_queue::enqueue(std::move(value));
602 }
603
608 {
609 return current_pressure_.load(std::memory_order_acquire);
610 }
611
616 {
617 // Recalculate for accuracy
618 auto max_sz = get_max_size();
619 if (!max_sz.has_value() || max_sz.value() == 0)
620 {
621 return 0.0;
622 }
623 return static_cast<double>(size()) / static_cast<double>(max_sz.value());
624 }
625
630 {
631 std::scoped_lock<std::mutex> lock(config_mutex_);
632
633 bool rate_limiting_changed =
634 config.enable_rate_limiting != config_.enable_rate_limiting ||
635 config.rate_limit_tokens_per_second != config_.rate_limit_tokens_per_second ||
636 config.rate_limit_burst_size != config_.rate_limit_burst_size;
637
638 config_ = std::move(config);
639
640 // Update rate limiter if needed
641 if (rate_limiting_changed)
642 {
643 if (config_.enable_rate_limiting)
644 {
645 rate_limiter_ = std::make_unique<token_bucket>(
646 config_.rate_limit_tokens_per_second,
647 config_.rate_limit_burst_size);
648 }
649 else
650 {
651 rate_limiter_.reset();
652 }
653 }
654
655 // Update pressure state with new config
656 update_pressure_state();
657 }
658
666
671 {
673 {
674 return false;
675 }
676 // Rate limited if tokens are low (less than burst size * 10%)
677 return rate_limiter_->available_tokens() <
679 }
680
685 {
687 {
688 return std::numeric_limits<std::size_t>::max();
689 }
690 return rate_limiter_->available_tokens();
691 }
692
700
705 {
706 stats_.reset();
707 }
708
713 {
715 "backpressure_job_queue[size={}, pressure={}, policy={}, ratio={:.1f}%]",
716 size(),
719 get_pressure_ratio() * 100.0);
720 }
721
722} // namespace kcenon::thread
Job queue with backpressure mechanisms for overflow and rate limiting.
backpressure_stats stats_
Backpressure statistics.
auto get_backpressure_stats() const -> backpressure_stats_snapshot
Returns backpressure statistics snapshot.
backpressure_job_queue(std::size_t max_size, backpressure_config config=backpressure_config{})
Constructs a backpressure-aware job queue.
auto handle_drop_oldest_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles drop_oldest policy.
auto set_backpressure_config(backpressure_config config) -> void
Sets the backpressure configuration.
auto to_string() const -> std::string override
Returns string representation including backpressure state.
auto get_backpressure_config() const -> const backpressure_config &
Returns the current backpressure configuration.
auto handle_adaptive_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles adaptive policy.
std::unique_ptr< token_bucket > rate_limiter_
Token bucket rate limiter (nullptr if disabled).
auto is_rate_limited() const -> bool
Checks if rate limiting is causing delays.
auto get_available_tokens() const -> std::size_t
Returns available rate limit tokens.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult override
Enqueues a batch of jobs with backpressure handling.
auto apply_backpressure(std::unique_ptr< job > &&value) -> common::VoidResult
Applies backpressure logic for a single job.
backpressure_config config_
Backpressure configuration.
auto update_pressure_state() -> void
Updates pressure level and triggers callbacks if changed.
auto direct_enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Directly enqueues without backpressure (internal use).
~backpressure_job_queue() override
Virtual destructor.
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult override
Enqueues a job with backpressure handling.
auto apply_rate_limiting() -> bool
Applies rate limiting check.
auto handle_block_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles blocking policy with timeout.
auto reset_stats() -> void
Resets backpressure statistics.
std::atomic< pressure_level > current_pressure_
Current pressure level (atomic for lock-free reads).
auto handle_callback_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles callback policy.
auto get_pressure_ratio() const -> double
Returns the current pressure as a ratio.
auto get_pressure_level() const -> pressure_level
Returns the current pressure level.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
backpressure_decision
Decision returned by callback policy handler.
pressure_level
Current pressure level for graduated response.
@ drop_and_accept
Drop the oldest job, then accept new one.
@ accept
Accept the job into the queue.
@ reject
Reject with error (queue_full)
@ delay
Delay processing (attempt later)
@ none
Below low_watermark, queue is healthy.
@ low
Between low and high watermark.
@ critical
At or above max_size, queue is full.
@ high
Above high_watermark, approaching capacity.
@ drop_newest
Reject the new job when full.
@ block
Block until space is available (with timeout)
@ adaptive
Automatically adjust based on load conditions.
@ callback
Call user callback for custom decision.
@ drop_oldest
Drop the oldest job when full to make room.
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
auto pressure_level_to_string(pressure_level level) -> std::string
Converts pressure_level to human-readable string.
auto backpressure_policy_to_string(backpressure_policy policy) -> std::string
Converts backpressure_policy to human-readable string.
STL namespace.
Configuration for backpressure mechanisms.
backpressure_policy policy
The backpressure policy to use.
std::size_t rate_limit_burst_size
Maximum tokens that can accumulate (burst capacity).
bool enable_rate_limiting
Enable token bucket rate limiting.
Snapshot of backpressure statistics (copyable).
auto snapshot() const -> backpressure_stats_snapshot
Creates a copyable snapshot of current statistics.