Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
queue_capabilities_sample.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
24
25#include <iostream>
26#include <memory>
27#include <string>
28
29using namespace kcenon::thread;
30
37{
38 std::cout << "=== Example 1: Basic Capability Query ===" << std::endl;
39
40 auto mutex_queue = std::make_shared<job_queue>();
41 auto lockfree_queue = std::make_unique<detail::lockfree_job_queue>();
42
43 // Query capabilities using get_capabilities()
44 auto mutex_caps = mutex_queue->get_capabilities();
45 auto lockfree_caps = lockfree_queue->get_capabilities();
46
47 std::cout << "\njob_queue capabilities:" << std::endl;
48 std::cout << " exact_size: " << std::boolalpha << mutex_caps.exact_size << std::endl;
49 std::cout << " atomic_empty_check: " << mutex_caps.atomic_empty_check << std::endl;
50 std::cout << " lock_free: " << mutex_caps.lock_free << std::endl;
51 std::cout << " wait_free: " << mutex_caps.wait_free << std::endl;
52 std::cout << " supports_batch: " << mutex_caps.supports_batch << std::endl;
53 std::cout << " supports_blocking_wait: " << mutex_caps.supports_blocking_wait << std::endl;
54 std::cout << " supports_stop: " << mutex_caps.supports_stop << std::endl;
55
56 std::cout << "\ndetail::lockfree_job_queue capabilities:" << std::endl;
57 std::cout << " exact_size: " << lockfree_caps.exact_size << std::endl;
58 std::cout << " atomic_empty_check: " << lockfree_caps.atomic_empty_check << std::endl;
59 std::cout << " lock_free: " << lockfree_caps.lock_free << std::endl;
60 std::cout << " wait_free: " << lockfree_caps.wait_free << std::endl;
61 std::cout << " supports_batch: " << lockfree_caps.supports_batch << std::endl;
62 std::cout << " supports_blocking_wait: " << lockfree_caps.supports_blocking_wait << std::endl;
63 std::cout << " supports_stop: " << lockfree_caps.supports_stop << std::endl;
64
65 std::cout << std::endl;
66}
67
74{
75 std::cout << "=== Example 2: Convenience Methods ===" << std::endl;
76
77 auto queue = std::make_shared<job_queue>();
78
79 std::cout << "\nUsing convenience methods on job_queue:" << std::endl;
80
81 // Use convenience methods for cleaner code
82 if (queue->has_exact_size()) {
83 std::cout << " [OK] Queue size is exact: " << queue->size() << std::endl;
84 }
85
86 if (queue->has_atomic_empty()) {
87 std::cout << " [OK] Empty check is atomic: " << std::boolalpha << queue->empty() << std::endl;
88 }
89
90 if (!queue->is_lock_free()) {
91 std::cout << " [OK] Queue uses mutex (good for accuracy)" << std::endl;
92 }
93
94 if (!queue->is_wait_free()) {
95 std::cout << " [OK] Queue is not wait-free" << std::endl;
96 }
97
98 if (queue->supports_batch()) {
99 std::cout << " [OK] Batch operations supported" << std::endl;
100 }
101
102 if (queue->supports_blocking_wait()) {
103 std::cout << " [OK] Blocking wait supported" << std::endl;
104 }
105
106 if (queue->supports_stop()) {
107 std::cout << " [OK] Stop signaling supported" << std::endl;
108 }
109
110 std::cout << std::endl;
111}
112
119{
120 std::cout << "=== Example 3: Dynamic Capability Check ===" << std::endl;
121
122 std::cout << "\nChecking capabilities through scheduler_interface*:" << std::endl;
123
124 // Check if scheduler supports capabilities interface using dynamic_cast
125 if (auto* cap = dynamic_cast<queue_capabilities_interface*>(scheduler)) {
126 std::cout << " [OK] Scheduler supports capability introspection" << std::endl;
127
128 if (cap->has_exact_size()) {
129 std::cout << " -> Safe to use size() for decisions" << std::endl;
130 } else {
131 std::cout << " -> size() is approximate, use with caution" << std::endl;
132 }
133
134 if (cap->is_lock_free()) {
135 std::cout << " -> Lock-free implementation (high throughput)" << std::endl;
136 } else {
137 std::cout << " -> Mutex-based implementation (accurate metrics)" << std::endl;
138 }
139
140 if (cap->supports_blocking_wait()) {
141 std::cout << " -> Blocking dequeue available" << std::endl;
142 } else {
143 std::cout << " -> Use polling/spin-wait for dequeue" << std::endl;
144 }
145 } else {
146 std::cout << " [!] Scheduler does not support capability introspection" << std::endl;
147 }
148
149 std::cout << std::endl;
150}
151
158 std::unique_ptr<scheduler_interface> queue_;
160
161public:
162 explicit SmartJobProcessor(bool need_exact_metrics)
163 {
164 if (need_exact_metrics) {
165 auto jq = std::make_unique<job_queue>();
167 queue_ = std::move(jq);
168 } else {
169 auto lfq = std::make_unique<detail::lockfree_job_queue>();
171 queue_ = std::move(lfq);
172 }
173 }
174
175 void log_status() const
176 {
178 // Safe to use for monitoring/alerting
179 std::cout << " Exact queue size: " << get_size() << std::endl;
180 } else {
181 // Approximate, for logging only
182 std::cout << " Approximate queue size: ~" << get_size() << std::endl;
183 }
184 }
185
186 [[nodiscard]] bool has_exact_metrics() const { return exact_metrics_available_; }
187
188private:
189 [[nodiscard]] size_t get_size() const
190 {
191 if (auto* cap = dynamic_cast<queue_capabilities_interface*>(queue_.get())) {
192 // Use the polymorphic interface for demonstration
193 if (auto* jq = dynamic_cast<job_queue*>(queue_.get())) {
194 return jq->size();
195 }
196 if (auto* lfq = dynamic_cast<detail::lockfree_job_queue*>(queue_.get())) {
197 return lfq->size();
198 }
199 }
200 return 0;
201 }
202};
203
210{
211 std::cout << "=== Example 4: Capability-Driven Selection ===" << std::endl;
212
213 std::cout << "\nCreating processors with different requirements:" << std::endl;
214
215 // Monitoring processor needs exact metrics
216 SmartJobProcessor monitoring_processor(true);
217 std::cout << "\n[Monitoring Processor] (needs exact metrics)" << std::endl;
218 std::cout << " exact_metrics_available: " << std::boolalpha
219 << monitoring_processor.has_exact_metrics() << std::endl;
220 monitoring_processor.log_status();
221
222 // Logging processor can use approximate metrics
223 SmartJobProcessor logging_processor(false);
224 std::cout << "\n[Logging Processor] (approximate is fine)" << std::endl;
225 std::cout << " exact_metrics_available: "
226 << logging_processor.has_exact_metrics() << std::endl;
227 logging_processor.log_status();
228
229 std::cout << std::endl;
230}
231
238{
239 std::cout << "=== Example 5: Capability Comparison Table ===" << std::endl;
240
241 auto mutex_q = std::make_shared<job_queue>();
242 auto lockfree_q = std::make_unique<detail::lockfree_job_queue>();
243 auto adaptive_q = std::make_unique<adaptive_job_queue>();
244
245 auto print_caps = [](const char* name, const queue_capabilities& caps) {
246 std::cout << "\n" << name << ":" << std::endl;
247 std::cout << " exact_size = " << std::boolalpha << caps.exact_size << std::endl;
248 std::cout << " atomic_empty_check = " << caps.atomic_empty_check << std::endl;
249 std::cout << " lock_free = " << caps.lock_free << std::endl;
250 std::cout << " wait_free = " << caps.wait_free << std::endl;
251 std::cout << " supports_batch = " << caps.supports_batch << std::endl;
252 std::cout << " supports_blocking = " << caps.supports_blocking_wait << std::endl;
253 std::cout << " supports_stop = " << caps.supports_stop << std::endl;
254 };
255
256 print_caps("job_queue (mutex-based)", mutex_q->get_capabilities());
257 print_caps("detail::lockfree_job_queue", lockfree_q->get_capabilities());
258 print_caps("adaptive_job_queue (default mode)", adaptive_q->get_capabilities());
259
260 // Summary table
261 std::cout << "\n--- Summary Table ---" << std::endl;
262 std::cout << "Queue Type | exact | atomic | lock-free | batch | blocking" << std::endl;
263 std::cout << "----------------------|-------|--------|-----------|-------|----------" << std::endl;
264
265 auto caps = mutex_q->get_capabilities();
266 std::cout << "job_queue | "
267 << (caps.exact_size ? "Y" : "N") << " | "
268 << (caps.atomic_empty_check ? "Y" : "N") << " | "
269 << (caps.lock_free ? "Y" : "N") << " | "
270 << (caps.supports_batch ? "Y" : "N") << " | "
271 << (caps.supports_blocking_wait ? "Y" : "N") << std::endl;
272
273 caps = lockfree_q->get_capabilities();
274 std::cout << "detail::lockfree_job_queue | "
275 << (caps.exact_size ? "Y" : "N") << " | "
276 << (caps.atomic_empty_check ? "Y" : "N") << " | "
277 << (caps.lock_free ? "Y" : "N") << " | "
278 << (caps.supports_batch ? "Y" : "N") << " | "
279 << (caps.supports_blocking_wait ? "Y" : "N") << std::endl;
280
281 caps = adaptive_q->get_capabilities();
282 std::cout << "adaptive_job_queue | "
283 << (caps.exact_size ? "Y" : "N") << " | "
284 << (caps.atomic_empty_check ? "Y" : "N") << " | "
285 << (caps.lock_free ? "Y" : "N") << " | "
286 << (caps.supports_batch ? "Y" : "N") << " | "
287 << (caps.supports_blocking_wait ? "Y" : "N") << std::endl;
288
289 std::cout << std::endl;
290}
291
292int main()
293{
294 std::cout << "Queue Capabilities Sample" << std::endl;
295 std::cout << "=========================" << std::endl;
296 std::cout << std::endl;
297 std::cout << "This sample demonstrates queue_capabilities_interface usage" << std::endl;
298 std::cout << "for runtime capability introspection." << std::endl;
299 std::cout << std::endl;
300
301 try {
302 // Example 1: Basic capability query
304
305 // Example 2: Convenience methods
307
308 // Example 3: Dynamic capability check
309 auto queue = std::make_unique<job_queue>();
310 dynamic_capability_check(queue.get());
311
312 // Example 4: Capability-driven selection
314
315 // Example 5: Capability comparison
317
318 } catch (const std::exception& e) {
319 std::cerr << "Exception: " << e.what() << std::endl;
320 return 1;
321 }
322
323 std::cout << "All examples completed successfully!" << std::endl;
324
325 return 0;
326}
Adaptive queue that auto-switches between mutex and lock-free modes.
Specialized job class that encapsulates user-defined callbacks.
Smart job processor that adapts to queue capabilities.
SmartJobProcessor(bool need_exact_metrics)
std::unique_ptr< scheduler_interface > queue_
Lock-free Multi-Producer Multi-Consumer (MPMC) job queue (Internal implementation)
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Mixin interface for queue capability introspection.
Scheduler interface for queuing and retrieving jobs.
Thread-safe FIFO job queue with optional bounded size.
Lock-free MPMC job queue using Michael-Scott algorithm with hazard pointers.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Mixin interface for queue capability introspection.
void basic_capability_query()
Example 1: Basic capability query.
void capability_driven_selection()
Example 4: Capability-driven queue selection.
void capability_comparison()
Example 5: Capability comparison table.
void dynamic_capability_check(scheduler_interface *scheduler)
Example 3: Dynamic capability check (polymorphic)
void convenience_methods()
Example 2: Convenience methods.
Abstract scheduler interface for queuing and retrieving jobs.
Runtime-queryable queue capabilities descriptor.