Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17#pragma once
18
19#include "job.h"
21#include "callback_job.h"
23#include "error_handling.h"
27
28#include <mutex>
29#include <deque>
30#include <tuple>
31#include <atomic>
32#include <optional>
33#include <string_view>
34#include <type_traits>
35#include <condition_variable>
36#include <memory>
37#include <functional>
38
39namespace kcenon::thread
40{
62 class job_queue : public std::enable_shared_from_this<job_queue>,
65 {
66 public:
76 explicit job_queue(std::optional<std::size_t> max_size = std::nullopt);
77
81 virtual ~job_queue(void);
82
91 [[nodiscard]] auto get_ptr(void) -> std::shared_ptr<job_queue>;
92
101 [[nodiscard]] auto is_stopped() const -> bool;
102
109 auto set_notify(bool notify) -> void;
110
114 auto schedule(std::unique_ptr<job>&& value) -> common::VoidResult override { return enqueue(std::move(value)); }
115
119 auto get_next_job() -> common::Result<std::unique_ptr<job>> override { return dequeue(); }
120
129 [[nodiscard]] virtual auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult;
130
148 template<typename JobType, typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
149 [[nodiscard]] auto enqueue(std::unique_ptr<JobType>&& value) -> common::VoidResult
150 {
151 return enqueue(std::unique_ptr<job>(std::move(value)));
152 }
153
159 [[nodiscard]] virtual auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> common::VoidResult;
160
169 [[nodiscard]] virtual auto dequeue(void) -> common::Result<std::unique_ptr<job>>;
170
179 [[nodiscard]] virtual auto try_dequeue(void) -> common::Result<std::unique_ptr<job>>;
180
189 [[nodiscard]] virtual auto dequeue_batch(void) -> std::deque<std::unique_ptr<job>>;
190
211 [[nodiscard]] virtual auto dequeue_batch_limited(std::size_t max_count)
212 -> std::deque<std::unique_ptr<job>>;
213
221 virtual auto clear(void) -> void;
222
229 [[nodiscard]] auto empty(void) const -> bool;
230
237 [[nodiscard]] auto size(void) const -> std::size_t;
238
248 std::size_t queue_size_bytes; // Estimated queue memory usage
249 std::size_t pending_job_count; // Number of jobs in queue
250 std::size_t node_overhead_bytes; // Deque node overhead estimate
251 };
252
253 [[nodiscard]] auto get_memory_stats() const -> memory_stats;
254
264 auto stop(void) -> void;
265
273 [[nodiscard]] virtual auto to_string(void) const -> std::string;
274
287 [[nodiscard]] auto get_capabilities() const -> queue_capabilities override {
288 return queue_capabilities{
289 .exact_size = true,
290 .atomic_empty_check = true,
291 .lock_free = false,
292 .wait_free = false,
293 .supports_batch = true,
294 .supports_blocking_wait = true,
295 .supports_stop = true
296 };
297 }
298
299 // ============================================
300 // Bounded queue functionality (merged from bounded_job_queue)
301 // ============================================
302
307 [[nodiscard]] auto is_bounded() const -> bool;
308
313 [[nodiscard]] auto get_max_size() const -> std::optional<std::size_t>;
314
319 auto set_max_size(std::optional<std::size_t> max_size) -> void;
320
325 [[nodiscard]] auto is_full() const -> bool;
326
327 // ============================================
328 // Diagnostics support
329 // ============================================
330
346 [[nodiscard]] virtual auto inspect_pending_jobs(std::size_t limit = 100) const
347 -> std::vector<diagnostics::job_info>;
348
349 protected:
355 std::atomic_bool notify_;
356
363 std::atomic_bool stop_;
364
371 mutable std::mutex mutex_;
372
379 std::condition_variable condition_;
380
381 private:
389 std::deque<std::unique_ptr<job>> queue_;
390
397 std::optional<std::size_t> max_size_;
398
405 std::atomic<std::size_t> atomic_size_{0};
406 };
407}
408
409// ----------------------------------------------------------------------------
410// Formatter specializations for job_queue
411// ----------------------------------------------------------------------------
412
414
415// Generate formatter specializations for kcenon::thread::job_queue
Specialized job class that encapsulates user-defined callbacks.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
auto schedule(std::unique_ptr< job > &&value) -> common::VoidResult override
scheduler_interface implementation: schedule a job
Definition job_queue.h:114
auto empty(void) const -> bool
Checks if the queue is currently empty.
virtual auto dequeue_batch(void) -> std::deque< std::unique_ptr< job > >
Dequeues all remaining jobs from the queue without processing them.
auto is_full() const -> bool
Check if queue is at capacity.
virtual ~job_queue(void)
Virtual destructor. Cleans up resources used by the job_queue.
Definition job_queue.cpp:44
auto stop(void) -> void
Signals the queue to stop waiting for new jobs (e.g., during shutdown).
std::atomic_bool notify_
If true, threads waiting for new jobs are notified when a new job is enqueued. If false,...
Definition job_queue.h:355
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
std::mutex mutex_
Mutex to protect access to the underlying queue_ container and related state.
Definition job_queue.h:371
auto set_notify(bool notify) -> void
Sets the 'notify' flag for this queue.
Definition job_queue.cpp:81
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
auto get_memory_stats() const -> memory_stats
Get memory footprint statistics for debugging and monitoring.
virtual auto clear(void) -> void
Removes all jobs currently in the queue without processing them.
virtual auto try_dequeue(void) -> common::Result< std::unique_ptr< job > >
Attempts to dequeue a job from the queue without blocking.
virtual auto to_string(void) const -> std::string
Returns a string representation of this job_queue.
std::atomic< std::size_t > atomic_size_
Atomic size counter for lock-free read-only queries.
Definition job_queue.h:405
std::deque< std::unique_ptr< job > > queue_
The underlying container storing the jobs in FIFO order.
Definition job_queue.h:389
auto is_bounded() const -> bool
Check if queue has a size limit.
job_queue(std::optional< std::size_t > max_size=std::nullopt)
Constructs a new, empty job_queue.
Definition job_queue.cpp:33
auto get_ptr(void) -> std::shared_ptr< job_queue >
Obtains a std::shared_ptr that points to this queue instance.
Definition job_queue.cpp:56
std::condition_variable condition_
Condition variable used to signal worker threads.
Definition job_queue.h:379
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.
auto set_max_size(std::optional< std::size_t > max_size) -> void
Set maximum queue size.
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
scheduler_interface implementation: get next job
Definition job_queue.h:119
virtual auto inspect_pending_jobs(std::size_t limit=100) const -> std::vector< diagnostics::job_info >
Inspects pending jobs in the queue without removing them.
virtual auto dequeue_batch_limited(std::size_t max_count) -> std::deque< std::unique_ptr< job > >
Dequeues up to N jobs in a single operation (micro-batching).
std::atomic_bool stop_
Indicates whether the queue has been signaled to stop.
Definition job_queue.h:363
auto is_stopped() const -> bool
Checks if the queue is in a "stopped" state.
Definition job_queue.cpp:68
virtual auto dequeue(void) -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue in FIFO order (blocking operation).
auto get_capabilities() const -> queue_capabilities override
Returns capabilities of this job_queue implementation.
Definition job_queue.h:287
std::optional< std::size_t > max_size_
Optional maximum queue size.
Definition job_queue.h:397
auto enqueue(std::unique_ptr< JobType > &&value) -> common::VoidResult
Type-safe enqueue for job subclasses.
Definition job_queue.h:149
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Mixin interface for queue capability introspection.
Scheduler interface for queuing and retrieving jobs.
Error codes and utilities for the thread system.
String encoding conversion, Base64 encoding/decoding utilities.
Generic formatter for enum types using user-provided converter functors.
Provides macros for generating std::formatter specializations.
Base job class for schedulable work units in the thread system.
Job information snapshot for diagnostics and monitoring.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Mixin interface for queue capability introspection.
Abstract scheduler interface for queuing and retrieving jobs.
Information about a job for starvation callback.
Get memory footprint statistics for the queue.
Definition job_queue.h:247
Runtime-queryable queue capabilities descriptor.
#define DECLARE_FORMATTER(CLASS_NAME)
Generates std::formatter specializations for narrow and wide characters.