38 , config_(
std::move(config))
40 , current_pressure_ratio_(0.0)
45 rate_limiter_ = std::make_unique<token_bucket>(
46 config_.rate_limit_tokens_per_second,
47 config_.rate_limit_burst_size);
82 return apply_backpressure(std::move(value));
111 for (
const auto& job_ptr : jobs)
113 if (job_ptr ==
nullptr)
116 "cannot enqueue null job in batch");
121 if (config_.enable_rate_limiting && rate_limiter_)
123 if (!rate_limiter_->try_acquire(jobs.size()))
125 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
128 auto start = std::chrono::steady_clock::now();
129 if (!rate_limiter_->try_acquire_for(jobs.size(), config_.block_timeout))
131 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
133 "rate limit timeout for batch");
135 auto elapsed = std::chrono::steady_clock::now() - start;
136 stats_.total_block_time_ns.fetch_add(
137 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
138 std::memory_order_relaxed);
143 auto max_sz = get_max_size();
144 if (!max_sz.has_value())
150 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
152 update_pressure_state();
156 std::size_t current_size = size();
157 std::size_t available_space = max_sz.value() > current_size
158 ? max_sz.value() - current_size
161 if (jobs.size() <= available_space)
167 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
169 update_pressure_state();
174 switch (config_.policy)
179 std::size_t to_drop = jobs.size() - available_space;
181 std::scoped_lock<std::mutex> lock(mutex_);
182 for (std::size_t i = 0; i < to_drop && !empty(); ++i)
184 auto batch = dequeue_batch_limited(1);
185 stats_.jobs_dropped.fetch_add(batch.size(), std::memory_order_relaxed);
191 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
193 update_pressure_state();
200 auto start = std::chrono::steady_clock::now();
201 std::unique_lock<std::mutex> lock(mutex_);
202 bool got_space = space_available_.wait_for(
204 config_.block_timeout,
205 [
this, needed = jobs.size(), max = max_sz.value()]() {
206 return size() + needed <= max || is_stopped();
209 if (!got_space || is_stopped())
211 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
212 auto elapsed = std::chrono::steady_clock::now() - start;
213 stats_.total_block_time_ns.fetch_add(
214 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
215 std::memory_order_relaxed);
217 "timeout waiting for space for batch");
224 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
226 update_pressure_state();
232 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
233 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
234 update_pressure_state();
236 "queue cannot fit entire batch");
253 -> common::VoidResult
256 if (!apply_rate_limiting())
258 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
260 "rate limit exceeded");
264 bool queue_is_full = is_full();
269 auto result = direct_enqueue(std::move(value));
272 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
273 update_pressure_state();
278 queue_is_full =
true;
282 switch (config_.policy)
285 return handle_block_policy(std::move(value));
288 return handle_drop_oldest_policy(std::move(value));
291 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
292 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
293 update_pressure_state();
295 "queue full: drop_newest policy rejected job");
298 return handle_callback_policy(std::move(value));
301 return handle_adaptive_policy(std::move(value));
304 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
306 "queue full: unknown policy");
317 if (!config_.enable_rate_limiting || !rate_limiter_)
323 if (rate_limiter_->try_acquire())
329 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
331 auto start = std::chrono::steady_clock::now();
332 bool acquired = rate_limiter_->try_acquire_for(1, config_.block_timeout);
333 auto elapsed = std::chrono::steady_clock::now() - start;
335 stats_.total_block_time_ns.fetch_add(
336 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
337 std::memory_order_relaxed);
347 auto max_sz = get_max_size();
348 if (!max_sz.has_value() || max_sz.value() == 0)
351 current_pressure_ratio_.store(0.0, std::memory_order_relaxed);
355 std::size_t current = size();
356 double ratio =
static_cast<double>(current) /
static_cast<double>(max_sz.value());
357 current_pressure_ratio_.store(ratio, std::memory_order_relaxed);
360 if (current >= max_sz.value())
364 else if (ratio >= config_.high_watermark)
368 else if (ratio >= config_.low_watermark)
378 new_level, std::memory_order_acq_rel);
381 if (config_.pressure_callback)
383 if (new_level != old_level ||
389 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
391 config_.pressure_callback(current, ratio);
396 if (new_level < old_level)
398 space_available_.notify_all();
406 -> common::VoidResult
408 auto start = std::chrono::steady_clock::now();
409 auto deadline = start + config_.block_timeout;
411 while (std::chrono::steady_clock::now() < deadline)
414 auto result = direct_enqueue(std::move(value));
417 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
418 update_pressure_state();
423 std::unique_lock<std::mutex> lock(mutex_);
424 auto remaining = deadline - std::chrono::steady_clock::now();
425 if (remaining <= std::chrono::milliseconds{0})
430 space_available_.wait_for(lock, remaining, [
this]() {
431 return !is_full() || is_stopped();
444 auto elapsed = std::chrono::steady_clock::now() - start;
445 stats_.total_block_time_ns.fetch_add(
446 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
447 std::memory_order_relaxed);
448 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
451 "timeout waiting for queue space");
458 -> common::VoidResult
461 auto dropped = dequeue_batch_limited(1);
462 stats_.jobs_dropped.fetch_add(dropped.size(), std::memory_order_relaxed);
465 auto result = direct_enqueue(std::move(value));
468 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
470 update_pressure_state();
478 -> common::VoidResult
480 if (!config_.decision_callback)
483 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
485 "queue full and no decision callback");
497 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
499 "callback rejected job");
502 return handle_drop_oldest_policy(std::move(value));
506 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
508 "callback requested delay, retry later");
511 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
513 "unknown callback decision");
526 -> common::VoidResult
528 double ratio = get_pressure_ratio();
531 if (ratio < config_.high_watermark)
533 auto result = direct_enqueue(std::move(value));
536 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
538 update_pressure_state();
547 double accept_prob = (1.0 - ratio) / (1.0 - config_.high_watermark);
551 static thread_local std::size_t counter = 0;
552 bool should_accept = (counter++ % 100) < (accept_prob * 100);
556 auto result = direct_enqueue(std::move(value));
559 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
561 update_pressure_state();
568 std::unique_lock<std::mutex> lock(mutex_);
569 auto brief_wait = std::chrono::milliseconds{10};
570 bool got_space = space_available_.wait_for(
573 [
this]() {
return !is_full() || is_stopped(); });
575 if (got_space && !is_stopped())
578 auto result = direct_enqueue(std::move(value));
581 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
583 update_pressure_state();
588 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
589 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
590 update_pressure_state();
592 "adaptive policy rejected job due to high pressure");
599 -> common::VoidResult
619 if (!max_sz.has_value() || max_sz.value() == 0)
623 return static_cast<double>(
size()) /
static_cast<double>(max_sz.value());
631 std::scoped_lock<std::mutex> lock(config_mutex_);
633 bool rate_limiting_changed =
634 config.enable_rate_limiting != config_.enable_rate_limiting ||
635 config.rate_limit_tokens_per_second != config_.rate_limit_tokens_per_second ||
636 config.rate_limit_burst_size != config_.rate_limit_burst_size;
638 config_ = std::move(config);
641 if (rate_limiting_changed)
643 if (config_.enable_rate_limiting)
645 rate_limiter_ = std::make_unique<token_bucket>(
646 config_.rate_limit_tokens_per_second,
647 config_.rate_limit_burst_size);
651 rate_limiter_.reset();
656 update_pressure_state();
688 return std::numeric_limits<std::size_t>::max();
715 "backpressure_job_queue[size={}, pressure={}, policy={}, ratio={:.1f}%]",
Job queue with backpressure mechanisms for overflow and rate limiting.
backpressure_stats stats_
Backpressure statistics.
auto get_backpressure_stats() const -> backpressure_stats_snapshot
Returns backpressure statistics snapshot.
backpressure_job_queue(std::size_t max_size, backpressure_config config=backpressure_config{})
Constructs a backpressure-aware job queue.
auto handle_drop_oldest_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles drop_oldest policy.
auto set_backpressure_config(backpressure_config config) -> void
Sets the backpressure configuration.
auto to_string() const -> std::string override
Returns string representation including backpressure state.
auto get_backpressure_config() const -> const backpressure_config &
Returns the current backpressure configuration.
auto handle_adaptive_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles adaptive policy.
std::unique_ptr< token_bucket > rate_limiter_
Token bucket rate limiter (nullptr if disabled).
auto is_rate_limited() const -> bool
Checks if rate limiting is causing delays.
auto get_available_tokens() const -> std::size_t
Returns available rate limit tokens.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult override
Enqueues a batch of jobs with backpressure handling.
auto apply_backpressure(std::unique_ptr< job > &&value) -> common::VoidResult
Applies backpressure logic for a single job.
backpressure_config config_
Backpressure configuration.
auto update_pressure_state() -> void
Updates pressure level and triggers callbacks if changed.
auto direct_enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Directly enqueues without backpressure (internal use).
~backpressure_job_queue() override
Virtual destructor.
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult override
Enqueues a job with backpressure handling.
auto apply_rate_limiting() -> bool
Applies rate limiting check.
auto handle_block_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles blocking policy with timeout.
auto reset_stats() -> void
Resets backpressure statistics.
std::atomic< pressure_level > current_pressure_
Current pressure level (atomic for lock-free reads).
auto handle_callback_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles callback policy.
auto get_pressure_ratio() const -> double
Returns the current pressure as a ratio.
auto get_pressure_level() const -> pressure_level
Returns the current pressure level.
A thread-safe job queue for managing and dispatching work items.
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
backpressure_decision
Decision returned by callback policy handler.
pressure_level
Current pressure level for graduated response.
@ drop_and_accept
Drop the oldest job, then accept new one.
@ accept
Accept the job into the queue.
@ reject
Reject with error (queue_full)
@ delay
Delay processing (attempt later)
@ none
Below low_watermark, queue is healthy.
@ low
Between low and high watermark.
@ critical
At or above max_size, queue is full.
@ high
Above high_watermark, approaching capacity.
@ drop_newest
Reject the new job when full.
@ block
Block until space is available (with timeout)
@ adaptive
Automatically adjust based on load conditions.
@ callback
Call user callback for custom decision.
@ drop_oldest
Drop the oldest job when full to make room.
Core threading foundation of the thread system library.
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
auto pressure_level_to_string(pressure_level level) -> std::string
Converts pressure_level to human-readable string.
auto backpressure_policy_to_string(backpressure_policy policy) -> std::string
Converts backpressure_policy to human-readable string.
Configuration for backpressure mechanisms.
backpressure_policy policy
The backpressure policy to use.
std::size_t rate_limit_burst_size
Maximum tokens that can accumulate (burst capacity).
bool enable_rate_limiting
Enable token bucket rate limiting.
Snapshot of backpressure statistics (copyable).
auto snapshot() const -> backpressure_stats_snapshot
Creates a copyable snapshot of current statistics.