Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
policy_queue_adapter.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
17
18namespace kcenon::thread {
19
49template<
50 typename SyncPolicy,
51 typename BoundPolicy = policies::unbounded_policy,
52 typename OverflowPolicy = policies::overflow_reject_policy
53>
55public:
57
62 : queue_(std::make_unique<queue_type>()) {}
63
68 explicit policy_queue_adapter(BoundPolicy bound_policy)
69 : queue_(std::make_unique<queue_type>(std::move(bound_policy))) {}
70
77 policy_queue_adapter(SyncPolicy sync_policy,
78 BoundPolicy bound_policy,
79 OverflowPolicy overflow_policy)
80 : queue_(std::make_unique<queue_type>(
81 std::move(sync_policy),
82 std::move(bound_policy),
83 std::move(overflow_policy))) {}
84
89 explicit policy_queue_adapter(std::unique_ptr<queue_type> queue)
90 : queue_(std::move(queue)) {}
91
92 ~policy_queue_adapter() override = default;
93
94 // Non-copyable
97
98 // Movable
101
102 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& j) -> common::VoidResult override {
103 return queue_->enqueue(std::move(j));
104 }
105
106 [[nodiscard]] auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> common::VoidResult override {
107 // policy_queue doesn't have enqueue_batch, so we implement it as a loop
108 for (auto& j : jobs) {
109 auto result = queue_->enqueue(std::move(j));
110 if (result.is_err()) {
111 return result;
112 }
113 }
114 return common::ok();
115 }
116
117 [[nodiscard]] auto dequeue() -> common::Result<std::unique_ptr<job>> override {
118 return queue_->dequeue();
119 }
120
121 [[nodiscard]] auto try_dequeue() -> common::Result<std::unique_ptr<job>> override {
122 return queue_->try_dequeue();
123 }
124
125 [[nodiscard]] auto empty() const -> bool override {
126 return queue_->empty();
127 }
128
129 [[nodiscard]] auto size() const -> std::size_t override {
130 return queue_->size();
131 }
132
133 auto clear() -> void override {
134 queue_->clear();
135 }
136
137 auto stop() -> void override {
138 queue_->stop();
139 }
140
141 [[nodiscard]] auto is_stopped() const -> bool override {
142 return queue_->is_stopped();
143 }
144
145 [[nodiscard]] auto get_capabilities() const -> queue_capabilities override {
146 return queue_->get_capabilities();
147 }
148
149 [[nodiscard]] auto to_string() const -> std::string override {
151 "policy_queue[size={}, stopped={}]",
152 queue_->size(),
153 queue_->is_stopped());
154 }
155
156 [[nodiscard]] auto get_job_queue() const -> std::shared_ptr<job_queue> override {
157 // policy_queue is not a job_queue, return nullptr
158 return nullptr;
159 }
160
161 [[nodiscard]] auto get_scheduler() -> scheduler_interface& override {
162 return *queue_;
163 }
164
165 [[nodiscard]] auto get_scheduler() const -> const scheduler_interface& override {
166 return *queue_;
167 }
168
173 [[nodiscard]] auto get_policy_queue() -> queue_type& {
174 return *queue_;
175 }
176
181 [[nodiscard]] auto get_policy_queue() const -> const queue_type& {
182 return *queue_;
183 }
184
185private:
186 std::unique_ptr<queue_type> queue_;
187};
188
189// ============================================
190// Type aliases for common configurations
191// ============================================
192
200>;
201
209>;
210
211// ============================================
212// Factory functions
213// ============================================
214
224template<typename PolicyQueueType>
225[[nodiscard]] auto make_policy_queue_adapter()
226 -> std::unique_ptr<pool_queue_adapter_interface>
227{
228 using adapter_type = policy_queue_adapter<
229 typename PolicyQueueType::sync_policy_type,
230 typename PolicyQueueType::bound_policy_type,
231 typename PolicyQueueType::overflow_policy_type
232 >;
233 return std::make_unique<adapter_type>();
234}
235
242template<typename PolicyQueueType>
243[[nodiscard]] auto make_policy_queue_adapter(
244 typename PolicyQueueType::bound_policy_type bound_policy)
245 -> std::unique_ptr<pool_queue_adapter_interface>
246{
247 using adapter_type = policy_queue_adapter<
248 typename PolicyQueueType::sync_policy_type,
249 typename PolicyQueueType::bound_policy_type,
250 typename PolicyQueueType::overflow_policy_type
251 >;
252 return std::make_unique<adapter_type>(std::move(bound_policy));
253}
254
259[[nodiscard]] inline auto make_standard_queue_adapter()
260 -> std::unique_ptr<pool_queue_adapter_interface>
261{
262 return std::make_unique<standard_queue_adapter>();
263}
264
269[[nodiscard]] inline auto make_lockfree_queue_adapter()
270 -> std::unique_ptr<pool_queue_adapter_interface>
271{
272 return std::make_unique<lockfree_queue_adapter>();
273}
274
275} // namespace kcenon::thread
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Lock-free synchronization policy using Michael-Scott algorithm.
Synchronization policy using mutex and condition variable.
Policy that rejects new items when queue is full.
Policy that allows unlimited queue size.
Adapter for policy_queue to pool_queue_adapter_interface.
auto enqueue(std::unique_ptr< job > &&j) -> common::VoidResult override
Enqueue a job.
policy_queue_adapter(BoundPolicy bound_policy)
Construct adapter with bound policy.
auto get_policy_queue() const -> const queue_type &
Get direct access to the underlying policy_queue (const)
auto get_scheduler() const -> const scheduler_interface &override
Get the underlying scheduler interface (const)
auto get_policy_queue() -> queue_type &
Get direct access to the underlying policy_queue.
auto dequeue() -> common::Result< std::unique_ptr< job > > override
Dequeue a job (blocking)
policy_queue_adapter(std::unique_ptr< queue_type > queue)
Construct adapter with existing policy_queue.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult override
Enqueue a batch of jobs.
~policy_queue_adapter() override=default
auto get_job_queue() const -> std::shared_ptr< job_queue > override
Get the underlying job_queue if this adapter wraps one.
policy_queue_adapter(SyncPolicy sync_policy, BoundPolicy bound_policy, OverflowPolicy overflow_policy)
Construct adapter with all policies.
policy_queue_adapter & operator=(const policy_queue_adapter &)=delete
auto clear() -> void override
Clear all jobs from queue.
std::unique_ptr< queue_type > queue_
auto empty() const -> bool override
Check if queue is empty.
auto size() const -> std::size_t override
Get queue size.
auto to_string() const -> std::string override
Get string representation.
auto stop() -> void override
Stop the queue.
policy_queue_adapter()
Construct adapter with new policy_queue using default policies.
auto is_stopped() const -> bool override
Check if queue is stopped.
policy_queue_adapter(policy_queue_adapter &&)=default
policy_queue_adapter & operator=(policy_queue_adapter &&)=default
auto get_capabilities() const -> queue_capabilities override
Get queue capabilities.
auto try_dequeue() -> common::Result< std::unique_ptr< job > > override
Try to dequeue a job (non-blocking)
auto get_scheduler() -> scheduler_interface &override
Get the underlying scheduler interface.
policy_queue_adapter(const policy_queue_adapter &)=delete
Policy-based queue template.
Abstract interface for queue adapters used by thread_pool.
A template class representing either a value or an error.
Scheduler interface for queuing and retrieving jobs.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
auto make_lockfree_queue_adapter() -> std::unique_ptr< pool_queue_adapter_interface >
Create a lock-free queue adapter.
auto make_standard_queue_adapter() -> std::unique_ptr< pool_queue_adapter_interface >
Create a standard_queue adapter.
auto make_policy_queue_adapter() -> std::unique_ptr< pool_queue_adapter_interface >
Create a pool_queue_adapter from a policy_queue type.
STL namespace.
Policy-based job queue template with customizable sync, bound, and overflow.
Abstract interface for queue adapters used by thread_pool.
Runtime-queryable queue capabilities descriptor.