Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::backpressure_job_queue Class Reference

A job queue with comprehensive backpressure mechanisms. More...

#include <backpressure_job_queue.h>

Inheritance diagram for kcenon::thread::backpressure_job_queue:
Inheritance graph
Collaboration diagram for kcenon::thread::backpressure_job_queue:
Collaboration graph

Public Member Functions

 backpressure_job_queue (std::size_t max_size, backpressure_config config=backpressure_config{})
 Constructs a backpressure-aware job queue.
 
 ~backpressure_job_queue () override
 Virtual destructor.
 
 backpressure_job_queue (const backpressure_job_queue &)=delete
 
backpressure_job_queueoperator= (const backpressure_job_queue &)=delete
 
 backpressure_job_queue (backpressure_job_queue &&)=delete
 
backpressure_job_queueoperator= (backpressure_job_queue &&)=delete
 
auto enqueue (std::unique_ptr< job > &&value) -> common::VoidResult override
 Enqueues a job with backpressure handling.
 
auto enqueue_batch (std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult override
 Enqueues a batch of jobs with backpressure handling.
 
auto get_pressure_level () const -> pressure_level
 Returns the current pressure level.
 
auto get_pressure_ratio () const -> double
 Returns the current pressure as a ratio.
 
auto set_backpressure_config (backpressure_config config) -> void
 Sets the backpressure configuration.
 
auto get_backpressure_config () const -> const backpressure_config &
 Returns the current backpressure configuration.
 
auto is_rate_limited () const -> bool
 Checks if rate limiting is causing delays.
 
auto get_available_tokens () const -> std::size_t
 Returns available rate limit tokens.
 
auto get_backpressure_stats () const -> backpressure_stats_snapshot
 Returns backpressure statistics snapshot.
 
auto reset_stats () -> void
 Resets backpressure statistics.
 
auto to_string () const -> std::string override
 Returns string representation including backpressure state.
 
- Public Member Functions inherited from kcenon::thread::job_queue
 job_queue (std::optional< std::size_t > max_size=std::nullopt)
 Constructs a new, empty job_queue.
 
virtual ~job_queue (void)
 Virtual destructor. Cleans up resources used by the job_queue.
 
auto get_ptr (void) -> std::shared_ptr< job_queue >
 Obtains a std::shared_ptr that points to this queue instance.
 
auto is_stopped () const -> bool
 Checks if the queue is in a "stopped" state.
 
auto set_notify (bool notify) -> void
 Sets the 'notify' flag for this queue.
 
auto schedule (std::unique_ptr< job > &&value) -> common::VoidResult override
 scheduler_interface implementation: schedule a job
 
auto get_next_job () -> common::Result< std::unique_ptr< job > > override
 scheduler_interface implementation: get next job
 
template<typename JobType , typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
auto enqueue (std::unique_ptr< JobType > &&value) -> common::VoidResult
 Type-safe enqueue for job subclasses.
 
virtual auto dequeue (void) -> common::Result< std::unique_ptr< job > >
 Dequeues a job from the queue in FIFO order (blocking operation).
 
virtual auto try_dequeue (void) -> common::Result< std::unique_ptr< job > >
 Attempts to dequeue a job from the queue without blocking.
 
virtual auto dequeue_batch (void) -> std::deque< std::unique_ptr< job > >
 Dequeues all remaining jobs from the queue without processing them.
 
virtual auto dequeue_batch_limited (std::size_t max_count) -> std::deque< std::unique_ptr< job > >
 Dequeues up to N jobs in a single operation (micro-batching).
 
virtual auto clear (void) -> void
 Removes all jobs currently in the queue without processing them.
 
auto empty (void) const -> bool
 Checks if the queue is currently empty.
 
auto size (void) const -> std::size_t
 Returns the current number of jobs in the queue.
 
auto get_memory_stats () const -> memory_stats
 Get memory footprint statistics for debugging and monitoring.
 
auto stop (void) -> void
 Signals the queue to stop waiting for new jobs (e.g., during shutdown).
 
auto get_capabilities () const -> queue_capabilities override
 Returns capabilities of this job_queue implementation.
 
auto is_bounded () const -> bool
 Check if queue has a size limit.
 
auto get_max_size () const -> std::optional< std::size_t >
 Get the maximum queue size.
 
auto set_max_size (std::optional< std::size_t > max_size) -> void
 Set maximum queue size.
 
auto is_full () const -> bool
 Check if queue is at capacity.
 
virtual auto inspect_pending_jobs (std::size_t limit=100) const -> std::vector< diagnostics::job_info >
 Inspects pending jobs in the queue without removing them.
 
- Public Member Functions inherited from kcenon::thread::scheduler_interface
virtual ~scheduler_interface ()=default
 
- Public Member Functions inherited from kcenon::thread::queue_capabilities_interface
virtual ~queue_capabilities_interface ()=default
 
auto has_exact_size () const -> bool
 Check if size() returns exact values.
 
auto has_atomic_empty () const -> bool
 Check if empty() check is atomic.
 
auto is_lock_free () const -> bool
 Check if this is a lock-free implementation.
 
auto is_wait_free () const -> bool
 Check if this is a wait-free implementation.
 
auto supports_batch () const -> bool
 Check if batch operations are supported.
 
auto supports_blocking_wait () const -> bool
 Check if blocking wait is supported.
 
auto supports_stop () const -> bool
 Check if stop signaling is supported.
 

Private Member Functions

auto apply_backpressure (std::unique_ptr< job > &&value) -> common::VoidResult
 Applies backpressure logic for a single job.
 
auto apply_rate_limiting () -> bool
 Applies rate limiting check.
 
auto update_pressure_state () -> void
 Updates pressure level and triggers callbacks if changed.
 
auto handle_block_policy (std::unique_ptr< job > &&value) -> common::VoidResult
 Handles blocking policy with timeout.
 
auto handle_drop_oldest_policy (std::unique_ptr< job > &&value) -> common::VoidResult
 Handles drop_oldest policy.
 
auto handle_callback_policy (std::unique_ptr< job > &&value) -> common::VoidResult
 Handles callback policy.
 
auto handle_adaptive_policy (std::unique_ptr< job > &&value) -> common::VoidResult
 Handles adaptive policy.
 
auto direct_enqueue (std::unique_ptr< job > &&value) -> common::VoidResult
 Directly enqueues without backpressure (internal use).
 

Private Attributes

backpressure_config config_
 Backpressure configuration.
 
std::mutex config_mutex_
 Mutex for configuration access.
 
std::unique_ptr< token_bucketrate_limiter_
 Token bucket rate limiter (nullptr if disabled).
 
std::atomic< pressure_levelcurrent_pressure_ {pressure_level::none}
 Current pressure level (atomic for lock-free reads).
 
std::atomic< double > current_pressure_ratio_ {0.0}
 Current pressure ratio (atomic for lock-free reads).
 
backpressure_stats stats_
 Backpressure statistics.
 
std::condition_variable space_available_
 Condition variable for blocking policy wait.
 

Additional Inherited Members

- Protected Attributes inherited from kcenon::thread::job_queue
std::atomic_bool notify_
 If true, threads waiting for new jobs are notified when a new job is enqueued. If false, enqueuing does not automatically trigger a notification.
 
std::atomic_bool stop_
 Indicates whether the queue has been signaled to stop.
 
std::mutex mutex_
 Mutex to protect access to the underlying queue_ container and related state.
 
std::condition_variable condition_
 Condition variable used to signal worker threads.
 

Detailed Description

A job queue with comprehensive backpressure mechanisms.

Extends job_queue with backpressure handling to prevent resource exhaustion and provide graceful degradation under high load.

Features

  • Multiple Policies: Block, drop_oldest, drop_newest, callback, adaptive
  • Watermark-Based Pressure: Graduated response based on queue depth
  • Rate Limiting: Token bucket algorithm for sustained throughput control
  • Adaptive Control: Auto-adjusts based on latency targets
  • Statistics: Comprehensive metrics for monitoring

Pressure Response

Queue Depth vs Response:
0% 50% 80% 100%
|------|--------|--------|
OK Warning High Critical
(callback) (slow) (reject)
@ reject
Reject with error (queue_full)
@ callback
Call user callback for custom decision.

Thread Safety

All methods are thread-safe. The queue inherits mutex-based synchronization from job_queue.

Usage Example

// Create backpressure queue with 1000 max size
config.high_watermark = 0.8;
config.enable_rate_limiting = true;
auto queue = std::make_shared<backpressure_job_queue>(1000, config);
// Enqueue with backpressure handling
auto result = queue->enqueue(std::make_unique<my_job>());
if (result.is_err()) {
// Handle backpressure (rejected, timeout, etc.)
}
// Check pressure level
if (queue->get_pressure_level() == pressure_level::high) {
// Consider reducing load
}
A template class representing either a value or an error.
@ high
Above high_watermark, approaching capacity.
@ adaptive
Automatically adjust based on load conditions.
Configuration for backpressure mechanisms.
backpressure_policy policy
The backpressure policy to use.
std::size_t rate_limit_tokens_per_second
Token refill rate (tokens added per second).
double high_watermark
High watermark threshold (percentage of max_size).
bool enable_rate_limiting
Enable token bucket rate limiting.
See also
job_queue Base queue class
backpressure_config Configuration options
token_bucket Rate limiting implementation

Definition at line 87 of file backpressure_job_queue.h.

Constructor & Destructor Documentation

◆ backpressure_job_queue() [1/3]

kcenon::thread::backpressure_job_queue::backpressure_job_queue ( std::size_t max_size,
backpressure_config config = backpressure_config{} )
explicit

Constructs a backpressure-aware job queue.

Parameters
max_sizeMaximum queue capacity.
configBackpressure configuration.

The queue is immediately ready for use after construction. If rate limiting is enabled, the token bucket is initialized with the specified parameters.

Implementation details:

  • Calls base job_queue constructor with max_size
  • Stores configuration
  • Creates token bucket if rate limiting is enabled
  • Initializes pressure state to none
Parameters
max_sizeMaximum queue capacity
configBackpressure configuration

Definition at line 34 of file backpressure_job_queue.cpp.

37 : job_queue(max_size)
38 , config_(std::move(config))
41 {
42 // Initialize rate limiter if enabled
44 {
45 rate_limiter_ = std::make_unique<token_bucket>(
48 }
49 }
std::unique_ptr< token_bucket > rate_limiter_
Token bucket rate limiter (nullptr if disabled).
backpressure_config config_
Backpressure configuration.
std::atomic< double > current_pressure_ratio_
Current pressure ratio (atomic for lock-free reads).
std::atomic< pressure_level > current_pressure_
Current pressure level (atomic for lock-free reads).
job_queue(std::optional< std::size_t > max_size=std::nullopt)
Constructs a new, empty job_queue.
Definition job_queue.cpp:33
@ none
Below low_watermark, queue is healthy.
std::size_t rate_limit_burst_size
Maximum tokens that can accumulate (burst capacity).

References config_, and kcenon::thread::backpressure_config::enable_rate_limiting.

◆ ~backpressure_job_queue()

kcenon::thread::backpressure_job_queue::~backpressure_job_queue ( )
overridedefault

Virtual destructor.

Destructor.

◆ backpressure_job_queue() [2/3]

kcenon::thread::backpressure_job_queue::backpressure_job_queue ( const backpressure_job_queue & )
delete

◆ backpressure_job_queue() [3/3]

kcenon::thread::backpressure_job_queue::backpressure_job_queue ( backpressure_job_queue && )
delete

Member Function Documentation

◆ apply_backpressure()

auto kcenon::thread::backpressure_job_queue::apply_backpressure ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Applies backpressure logic for a single job.

Core backpressure implementation for single job.

Parameters
valueThe job to potentially enqueue.
Returns
VoidResult indicating success or error.

This is the core backpressure implementation that handles all policy logic, rate limiting, and statistics tracking.

Implementation details:

  • Applies rate limiting first
  • Checks capacity before attempting enqueue
  • Routes to policy handler if queue is full
  • Updates statistics
Parameters
valueThe job to enqueue
Returns
VoidResult indicating success or error

Definition at line 252 of file backpressure_job_queue.cpp.

254 {
255 // Apply rate limiting
256 if (!apply_rate_limiting())
257 {
258 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
260 "rate limit exceeded");
261 }
262
263 // Check if queue is full before deciding on policy
264 bool queue_is_full = is_full();
265
266 // If not full, try direct enqueue
267 if (!queue_is_full)
268 {
269 auto result = direct_enqueue(std::move(value));
270 if (result.is_ok())
271 {
272 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
274 return result;
275 }
276 // If direct_enqueue failed but queue wasn't reported as full,
277 // it might be a race condition - treat as full
278 queue_is_full = true;
279 }
280
281 // Queue is full, apply policy
282 switch (config_.policy)
283 {
285 return handle_block_policy(std::move(value));
286
288 return handle_drop_oldest_policy(std::move(value));
289
291 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
292 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
295 "queue full: drop_newest policy rejected job");
296
298 return handle_callback_policy(std::move(value));
299
301 return handle_adaptive_policy(std::move(value));
302
303 default:
304 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
306 "queue full: unknown policy");
307 }
308 }
backpressure_stats stats_
Backpressure statistics.
auto handle_drop_oldest_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles drop_oldest policy.
auto handle_adaptive_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles adaptive policy.
auto update_pressure_state() -> void
Updates pressure level and triggers callbacks if changed.
auto direct_enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Directly enqueues without backpressure (internal use).
auto apply_rate_limiting() -> bool
Applies rate limiting check.
auto handle_block_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles blocking policy with timeout.
auto handle_callback_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles callback policy.
auto is_full() const -> bool
Check if queue is at capacity.
@ drop_newest
Reject the new job when full.
@ block
Block until space is available (with timeout)
@ drop_oldest
Drop the oldest job when full to make room.
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
std::atomic< std::uint64_t > jobs_accepted
Total jobs accepted into the queue.
std::atomic< std::uint64_t > pressure_events
Number of times high watermark was crossed.
std::atomic< std::uint64_t > jobs_rejected
Total jobs rejected due to backpressure.

References kcenon::thread::adaptive, kcenon::thread::block, kcenon::thread::callback, kcenon::thread::drop_newest, kcenon::thread::drop_oldest, kcenon::thread::result< T >::is_ok(), kcenon::thread::make_error_result(), kcenon::thread::operation_timeout, and kcenon::thread::queue_full.

Here is the call graph for this function:

◆ apply_rate_limiting()

auto kcenon::thread::backpressure_job_queue::apply_rate_limiting ( ) -> bool
nodiscardprivate

Applies rate limiting check.

Applies rate limiting using token bucket.

Returns
true if rate limit allows operation, false if should wait.
true if allowed to proceed, false if rate limited

Definition at line 315 of file backpressure_job_queue.cpp.

316 {
318 {
319 return true; // No rate limiting
320 }
321
322 // Try immediate acquire
323 if (rate_limiter_->try_acquire())
324 {
325 return true;
326 }
327
328 // Rate limited - record and try waiting
329 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
330
331 auto start = std::chrono::steady_clock::now();
332 bool acquired = rate_limiter_->try_acquire_for(1, config_.block_timeout);
333 auto elapsed = std::chrono::steady_clock::now() - start;
334
335 stats_.total_block_time_ns.fetch_add(
336 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
337 std::memory_order_relaxed);
338
339 return acquired;
340 }
std::chrono::milliseconds block_timeout
Maximum time to block when using block policy.
std::atomic< std::uint64_t > rate_limit_waits
Number of times rate limiting caused a wait.
std::atomic< std::uint64_t > total_block_time_ns
Total time spent blocking in nanoseconds.

◆ direct_enqueue()

auto kcenon::thread::backpressure_job_queue::direct_enqueue ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Directly enqueues without backpressure (internal use).

Directly enqueues to base class without backpressure.

Parameters
valueThe job to enqueue.
Returns
VoidResult from base class enqueue.

Definition at line 598 of file backpressure_job_queue.cpp.

600 {
601 return job_queue::enqueue(std::move(value));
602 }
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.

References kcenon::thread::job_queue::enqueue().

Here is the call graph for this function:

◆ enqueue()

auto kcenon::thread::backpressure_job_queue::enqueue ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardoverridevirtual

Enqueues a job with backpressure handling.

Parameters
valueThe job to enqueue.
Returns
VoidResult indicating success or error.

Depending on the configured policy:

  • block: Waits up to block_timeout for space
  • drop_oldest: Removes oldest job if full
  • drop_newest: Rejects new job if full
  • callback: Calls decision_callback for custom handling
  • adaptive: Adjusts behavior based on current conditions

Rate limiting (if enabled) is applied before policy-based handling.

Error Codes:

  • queue_full: Queue at capacity and job rejected
  • queue_stopped: Queue has been stopped
  • operation_timeout: Block timeout expired
  • invalid_argument: Null job provided

Implementation details:

  • Early validation (stopped, null check)
  • Apply rate limiting if enabled
  • Route to policy-specific handler based on config
  • Update pressure state and statistics
Parameters
valueThe job to enqueue
Returns
VoidResult indicating success or error

Reimplemented from kcenon::thread::job_queue.

Definition at line 68 of file backpressure_job_queue.cpp.

69 {
70 // Early validation
71 if (is_stopped())
72 {
74 }
75
76 if (value == nullptr)
77 {
78 return make_error_result(error_code::invalid_argument, "cannot enqueue null job");
79 }
80
81 // Apply backpressure logic
82 return apply_backpressure(std::move(value));
83 }
auto apply_backpressure(std::unique_ptr< job > &&value) -> common::VoidResult
Applies backpressure logic for a single job.
auto is_stopped() const -> bool
Checks if the queue is in a "stopped" state.
Definition job_queue.cpp:68

References kcenon::thread::invalid_argument, kcenon::thread::make_error_result(), and kcenon::thread::queue_stopped.

Here is the call graph for this function:

◆ enqueue_batch()

auto kcenon::thread::backpressure_job_queue::enqueue_batch ( std::vector< std::unique_ptr< job > > && jobs) -> common::VoidResult
nodiscardoverridevirtual

Enqueues a batch of jobs with backpressure handling.

Parameters
jobsVector of jobs to enqueue.
Returns
VoidResult indicating success or error.

Batch enqueue respects backpressure settings. If the batch would exceed capacity, behavior depends on policy:

  • block/drop_newest: Entire batch rejected
  • drop_oldest: Oldest jobs dropped to make room
  • callback: Called once for the batch decision
  • adaptive: May partially accept (future enhancement)

Implementation details:

  • Validates batch (stopped, empty, null jobs)
  • Checks if batch fits within capacity constraints
  • For drop_oldest: drops enough jobs to fit batch
  • For other policies: rejects entire batch if won't fit
Parameters
jobsVector of jobs to enqueue
Returns
VoidResult indicating success or error

Reimplemented from kcenon::thread::job_queue.

Definition at line 97 of file backpressure_job_queue.cpp.

99 {
100 if (is_stopped())
101 {
103 }
104
105 if (jobs.empty())
106 {
107 return make_error_result(error_code::invalid_argument, "cannot enqueue empty batch");
108 }
109
110 // Validate all jobs
111 for (const auto& job_ptr : jobs)
112 {
113 if (job_ptr == nullptr)
114 {
116 "cannot enqueue null job in batch");
117 }
118 }
119
120 // Apply rate limiting for batch
122 {
123 if (!rate_limiter_->try_acquire(jobs.size()))
124 {
125 stats_.rate_limit_waits.fetch_add(1, std::memory_order_relaxed);
126
127 // Try waiting for tokens
128 auto start = std::chrono::steady_clock::now();
129 if (!rate_limiter_->try_acquire_for(jobs.size(), config_.block_timeout))
130 {
131 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
133 "rate limit timeout for batch");
134 }
135 auto elapsed = std::chrono::steady_clock::now() - start;
136 stats_.total_block_time_ns.fetch_add(
137 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
138 std::memory_order_relaxed);
139 }
140 }
141
142 // Check capacity
143 auto max_sz = get_max_size();
144 if (!max_sz.has_value())
145 {
146 // Unbounded queue, just enqueue
147 auto result = job_queue::enqueue_batch(std::move(jobs));
148 if (result.is_ok())
149 {
150 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
151 }
153 return result;
154 }
155
156 std::size_t current_size = size();
157 std::size_t available_space = max_sz.value() > current_size
158 ? max_sz.value() - current_size
159 : 0;
160
161 if (jobs.size() <= available_space)
162 {
163 // Batch fits
164 auto result = job_queue::enqueue_batch(std::move(jobs));
165 if (result.is_ok())
166 {
167 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
168 }
170 return result;
171 }
172
173 // Batch doesn't fit - handle based on policy
174 switch (config_.policy)
175 {
177 {
178 // Drop enough jobs to make room
179 std::size_t to_drop = jobs.size() - available_space;
180 {
181 std::scoped_lock<std::mutex> lock(mutex_);
182 for (std::size_t i = 0; i < to_drop && !empty(); ++i)
183 {
184 auto batch = dequeue_batch_limited(1);
185 stats_.jobs_dropped.fetch_add(batch.size(), std::memory_order_relaxed);
186 }
187 }
188 auto result = job_queue::enqueue_batch(std::move(jobs));
189 if (result.is_ok())
190 {
191 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
192 }
194 return result;
195 }
196
198 {
199 // Wait for space (simplified: just check once with timeout)
200 auto start = std::chrono::steady_clock::now();
201 std::unique_lock<std::mutex> lock(mutex_);
202 bool got_space = space_available_.wait_for(
203 lock,
205 [this, needed = jobs.size(), max = max_sz.value()]() {
206 return size() + needed <= max || is_stopped();
207 });
208
209 if (!got_space || is_stopped())
210 {
211 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
212 auto elapsed = std::chrono::steady_clock::now() - start;
213 stats_.total_block_time_ns.fetch_add(
214 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
215 std::memory_order_relaxed);
217 "timeout waiting for space for batch");
218 }
219
220 lock.unlock();
221 auto result = job_queue::enqueue_batch(std::move(jobs));
222 if (result.is_ok())
223 {
224 stats_.jobs_accepted.fetch_add(jobs.size(), std::memory_order_relaxed);
225 }
227 return result;
228 }
229
230 default:
231 // drop_newest, callback, adaptive - reject batch
232 stats_.jobs_rejected.fetch_add(jobs.size(), std::memory_order_relaxed);
233 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
236 "queue cannot fit entire batch");
237 }
238 }
std::condition_variable space_available_
Condition variable for blocking policy wait.
auto empty(void) const -> bool
Checks if the queue is currently empty.
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
std::mutex mutex_
Mutex to protect access to the underlying queue_ container and related state.
Definition job_queue.h:371
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
virtual auto dequeue_batch_limited(std::size_t max_count) -> std::deque< std::unique_ptr< job > >
Dequeues up to N jobs in a single operation (micro-batching).
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
std::atomic< std::uint64_t > jobs_dropped
Total jobs dropped (oldest dropped for new).

References kcenon::thread::block, kcenon::thread::drop_oldest, kcenon::thread::job_queue::enqueue_batch(), kcenon::thread::invalid_argument, kcenon::thread::result< T >::is_ok(), kcenon::thread::make_error_result(), kcenon::thread::operation_timeout, kcenon::thread::queue_full, and kcenon::thread::queue_stopped.

Here is the call graph for this function:

◆ get_available_tokens()

auto kcenon::thread::backpressure_job_queue::get_available_tokens ( ) const -> std::size_t
nodiscard

Returns available rate limit tokens.

Returns
Available tokens, or max if rate limiting disabled.

Definition at line 684 of file backpressure_job_queue.cpp.

685 {
687 {
688 return std::numeric_limits<std::size_t>::max();
689 }
690 return rate_limiter_->available_tokens();
691 }

References config_, kcenon::thread::backpressure_config::enable_rate_limiting, and rate_limiter_.

◆ get_backpressure_config()

auto kcenon::thread::backpressure_job_queue::get_backpressure_config ( ) const -> const backpressure_config&
nodiscard

Returns the current backpressure configuration.

Returns current configuration.

Returns
Reference to current configuration.

Definition at line 662 of file backpressure_job_queue.cpp.

663 {
664 return config_;
665 }

References config_.

◆ get_backpressure_stats()

auto kcenon::thread::backpressure_job_queue::get_backpressure_stats ( ) const -> backpressure_stats_snapshot
nodiscard

Returns backpressure statistics snapshot.

Returns
Snapshot of current statistics.

Returns a snapshot of current stats. For ongoing monitoring, call periodically.

Definition at line 696 of file backpressure_job_queue.cpp.

697 {
698 return stats_.snapshot();
699 }
auto snapshot() const -> backpressure_stats_snapshot
Creates a copyable snapshot of current statistics.

References kcenon::thread::backpressure_stats::snapshot(), and stats_.

Here is the call graph for this function:

◆ get_pressure_level()

auto kcenon::thread::backpressure_job_queue::get_pressure_level ( ) const -> pressure_level
nodiscard

Returns the current pressure level.

Returns current pressure level.

Returns
Current pressure_level enum value.

Calculated based on current queue depth relative to watermarks:

  • none: depth < low_watermark * max_size
  • low: depth < high_watermark * max_size
  • high: depth < max_size
  • critical: depth >= max_size

Definition at line 607 of file backpressure_job_queue.cpp.

608 {
609 return current_pressure_.load(std::memory_order_acquire);
610 }

References current_pressure_.

Referenced by to_string().

Here is the caller graph for this function:

◆ get_pressure_ratio()

auto kcenon::thread::backpressure_job_queue::get_pressure_ratio ( ) const -> double
nodiscard

Returns the current pressure as a ratio.

Returns current pressure ratio.

Returns
Ratio of current depth to max_size (0.0 to 1.0+).

Can exceed 1.0 if queue somehow exceeds max_size (shouldn't happen with proper backpressure, but included for robustness).

Definition at line 615 of file backpressure_job_queue.cpp.

616 {
617 // Recalculate for accuracy
618 auto max_sz = get_max_size();
619 if (!max_sz.has_value() || max_sz.value() == 0)
620 {
621 return 0.0;
622 }
623 return static_cast<double>(size()) / static_cast<double>(max_sz.value());
624 }

References kcenon::thread::job_queue::get_max_size(), and kcenon::thread::job_queue::size().

Referenced by to_string().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_adaptive_policy()

auto kcenon::thread::backpressure_job_queue::handle_adaptive_policy ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Handles adaptive policy.

Parameters
valueThe job to potentially enqueue.
Returns
VoidResult based on adaptive decision.

Implementation details:

  • Below high_watermark: accept normally
  • Above high_watermark: probabilistic acceptance based on pressure
  • At capacity: block briefly then reject

Definition at line 525 of file backpressure_job_queue.cpp.

527 {
528 double ratio = get_pressure_ratio();
529
530 // Below high watermark: accept
531 if (ratio < config_.high_watermark)
532 {
533 auto result = direct_enqueue(std::move(value));
534 if (result.is_ok())
535 {
536 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
537 }
539 return result;
540 }
541
542 // Above high watermark but not full: probabilistic acceptance
543 // Acceptance probability decreases as we approach capacity
544 if (ratio < 1.0)
545 {
546 // Simple linear decrease from 1.0 at high_watermark to 0 at 1.0
547 double accept_prob = (1.0 - ratio) / (1.0 - config_.high_watermark);
548
549 // Use simple threshold (not truly random, but deterministic)
550 // This avoids random number generation overhead
551 static thread_local std::size_t counter = 0;
552 bool should_accept = (counter++ % 100) < (accept_prob * 100);
553
554 if (should_accept)
555 {
556 auto result = direct_enqueue(std::move(value));
557 if (result.is_ok())
558 {
559 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
560 }
562 return result;
563 }
564 }
565
566 // At or above capacity: brief wait then reject
567 {
568 std::unique_lock<std::mutex> lock(mutex_);
569 auto brief_wait = std::chrono::milliseconds{10};
570 bool got_space = space_available_.wait_for(
571 lock,
572 brief_wait,
573 [this]() { return !is_full() || is_stopped(); });
574
575 if (got_space && !is_stopped())
576 {
577 lock.unlock();
578 auto result = direct_enqueue(std::move(value));
579 if (result.is_ok())
580 {
581 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
582 }
584 return result;
585 }
586 }
587
588 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
589 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
592 "adaptive policy rejected job due to high pressure");
593 }
auto get_pressure_ratio() const -> double
Returns the current pressure as a ratio.

References kcenon::thread::result< T >::is_ok(), kcenon::thread::make_error_result(), and kcenon::thread::queue_full.

Here is the call graph for this function:

◆ handle_block_policy()

auto kcenon::thread::backpressure_job_queue::handle_block_policy ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Handles blocking policy with timeout.

Parameters
valueThe job to enqueue.
Returns
VoidResult indicating success or timeout.

Definition at line 405 of file backpressure_job_queue.cpp.

407 {
408 auto start = std::chrono::steady_clock::now();
409 auto deadline = start + config_.block_timeout;
410
411 while (std::chrono::steady_clock::now() < deadline)
412 {
413 // Try to enqueue
414 auto result = direct_enqueue(std::move(value));
415 if (result.is_ok())
416 {
417 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
419 return result;
420 }
421
422 // Wait for space
423 std::unique_lock<std::mutex> lock(mutex_);
424 auto remaining = deadline - std::chrono::steady_clock::now();
425 if (remaining <= std::chrono::milliseconds{0})
426 {
427 break;
428 }
429
430 space_available_.wait_for(lock, remaining, [this]() {
431 return !is_full() || is_stopped();
432 });
433
434 if (is_stopped())
435 {
436 break;
437 }
438
439 // Job was moved, need to return error
440 // This is a design issue - we can't retry after move
441 break;
442 }
443
444 auto elapsed = std::chrono::steady_clock::now() - start;
445 stats_.total_block_time_ns.fetch_add(
446 std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count(),
447 std::memory_order_relaxed);
448 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
449
451 "timeout waiting for queue space");
452 }

References kcenon::thread::result< T >::is_ok(), kcenon::thread::make_error_result(), and kcenon::thread::operation_timeout.

Here is the call graph for this function:

◆ handle_callback_policy()

auto kcenon::thread::backpressure_job_queue::handle_callback_policy ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Handles callback policy.

Parameters
valueThe job to potentially enqueue.
Returns
VoidResult based on callback decision.

Definition at line 477 of file backpressure_job_queue.cpp.

479 {
481 {
482 // No callback, fall back to reject
483 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
485 "queue full and no decision callback");
486 }
487
489
490 switch (decision)
491 {
493 // Force accept (may exceed max_size temporarily)
494 return job_queue::enqueue(std::move(value));
495
497 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
499 "callback rejected job");
500
502 return handle_drop_oldest_policy(std::move(value));
503
505 // For now, treat as reject with retry hint
506 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
508 "callback requested delay, retry later");
509
510 default:
511 stats_.jobs_rejected.fetch_add(1, std::memory_order_relaxed);
513 "unknown callback decision");
514 }
515 }
backpressure_decision
Decision returned by callback policy handler.
@ drop_and_accept
Drop the oldest job, then accept new one.
@ accept
Accept the job into the queue.
@ delay
Delay processing (attempt later)
std::function< backpressure_decision(std::unique_ptr< job > &)> decision_callback
Custom decision callback for callback policy.

References kcenon::thread::accept, kcenon::thread::delay, kcenon::thread::drop_and_accept, kcenon::thread::job_queue::enqueue(), kcenon::thread::make_error_result(), kcenon::thread::queue_busy, kcenon::thread::queue_full, and kcenon::thread::reject.

Here is the call graph for this function:

◆ handle_drop_oldest_policy()

auto kcenon::thread::backpressure_job_queue::handle_drop_oldest_policy ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardprivate

Handles drop_oldest policy.

Parameters
valueThe job to enqueue after dropping oldest.
Returns
VoidResult (always succeeds unless stopped).

Definition at line 457 of file backpressure_job_queue.cpp.

459 {
460 // Drop oldest job - dequeue_batch_limited handles its own locking
461 auto dropped = dequeue_batch_limited(1);
462 stats_.jobs_dropped.fetch_add(dropped.size(), std::memory_order_relaxed);
463
464 // Now enqueue new job
465 auto result = direct_enqueue(std::move(value));
466 if (result.is_ok())
467 {
468 stats_.jobs_accepted.fetch_add(1, std::memory_order_relaxed);
469 }
471 return result;
472 }

References kcenon::thread::result< T >::is_ok().

Here is the call graph for this function:

◆ is_rate_limited()

auto kcenon::thread::backpressure_job_queue::is_rate_limited ( ) const -> bool
nodiscard

Checks if rate limiting is causing delays.

Checks if rate limiting is active.

Returns
true if rate limiter is constraining throughput.

Returns false if rate limiting is disabled.

Definition at line 670 of file backpressure_job_queue.cpp.

671 {
673 {
674 return false;
675 }
676 // Rate limited if tokens are low (less than burst size * 10%)
677 return rate_limiter_->available_tokens() <
679 }

References config_, kcenon::thread::backpressure_config::enable_rate_limiting, kcenon::thread::backpressure_config::rate_limit_burst_size, and rate_limiter_.

◆ operator=() [1/2]

backpressure_job_queue & kcenon::thread::backpressure_job_queue::operator= ( backpressure_job_queue && )
delete

◆ operator=() [2/2]

backpressure_job_queue & kcenon::thread::backpressure_job_queue::operator= ( const backpressure_job_queue & )
delete

◆ reset_stats()

auto kcenon::thread::backpressure_job_queue::reset_stats ( ) -> void

Resets backpressure statistics.

Resets statistics.

Definition at line 704 of file backpressure_job_queue.cpp.

705 {
706 stats_.reset();
707 }
auto reset() -> void
Resets all statistics to zero.

◆ set_backpressure_config()

auto kcenon::thread::backpressure_job_queue::set_backpressure_config ( backpressure_config config) -> void

Sets the backpressure configuration.

Sets backpressure configuration.

Parameters
configNew configuration to apply.

Updates take effect immediately. If rate limiting is being enabled or its parameters change, the token bucket is recreated or updated.

Definition at line 629 of file backpressure_job_queue.cpp.

630 {
631 std::scoped_lock<std::mutex> lock(config_mutex_);
632
633 bool rate_limiting_changed =
634 config.enable_rate_limiting != config_.enable_rate_limiting ||
635 config.rate_limit_tokens_per_second != config_.rate_limit_tokens_per_second ||
636 config.rate_limit_burst_size != config_.rate_limit_burst_size;
637
638 config_ = std::move(config);
639
640 // Update rate limiter if needed
641 if (rate_limiting_changed)
642 {
644 {
645 rate_limiter_ = std::make_unique<token_bucket>(
648 }
649 else
650 {
651 rate_limiter_.reset();
652 }
653 }
654
655 // Update pressure state with new config
657 }
std::mutex config_mutex_
Mutex for configuration access.

◆ to_string()

auto kcenon::thread::backpressure_job_queue::to_string ( void ) const -> std::string
nodiscardoverridevirtual

Returns string representation including backpressure state.

Returns string representation.

Returns
Formatted string with queue and backpressure information.

Reimplemented from kcenon::thread::job_queue.

Definition at line 712 of file backpressure_job_queue.cpp.

713 {
715 "backpressure_job_queue[size={}, pressure={}, policy={}, ratio={:.1f}%]",
716 size(),
719 get_pressure_ratio() * 100.0);
720 }
auto get_pressure_level() const -> pressure_level
Returns the current pressure level.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
auto pressure_level_to_string(pressure_level level) -> std::string
Converts pressure_level to human-readable string.
auto backpressure_policy_to_string(backpressure_policy policy) -> std::string
Converts backpressure_policy to human-readable string.

References kcenon::thread::backpressure_policy_to_string(), config_, utility_module::formatter::format(), get_pressure_level(), get_pressure_ratio(), kcenon::thread::backpressure_config::policy, kcenon::thread::pressure_level_to_string(), and kcenon::thread::job_queue::size().

Here is the call graph for this function:

◆ update_pressure_state()

auto kcenon::thread::backpressure_job_queue::update_pressure_state ( ) -> void
private

Updates pressure level and triggers callbacks if changed.

Updates pressure state and triggers callbacks.

Definition at line 345 of file backpressure_job_queue.cpp.

346 {
347 auto max_sz = get_max_size();
348 if (!max_sz.has_value() || max_sz.value() == 0)
349 {
350 current_pressure_.store(pressure_level::none, std::memory_order_relaxed);
351 current_pressure_ratio_.store(0.0, std::memory_order_relaxed);
352 return;
353 }
354
355 std::size_t current = size();
356 double ratio = static_cast<double>(current) / static_cast<double>(max_sz.value());
357 current_pressure_ratio_.store(ratio, std::memory_order_relaxed);
358
359 pressure_level new_level;
360 if (current >= max_sz.value())
361 {
362 new_level = pressure_level::critical;
363 }
364 else if (ratio >= config_.high_watermark)
365 {
366 new_level = pressure_level::high;
367 }
368 else if (ratio >= config_.low_watermark)
369 {
370 new_level = pressure_level::low;
371 }
372 else
373 {
374 new_level = pressure_level::none;
375 }
376
377 pressure_level old_level = current_pressure_.exchange(
378 new_level, std::memory_order_acq_rel);
379
380 // Trigger callback on level change or when entering high/critical
382 {
383 if (new_level != old_level ||
384 new_level == pressure_level::high ||
385 new_level == pressure_level::critical)
386 {
387 if (new_level >= pressure_level::high)
388 {
389 stats_.pressure_events.fetch_add(1, std::memory_order_relaxed);
390 }
391 config_.pressure_callback(current, ratio);
392 }
393 }
394
395 // Notify waiting threads if space became available
396 if (new_level < old_level)
397 {
398 space_available_.notify_all();
399 }
400 }
pressure_level
Current pressure level for graduated response.
@ low
Between low and high watermark.
@ critical
At or above max_size, queue is full.
double low_watermark
Low watermark threshold (percentage of max_size).
std::function< void(std::size_t queue_depth, double pressure_ratio)> pressure_callback
Callback for pressure events.

References kcenon::thread::critical, kcenon::thread::high, kcenon::thread::low, and kcenon::thread::none.

Member Data Documentation

◆ config_

backpressure_config kcenon::thread::backpressure_job_queue::config_
private

Backpressure configuration.

Definition at line 302 of file backpressure_job_queue.h.

Referenced by backpressure_job_queue(), get_available_tokens(), get_backpressure_config(), is_rate_limited(), and to_string().

◆ config_mutex_

std::mutex kcenon::thread::backpressure_job_queue::config_mutex_
mutableprivate

Mutex for configuration access.

Definition at line 307 of file backpressure_job_queue.h.

◆ current_pressure_

std::atomic<pressure_level> kcenon::thread::backpressure_job_queue::current_pressure_ {pressure_level::none}
private

Current pressure level (atomic for lock-free reads).

Definition at line 317 of file backpressure_job_queue.h.

Referenced by get_pressure_level().

◆ current_pressure_ratio_

std::atomic<double> kcenon::thread::backpressure_job_queue::current_pressure_ratio_ {0.0}
private

Current pressure ratio (atomic for lock-free reads).

Definition at line 322 of file backpressure_job_queue.h.

322{0.0};

◆ rate_limiter_

std::unique_ptr<token_bucket> kcenon::thread::backpressure_job_queue::rate_limiter_
private

Token bucket rate limiter (nullptr if disabled).

Definition at line 312 of file backpressure_job_queue.h.

Referenced by get_available_tokens(), and is_rate_limited().

◆ space_available_

std::condition_variable kcenon::thread::backpressure_job_queue::space_available_
private

Condition variable for blocking policy wait.

Definition at line 332 of file backpressure_job_queue.h.

◆ stats_

backpressure_stats kcenon::thread::backpressure_job_queue::stats_
private

Backpressure statistics.

Definition at line 327 of file backpressure_job_queue.h.

Referenced by get_backpressure_stats().


The documentation for this class was generated from the following files: