Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job_queue.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
16namespace kcenon::thread
17{
33 job_queue::job_queue(std::optional<std::size_t> max_size)
34 : notify_(true), stop_(false), mutex_(), condition_(), queue_(), max_size_(max_size) {}
35
45
56 auto job_queue::get_ptr(void) -> std::shared_ptr<job_queue> { return shared_from_this(); }
57
68 auto job_queue::is_stopped() const -> bool { return stop_.load(); }
69
81 auto job_queue::set_notify(bool notify) -> void { notify_.store(notify); }
82
105 auto job_queue::enqueue(std::unique_ptr<job>&& value) -> common::VoidResult
106 {
107 // Early validation: check if queue is still accepting jobs
108 if (stop_.load())
109 {
110 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
111 }
112
113 // Validate input: null jobs are not allowed
114 if (value == nullptr)
115 {
116 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue null job", "thread_system"};
117 }
118
119 // Critical section: modify queue with proper synchronization
120 std::scoped_lock<std::mutex> lock(mutex_);
121
122 // Check size limit if bounded
123 if (max_size_.has_value() && queue_.size() >= max_size_.value())
124 {
125 return common::error_info{static_cast<int>(error_code::queue_full), "Job queue is full", "thread_system"};
126 }
127
128 // Move job into queue (efficient transfer of ownership)
129 queue_.push_back(std::move(value));
130 atomic_size_.fetch_add(1, std::memory_order_relaxed);
131
132 // Conditionally notify waiting consumers
133 if (notify_)
134 {
135 condition_.notify_one(); // Wake exactly one waiting thread
136 }
137
138 return common::ok();
139 }
140
163 auto job_queue::enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> common::VoidResult
164 {
165 // Early validation: check if queue is still accepting jobs
166 if (stop_.load())
167 {
168 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
169 }
170
171 // Validate batch: empty batches are not allowed
172 if (jobs.empty())
173 {
174 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue empty batch", "thread_system"};
175 }
176
177 // Pre-validate all jobs before modifying queue (ensures atomicity)
178 for (auto& job : jobs)
179 {
180 if (job == nullptr)
181 {
182 return common::error_info{static_cast<int>(error_code::invalid_argument), "cannot enqueue null job in batch", "thread_system"};
183 }
184 }
185
186 // Critical section: add entire batch with single lock acquisition
187 std::scoped_lock<std::mutex> lock(mutex_);
188
189 // Check size limit if bounded
190 if (max_size_.has_value())
191 {
192 std::size_t available_space = max_size_.value() - queue_.size();
193 if (jobs.size() > available_space)
194 {
195 return common::error_info{static_cast<int>(error_code::queue_full),
196 "Job queue cannot fit entire batch", "thread_system"};
197 }
198 }
199
200 // Move all jobs into queue efficiently
201 for (auto& job : jobs)
202 {
203 queue_.push_back(std::move(job));
204 }
205 atomic_size_.fetch_add(jobs.size(), std::memory_order_relaxed);
206
207 // Single notification for entire batch (performance optimization)
208 if (notify_)
209 {
210 condition_.notify_one(); // Wake one thread to process batch
211 }
212
213 return common::ok();
214 }
215
248 auto job_queue::dequeue() -> common::Result<std::unique_ptr<job>>
249 {
250 // Use unique_lock for condition variable operations
251 std::unique_lock<std::mutex> lock(mutex_);
252
253 // Block until job available OR queue stopped
254 condition_.wait(lock, [this]() { return !queue_.empty() || stop_.load(); });
255
256 // CRITICAL: Re-check queue state while still holding lock
257 // This prevents race with clear() that could empty the queue
258 // between wait() return and queue access (TOCTOU bug)
259 if (queue_.empty())
260 {
261 return common::error_info{static_cast<int>(error_code::queue_empty), "there are no jobs to dequeue", "thread_system"};
262 }
263
264 // Efficiently extract first job from queue
265 // At this point, we're guaranteed queue is not empty because:
266 // 1. We hold the mutex continuously from wait() return
267 // 2. We just verified !queue_.empty() above
268 auto value = std::move(queue_.front());
269 queue_.pop_front();
270 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
271
272 return value; // Return moved job (caller takes ownership)
273 }
274
296 auto job_queue::try_dequeue() -> common::Result<std::unique_ptr<job>>
297 {
298 // Early validation: check if queue is stopped
299 if (stop_.load())
300 {
301 return common::error_info{static_cast<int>(error_code::queue_stopped), "Job queue is stopped", "thread_system"};
302 }
303
304 // Critical section: check and potentially extract job
305 std::scoped_lock<std::mutex> lock(mutex_);
306
307 // Non-blocking check: return error if empty
308 if (queue_.empty())
309 {
310 return common::error_info{static_cast<int>(error_code::queue_empty), "there are no jobs to dequeue", "thread_system"};
311 }
312
313 // Efficiently extract first job from queue
314 auto value = std::move(queue_.front());
315 queue_.pop_front();
316 atomic_size_.fetch_sub(1, std::memory_order_relaxed);
317
318 return value; // Return moved job (caller takes ownership)
319 }
320
347 auto job_queue::dequeue_batch(void) -> std::deque<std::unique_ptr<job>>
348 {
349 std::deque<std::unique_ptr<job>> all_items;
350 {
351 // Critical section: atomically transfer all queue contents
352 std::scoped_lock<std::mutex> lock(mutex_);
353
354 // Efficient O(1) transfer of all jobs
355 std::swap(queue_, all_items);
356 atomic_size_.store(0, std::memory_order_relaxed);
357
358 // Wake all waiting threads since queue is now empty
359 condition_.notify_all();
360 }
361
362 return all_items; // Return all extracted jobs
363 }
364
396 auto job_queue::dequeue_batch_limited(std::size_t max_count)
397 -> std::deque<std::unique_ptr<job>>
398 {
399 std::deque<std::unique_ptr<job>> batch_items;
400 {
401 // Critical section: atomically extract up to max_count jobs
402 std::scoped_lock<std::mutex> lock(mutex_);
403
404 // Extract jobs up to max_count or queue size, whichever is smaller
405 std::size_t count = std::min(max_count, queue_.size());
406 for (std::size_t i = 0; i < count; ++i)
407 {
408 batch_items.push_back(std::move(queue_.front()));
409 queue_.pop_front();
410 }
411 atomic_size_.fetch_sub(count, std::memory_order_relaxed);
412
413 // Only notify if queue is now empty (to wake waiting workers)
414 // If jobs remain, other workers can continue dequeuing without wakeup overhead
415 if (queue_.empty())
416 {
417 condition_.notify_all();
418 }
419 }
420
421 return batch_items; // Return extracted batch
422 }
423
440 auto job_queue::clear(void) -> void
441 {
442 // Critical section: atomically clear all jobs
443 std::scoped_lock<std::mutex> lock(mutex_);
444
445 // Destroy all jobs in queue
446 queue_.clear();
447 atomic_size_.store(0, std::memory_order_relaxed);
448
449 // Wake all waiting threads since queue is now empty
450 condition_.notify_all();
451 }
452
465 auto job_queue::empty(void) const -> bool
466 {
467 return atomic_size_.load(std::memory_order_relaxed) == 0;
468 }
469
489 auto job_queue::stop(void) -> void
490 {
491 // Critical section: set stop flag and notify waiters
492 std::scoped_lock<std::mutex> lock(mutex_);
493
494 // Prevent new jobs from being added
495 stop_.store(true);
496
497 // Wake all threads waiting in dequeue()
498 condition_.notify_all();
499 }
500
501
514 auto job_queue::size(void) const -> std::size_t
515 {
516 return atomic_size_.load(std::memory_order_relaxed);
517 }
518
541 {
542 std::scoped_lock<std::mutex> lock(mutex_);
543
544 std::size_t job_count = queue_.size();
545
546 // Estimate memory usage
547 // std::unique_ptr<job> typically 8 bytes (pointer size)
548 // std::deque node overhead varies by platform, typically 32-48 bytes
549 constexpr std::size_t ptr_size = sizeof(std::unique_ptr<job>);
550 constexpr std::size_t node_overhead = 40; // Conservative estimate
551
552 // Use traditional aggregate initialization for C++17 compatibility
553 memory_stats stats;
554 stats.queue_size_bytes = (ptr_size + node_overhead) * job_count;
555 stats.pending_job_count = job_count;
556 stats.node_overhead_bytes = node_overhead * job_count;
557 return stats;
558 }
559
570 auto job_queue::to_string(void) const -> std::string
571 {
572 return utility_module::formatter::format("contained {} jobs", size());
573 }
574
575 // ============================================
576 // Bounded queue functionality
577 // ============================================
578
584 auto job_queue::is_bounded() const -> bool
585 {
586 return max_size_.has_value();
587 }
588
594 auto job_queue::get_max_size() const -> std::optional<std::size_t>
595 {
596 return max_size_;
597 }
598
607 auto job_queue::set_max_size(std::optional<std::size_t> max_size) -> void
608 {
609 std::scoped_lock<std::mutex> lock(mutex_);
610 max_size_ = max_size;
611 }
612
618 auto job_queue::is_full() const -> bool
619 {
620 if (!max_size_.has_value())
621 {
622 return false; // Unbounded queue is never full
623 }
624 return atomic_size_.load(std::memory_order_relaxed) >= max_size_.value();
625 }
626
627 // ============================================
628 // Diagnostics support
629 // ============================================
630
648 auto job_queue::inspect_pending_jobs(std::size_t limit) const
649 -> std::vector<diagnostics::job_info>
650 {
651 std::scoped_lock<std::mutex> lock(mutex_);
652
653 std::vector<diagnostics::job_info> result;
654 auto count = (limit == 0) ? queue_.size() : std::min(limit, queue_.size());
655 result.reserve(count);
656
657 auto now = std::chrono::steady_clock::now();
658 std::size_t index = 0;
659
660 for (const auto& job_ptr : queue_)
661 {
662 if (limit > 0 && index >= limit)
663 {
664 break;
665 }
666
667 if (job_ptr == nullptr)
668 {
669 ++index;
670 continue;
671 }
672
674 info.job_id = job_ptr->get_job_id();
675 info.job_name = job_ptr->get_name();
677 info.enqueue_time = job_ptr->get_enqueue_time();
678 info.start_time = job_ptr->get_enqueue_time(); // Not started yet
679
680 // Calculate wait time (time since enqueue)
681 info.wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
682 now - job_ptr->get_enqueue_time()
683 );
684 info.execution_time = std::chrono::nanoseconds{0}; // Not started yet
685
686 result.push_back(std::move(info));
687 ++index;
688 }
689
690 return result;
691 }
692
693} // namespace kcenon::thread
auto empty(void) const -> bool
Checks if the queue is currently empty.
virtual auto dequeue_batch(void) -> std::deque< std::unique_ptr< job > >
Dequeues all remaining jobs from the queue without processing them.
auto is_full() const -> bool
Check if queue is at capacity.
virtual ~job_queue(void)
Virtual destructor. Cleans up resources used by the job_queue.
Definition job_queue.cpp:44
auto stop(void) -> void
Signals the queue to stop waiting for new jobs (e.g., during shutdown).
auto get_max_size() const -> std::optional< std::size_t >
Get the maximum queue size.
std::mutex mutex_
Mutex to protect access to the underlying queue_ container and related state.
Definition job_queue.h:371
auto set_notify(bool notify) -> void
Sets the 'notify' flag for this queue.
Definition job_queue.cpp:81
auto size(void) const -> std::size_t
Returns the current number of jobs in the queue.
auto get_memory_stats() const -> memory_stats
Get memory footprint statistics for debugging and monitoring.
virtual auto clear(void) -> void
Removes all jobs currently in the queue without processing them.
virtual auto try_dequeue(void) -> common::Result< std::unique_ptr< job > >
Attempts to dequeue a job from the queue without blocking.
virtual auto to_string(void) const -> std::string
Returns a string representation of this job_queue.
std::atomic< std::size_t > atomic_size_
Atomic size counter for lock-free read-only queries.
Definition job_queue.h:405
std::deque< std::unique_ptr< job > > queue_
The underlying container storing the jobs in FIFO order.
Definition job_queue.h:389
auto is_bounded() const -> bool
Check if queue has a size limit.
job_queue(std::optional< std::size_t > max_size=std::nullopt)
Constructs a new, empty job_queue.
Definition job_queue.cpp:33
auto get_ptr(void) -> std::shared_ptr< job_queue >
Obtains a std::shared_ptr that points to this queue instance.
Definition job_queue.cpp:56
virtual auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueues a new job into the queue.
auto set_max_size(std::optional< std::size_t > max_size) -> void
Set maximum queue size.
virtual auto inspect_pending_jobs(std::size_t limit=100) const -> std::vector< diagnostics::job_info >
Inspects pending jobs in the queue without removing them.
virtual auto dequeue_batch_limited(std::size_t max_count) -> std::deque< std::unique_ptr< job > >
Dequeues up to N jobs in a single operation (micro-batching).
std::atomic_bool stop_
Indicates whether the queue has been signaled to stop.
Definition job_queue.h:363
auto is_stopped() const -> bool
Checks if the queue is in a "stopped" state.
Definition job_queue.cpp:68
virtual auto dequeue(void) -> common::Result< std::unique_ptr< job > >
Dequeues a job from the queue in FIFO order (blocking operation).
std::optional< std::size_t > max_size_
Optional maximum queue size.
Definition job_queue.h:397
virtual auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the queue.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
@ pending
Job is waiting in the queue.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ info
Informational messages highlighting progress.
STL namespace.
Information about a job in the thread pool.
Definition job_info.h:90
std::uint64_t job_id
Unique identifier for this job.
Definition job_info.h:97
Get memory footprint statistics for the queue.
Definition job_queue.h:247