Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
adaptive_queue_sample.cpp File Reference

Adaptive job queue sample comparing policies and demonstrating mode switching. More...

#include <kcenon/thread/queue/adaptive_job_queue.h>
#include <kcenon/thread/core/callback_job.h>
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>
#include <random>
#include <iostream>
#include <iomanip>
Include dependency graph for adaptive_queue_sample.cpp:

Go to the source code of this file.

Functions

std::string mode_to_string (adaptive_job_queue::mode m)
 
void policy_comparison_example ()
 
void adaptive_behavior_example ()
 
void different_policies_example ()
 
void performance_monitoring_example ()
 
void web_server_simulation ()
 
int main ()
 

Detailed Description

Adaptive job queue sample comparing policies and demonstrating mode switching.

Definition in file adaptive_queue_sample.cpp.

Function Documentation

◆ adaptive_behavior_example()

void adaptive_behavior_example ( )
Examples
adaptive_queue_sample.cpp.

Definition at line 134 of file adaptive_queue_sample.cpp.

135{
136 std::cout << "\n[Example 2] Balanced Policy Behavior" << std::endl;
137
138 adaptive_job_queue queue(adaptive_job_queue::policy::balanced);
139
140 // Low contention phase (1 producer, 1 consumer)
141 std::cout << "Phase 1: Low contention (1P-1C)" << std::endl;
142 {
143 std::atomic<bool> running{true};
144 std::atomic<int> jobs_processed{0};
145
146 std::thread producer([&queue, &running]() {
147 while (running) {
148 auto job = std::make_unique<callback_job>(
149 []() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
150 auto enqueue_result = queue.enqueue(std::move(job));
151 if (enqueue_result.is_err()) {
152 std::cerr << "enqueue failed: " << enqueue_result.error().message << std::endl;
153 }
154 std::this_thread::sleep_for(1ms);
155 }
156 });
157
158 std::thread consumer([&queue, &running, &jobs_processed]() {
159 while (running) {
160 auto result = queue.dequeue();
161 if (result.is_ok()) {
162 auto work_result = result.value()->do_work();
163 (void)work_result; // Ignore result for sample
164 jobs_processed.fetch_add(1);
165 }
166 std::this_thread::sleep_for(1ms);
167 }
168 });
169
170 std::this_thread::sleep_for(2s);
171 running = false;
172 producer.join();
173 consumer.join();
174
175 auto current_mode = queue.current_mode();
176 std::cout << " Current mode: " << mode_to_string(current_mode)
177 << ", Jobs processed: " << jobs_processed.load() << std::endl;
178 }
179
180 // High contention phase (8 producers, 8 consumers)
181 std::cout << "Phase 2: High contention (8P-8C)" << std::endl;
182 {
183 std::atomic<bool> running{true};
184 std::atomic<int> jobs_processed{0};
185 std::vector<std::thread> threads;
186
187 // Start producers
188 for (int i = 0; i < 8; ++i) {
189 threads.emplace_back([&queue, &running]() {
190 std::random_device rd;
191 std::mt19937 gen(rd());
192 std::uniform_int_distribution<> dist(0, 100);
193
194 while (running) {
195 auto job = std::make_unique<callback_job>(
196 []() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
197 auto enqueue_result = queue.enqueue(std::move(job));
198 if (enqueue_result.is_err()) {
199 // Best-effort: ignore for demo
200 }
201 if (dist(gen) < 10) { // 10% chance of sleep
202 std::this_thread::sleep_for(std::chrono::microseconds(dist(gen)));
203 }
204 }
205 });
206 }
207
208 // Start consumers
209 for (int i = 0; i < 8; ++i) {
210 threads.emplace_back([&queue, &running, &jobs_processed]() {
211 while (running) {
212 auto result = queue.dequeue();
213 if (result.is_ok()) {
214 auto work_result = result.value()->do_work();
215 (void)work_result; // Ignore result for sample
216 jobs_processed.fetch_add(1);
217 }
218 }
219 });
220 }
221
222 std::this_thread::sleep_for(2s);
223 running = false;
224 for (auto& t : threads) t.join();
225
226 auto current_mode = queue.current_mode();
227 std::cout << " Current mode: " << mode_to_string(current_mode)
228 << ", Jobs processed: " << jobs_processed.load() << std::endl;
229 }
230}
std::string mode_to_string(adaptive_job_queue::mode m)
Adaptive queue that switches between mutex and lock-free modes.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
T & value() &
Gets the value.
bool is_ok() const noexcept
Checks if the result is successful.
@ running
Currently executing.

References kcenon::thread::adaptive_job_queue::current_mode(), kcenon::thread::adaptive_job_queue::dequeue(), kcenon::thread::adaptive_job_queue::enqueue(), kcenon::thread::result< T >::is_ok(), mode_to_string(), kcenon::thread::running, and kcenon::thread::result< T >::value().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ different_policies_example()

void different_policies_example ( )
Examples
adaptive_queue_sample.cpp.

Definition at line 233 of file adaptive_queue_sample.cpp.

234{
235 std::cout << "\n[Example 3] Different Queue Policies" << std::endl;
236
237 // Create queue with accuracy-first policy (mutex mode)
238 adaptive_job_queue mutex_queue(adaptive_job_queue::policy::accuracy_first);
239 std::cout << "Accuracy-first queue mode: " << mode_to_string(mutex_queue.current_mode()) << std::endl;
240
241 // Perform some operations
242 std::vector<std::unique_ptr<job>> jobs;
243 for (int i = 0; i < 100; ++i) {
244 jobs.push_back(std::make_unique<callback_job>(
245 [i]() -> kcenon::common::VoidResult {
246 // Job executed silently for batch demo
247 return kcenon::common::ok(); // Success
248 }));
249 }
250
251 // Enqueue jobs one by one
252 int enqueue_count = 0;
253 for (auto& job : jobs) {
254 auto result = mutex_queue.enqueue(std::move(job));
255 if (result.is_ok()) {
256 enqueue_count++;
257 }
258 }
259 std::cout << "Enqueued " << enqueue_count << " jobs" << std::endl;
260
261 // Create queue with performance-first policy (lock-free mode)
262 adaptive_job_queue lockfree_queue(adaptive_job_queue::policy::performance_first);
263 std::cout << "Performance-first queue mode: " << mode_to_string(lockfree_queue.current_mode()) << std::endl;
264
265 // Dequeue and process jobs from mutex queue
266 int success_count = 0;
267 int fail_count = 0;
268 while (!mutex_queue.empty()) {
269 auto result = mutex_queue.dequeue();
270 if (result.is_ok()) {
271 auto work_result = result.value()->do_work();
272 if (work_result.is_ok()) {
273 success_count++;
274 } else {
275 fail_count++;
276 std::cerr << "Job failed: " << work_result.error().message << std::endl;
277 }
278 }
279 }
280 std::cout << "Processed " << success_count << " jobs successfully, "
281 << fail_count << " failed" << std::endl;
282}

References kcenon::thread::adaptive_job_queue::current_mode(), kcenon::thread::adaptive_job_queue::dequeue(), kcenon::thread::adaptive_job_queue::empty(), kcenon::thread::adaptive_job_queue::enqueue(), kcenon::thread::result< T >::is_ok(), mode_to_string(), and kcenon::thread::result< T >::value().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

int main ( )
Examples
adaptive_queue_sample.cpp, composition_example.cpp, config_example.cpp, crash_protection/main.cpp, hazard_pointer_sample.cpp, integration_example.cpp, job_cancellation_example.cpp, logger_sample.cpp, minimal_thread_pool.cpp, mpmc_queue_sample.cpp, multi_process_monitoring_integration.cpp, node_pool_sample.cpp, queue_capabilities_sample.cpp, queue_factory_sample.cpp, service_registry_sample/main.cpp, thread_pool_sample.cpp, typed_job_queue_sample.cpp, typed_thread_pool_sample.cpp, and typed_thread_pool_sample_2.cpp.

Definition at line 437 of file adaptive_queue_sample.cpp.

438{
439 std::cout << "Adaptive Job Queue Sample" << std::endl;
440 std::cout << "=========================" << std::endl;
441
442 try {
448 } catch (const std::exception& e) {
449 std::cerr << "Exception: " << e.what() << std::endl;
450 return 1;
451 }
452
453 std::cout << "\nAll examples completed!" << std::endl;
454
455 return 0;
456}
void web_server_simulation()
void performance_monitoring_example()
void different_policies_example()
void adaptive_behavior_example()
void policy_comparison_example()

References adaptive_behavior_example(), different_policies_example(), performance_monitoring_example(), policy_comparison_example(), and web_server_simulation().

Here is the call graph for this function:

◆ mode_to_string()

std::string mode_to_string ( adaptive_job_queue::mode m)
Examples
adaptive_queue_sample.cpp.

Definition at line 33 of file adaptive_queue_sample.cpp.

33 {
34 switch (m) {
35 case adaptive_job_queue::mode::mutex: return "mutex";
36 case adaptive_job_queue::mode::lock_free: return "lock_free";
37 }
38 return "unknown";
39}

Referenced by adaptive_behavior_example(), different_policies_example(), and performance_monitoring_example().

Here is the caller graph for this function:

◆ performance_monitoring_example()

void performance_monitoring_example ( )
Examples
adaptive_queue_sample.cpp.

Definition at line 285 of file adaptive_queue_sample.cpp.

286{
287 std::cout << "\n[Example 4] Performance Monitoring" << std::endl;
288
289 adaptive_job_queue queue(adaptive_job_queue::policy::balanced);
290
291 const int num_operations = 50000;
292 std::atomic<bool> running{true};
293 std::atomic<int> enqueued{0};
294 std::atomic<int> dequeued{0};
295
296 // Producer thread
297 std::thread producer([&queue, &enqueued, num_operations]() {
298 for (int i = 0; i < num_operations; ++i) {
299 auto job = std::make_unique<callback_job>(
300 []() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
301
302 while (queue.enqueue(std::move(job)).is_err()) {
303 std::this_thread::yield();
304 job = std::make_unique<callback_job>(
305 []() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
306 }
307 enqueued.fetch_add(1);
308 }
309 });
310
311 // Consumer thread
312 std::thread consumer([&queue, &dequeued, num_operations]() {
313 while (dequeued.load() < num_operations) {
314 auto result = queue.dequeue();
315 if (result.is_ok()) {
316 auto work_result = result.value()->do_work();
317 (void)work_result; // Ignore result for sample
318 dequeued.fetch_add(1);
319 }
320 }
321 });
322
323 // Monitor thread
324 std::thread monitor([&queue, &running, &enqueued, &dequeued, num_operations]() {
325 auto start = std::chrono::steady_clock::now();
326
327 while (dequeued.load() < num_operations) {
328 std::this_thread::sleep_for(500ms);
329
330 auto now = std::chrono::steady_clock::now();
331 auto elapsed = std::chrono::duration<double>(now - start).count();
332
333 auto current_mode = queue.current_mode();
334 double rate = (elapsed > 0) ? (dequeued.load() / elapsed) : 0;
335
336 std::cout << "Status: " << mode_to_string(current_mode) << " mode, Enqueued: "
337 << enqueued.load() << ", Dequeued: " << dequeued.load()
338 << ", Rate: " << std::fixed << std::setprecision(0)
339 << rate << " ops/sec" << std::endl;
340 }
341 });
342
343 producer.join();
344 consumer.join();
345 running = false;
346 monitor.join();
347
348 // Print statistics
349 auto stats = queue.get_stats();
350 std::cout << "Completed " << num_operations << " operations" << std::endl;
351 std::cout << "Statistics: mode_switches=" << stats.mode_switches
352 << ", enqueues=" << stats.enqueue_count
353 << ", dequeues=" << stats.dequeue_count << std::endl;
354}
@ dequeued
Job was taken from queue by a worker.
@ enqueued
Job was added to the queue.

References kcenon::thread::adaptive_job_queue::current_mode(), kcenon::thread::adaptive_job_queue::dequeue(), kcenon::thread::adaptive_job_queue::enqueue(), kcenon::thread::adaptive_job_queue::get_stats(), kcenon::thread::result< T >::is_ok(), mode_to_string(), kcenon::thread::running, and kcenon::thread::result< T >::value().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ policy_comparison_example()

void policy_comparison_example ( )
Examples
adaptive_queue_sample.cpp.

Definition at line 42 of file adaptive_queue_sample.cpp.

43{
44 std::cout << "[Example 1] Queue Policy Comparison" << std::endl;
45
46 const int num_jobs = 10000;
47 const int num_producers = 4;
48 const int num_consumers = 4;
49
50 // Test each policy
51 for (auto policy : {adaptive_job_queue::policy::accuracy_first,
52 adaptive_job_queue::policy::performance_first,
53 adaptive_job_queue::policy::balanced})
54 {
55 adaptive_job_queue queue(policy);
56 std::atomic<int> produced{0};
57 std::atomic<int> consumed{0};
58
59 auto start = std::chrono::high_resolution_clock::now();
60
61 std::vector<std::thread> producers;
62 std::vector<std::thread> consumers;
63
64 // Start producers
65 for (int p = 0; p < num_producers; ++p) {
66 producers.emplace_back([&queue, &produced, p, num_jobs, num_producers]() {
67 for (int i = 0; i < num_jobs / num_producers; ++i) {
68 auto job = std::make_unique<callback_job>(
69 [p, i]() -> kcenon::common::VoidResult {
70 return kcenon::common::ok();
71 });
72
73 while (true) {
74 auto r = queue.enqueue(std::move(job));
75 if (!r.is_err()) break;
76 std::this_thread::yield();
77 // Recreate moved job for retry
78 job = std::make_unique<callback_job>(
79 [p, i]() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
80 }
81 produced.fetch_add(1);
82 }
83 });
84 }
85
86 // Start consumers
87 for (int c = 0; c < num_consumers; ++c) {
88 consumers.emplace_back([&queue, &consumed, num_jobs]() {
89 while (consumed.load() < num_jobs) {
90 auto result = queue.dequeue();
91 if (result.is_ok()) {
92 auto& job = result.value();
93 auto work_result = job->do_work();
94 (void)work_result; // Ignore result for sample
95 consumed.fetch_add(1);
96 } else {
97 std::this_thread::yield();
98 }
99 }
100 });
101 }
102
103 // Wait for completion
104 for (auto& t : producers) t.join();
105 for (auto& t : consumers) t.join();
106
107 auto duration = std::chrono::high_resolution_clock::now() - start;
108 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
109
110 std::string policy_name;
111 switch (policy) {
112 case adaptive_job_queue::policy::accuracy_first:
113 policy_name = "Accuracy (Mutex)";
114 break;
115 case adaptive_job_queue::policy::performance_first:
116 policy_name = "Performance (Lock-free)";
117 break;
118 case adaptive_job_queue::policy::balanced:
119 policy_name = "Balanced (Adaptive)";
120 break;
121 case adaptive_job_queue::policy::manual:
122 policy_name = "Manual";
123 break;
124 }
125
126 double ops_per_sec = (ms > 0) ? (num_jobs * 1000.0 / ms) : 0;
127 std::cout << policy_name << " policy: " << num_jobs << " jobs in "
128 << ms << " ms = " << std::fixed << std::setprecision(0)
129 << ops_per_sec << " ops/sec" << std::endl;
130 }
131}
virtual auto do_work(void) -> common::VoidResult
The core task execution method to be overridden by derived classes.
Definition job.cpp:135

References kcenon::thread::adaptive_job_queue::dequeue(), kcenon::thread::job::do_work(), kcenon::thread::adaptive_job_queue::enqueue(), kcenon::thread::result< T >::is_ok(), and kcenon::thread::result< T >::value().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ web_server_simulation()

void web_server_simulation ( )
Examples
adaptive_queue_sample.cpp.

Definition at line 357 of file adaptive_queue_sample.cpp.

358{
359 std::cout << "\n[Example 5] Web Server Simulation" << std::endl;
360
361 adaptive_job_queue request_queue(adaptive_job_queue::policy::balanced);
362 std::atomic<bool> server_running{true};
363 std::atomic<int> requests_handled{0};
364 std::atomic<int> requests_failed{0};
365
366 // Request types
367 enum class request_type { GET, POST, PUT, DELETE };
368
369 // Simulate incoming requests
370 std::vector<std::thread> clients;
371 for (int client_id = 0; client_id < 5; ++client_id) {
372 clients.emplace_back([&request_queue, &server_running, &requests_failed, client_id]() {
373 std::random_device rd;
374 std::mt19937 gen(rd());
375 std::uniform_int_distribution<> type_dist(0, 3);
376 std::uniform_int_distribution<> delay_dist(10, 100);
377
378 while (server_running) {
379 auto type = static_cast<request_type>(type_dist(gen));
380
381 auto request = std::make_unique<callback_job>(
382 [type]() -> kcenon::common::VoidResult {
383 // Simulate request processing
384 std::this_thread::sleep_for(std::chrono::microseconds(
385 type == request_type::GET ? 10 : 50));
386 return kcenon::common::ok(); // Success
387 });
388
389 auto r = request_queue.enqueue(std::move(request));
390 if (r.is_err()) requests_failed.fetch_add(1);
391
392 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
393 }
394 });
395 }
396
397 // Worker threads (server handlers)
398 std::vector<std::thread> workers;
399 for (int worker_id = 0; worker_id < 3; ++worker_id) {
400 workers.emplace_back([&request_queue, &server_running, &requests_handled, worker_id]() {
401 while (server_running) {
402 auto request = request_queue.dequeue();
403 if (request.is_ok()) {
404 auto result = request.value()->do_work();
405 if (result.is_ok()) {
406 // Request processed successfully
407 requests_handled.fetch_add(1);
408 } else {
409 std::cerr << "Worker " << worker_id << " request failed: "
410 << result.error().message << std::endl;
411 }
412 } else {
413 std::this_thread::sleep_for(1ms);
414 }
415 }
416 });
417 }
418
419 // Run simulation for 5 seconds
420 std::this_thread::sleep_for(5s);
421 server_running = false;
422
423 // Cleanup
424 for (auto& t : clients) t.join();
425 for (auto& t : workers) t.join();
426
427 std::cout << "Server simulation complete: " << requests_handled.load()
428 << " requests handled, " << requests_failed.load() << " failed" << std::endl;
429
430 // Print final statistics
431 auto stats = request_queue.get_stats();
432 std::cout << "Final stats: mode_switches=" << stats.mode_switches
433 << ", time_in_mutex=" << stats.time_in_mutex_ms << "ms"
434 << ", time_in_lockfree=" << stats.time_in_lockfree_ms << "ms" << std::endl;
435}

References kcenon::thread::adaptive_job_queue::dequeue(), kcenon::thread::adaptive_job_queue::enqueue(), kcenon::thread::adaptive_job_queue::get_stats(), kcenon::thread::result< T >::is_ok(), and kcenon::thread::result< T >::value().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function: