Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
policy_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
14#include <type_traits>
15#include <memory>
16
24
25namespace kcenon::thread {
26
78template<
79 typename SyncPolicy,
80 typename BoundPolicy = policies::unbounded_policy,
81 typename OverflowPolicy = policies::overflow_reject_policy
82>
85public:
86 using sync_policy_type = SyncPolicy;
87 using bound_policy_type = BoundPolicy;
88 using overflow_policy_type = OverflowPolicy;
89
90 // ============================================
91 // Construction
92 // ============================================
93
101
106 explicit policy_queue(BoundPolicy bound_policy)
107 : sync_policy_()
109 , overflow_policy_() {}
110
118 BoundPolicy bound_policy,
119 OverflowPolicy overflow_policy)
123
127 ~policy_queue() = default;
128
129 // Non-copyable, non-movable
130 policy_queue(const policy_queue&) = delete;
134
135 // ============================================
136 // scheduler_interface implementation
137 // ============================================
138
144 auto schedule(std::unique_ptr<job>&& work) -> common::VoidResult override {
145 return enqueue(std::move(work));
146 }
147
152 auto get_next_job() -> common::Result<std::unique_ptr<job>> override {
153 return dequeue();
154 }
155
156 // ============================================
157 // Queue operations
158 // ============================================
159
169 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult {
170 if (!value) {
171 return common::error_info{-105, "cannot enqueue null job", "thread_system"};
172 }
173
174 // Check bound policy
175 if constexpr (BoundPolicy::is_bounded()) {
176 if (bound_policy_.is_full(sync_policy_.size())) {
177 return handle_overflow(std::move(value));
178 }
179 }
180
181 return sync_policy_.enqueue(std::move(value));
182 }
183
190 template<typename JobType, typename = std::enable_if_t<std::is_base_of_v<job, JobType>>>
191 [[nodiscard]] auto enqueue(std::unique_ptr<JobType>&& value) -> common::VoidResult {
192 return enqueue(std::unique_ptr<job>(std::move(value)));
193 }
194
199 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>> {
200 return sync_policy_.dequeue();
201 }
202
207 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> {
208 return sync_policy_.try_dequeue();
209 }
210
215 [[nodiscard]] auto empty() const -> bool {
216 return sync_policy_.empty();
217 }
218
223 [[nodiscard]] auto size() const -> std::size_t {
224 return sync_policy_.size();
225 }
226
230 auto clear() -> void {
231 sync_policy_.clear();
232 }
233
237 auto stop() -> void {
238 sync_policy_.stop();
239 }
240
245 [[nodiscard]] auto is_stopped() const -> bool {
246 return sync_policy_.is_stopped();
247 }
248
249 // ============================================
250 // queue_capabilities_interface implementation
251 // ============================================
252
257 [[nodiscard]] auto get_capabilities() const -> queue_capabilities override {
258 return sync_policy_.get_capabilities();
259 }
260
261 // ============================================
262 // Policy access
263 // ============================================
264
269 [[nodiscard]] auto sync_policy() -> SyncPolicy& {
270 return sync_policy_;
271 }
272
277 [[nodiscard]] auto sync_policy() const -> const SyncPolicy& {
278 return sync_policy_;
279 }
280
285 [[nodiscard]] auto bound_policy() -> BoundPolicy& {
286 return bound_policy_;
287 }
288
293 [[nodiscard]] auto bound_policy() const -> const BoundPolicy& {
294 return bound_policy_;
295 }
296
301 [[nodiscard]] auto overflow_policy() -> OverflowPolicy& {
302 return overflow_policy_;
303 }
304
309 [[nodiscard]] auto overflow_policy() const -> const OverflowPolicy& {
310 return overflow_policy_;
311 }
312
313 // ============================================
314 // Bounded queue convenience methods
315 // ============================================
316
321 [[nodiscard]] auto is_bounded() const -> bool {
322 if constexpr (requires { bound_policy_.is_bounded(); }) {
323 return bound_policy_.is_bounded();
324 }
325 return BoundPolicy::is_bounded();
326 }
327
332 [[nodiscard]] auto is_full() const -> bool {
333 if constexpr (BoundPolicy::is_bounded()) {
334 return bound_policy_.is_full(sync_policy_.size());
335 }
336 return false;
337 }
338
343 [[nodiscard]] auto remaining_capacity() const -> std::size_t {
344 return bound_policy_.remaining_capacity(sync_policy_.size());
345 }
346
347private:
353 [[nodiscard]] auto handle_overflow(std::unique_ptr<job>&& value) -> common::VoidResult {
354 if constexpr (std::is_same_v<OverflowPolicy, policies::overflow_reject_policy>) {
355 return overflow_policy_.handle_overflow(std::move(value));
356 } else if constexpr (std::is_same_v<OverflowPolicy, policies::overflow_drop_newest_policy>) {
357 return overflow_policy_.handle_overflow(std::move(value));
358 } else if constexpr (std::is_same_v<OverflowPolicy, policies::overflow_drop_oldest_policy>) {
359 // Drop oldest by dequeuing and discarding
360 auto oldest = sync_policy_.try_dequeue();
361 (void)oldest; // Discard
362 return sync_policy_.enqueue(std::move(value));
363 } else {
364 // For blocking policies, just return the error
365 // Actual blocking must be implemented at a higher level
366 return common::error_info{-120, "queue is full", "thread_system"};
367 }
368 }
369
370 SyncPolicy sync_policy_;
371 BoundPolicy bound_policy_;
372 OverflowPolicy overflow_policy_;
373};
374
375// ============================================
376// Type aliases for common configurations
377// ============================================
378
386>;
387
398>;
399
403template<std::size_t MaxSize>
408>;
409
413template<std::size_t MaxSize>
418>;
419
423template<std::size_t MaxSize>
428>;
429
430} // namespace kcenon::thread
Bound policies for queue capacity: unbounded or fixed-size.
Policy that limits queue size to a maximum.
Lock-free synchronization policy using Michael-Scott algorithm.
Synchronization policy using mutex and condition variable.
Policy that blocks until space is available.
Policy that drops the oldest item when queue is full.
Policy that rejects new items when queue is full.
Policy that allows unlimited queue size.
Policy-based queue template.
auto size() const -> std::size_t
Get queue size.
auto overflow_policy() -> OverflowPolicy &
Get reference to overflow policy.
auto overflow_policy() const -> const OverflowPolicy &
Get const reference to overflow policy.
auto clear() -> void
Clear all jobs from queue.
auto remaining_capacity() const -> std::size_t
Get remaining capacity.
policy_queue & operator=(const policy_queue &)=delete
auto sync_policy() const -> const SyncPolicy &
Get const reference to sync policy.
auto get_capabilities() const -> queue_capabilities override
Get queue capabilities.
auto bound_policy() -> BoundPolicy &
Get reference to bound policy.
auto bound_policy() const -> const BoundPolicy &
Get const reference to bound policy.
auto try_dequeue() -> common::Result< std::unique_ptr< job > >
Try to dequeue a job (non-blocking)
auto dequeue() -> common::Result< std::unique_ptr< job > >
Dequeue a job (blocking if sync policy supports it)
auto is_full() const -> bool
Check if queue is at capacity.
auto schedule(std::unique_ptr< job > &&work) -> common::VoidResult override
Schedule a job (delegates to enqueue)
policy_queue(policy_queue &&)=delete
auto is_bounded() const -> bool
Check if queue is bounded.
policy_queue & operator=(policy_queue &&)=delete
policy_queue(SyncPolicy sync_policy, BoundPolicy bound_policy, OverflowPolicy overflow_policy)
Construct queue with all policies.
~policy_queue()=default
Destructor.
auto stop() -> void
Stop the queue.
auto get_next_job() -> common::Result< std::unique_ptr< job > > override
Get next job (delegates to dequeue)
OverflowPolicy overflow_policy_type
auto empty() const -> bool
Check if queue is empty.
auto enqueue(std::unique_ptr< JobType > &&value) -> common::VoidResult
Type-safe enqueue for job subclasses.
auto is_stopped() const -> bool
Check if queue is stopped.
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Enqueue a job into the queue.
policy_queue(const policy_queue &)=delete
policy_queue()
Construct queue with default policies.
auto sync_policy() -> SyncPolicy &
Get reference to sync policy.
auto handle_overflow(std::unique_ptr< job > &&value) -> common::VoidResult
Handle overflow based on overflow policy.
policy_queue(BoundPolicy bound_policy)
Construct queue with bound policy.
Mixin interface for queue capability introspection.
Scheduler interface for queuing and retrieving jobs.
Error codes and utilities for the thread system.
Base job class for schedulable work units in the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Overflow handling policies: reject, drop-oldest, or block.
Mixin interface for queue capability introspection.
Abstract scheduler interface for queuing and retrieving jobs.
Runtime-queryable queue capabilities descriptor.
Synchronization policies: mutex-based or spinlock-based.