|
Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
|
A thread-safe job queue for managing and dispatching work items. More...
#include <job_queue.h>


Classes | |
| struct | memory_stats |
| Get memory footprint statistics for the queue. More... | |
Public Member Functions | |
| job_queue (std::optional< std::size_t > max_size=std::nullopt) | |
Constructs a new, empty job_queue. | |
| virtual | ~job_queue (void) |
Virtual destructor. Cleans up resources used by the job_queue. | |
| auto | get_ptr (void) -> std::shared_ptr< job_queue > |
Obtains a std::shared_ptr that points to this queue instance. | |
| auto | is_stopped () const -> bool |
| Checks if the queue is in a "stopped" state. | |
| auto | set_notify (bool notify) -> void |
| Sets the 'notify' flag for this queue. | |
| auto | schedule (std::unique_ptr< job > &&value) -> common::VoidResult override |
| scheduler_interface implementation: schedule a job | |
| auto | get_next_job () -> common::Result< std::unique_ptr< job > > override |
| scheduler_interface implementation: get next job | |
| virtual auto | enqueue (std::unique_ptr< job > &&value) -> common::VoidResult |
| Enqueues a new job into the queue. | |
| template<typename JobType , typename = std::enable_if_t<std::is_base_of_v<job, JobType>>> | |
| auto | enqueue (std::unique_ptr< JobType > &&value) -> common::VoidResult |
| Type-safe enqueue for job subclasses. | |
| virtual auto | enqueue_batch (std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult |
| Enqueues a batch of jobs into the queue. | |
| virtual auto | dequeue (void) -> common::Result< std::unique_ptr< job > > |
| Dequeues a job from the queue in FIFO order (blocking operation). | |
| virtual auto | try_dequeue (void) -> common::Result< std::unique_ptr< job > > |
| Attempts to dequeue a job from the queue without blocking. | |
| virtual auto | dequeue_batch (void) -> std::deque< std::unique_ptr< job > > |
| Dequeues all remaining jobs from the queue without processing them. | |
| virtual auto | dequeue_batch_limited (std::size_t max_count) -> std::deque< std::unique_ptr< job > > |
| Dequeues up to N jobs in a single operation (micro-batching). | |
| virtual auto | clear (void) -> void |
| Removes all jobs currently in the queue without processing them. | |
| auto | empty (void) const -> bool |
| Checks if the queue is currently empty. | |
| auto | size (void) const -> std::size_t |
| Returns the current number of jobs in the queue. | |
| auto | get_memory_stats () const -> memory_stats |
| Get memory footprint statistics for debugging and monitoring. | |
| auto | stop (void) -> void |
| Signals the queue to stop waiting for new jobs (e.g., during shutdown). | |
| virtual auto | to_string (void) const -> std::string |
| Returns a string representation of this job_queue. | |
| auto | get_capabilities () const -> queue_capabilities override |
| Returns capabilities of this job_queue implementation. | |
| auto | is_bounded () const -> bool |
| Check if queue has a size limit. | |
| auto | get_max_size () const -> std::optional< std::size_t > |
| Get the maximum queue size. | |
| auto | set_max_size (std::optional< std::size_t > max_size) -> void |
| Set maximum queue size. | |
| auto | is_full () const -> bool |
| Check if queue is at capacity. | |
| virtual auto | inspect_pending_jobs (std::size_t limit=100) const -> std::vector< diagnostics::job_info > |
| Inspects pending jobs in the queue without removing them. | |
Public Member Functions inherited from kcenon::thread::scheduler_interface | |
| virtual | ~scheduler_interface ()=default |
Public Member Functions inherited from kcenon::thread::queue_capabilities_interface | |
| virtual | ~queue_capabilities_interface ()=default |
| auto | has_exact_size () const -> bool |
| Check if size() returns exact values. | |
| auto | has_atomic_empty () const -> bool |
| Check if empty() check is atomic. | |
| auto | is_lock_free () const -> bool |
| Check if this is a lock-free implementation. | |
| auto | is_wait_free () const -> bool |
| Check if this is a wait-free implementation. | |
| auto | supports_batch () const -> bool |
| Check if batch operations are supported. | |
| auto | supports_blocking_wait () const -> bool |
| Check if blocking wait is supported. | |
| auto | supports_stop () const -> bool |
| Check if stop signaling is supported. | |
Protected Attributes | |
| std::atomic_bool | notify_ |
If true, threads waiting for new jobs are notified when a new job is enqueued. If false, enqueuing does not automatically trigger a notification. | |
| std::atomic_bool | stop_ |
| Indicates whether the queue has been signaled to stop. | |
| std::mutex | mutex_ |
Mutex to protect access to the underlying queue_ container and related state. | |
| std::condition_variable | condition_ |
| Condition variable used to signal worker threads. | |
Private Attributes | |
| std::deque< std::unique_ptr< job > > | queue_ |
| The underlying container storing the jobs in FIFO order. | |
| std::optional< std::size_t > | max_size_ |
| Optional maximum queue size. | |
| std::atomic< std::size_t > | atomic_size_ {0} |
| Atomic size counter for lock-free read-only queries. | |
A thread-safe job queue for managing and dispatching work items.
The job_queue class provides a synchronized queue for storing and retrieving job objects (or derived classes). Multiple threads can safely enqueue and dequeue jobs, ensuring proper synchronization and preventing data races.
This class inherits from std::enable_shared_from_this, which allows it to create std::shared_ptr instances referring to itself via get_ptr(). This is often useful when passing a job_queue pointer to jobs themselves or other components that need a safe, shared reference to the queue.
job_queue instance, typically via std::make_shared.job objects (or derived types) using enqueue().dequeue() to retrieve jobs and process them.stop() and possibly clear() to shut down the queue gracefully when all jobs are done or when the system is stopping. Definition at line 62 of file job_queue.h.
|
explicit |
Constructs a new, empty job_queue.
Constructs a new job_queue with optional size limit.
| max_size | Optional maximum queue size. If set, enqueue will fail when the queue reaches this size. Use std::nullopt for unlimited. |
Initializes internal synchronization primitives and sets all flags to their default states (e.g., notify_ = false, stop_ = false).
Implementation details:
The queue is immediately ready for use after construction.
| max_size | Optional maximum queue size. If set, enqueue will fail when the queue reaches this size. Use std::nullopt for unlimited. |
Definition at line 33 of file job_queue.cpp.
|
virtual |
Virtual destructor. Cleans up resources used by the job_queue.
Destroys the job_queue.
Implementation details:
Definition at line 44 of file job_queue.cpp.
|
virtual |
Removes all jobs currently in the queue without processing them.
Removes all jobs from the queue without returning them.
This operation is thread-safe and can be used to discard pending jobs, typically during shutdown or error recovery. It does not affect the stop_ or notify_ flags.
Implementation details:
Use Cases:
Definition at line 440 of file job_queue.cpp.
|
nodiscardvirtual |
Dequeues a job from the queue in FIFO order (blocking operation).
Removes and returns the first job from the queue (blocking operation).
If the queue is empty, the caller will block until a job becomes available or the queue is stopped. Use try_dequeue() for non-blocking operation.
Implementation details:
Blocking Behavior:
Stop Coordination:
Thread Safety:
Race Condition Fix:
Definition at line 248 of file job_queue.cpp.
References kcenon::thread::queue_empty.
Referenced by get_next_job().

|
nodiscardvirtual |
Dequeues all remaining jobs from the queue without processing them.
Removes and returns ALL jobs from the queue (non-blocking operation).
std::deque of unique_ptr<job> containing all jobs that were in the queue at the time of the call.Similar to clear(), but returns the dequeued jobs to the caller for potential inspection or manual processing.
Implementation details:
Performance Characteristics:
Use Cases:
Thread Safety:
Definition at line 347 of file job_queue.cpp.
|
nodiscardvirtual |
Dequeues up to N jobs in a single operation (micro-batching).
Dequeues up to N jobs in a single lock acquisition (micro-batching).
| max_count | Maximum number of jobs to dequeue (typically 4-8) |
std::deque of unique_ptr<job> containing dequeued jobsThis method implements micro-batching to reduce mutex contention:
Recommended batch sizes:
Use Cases:
Implementation details:
Performance Characteristics:
Micro-Batching Benefits:
Thread Safety:
| max_count | Maximum number of jobs to extract (typically 4-8) |
Definition at line 396 of file job_queue.cpp.
|
nodiscard |
Checks if the queue is currently empty.
Checks if the queue contains any jobs.
true if the queue has no pending jobs, false otherwise.Implementation details:
Definition at line 465 of file job_queue.cpp.
|
nodiscardvirtual |
Enqueues a new job into the queue.
Adds a single job to the back of the queue.
| value | A unique pointer to the job being added. |
This method is thread-safe. If notify_ is set to true, a waiting thread (if any) will be notified upon successful enqueue.
Implementation details:
Thread Safety:
Error Conditions:
| value | Unique pointer to job (moved into queue) |
Reimplemented in kcenon::thread::backpressure_job_queue.
Definition at line 105 of file job_queue.cpp.
References kcenon::thread::invalid_argument, kcenon::thread::queue_full, and kcenon::thread::queue_stopped.
Referenced by kcenon::thread::backpressure_job_queue::direct_enqueue(), enqueue(), kcenon::thread::backpressure_job_queue::handle_callback_policy(), and schedule().

|
inlinenodiscard |
Type-safe enqueue for job subclasses.
This template method provides type-safe job submission, allowing callers to enqueue jobs without explicit casting. The JobType must be derived from the base job class.
| JobType | A type derived from job |
| value | A unique pointer to the job being added. |
Definition at line 149 of file job_queue.h.
References enqueue().

|
nodiscardvirtual |
Enqueues a batch of jobs into the queue.
Adds multiple jobs to the queue in a single operation.
| jobs | A vector of unique pointers to the jobs being added. |
Implementation details:
Performance Benefits:
Atomicity:
| jobs | Vector of unique pointers to jobs (moved into queue) |
Reimplemented in kcenon::thread::backpressure_job_queue.
Definition at line 163 of file job_queue.cpp.
References kcenon::thread::invalid_argument, kcenon::thread::queue_full, and kcenon::thread::queue_stopped.
Referenced by kcenon::thread::backpressure_job_queue::enqueue_batch().

|
inlinenodiscardoverridevirtual |
Returns capabilities of this job_queue implementation.
job_queue provides:
Reimplemented from kcenon::thread::queue_capabilities_interface.
Definition at line 287 of file job_queue.h.
|
nodiscard |
Get the maximum queue size.
Gets the maximum queue size.
Definition at line 594 of file job_queue.cpp.
References max_size_.
Referenced by kcenon::thread::backpressure_job_queue::get_pressure_ratio().

|
nodiscard |
Get memory footprint statistics for debugging and monitoring.
Implementation details:
Memory Estimation:
Use Cases:
Definition at line 540 of file job_queue.cpp.
References mutex_, kcenon::thread::job_queue::memory_stats::node_overhead_bytes, kcenon::thread::job_queue::memory_stats::pending_job_count, queue_, and kcenon::thread::job_queue::memory_stats::queue_size_bytes.
|
inlineoverridevirtual |
scheduler_interface implementation: get next job
Implements kcenon::thread::scheduler_interface.
Definition at line 119 of file job_queue.h.
References dequeue().

|
nodiscard |
Obtains a std::shared_ptr that points to this queue instance.
Returns a shared pointer to this job_queue instance.
job_queue object.Because job_queue inherits from std::enable_shared_from_this, calling get_ptr() allows retrieving a shared_ptr<job_queue> from within member functions of job_queue.
Implementation details:
Definition at line 56 of file job_queue.cpp.
|
nodiscardvirtual |
Inspects pending jobs in the queue without removing them.
| limit | Maximum number of jobs to inspect (0 = all). |
Thread Safety:
Use Cases:
Implementation details:
Performance Characteristics:
| limit | Maximum number of jobs to inspect (0 = all) |
Definition at line 648 of file job_queue.cpp.
References kcenon::thread::info, kcenon::thread::diagnostics::job_info::job_id, and kcenon::thread::diagnostics::pending.
|
nodiscard |
Check if queue has a size limit.
Checks if the queue has a size limit.
Definition at line 584 of file job_queue.cpp.
References max_size_.
|
nodiscard |
Check if queue is at capacity.
Checks if the queue is at capacity.
Definition at line 618 of file job_queue.cpp.
References atomic_size_, and max_size_.
|
nodiscard |
Checks if the queue is in a "stopped" state.
Checks if the queue has been stopped.
true if the queue is stopped, false otherwise.When stopped, worker threads are typically notified to cease waiting for new jobs. New jobs may still be enqueued, but it is up to the system design how they are handled in a stopped state.
Implementation details:
Definition at line 68 of file job_queue.cpp.
References stop_.
|
inlineoverridevirtual |
scheduler_interface implementation: schedule a job
Implements kcenon::thread::scheduler_interface.
Definition at line 114 of file job_queue.h.
References enqueue().

| auto kcenon::thread::job_queue::set_max_size | ( | std::optional< std::size_t > | max_size | ) | -> void |
Set maximum queue size.
Sets the maximum queue size.
| max_size | New maximum size (std::nullopt for unlimited) |
Note: This does not remove existing jobs if the queue is currently larger than the new max_size. It only affects future enqueue operations.
| max_size | New maximum size (std::nullopt for unlimited) |
Definition at line 607 of file job_queue.cpp.
| auto kcenon::thread::job_queue::set_notify | ( | bool | notify | ) | -> void |
Sets the 'notify' flag for this queue.
Controls whether enqueue operations notify waiting threads.
| notify | If true, signals that enqueue should notify waiting threads. If false, jobs can still be enqueued, but waiting threads won't be automatically notified. |
Implementation details:
| notify | true to enable notifications, false to disable |
Definition at line 81 of file job_queue.cpp.
|
nodiscard |
Returns the current number of jobs in the queue.
Implementation details:
Definition at line 514 of file job_queue.cpp.
Referenced by kcenon::thread::backpressure_job_queue::get_pressure_ratio(), and kcenon::thread::backpressure_job_queue::to_string().

| auto kcenon::thread::job_queue::stop | ( | void | ) | -> void |
Signals the queue to stop waiting for new jobs (e.g., during shutdown).
Signals the queue to stop accepting jobs and wake waiting threads.
Sets the stop_ flag to true and notifies any threads that might be blocked in dequeue(). This allows worker threads to exit gracefully rather than remain blocked indefinitely.
This method provides a consistent API with thread_pool and thread_base classes.
Implementation details:
Shutdown Sequence:
Thread Safety:
Definition at line 489 of file job_queue.cpp.
|
nodiscardvirtual |
Returns a string representation of this job_queue.
Provides a string representation of the queue's current state.
Primarily used for logging and debugging. Derived classes may override this to include additional diagnostic information.
Implementation details:
Reimplemented in kcenon::thread::backpressure_job_queue.
Definition at line 570 of file job_queue.cpp.
References utility_module::formatter::format().

|
nodiscardvirtual |
Attempts to dequeue a job from the queue without blocking.
Attempts to dequeue a job without blocking (non-blocking operation).
If the queue is empty, this method returns immediately with an error instead of blocking. This is useful for polling-based consumers.
Implementation details:
Performance Benefits:
Use Cases:
Definition at line 296 of file job_queue.cpp.
References kcenon::thread::queue_empty, and kcenon::thread::queue_stopped.
|
private |
|
protected |
Condition variable used to signal worker threads.
Used in combination with mutex_ to block or notify threads waiting for new jobs.
Definition at line 379 of file job_queue.h.
|
private |
Optional maximum queue size.
If set, enqueue operations will fail when the queue reaches this size. std::nullopt means unlimited size (default behavior).
Definition at line 397 of file job_queue.h.
Referenced by get_max_size(), is_bounded(), and is_full().
|
mutableprotected |
Mutex to protect access to the underlying queue_ container and related state.
Any operation that modifies or reads the queue should lock this mutex to ensure thread safety.
Definition at line 371 of file job_queue.h.
Referenced by get_memory_stats().
|
protected |
If true, threads waiting for new jobs are notified when a new job is enqueued. If false, enqueuing does not automatically trigger a notification.
Definition at line 355 of file job_queue.h.
|
private |
The underlying container storing the jobs in FIFO order.
mutex_ and should only be modified while holding the lock. The size of this container should be the only source of truth for the queue size to prevent inconsistencies. Definition at line 389 of file job_queue.h.
Referenced by get_memory_stats().
|
protected |
Indicates whether the queue has been signaled to stop.
Setting stop_ to true typically causes waiting threads to unblock and exit their waiting loop.
Definition at line 363 of file job_queue.h.
Referenced by is_stopped().