Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::job_queue Class Reference

A thread-safe job queue for managing and dispatching work items. More...

#include <job_queue.h>

Inheritance diagram for kcenon::thread::job_queue:
Inheritance graph
Collaboration diagram for kcenon::thread::job_queue:
Collaboration graph

Classes

struct  memory_stats
 Get memory footprint statistics for the queue. More...
 

Public Member Functions

 job_queue (std::optional< std::size_t > max_size=std::nullopt)
 Constructs a new, empty job_queue.
 
virtual ~job_queue (void)
 Virtual destructor. Cleans up resources used by the job_queue.
 
auto get_ptr (void) -> std::shared_ptr< job_queue >
 Obtains a std::shared_ptr that points to this queue instance.
 
auto is_stopped () const -> bool
 Checks if the queue is in a "stopped" state.
 
auto set_notify (bool notify) -> void
 Sets the 'notify' flag for this queue.
 
auto schedule (std::unique_ptr< job > &&value) -> common::VoidResult override
 scheduler_interface implementation: schedule a job
 
auto get_next_job () -> common::Result< std::unique_ptr< job > > override
 scheduler_interface implementation: get next job
 
virtual auto enqueue (std::unique_ptr< job > &&value) -> common::VoidResult
 Enqueues a new job into the queue.
 
template<typename JobType , typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
auto enqueue (std::unique_ptr< JobType > &&value) -> common::VoidResult
 Type-safe enqueue for job subclasses.
 
virtual auto enqueue_batch (std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
 Enqueues a batch of jobs into the queue.
 
virtual auto dequeue (void) -> common::Result< std::unique_ptr< job > >
 Dequeues a job from the queue in FIFO order (blocking operation).
 
virtual auto try_dequeue (void) -> common::Result< std::unique_ptr< job > >
 Attempts to dequeue a job from the queue without blocking.
 
virtual auto dequeue_batch (void) -> std::deque< std::unique_ptr< job > >
 Dequeues all remaining jobs from the queue without processing them.
 
virtual auto dequeue_batch_limited (std::size_t max_count) -> std::deque< std::unique_ptr< job > >
 Dequeues up to N jobs in a single operation (micro-batching).
 
virtual auto clear (void) -> void
 Removes all jobs currently in the queue without processing them.
 
auto empty (void) const -> bool
 Checks if the queue is currently empty.
 
auto size (void) const -> std::size_t
 Returns the current number of jobs in the queue.
 
auto get_memory_stats () const -> memory_stats
 Get memory footprint statistics for debugging and monitoring.
 
auto stop (void) -> void
 Signals the queue to stop waiting for new jobs (e.g., during shutdown).
 
virtual auto to_string (void) const -> std::string
 Returns a string representation of this job_queue.
 
auto get_capabilities () const -> queue_capabilities override
 Returns capabilities of this job_queue implementation.
 
auto is_bounded () const -> bool
 Check if queue has a size limit.
 
auto get_max_size () const -> std::optional< std::size_t >
 Get the maximum queue size.
 
auto set_max_size (std::optional< std::size_t > max_size) -> void
 Set maximum queue size.
 
auto is_full () const -> bool
 Check if queue is at capacity.
 
virtual auto inspect_pending_jobs (std::size_t limit=100) const -> std::vector< diagnostics::job_info >
 Inspects pending jobs in the queue without removing them.
 
- Public Member Functions inherited from kcenon::thread::scheduler_interface
virtual ~scheduler_interface ()=default
 
- Public Member Functions inherited from kcenon::thread::queue_capabilities_interface
virtual ~queue_capabilities_interface ()=default
 
auto has_exact_size () const -> bool
 Check if size() returns exact values.
 
auto has_atomic_empty () const -> bool
 Check if empty() check is atomic.
 
auto is_lock_free () const -> bool
 Check if this is a lock-free implementation.
 
auto is_wait_free () const -> bool
 Check if this is a wait-free implementation.
 
auto supports_batch () const -> bool
 Check if batch operations are supported.
 
auto supports_blocking_wait () const -> bool
 Check if blocking wait is supported.
 
auto supports_stop () const -> bool
 Check if stop signaling is supported.
 

Protected Attributes

std::atomic_bool notify_
 If true, threads waiting for new jobs are notified when a new job is enqueued. If false, enqueuing does not automatically trigger a notification.
 
std::atomic_bool stop_
 Indicates whether the queue has been signaled to stop.
 
std::mutex mutex_
 Mutex to protect access to the underlying queue_ container and related state.
 
std::condition_variable condition_
 Condition variable used to signal worker threads.
 

Private Attributes

std::deque< std::unique_ptr< job > > queue_
 The underlying container storing the jobs in FIFO order.
 
std::optional< std::size_t > max_size_
 Optional maximum queue size.
 
std::atomic< std::size_t > atomic_size_ {0}
 Atomic size counter for lock-free read-only queries.
 

Detailed Description

A thread-safe job queue for managing and dispatching work items.

The job_queue class provides a synchronized queue for storing and retrieving job objects (or derived classes). Multiple threads can safely enqueue and dequeue jobs, ensuring proper synchronization and preventing data races.

This class inherits from std::enable_shared_from_this, which allows it to create std::shared_ptr instances referring to itself via get_ptr(). This is often useful when passing a job_queue pointer to jobs themselves or other components that need a safe, shared reference to the queue.

Typical Usage

  1. Create a job_queue instance, typically via std::make_shared.
  2. Enqueue job objects (or derived types) using enqueue().
  3. One or more worker threads repeatedly call dequeue() to retrieve jobs and process them.
  4. Call stop() and possibly clear() to shut down the queue gracefully when all jobs are done or when the system is stopping.
Examples
queue_capabilities_sample.cpp.

Definition at line 62 of file job_queue.h.

Constructor & Destructor Documentation

◆ job_queue()

kcenon::thread::job_queue::job_queue ( std::optional< std::size_t > max_size = std::nullopt)
explicit

Constructs a new, empty job_queue.

Constructs a new job_queue with optional size limit.

Parameters
max_sizeOptional maximum queue size. If set, enqueue will fail when the queue reaches this size. Use std::nullopt for unlimited.

Initializes internal synchronization primitives and sets all flags to their default states (e.g., notify_ = false, stop_ = false).

Implementation details:

  • notify_ starts as true (enables condition variable notifications)
  • stop_ starts as false (queue is active)
  • mutex_ and condition_ are default constructed
  • queue_ is an empty deque ready for job storage
  • max_size_ controls the maximum queue size (std::nullopt = unlimited)

The queue is immediately ready for use after construction.

Parameters
max_sizeOptional maximum queue size. If set, enqueue will fail when the queue reaches this size. Use std::nullopt for unlimited.

Definition at line 33 of file job_queue.cpp.

34 : notify_(true), stop_(false), mutex_(), condition_(), queue_(), max_size_(max_size) {}
std::atomic_bool notify_
If true, threads waiting for new jobs are notified when a new job is enqueued. If false,...
Definition job_queue.h:355
std::mutex mutex_
Mutex to protect access to the underlying queue_ container and related state.
Definition job_queue.h:371
std::deque< std::unique_ptr< job > > queue_
The underlying container storing the jobs in FIFO order.
Definition job_queue.h:389
std::condition_variable condition_
Condition variable used to signal worker threads.
Definition job_queue.h:379
std::atomic_bool stop_
Indicates whether the queue has been signaled to stop.
Definition job_queue.h:363
std::optional< std::size_t > max_size_
Optional maximum queue size.
Definition job_queue.h:397

◆ ~job_queue()

kcenon::thread::job_queue::~job_queue ( void )
virtual

Virtual destructor. Cleans up resources used by the job_queue.

Destroys the job_queue.

Implementation details:

  • Any remaining jobs in the queue are automatically destroyed
  • No explicit cleanup needed due to RAII design
  • Any threads waiting on dequeue() will be unblocked by the stop mechanism

Definition at line 44 of file job_queue.cpp.

44{}

Member Function Documentation

◆ clear()

auto kcenon::thread::job_queue::clear ( void ) -> void
virtual

Removes all jobs currently in the queue without processing them.

Removes all jobs from the queue without returning them.

This operation is thread-safe and can be used to discard pending jobs, typically during shutdown or error recovery. It does not affect the stop_ or notify_ flags.

Implementation details:

  • Discards all jobs immediately (destructors called automatically)
  • Uses deque::clear() for efficient bulk removal
  • Notifies all waiting threads since queue is now empty
  • More efficient than dequeue_batch() when jobs don't need to be processed

Use Cases:

  • Emergency shutdown (discard pending work)
  • Queue reset/reinitialization
  • Error recovery scenarios
Note
Jobs are destroyed immediately, so any cleanup logic in job destructors will run

Definition at line 440 of file job_queue.cpp.

441 {
442 // Critical section: atomically clear all jobs
443 std::scoped_lock<std::mutex> lock(mutex_);
444
445 // Destroy all jobs in queue
446 queue_.clear();
447 atomic_size_.store(0, std::memory_order_relaxed);
448
449 // Wake all waiting threads since queue is now empty
450 condition_.notify_all();
451 }
std::atomic< std::size_t > atomic_size_
Atomic size counter for lock-free read-only queries.
Definition job_queue.h:405

◆ dequeue()

auto kcenon::thread::job_queue::dequeue ( void ) -> common::Result<std::unique_ptr<job>>
nodiscardvirtual

Dequeues a job from the queue in FIFO order (blocking operation).

Removes and returns the first job from the queue (blocking operation).

Returns
A common::Result<std::unique_ptr<job>> containing either a valid job or an error object.

If the queue is empty, the caller will block until a job becomes available or the queue is stopped. Use try_dequeue() for non-blocking operation.

Implementation details:

  • Uses unique_lock (required by condition_variable)
  • Blocks until a job is available OR queue is stopped
  • Returns error if queue is empty after waking up (indicates stop condition)
  • Efficiently moves job out of queue (zero-copy transfer)
  • Automatically unlocks mutex when function exits

Blocking Behavior:

  • Waits indefinitely until job available or stop requested
  • Uses condition variable for efficient waiting (no busy polling)
  • Multiple threads can wait concurrently (fair wake-up via notify_one)

Stop Coordination:

  • Wakes up when stop_.load() becomes true
  • Returns error instead of blocking indefinitely during shutdown

Thread Safety:

  • Safe for concurrent access with other dequeue/enqueue operations
  • Uses proper synchronization primitives
  • Handles race with clear() by re-checking queue state

Race Condition Fix:

  • Prevents TOCTOU bug between condition.wait() and queue access
  • Even if wait() returns with !queue_.empty(), another thread could call clear() before we access queue_.front()
  • Solution: Re-check queue_.empty() while still holding the lock
Returns
Unique pointer to job on success, error if queue empty/stopped

Definition at line 248 of file job_queue.cpp.

249 {
250 // Use unique_lock for condition variable operations
251 std::unique_lock<std::mutex> lock(mutex_);
252
253 // Block until job available OR queue stopped
254 condition_.wait(lock, [this]() { return !queue_.empty() || stop_.load(); });
255
256 // CRITICAL: Re-check queue state while still holding lock
257 // This prevents race with clear() that could empty the queue
258 // between wait() return and queue access (TOCTOU bug)
259 if (queue_.empty())
260 {
261 return common::error_info{static_cast<int>(error_code::queue_empty), "there are no jobs to dequeue", "thread_system"};
262 }
263
264 // Efficiently extract first job from queue
265 // At this point, we're guaranteed queue is not empty because:
266 // 1. We hold the mutex continuously from wait() return
267 // 2. We just verified !queue_.empty() above
268 auto value = std::move(queue_.front());
269 queue_.pop_front();
270 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
271
272 return value; // Return moved job (caller takes ownership)
273 }

References kcenon::thread::queue_empty.

Referenced by get_next_job().

Here is the caller graph for this function:

◆ dequeue_batch()

auto kcenon::thread::job_queue::dequeue_batch ( void ) -> std::deque<std::unique_ptr<job>>
nodiscardvirtual

Dequeues all remaining jobs from the queue without processing them.

Removes and returns ALL jobs from the queue (non-blocking operation).

Returns
A std::deque of unique_ptr<job> containing all jobs that were in the queue at the time of the call.

Similar to clear(), but returns the dequeued jobs to the caller for potential inspection or manual processing.

Implementation details:

  • Non-blocking: returns immediately regardless of queue state
  • Uses efficient swap operation to transfer entire queue contents
  • Notifies all waiting threads since queue is now empty
  • Returns empty deque if queue was already empty
  • Leaves queue in empty state after operation

Performance Characteristics:

  • O(1) complexity due to swap operation (very efficient)
  • No copying of job objects (moves ownership)
  • Single lock acquisition for entire batch

Use Cases:

  • Shutdown scenarios (drain all pending work)
  • Batch processing of accumulated jobs
  • Queue migration between workers

Thread Safety:

  • Uses scoped_lock for automatic cleanup
  • notify_all() ensures no threads remain blocked on empty queue
Returns
Deque containing all jobs that were in the queue

Definition at line 347 of file job_queue.cpp.

348 {
349 std::deque<std::unique_ptr<job>> all_items;
350 {
351 // Critical section: atomically transfer all queue contents
352 std::scoped_lock<std::mutex> lock(mutex_);
353
354 // Efficient O(1) transfer of all jobs
355 std::swap(queue_, all_items);
356 atomic_size_.store(0, std::memory_order_relaxed);
357
358 // Wake all waiting threads since queue is now empty
359 condition_.notify_all();
360 }
361
362 return all_items; // Return all extracted jobs
363 }

◆ dequeue_batch_limited()

auto kcenon::thread::job_queue::dequeue_batch_limited ( std::size_t max_count) -> std::deque<std::unique_ptr<job>>
nodiscardvirtual

Dequeues up to N jobs in a single operation (micro-batching).

Dequeues up to N jobs in a single lock acquisition (micro-batching).

Parameters
max_countMaximum number of jobs to dequeue (typically 4-8)
Returns
A std::deque of unique_ptr<job> containing dequeued jobs

This method implements micro-batching to reduce mutex contention:

  • Acquires lock once for multiple jobs (vs N separate dequeues)
  • Reduces overhead under high contention scenarios
  • Provides better cache locality for job processing
  • Returns fewer jobs if queue has less than max_count

Recommended batch sizes:

  • Light load: 4 jobs (balance latency and throughput)
  • Heavy contention: 8 jobs (amortize lock overhead)

Use Cases:

  • High-throughput scenarios with many producers/consumers
  • Reducing lock contention in multi-threaded workloads
  • Improving cache efficiency for batch processing

Implementation details:

  • Acquires lock once and extracts up to max_count jobs
  • Reduces lock contention compared to N individual dequeue() calls
  • Returns immediately with available jobs (non-blocking)
  • Returns empty deque if queue is empty
  • Does not notify condition variable (jobs remain available)

Performance Characteristics:

  • O(N) where N = min(max_count, queue.size())
  • Amortizes lock overhead across multiple jobs
  • Improves cache locality for batch processing
  • Reduces context switching under high contention

Micro-Batching Benefits:

  • With 8-job batches and 100 producers:
    • Reduces lock acquisitions by ~8x
    • Improves throughput by 15-40% under contention
  • Minimizes cache line bouncing between threads
  • Better CPU pipeline utilization

Thread Safety:

  • Uses scoped_lock for exception safety
  • Safe to call concurrently with enqueue/dequeue operations
  • No spurious wake-ups since jobs still remain
Parameters
max_countMaximum number of jobs to extract (typically 4-8)
Returns
Deque containing up to max_count jobs

Definition at line 396 of file job_queue.cpp.

398 {
399 std::deque<std::unique_ptr<job>> batch_items;
400 {
401 // Critical section: atomically extract up to max_count jobs
402 std::scoped_lock<std::mutex> lock(mutex_);
403
404 // Extract jobs up to max_count or queue size, whichever is smaller
405 std::size_t count = std::min(max_count, queue_.size());
406 for (std::size_t i = 0; i < count; ++i)
407 {
408 batch_items.push_back(std::move(queue_.front()));
409 queue_.pop_front();
410 }
411 atomic_size_.fetch_sub(count, std::memory_order_relaxed);
412
413 // Only notify if queue is now empty (to wake waiting workers)
414 // If jobs remain, other workers can continue dequeuing without wakeup overhead
415 if (queue_.empty())
416 {
417 condition_.notify_all();
418 }
419 }
420
421 return batch_items; // Return extracted batch
422 }

◆ empty()

auto kcenon::thread::job_queue::empty ( void ) const -> bool
nodiscard

Checks if the queue is currently empty.

Checks if the queue contains any jobs.

Returns
true if the queue has no pending jobs, false otherwise.
Note
This method is thread-safe.

Implementation details:

  • Thread-safe read operation using mutex
  • Snapshot in time (may change immediately after call)
  • Useful for non-blocking queue state checks
Note
Result may be stale by the time caller uses it in multi-threaded environment
Returns
true if queue has no jobs, false if jobs are present

Definition at line 465 of file job_queue.cpp.

466 {
467 return atomic_size_.load(std::memory_order_relaxed) == 0;
468 }

◆ enqueue() [1/2]

auto kcenon::thread::job_queue::enqueue ( std::unique_ptr< job > && value) -> common::VoidResult
nodiscardvirtual

Enqueues a new job into the queue.

Adds a single job to the back of the queue.

Parameters
valueA unique pointer to the job being added.
Returns
A common::VoidResult indicating success or an error message.

This method is thread-safe. If notify_ is set to true, a waiting thread (if any) will be notified upon successful enqueue.

Implementation details:

  • First checks if queue is stopped to fail fast
  • Validates that the job pointer is not null
  • Uses scoped_lock for automatic mutex management
  • Moves the job into the queue (zero-copy transfer)
  • Optionally notifies one waiting thread if notifications enabled

Thread Safety:

  • Uses mutex to protect queue modification
  • Safe to call concurrently from multiple threads
  • notify_one() wakes exactly one waiting thread (fair scheduling)

Error Conditions:

  • Returns error if queue is stopped
  • Returns error if job is null
Parameters
valueUnique pointer to job (moved into queue)
Returns
Empty result on success, error on failure

Reimplemented in kcenon::thread::backpressure_job_queue.

Definition at line 105 of file job_queue.cpp.

106 {
107 // Early validation: check if queue is still accepting jobs
108 if (stop_.load())
109 {
110 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
111 }
112
113 // Validate input: null jobs are not allowed
114 if (value == nullptr)
115 {
116 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue null job", "thread_system"};
117 }
118
119 // Critical section: modify queue with proper synchronization
120 std::scoped_lock<std::mutex> lock(mutex_);
121
122 // Check size limit if bounded
123 if (max_size_.has_value() && queue_.size() >= max_size_.value())
124 {
125 return common::error_info{static_cast<int>(error_code::queue_full), "Job queue is full", "thread_system"};
126 }
127
128 // Move job into queue (efficient transfer of ownership)
129 queue_.push_back(std::move(value));
130 atomic_size_.fetch_add(1, std::memory_order_relaxed);
131
132 // Conditionally notify waiting consumers
133 if (notify_)
134 {
135 condition_.notify_one(); // Wake exactly one waiting thread
136 }
137
138 return common::ok();
139 }

References kcenon::thread::invalid_argument, kcenon::thread::queue_full, and kcenon::thread::queue_stopped.

Referenced by kcenon::thread::backpressure_job_queue::direct_enqueue(), enqueue(), kcenon::thread::backpressure_job_queue::handle_callback_policy(), and schedule().

Here is the caller graph for this function:

◆ enqueue() [2/2]

template<typename JobType , typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
auto kcenon::thread::job_queue::enqueue ( std::unique_ptr< JobType > && value) -> common::VoidResult
inlinenodiscard

Type-safe enqueue for job subclasses.

This template method provides type-safe job submission, allowing callers to enqueue jobs without explicit casting. The JobType must be derived from the base job class.

Template Parameters
JobTypeA type derived from job
Parameters
valueA unique pointer to the job being added.
Returns
A common::VoidResult indicating success or an error message.
// Instead of typed_job_queue, use job_queue with template enqueue:
auto queue = std::make_shared<job_queue>();
queue->enqueue<my_custom_job>(std::make_unique<my_custom_job>());

Definition at line 149 of file job_queue.h.

150 {
151 return enqueue(std::unique_ptr<job>(std::move(value)));
152 }
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.

References enqueue().

Here is the call graph for this function:

◆ enqueue_batch()

auto kcenon::thread::job_queue::enqueue_batch ( std::vector< std::unique_ptr< job > > && jobs) -> common::VoidResult
nodiscardvirtual

Enqueues a batch of jobs into the queue.

Adds multiple jobs to the queue in a single operation.

Parameters
jobsA vector of unique pointers to the jobs being added.
Returns
A common::VoidResult indicating success or an error message.

Implementation details:

  • Validates queue state and input before any modifications
  • Pre-validates all jobs to ensure atomicity (all or nothing)
  • Uses single lock acquisition for entire batch (better performance)
  • Moves all jobs efficiently without copying
  • Only notifies once after all jobs are added (avoids notification spam)

Performance Benefits:

  • Single mutex acquisition vs multiple for individual enqueues
  • Single notification vs notification per job
  • Reduced contention for high-throughput scenarios

Atomicity:

  • Either all jobs are added or none are (on validation failure)
  • No partial batches in case of null job detection
Parameters
jobsVector of unique pointers to jobs (moved into queue)
Returns
Empty result on success, error on failure

Reimplemented in kcenon::thread::backpressure_job_queue.

Definition at line 163 of file job_queue.cpp.

164 {
165 // Early validation: check if queue is still accepting jobs
166 if (stop_.load())
167 {
168 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
169 }
170
171 // Validate batch: empty batches are not allowed
172 if (jobs.empty())
173 {
174 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue empty batch", "thread_system"};
175 }
176
177 // Pre-validate all jobs before modifying queue (ensures atomicity)
178 for (auto& job : jobs)
179 {
180 if (job == nullptr)
181 {
182 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue null job in batch", "thread_system"};
183 }
184 }
185
186 // Critical section: add entire batch with single lock acquisition
187 std::scoped_lock<std::mutex> lock(mutex_);
188
189 // Check size limit if bounded
190 if (max_size_.has_value())
191 {
192 std::size_t available_space = max_size_.value() - queue_.size();
193 if (jobs.size() > available_space)
194 {
195 return common::error_info{static_cast<int>(error_code::queue_full),
196 "Job queue cannot fit entire batch", "thread_system"};
197 }
198 }
199
200 // Move all jobs into queue efficiently
201 for (auto& job : jobs)
202 {
203 queue_.push_back(std::move(job));
204 }
205 atomic_size_.fetch_add(jobs.size(), std::memory_order_relaxed);
206
207 // Single notification for entire batch (performance optimization)
208 if (notify_)
209 {
210 condition_.notify_one(); // Wake one thread to process batch
211 }
212
213 return common::ok();
214 }

References kcenon::thread::invalid_argument, kcenon::thread::queue_full, and kcenon::thread::queue_stopped.

Referenced by kcenon::thread::backpressure_job_queue::enqueue_batch().

Here is the caller graph for this function:

◆ get_capabilities()

auto kcenon::thread::job_queue::get_capabilities ( ) const -> queue_capabilities
inlinenodiscardoverridevirtual

Returns capabilities of this job_queue implementation.

Returns
Queue capabilities struct describing this implementation.

job_queue provides:

  • Exact size: size() returns precise count (mutex-protected)
  • Atomic empty check: empty() is consistent (mutex-protected)
  • Not lock-free: Uses mutex for synchronization
  • Batch operations: Supports enqueue_batch/dequeue_batch
  • Blocking wait: Supports blocking dequeue()
  • Stop support: Supports stop() for graceful shutdown

Reimplemented from kcenon::thread::queue_capabilities_interface.

Definition at line 287 of file job_queue.h.

287 {
288 return queue_capabilities{
289 .exact_size = true,
290 .atomic_empty_check = true,
291 .lock_free = false,
292 .wait_free = false,
293 .supports_batch = true,
294 .supports_blocking_wait = true,
295 .supports_stop = true
296 };
297 }

◆ get_max_size()

auto kcenon::thread::job_queue::get_max_size ( ) const -> std::optional<std::size_t>
nodiscard

Get the maximum queue size.

Gets the maximum queue size.

Returns
The maximum size, or std::nullopt if unlimited

Definition at line 594 of file job_queue.cpp.

595 {
596 return max_size_;
597 }

References max_size_.

Referenced by kcenon::thread::backpressure_job_queue::get_pressure_ratio().

Here is the caller graph for this function:

◆ get_memory_stats()

auto kcenon::thread::job_queue::get_memory_stats ( ) const -> memory_stats
nodiscard

Get memory footprint statistics for debugging and monitoring.

Implementation details:

  • Estimates total memory used by jobs in queue
  • Includes deque node overhead (platform-dependent)
  • Assumes average job size for pointer storage
  • Thread-safe operation using mutex

Memory Estimation:

  • Job pointer: sizeof(std::unique_ptr<job>) per job
  • Deque node: ~40 bytes overhead per node (typical for std::deque)
  • Total = (pointer_size + node_overhead) * job_count

Use Cases:

  • Detecting memory leaks (growing queue size)
  • Monitoring queue memory pressure
  • Setting memory-based thresholds for backpressure
Returns
Structure containing memory usage estimates

Definition at line 540 of file job_queue.cpp.

541 {
542 std::scoped_lock<std::mutex> lock(mutex_);
543
544 std::size_t job_count = queue_.size();
545
546 // Estimate memory usage
547 // std::unique_ptr<job> typically 8 bytes (pointer size)
548 // std::deque node overhead varies by platform, typically 32-48 bytes
549 constexpr std::size_t ptr_size = sizeof(std::unique_ptr<job>);
550 constexpr std::size_t node_overhead = 40; // Conservative estimate
551
552 // Use traditional aggregate initialization for C++17 compatibility
553 memory_stats stats;
554 stats.queue_size_bytes = (ptr_size + node_overhead) * job_count;
555 stats.pending_job_count = job_count;
556 stats.node_overhead_bytes = node_overhead * job_count;
557 return stats;
558 }

References mutex_, kcenon::thread::job_queue::memory_stats::node_overhead_bytes, kcenon::thread::job_queue::memory_stats::pending_job_count, queue_, and kcenon::thread::job_queue::memory_stats::queue_size_bytes.

◆ get_next_job()

auto kcenon::thread::job_queue::get_next_job ( ) -> common::Result<std::unique_ptr<job>>
inlineoverridevirtual

scheduler_interface implementation: get next job

Implements kcenon::thread::scheduler_interface.

Definition at line 119 of file job_queue.h.

119{ return dequeue(); }
virtual auto dequeue(void) -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue in FIFO order (blocking operation).

References dequeue().

Here is the call graph for this function:

◆ get_ptr()

auto kcenon::thread::job_queue::get_ptr ( void ) -> std::shared_ptr<job_queue>
nodiscard

Obtains a std::shared_ptr that points to this queue instance.

Returns a shared pointer to this job_queue instance.

Returns
A shared pointer to the current job_queue object.

Because job_queue inherits from std::enable_shared_from_this, calling get_ptr() allows retrieving a shared_ptr<job_queue> from within member functions of job_queue.

Implementation details:

  • Uses std::enable_shared_from_this to safely create shared_ptr
  • Required for passing the queue to multiple threads safely
  • Ensures proper lifetime management of the queue object
Returns
Shared pointer to this job_queue

Definition at line 56 of file job_queue.cpp.

56{ return shared_from_this(); }

◆ inspect_pending_jobs()

auto kcenon::thread::job_queue::inspect_pending_jobs ( std::size_t limit = 100) const -> std::vector<diagnostics::job_info>
nodiscardvirtual

Inspects pending jobs in the queue without removing them.

Parameters
limitMaximum number of jobs to inspect (0 = all).
Returns
Vector of job_info for pending jobs.

Thread Safety:

  • Acquires mutex for safe inspection
  • Returns snapshot of current queue state
  • No performance impact on normal queue operations

Use Cases:

  • Diagnostics and monitoring
  • Queue depth analysis
  • Job wait time tracking

Implementation details:

  • Thread-safe operation using mutex
  • Creates job_info snapshots for each pending job
  • Does not modify the queue (read-only inspection)
  • Returns up to 'limit' jobs, or all if limit is 0

Performance Characteristics:

  • O(min(limit, queue_size)) complexity
  • Single lock acquisition for entire inspection
  • Minimal overhead on normal queue operations
Parameters
limitMaximum number of jobs to inspect (0 = all)
Returns
Vector of job_info for pending jobs

Definition at line 648 of file job_queue.cpp.

650 {
651 std::scoped_lock<std::mutex> lock(mutex_);
652
653 std::vector<diagnostics::job_info> result;
654 auto count = (limit == 0) ? queue_.size() : std::min(limit, queue_.size());
655 result.reserve(count);
656
657 auto now = std::chrono::steady_clock::now();
658 std::size_t index = 0;
659
660 for (const auto& job_ptr : queue_)
661 {
662 if (limit > 0 && index >= limit)
663 {
664 break;
665 }
666
667 if (job_ptr == nullptr)
668 {
669 ++index;
670 continue;
671 }
672
673 diagnostics::job_info info;
674 info.job_id = job_ptr->get_job_id();
675 info.job_name = job_ptr->get_name();
677 info.enqueue_time = job_ptr->get_enqueue_time();
678 info.start_time = job_ptr->get_enqueue_time(); // Not started yet
679
680 // Calculate wait time (time since enqueue)
681 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
682 now - job_ptr->get_enqueue_time()
683 );
684 info.execution_time = std::chrono::nanoseconds{0}; // Not started yet
685
686 result.push_back(std::move(info));
687 ++index;
688 }
689
690 return result;
691 }
@ pending
Job is waiting in the queue.
std::shared_ptr< job_interface > job_ptr
Shared pointer type for job objects.
Definition job_types.h:84
@ info
Informational messages highlighting progress.

References kcenon::thread::info, kcenon::thread::diagnostics::job_info::job_id, and kcenon::thread::diagnostics::pending.

◆ is_bounded()

auto kcenon::thread::job_queue::is_bounded ( ) const -> bool
nodiscard

Check if queue has a size limit.

Checks if the queue has a size limit.

Returns
true if max_size is set, false otherwise
true if max_size is set, false if unlimited

Definition at line 584 of file job_queue.cpp.

585 {
586 return max_size_.has_value();
587 }

References max_size_.

◆ is_full()

auto kcenon::thread::job_queue::is_full ( ) const -> bool
nodiscard

Check if queue is at capacity.

Checks if the queue is at capacity.

Returns
true if queue is at max_size, false otherwise
true if queue is at max_size (bounded and full), false otherwise

Definition at line 618 of file job_queue.cpp.

619 {
620 if (!max_size_.has_value())
621 {
622 return false; // Unbounded queue is never full
623 }
624 return atomic_size_.load(std::memory_order_relaxed) >= max_size_.value();
625 }

References atomic_size_, and max_size_.

◆ is_stopped()

auto kcenon::thread::job_queue::is_stopped ( ) const -> bool
nodiscard

Checks if the queue is in a "stopped" state.

Checks if the queue has been stopped.

Returns
true if the queue is stopped, false otherwise.

When stopped, worker threads are typically notified to cease waiting for new jobs. New jobs may still be enqueued, but it is up to the system design how they are handled in a stopped state.

Implementation details:

  • Uses atomic load for thread-safe access
  • When stopped, dequeue operations will fail instead of blocking
  • Used to coordinate shutdown between producer and consumer threads
Returns
true if queue is stopped, false if active

Definition at line 68 of file job_queue.cpp.

68{ return stop_.load(); }

References stop_.

◆ schedule()

auto kcenon::thread::job_queue::schedule ( std::unique_ptr< job > && value) -> common::VoidResult
inlineoverridevirtual

scheduler_interface implementation: schedule a job

Implements kcenon::thread::scheduler_interface.

Definition at line 114 of file job_queue.h.

114{ return enqueue(std::move(value)); }

References enqueue().

Here is the call graph for this function:

◆ set_max_size()

auto kcenon::thread::job_queue::set_max_size ( std::optional< std::size_t > max_size) -> void

Set maximum queue size.

Sets the maximum queue size.

Parameters
max_sizeNew maximum size (std::nullopt for unlimited)

Note: This does not remove existing jobs if the queue is currently larger than the new max_size. It only affects future enqueue operations.

Parameters
max_sizeNew maximum size (std::nullopt for unlimited)

Definition at line 607 of file job_queue.cpp.

608 {
609 std::scoped_lock<std::mutex> lock(mutex_);
610 max_size_ = max_size;
611 }

◆ set_notify()

auto kcenon::thread::job_queue::set_notify ( bool notify) -> void

Sets the 'notify' flag for this queue.

Controls whether enqueue operations notify waiting threads.

Parameters
notifyIf true, signals that enqueue should notify waiting threads. If false, jobs can still be enqueued, but waiting threads won't be automatically notified.

Implementation details:

  • Uses atomic store for thread-safe modification
  • When false, enqueue won't wake up threads waiting in dequeue()
  • Useful for batch operations to avoid excessive notifications
  • Default is true (notifications enabled)
Parameters
notifytrue to enable notifications, false to disable

Definition at line 81 of file job_queue.cpp.

81{ notify_.store(notify); }

◆ size()

auto kcenon::thread::job_queue::size ( void ) const -> std::size_t
nodiscard

Returns the current number of jobs in the queue.

Returns
The number of pending jobs.
Note
This method is thread-safe.

Implementation details:

  • Thread-safe read operation using mutex
  • Snapshot in time (may change immediately after call)
  • O(1) operation for std::deque
Note
Result may be stale by the time caller uses it in multi-threaded environment
Returns
Number of jobs currently in the queue

Definition at line 514 of file job_queue.cpp.

515 {
516 return atomic_size_.load(std::memory_order_relaxed);
517 }

Referenced by kcenon::thread::backpressure_job_queue::get_pressure_ratio(), and kcenon::thread::backpressure_job_queue::to_string().

Here is the caller graph for this function:

◆ stop()

auto kcenon::thread::job_queue::stop ( void ) -> void

Signals the queue to stop waiting for new jobs (e.g., during shutdown).

Signals the queue to stop accepting jobs and wake waiting threads.

Sets the stop_ flag to true and notifies any threads that might be blocked in dequeue(). This allows worker threads to exit gracefully rather than remain blocked indefinitely.

This method provides a consistent API with thread_pool and thread_base classes.

Implementation details:

  • Sets atomic stop flag to prevent new enqueue operations
  • Wakes all threads waiting in dequeue() operations
  • Those threads will then return with error instead of blocking
  • Used for coordinated shutdown of producer-consumer systems

Shutdown Sequence:

  1. Set stop flag (prevents new jobs)
  2. Notify all waiting consumers
  3. Waiting dequeue() calls return with error
  4. Threads can then complete their shutdown process

Thread Safety:

  • Safe to call from any thread
  • Idempotent operation (safe to call multiple times)

Definition at line 489 of file job_queue.cpp.

490 {
491 // Critical section: set stop flag and notify waiters
492 std::scoped_lock<std::mutex> lock(mutex_);
493
494 // Prevent new jobs from being added
495 stop_.store(true);
496
497 // Wake all threads waiting in dequeue()
498 condition_.notify_all();
499 }

◆ to_string()

auto kcenon::thread::job_queue::to_string ( void ) const -> std::string
nodiscardvirtual

Returns a string representation of this job_queue.

Provides a string representation of the queue's current state.

Returns
A std::string describing the state of the queue (e.g., size, flags).

Primarily used for logging and debugging. Derived classes may override this to include additional diagnostic information.

Implementation details:

  • Uses size() method to get current job count
  • Formats output using the formatter utility
  • Useful for logging and debugging purposes
Returns
Formatted string showing current job count

Reimplemented in kcenon::thread::backpressure_job_queue.

Definition at line 570 of file job_queue.cpp.

571 {
572 return utility_module::formatter::format("contained {} jobs", size());
573 }
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129

References utility_module::formatter::format().

Here is the call graph for this function:

◆ try_dequeue()

auto kcenon::thread::job_queue::try_dequeue ( void ) -> common::Result<std::unique_ptr<job>>
nodiscardvirtual

Attempts to dequeue a job from the queue without blocking.

Attempts to dequeue a job without blocking (non-blocking operation).

Returns
A common::Result<std::unique_ptr<job>> containing either a valid job or an error object.

If the queue is empty, this method returns immediately with an error instead of blocking. This is useful for polling-based consumers.

Implementation details:

  • Never blocks, returns immediately regardless of queue state
  • Returns error if queue is empty (vs blocking in dequeue())
  • Uses scoped_lock for quick queue access and release
  • Ideal for polling-based consumers or timeout-based operations

Performance Benefits:

  • No condition variable overhead
  • No thread blocking/unblocking costs
  • Suitable for high-frequency polling scenarios

Use Cases:

  • Non-blocking consumer threads
  • Timeout-based dequeue operations
  • Testing scenarios where blocking is undesirable
Returns
Unique pointer to job on success, error if queue empty/stopped

Definition at line 296 of file job_queue.cpp.

297 {
298 // Early validation: check if queue is stopped
299 if (stop_.load())
300 {
301 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
302 }
303
304 // Critical section: check and potentially extract job
305 std::scoped_lock<std::mutex> lock(mutex_);
306
307 // Non-blocking check: return error if empty
308 if (queue_.empty())
309 {
310 return common::error_info{static_cast<int>(error_code::queue_empty), "there are no jobs to dequeue", "thread_system"};
311 }
312
313 // Efficiently extract first job from queue
314 auto value = std::move(queue_.front());
315 queue_.pop_front();
316 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
317
318 return value; // Return moved job (caller takes ownership)
319 }

References kcenon::thread::queue_empty, and kcenon::thread::queue_stopped.

Member Data Documentation

◆ atomic_size_

std::atomic<std::size_t> kcenon::thread::job_queue::atomic_size_ {0}
private

Atomic size counter for lock-free read-only queries.

Maintained in sync with queue_ by enqueue/dequeue operations. Allows empty(), size(), and is_full() to operate without mutex.

Definition at line 405 of file job_queue.h.

405{0};

Referenced by is_full().

◆ condition_

std::condition_variable kcenon::thread::job_queue::condition_
protected

Condition variable used to signal worker threads.

Used in combination with mutex_ to block or notify threads waiting for new jobs.

Definition at line 379 of file job_queue.h.

◆ max_size_

std::optional<std::size_t> kcenon::thread::job_queue::max_size_
private

Optional maximum queue size.

If set, enqueue operations will fail when the queue reaches this size. std::nullopt means unlimited size (default behavior).

Definition at line 397 of file job_queue.h.

Referenced by get_max_size(), is_bounded(), and is_full().

◆ mutex_

std::mutex kcenon::thread::job_queue::mutex_
mutableprotected

Mutex to protect access to the underlying queue_ container and related state.

Any operation that modifies or reads the queue should lock this mutex to ensure thread safety.

Definition at line 371 of file job_queue.h.

Referenced by get_memory_stats().

◆ notify_

std::atomic_bool kcenon::thread::job_queue::notify_
protected

If true, threads waiting for new jobs are notified when a new job is enqueued. If false, enqueuing does not automatically trigger a notification.

Definition at line 355 of file job_queue.h.

◆ queue_

std::deque<std::unique_ptr<job> > kcenon::thread::job_queue::queue_
private

The underlying container storing the jobs in FIFO order.

Note
This container is guarded by mutex_ and should only be modified while holding the lock. The size of this container should be the only source of truth for the queue size to prevent inconsistencies.

Definition at line 389 of file job_queue.h.

Referenced by get_memory_stats().

◆ stop_

std::atomic_bool kcenon::thread::job_queue::stop_
protected

Indicates whether the queue has been signaled to stop.

Setting stop_ to true typically causes waiting threads to unblock and exit their waiting loop.

Definition at line 363 of file job_queue.h.

Referenced by is_stopped().


The documentation for this class was generated from the following files: