34 : notify_(true), stop_(false), mutex_(), condition_(), queue_(), max_size_(max_size) {}
114 if (value ==
nullptr)
120 std::scoped_lock<std::mutex> lock(mutex_);
123 if (max_size_.has_value() && queue_.size() >= max_size_.value())
125 return common::error_info{
static_cast<int>(
error_code::queue_full),
"Job queue is full",
"thread_system"};
129 queue_.push_back(std::move(value));
130 atomic_size_.fetch_add(1, std::memory_order_relaxed);
135 condition_.notify_one();
178 for (
auto&
job : jobs)
187 std::scoped_lock<std::mutex> lock(mutex_);
190 if (max_size_.has_value())
192 std::size_t available_space = max_size_.value() - queue_.size();
193 if (jobs.size() > available_space)
196 "Job queue cannot fit entire batch",
"thread_system"};
201 for (
auto&
job : jobs)
203 queue_.push_back(std::move(
job));
205 atomic_size_.fetch_add(jobs.size(), std::memory_order_relaxed);
210 condition_.notify_one();
251 std::unique_lock<std::mutex> lock(mutex_);
254 condition_.wait(lock, [
this]() {
return !queue_.empty() || stop_.load(); });
261 return common::error_info{
static_cast<int>(
error_code::queue_empty),
"there are no jobs to dequeue",
"thread_system"};
268 auto value = std::move(queue_.front());
270 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
305 std::scoped_lock<std::mutex> lock(mutex_);
310 return common::error_info{
static_cast<int>(
error_code::queue_empty),
"there are no jobs to dequeue",
"thread_system"};
314 auto value = std::move(queue_.front());
316 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
349 std::deque<std::unique_ptr<job>> all_items;
352 std::scoped_lock<std::mutex> lock(mutex_);
355 std::swap(queue_, all_items);
356 atomic_size_.store(0, std::memory_order_relaxed);
359 condition_.notify_all();
397 -> std::deque<std::unique_ptr<job>>
399 std::deque<std::unique_ptr<job>> batch_items;
402 std::scoped_lock<std::mutex> lock(mutex_);
405 std::size_t count = std::min(max_count, queue_.size());
406 for (std::size_t i = 0; i < count; ++i)
408 batch_items.push_back(std::move(queue_.front()));
411 atomic_size_.fetch_sub(count, std::memory_order_relaxed);
417 condition_.notify_all();
443 std::scoped_lock<std::mutex> lock(mutex_);
447 atomic_size_.store(0, std::memory_order_relaxed);
450 condition_.notify_all();
467 return atomic_size_.load(std::memory_order_relaxed) == 0;
492 std::scoped_lock<std::mutex> lock(mutex_);
498 condition_.notify_all();
516 return atomic_size_.load(std::memory_order_relaxed);
542 std::scoped_lock<std::mutex> lock(
mutex_);
544 std::size_t job_count =
queue_.size();
549 constexpr std::size_t ptr_size =
sizeof(std::unique_ptr<job>);
550 constexpr std::size_t node_overhead = 40;
609 std::scoped_lock<std::mutex> lock(mutex_);
610 max_size_ = max_size;
649 -> std::vector<diagnostics::job_info>
651 std::scoped_lock<std::mutex> lock(mutex_);
653 std::vector<diagnostics::job_info>
result;
654 auto count = (limit == 0) ? queue_.size() : std::min(limit, queue_.size());
657 auto now = std::chrono::steady_clock::now();
658 std::size_t index = 0;
660 for (
const auto& job_ptr : queue_)
662 if (limit > 0 && index >= limit)
667 if (job_ptr ==
nullptr)
675 info.job_name = job_ptr->get_name();
677 info.enqueue_time = job_ptr->get_enqueue_time();
678 info.start_time = job_ptr->get_enqueue_time();
681 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
682 now - job_ptr->get_enqueue_time()
684 info.execution_time = std::chrono::nanoseconds{0};
auto empty(void) const -> bool
Checks if the queue is currently empty.
virtual auto dequeue_batch(void) -> std::deque< std::unique_ptr< job > >
Dequeues all remaining jobs from the queue without processing them.
auto is_full() const -> bool
Check if queue is at capacity.
virtual ~job_queue(void)
Virtual destructor. Cleans up resources used by the job_queue.
auto stop(void) -> void
Signals the queue to stop waiting for new jobs (e.g., during shutdown).
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
std::mutex mutex_
Mutex to protect access to the underlying queue_ container and related state.
auto set_notify(bool notify) -> void
Sets the 'notify' flag for this queue.
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
auto get_memory_stats() const -> memory_stats
Get memory footprint statistics for debugging and monitoring.
virtual auto clear(void) -> void
Removes all jobs currently in the queue without processing them.
virtual auto try_dequeue(void) -> common::Result< std::unique_ptr< job > >
Attempts to dequeue a job from the queue without blocking.
virtual auto to_string(void) const -> std::string
Returns a string representation of this job_queue.
std::atomic< std::size_t > atomic_size_
Atomic size counter for lock-free read-only queries.
std::deque< std::unique_ptr< job > > queue_
The underlying container storing the jobs in FIFO order.
auto is_bounded() const -> bool
Check if queue has a size limit.
job_queue(std::optional< std::size_t > max_size=std::nullopt)
Constructs a new, empty job_queue.
auto get_ptr(void) -> std::shared_ptr< job_queue >
Obtains a std::shared_ptr that points to this queue instance.
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.
auto set_max_size(std::optional< std::size_t > max_size) -> void
Set maximum queue size.
virtual auto inspect_pending_jobs(std::size_t limit=100) const -> std::vector< diagnostics::job_info >
Inspects pending jobs in the queue without removing them.
virtual auto dequeue_batch_limited(std::size_t max_count) -> std::deque< std::unique_ptr< job > >
Dequeues up to N jobs in a single operation (micro-batching).
std::atomic_bool stop_
Indicates whether the queue has been signaled to stop.
auto is_stopped() const -> bool
Checks if the queue is in a "stopped" state.
virtual auto dequeue(void) -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue in FIFO order (blocking operation).
std::optional< std::size_t > max_size_
Optional maximum queue size.
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
Represents a unit of work (task) to be executed, typically by a job queue.
A template class representing either a value or an error.
@ pending
Job is waiting in the queue.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
@ info
Informational messages highlighting progress.
Information about a job in the thread pool.
std::uint64_t job_id
Unique identifier for this job.
Get memory footprint statistics for the queue.
std::size_t queue_size_bytes
std::size_t node_overhead_bytes
std::size_t pending_job_count