Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
queue_factory_sample.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
19
20#include <iostream>
21#include <string>
22#include <thread>
23#include <atomic>
24#include <vector>
25#include <chrono>
26
27using namespace kcenon::thread;
28
35{
36 std::cout << "=== Example 1: Simple Factory Usage ===" << std::endl;
37
38 // Create standard queue (job_queue) - exact size, batch operations, blocking dequeue
40 std::cout << "Standard queue (job_queue):" << std::endl;
41 std::cout << " - has_exact_size: " << std::boolalpha << standard->has_exact_size() << std::endl;
42 std::cout << " - is_lock_free: " << standard->is_lock_free() << std::endl;
43
44 // Create adaptive queue with performance_first policy - maximum throughput
45 // Note: create_lockfree_queue() is deprecated, use create_adaptive_queue() instead
46 auto performance = queue_factory::create_adaptive_queue(adaptive_job_queue::policy::performance_first);
47 std::cout << "Performance-first queue (adaptive_job_queue with lock-free mode):" << std::endl;
48 std::cout << " - has_exact_size: " << performance->has_exact_size() << std::endl;
49 std::cout << " - is_lock_free: " << performance->is_lock_free() << std::endl;
50
51 // Create adaptive queue - auto-optimizing based on workload
53 std::cout << "Adaptive queue (adaptive_job_queue):" << std::endl;
54 std::cout << " - has_exact_size: " << adaptive->has_exact_size() << std::endl;
55 std::cout << " - is_lock_free: " << adaptive->is_lock_free() << std::endl;
56 std::cout << " - auto-switching enabled for balanced performance" << std::endl;
57
58 std::cout << std::endl;
59}
60
68{
69 std::cout << "=== Example 2: Requirements-Based Selection ===" << std::endl;
70
71 // Scenario: Monitoring system needs exact counts
72 std::cout << "Monitoring queue (need_exact_size=true):" << std::endl;
73 queue_factory::requirements monitoring_reqs;
74 monitoring_reqs.need_exact_size = true;
75 auto monitoring_queue = queue_factory::create_for_requirements(monitoring_reqs);
76 std::cout << " - Returns job_queue via scheduler_interface" << std::endl;
77 std::cout << " - Provides exact size() and empty() operations" << std::endl;
78
79 // Scenario: High-performance logging prefers lock-free
80 std::cout << "Logging queue (prefer_lock_free=true):" << std::endl;
81 queue_factory::requirements logging_reqs;
82 logging_reqs.prefer_lock_free = true;
83 auto logging_queue = queue_factory::create_for_requirements(logging_reqs);
84 std::cout << " - Returns adaptive_job_queue via scheduler_interface" << std::endl;
85 std::cout << " - Maximum throughput for high-volume logging" << std::endl;
86
87 // Scenario: Batch processing needs batch operations
88 std::cout << "Batch queue (need_batch_operations=true):" << std::endl;
90 batch_reqs.need_batch_operations = true;
91 auto batch_queue = queue_factory::create_for_requirements(batch_reqs);
92 std::cout << " - Returns job_queue for batch operation support" << std::endl;
93
94 // Scenario: No specific requirements - gets adaptive queue
95 std::cout << "Default queue (no specific requirements):" << std::endl;
96 queue_factory::requirements default_reqs;
97 auto default_queue = queue_factory::create_for_requirements(default_reqs);
98 std::cout << " - Returns adaptive_job_queue for flexibility" << std::endl;
99
100 // Demonstrate basic scheduler_interface usage
101 std::cout << "\nUsing scheduler_interface:" << std::endl;
102 auto job = std::make_unique<callback_job>(
103 []() -> kcenon::common::VoidResult {
104 std::cout << " - Job executed!" << std::endl;
105 return kcenon::common::ok();
106 });
107 auto schedule_result = monitoring_queue->schedule(std::move(job));
108 if (schedule_result.is_ok()) {
109 auto next_job = monitoring_queue->get_next_job();
110 if (next_job.is_ok()) {
111 auto work_result = next_job.value()->do_work();
112 (void)work_result;
113 }
114 }
115
116 std::cout << std::endl;
117}
118
125{
126 std::cout << "=== Example 3: Optimal Queue Selection ===" << std::endl;
127
128 auto optimal = queue_factory::create_optimal();
129
130 std::cout << "Optimal queue selected for this system:" << std::endl;
131 std::cout << " Selection criteria:" << std::endl;
132 std::cout << " - Hardware concurrency: " << std::thread::hardware_concurrency() << " cores" << std::endl;
133#if defined(__aarch64__) || defined(_M_ARM64)
134 std::cout << " - Architecture: ARM (weak memory model)" << std::endl;
135 std::cout << " - Selection: job_queue (safety priority)" << std::endl;
136#else
137 std::cout << " - Architecture: x86 (strong memory model)" << std::endl;
138 if (std::thread::hardware_concurrency() <= 2) {
139 std::cout << " - Selection: job_queue (mutex efficient for low core count)" << std::endl;
140 } else {
141 std::cout << " - Selection: adaptive_job_queue (best of both worlds)" << std::endl;
142 }
143#endif
144
145 // Demonstrate usage through scheduler_interface
146 std::cout << "\nUsing optimal queue:" << std::endl;
147 std::atomic<int> job_count{0};
148 const int num_jobs = 5;
149
150 for (int i = 0; i < num_jobs; ++i) {
151 auto job = std::make_unique<callback_job>(
152 [&job_count]() -> kcenon::common::VoidResult {
153 job_count.fetch_add(1);
154 return kcenon::common::ok();
155 });
156 optimal->schedule(std::move(job));
157 }
158
159 // Process all jobs
160 for (int i = 0; i < num_jobs; ++i) {
161 auto result = optimal->get_next_job();
162 if (result.is_ok()) {
163 auto work_result = result.value()->do_work();
164 (void)work_result;
165 }
166 }
167 std::cout << " Processed " << job_count.load() << " jobs" << std::endl;
168
169 std::cout << std::endl;
170}
171
178{
179 std::cout << "=== Example 4: Compile-Time Selection ===" << std::endl;
180
181 // Type aliases for common use cases
182 std::cout << "Pre-defined type aliases:" << std::endl;
183 std::cout << " - accurate_queue_t = job_queue (exact size/empty)" << std::endl;
184 std::cout << " - fast_queue_t = adaptive_job_queue (maximum throughput)" << std::endl;
185 std::cout << " - balanced_queue_t = adaptive_job_queue (auto-tuning)" << std::endl;
186
187 // Demonstrate usage
188 accurate_queue_t accurate;
189 fast_queue_t fast;
190 balanced_queue_t balanced;
191
192 std::cout << "\nInstantiated queues:" << std::endl;
193 std::cout << " - accurate_queue_t has_exact_size: " << std::boolalpha << accurate.has_exact_size() << std::endl;
194 std::cout << " - fast_queue_t is_lock_free: " << fast.is_lock_free() << std::endl;
195 std::cout << " - balanced_queue_t (adaptive mode)" << std::endl;
196
197 // Show template-based selection
198 std::cout << "\nTemplate-based selection (queue_t<NeedExactSize, PreferLockFree>):" << std::endl;
199 std::cout << " - queue_t<true, false> -> job_queue" << std::endl;
200 std::cout << " - queue_t<false, true> -> adaptive_job_queue (performance mode)" << std::endl;
201 std::cout << " - queue_t<false, false> -> adaptive_job_queue (balanced mode)" << std::endl;
202 std::cout << " - queue_t<true, true> -> compile error (mutually exclusive)" << std::endl;
203
204 std::cout << std::endl;
205}
206
213{
214 std::cout << "=== Example 5: Practical Use Cases ===" << std::endl;
215
216 // Financial system: needs exact counts for audit
217 std::cout << "\n[Financial System - Audit Queue]" << std::endl;
218 std::cout << " Requirements: exact_size + batch_operations" << std::endl;
219 std::cout << " Selected: job_queue (mutex-based for accuracy)" << std::endl;
220 auto financial_queue = queue_factory::create_standard_queue();
221
222 // High-frequency trading: needs maximum speed
223 std::cout << "\n[High-Frequency Trading - Order Queue]" << std::endl;
224 std::cout << " Requirements: prefer_lock_free" << std::endl;
225 std::cout << " Selected: adaptive_job_queue with performance_first policy" << std::endl;
226 auto hft_queue = queue_factory::create_adaptive_queue(adaptive_job_queue::policy::performance_first);
227
228 // Web server: balanced workload
229 std::cout << "\n[Web Server - Request Queue]" << std::endl;
230 std::cout << " Requirements: variable load, auto-tuning" << std::endl;
231 std::cout << " Selected: adaptive_job_queue with balanced policy" << std::endl;
232 auto web_queue = queue_factory::create_adaptive_queue(adaptive_job_queue::policy::balanced);
233
234 // Demonstrate actual usage with financial queue (has exact size)
235 std::cout << "\n[Demo: Processing jobs through financial queue]" << std::endl;
236 std::atomic<int> processed{0};
237
238 // Enqueue some jobs
239 for (int i = 0; i < 5; ++i) {
240 auto job = std::make_unique<callback_job>(
241 [i, &processed]() -> kcenon::common::VoidResult {
242 processed.fetch_add(1);
243 return kcenon::common::ok();
244 });
245 auto result = financial_queue->enqueue(std::move(job));
246 if (!result.is_err()) {
247 std::cout << " Enqueued job " << i << ", queue size: " << financial_queue->size() << std::endl;
248 }
249 }
250
251 // Process jobs
252 while (!financial_queue->empty()) {
253 auto result = financial_queue->dequeue();
254 if (result.is_ok()) {
255 auto work_result = result.value()->do_work();
256 (void)work_result;
257 }
258 }
259 std::cout << " Processed " << processed.load() << " jobs" << std::endl;
260
261 // Demonstrate HFT queue (lock-free)
262 std::cout << "\n[Demo: High-frequency trading simulation]" << std::endl;
263 std::atomic<int> orders_processed{0};
264 const int order_count = 1000;
265
266 auto start = std::chrono::high_resolution_clock::now();
267
268 // Enqueue orders
269 for (int i = 0; i < order_count; ++i) {
270 auto job = std::make_unique<callback_job>(
271 [&orders_processed]() -> kcenon::common::VoidResult {
272 orders_processed.fetch_add(1);
273 return kcenon::common::ok();
274 });
275 auto enqueue_result = hft_queue->enqueue(std::move(job));
276 (void)enqueue_result;
277 }
278
279 // Process orders
280 while (true) {
281 auto result = hft_queue->dequeue();
282 if (result.is_err()) break;
283 auto work_result = result.value()->do_work();
284 (void)work_result;
285 }
286
287 auto duration = std::chrono::high_resolution_clock::now() - start;
288 auto us = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
289 std::cout << " Processed " << orders_processed.load() << " orders in " << us << " us" << std::endl;
290 if (us > 0) {
291 std::cout << " Throughput: " << (order_count * 1000000.0 / us) << " ops/sec" << std::endl;
292 }
293
294 std::cout << std::endl;
295}
296
297int main()
298{
299 std::cout << "Queue Factory Sample" << std::endl;
300 std::cout << "====================" << std::endl;
301 std::cout << std::endl;
302
303 try {
309 } catch (const std::exception& e) {
310 std::cerr << "Exception: " << e.what() << std::endl;
311 return 1;
312 }
313
314 std::cout << "All examples completed successfully!" << std::endl;
315
316 return 0;
317}
Specialized job class that encapsulates user-defined callbacks.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
static auto create_optimal() -> std::unique_ptr< scheduler_interface >
Create optimal queue for current environment.
static auto create_standard_queue() -> std::shared_ptr< job_queue >
Create standard job_queue.
static auto create_adaptive_queue(adaptive_job_queue::policy policy=adaptive_job_queue::policy::balanced) -> std::unique_ptr< adaptive_job_queue >
Create adaptive queue (RECOMMENDED for most use cases)
static auto create_for_requirements(const requirements &reqs) -> std::unique_ptr< scheduler_interface >
Create queue based on requirements.
A template class representing either a value or an error.
T & value() &
Gets the value.
bool is_ok() const noexcept
Checks if the result is successful.
@ adaptive
Automatically adjust based on load conditions.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
queue_t< false, false > balanced_queue_t
Queue type for balanced performance (adaptive_job_queue)
queue_t< false, true > fast_queue_t
Queue type for maximum throughput (adaptive_job_queue with performance_first policy)
queue_t< true, false > accurate_queue_t
Queue type for accurate size/empty operations (job_queue)
Factory for creating queue instances based on configuration.
void optimal_selection()
Example 3: Optimal queue selection.
void requirements_based_selection()
Example 2: Requirements-based selection.
void simple_factory_usage()
Example 1: Simple factory usage.
void practical_use_cases()
Example 5: Practical use cases.
void compile_time_selection()
Example 4: Compile-time selection.
bool prefer_lock_free
Prefer lock-free if possible.
bool need_batch_operations
Require batch enqueue/dequeue.