44 std::cout <<
"[Example 1] Queue Policy Comparison" << std::endl;
46 const int num_jobs = 10000;
47 const int num_producers = 4;
48 const int num_consumers = 4;
51 for (
auto policy : {adaptive_job_queue::policy::accuracy_first,
52 adaptive_job_queue::policy::performance_first,
53 adaptive_job_queue::policy::balanced})
56 std::atomic<int> produced{0};
57 std::atomic<int> consumed{0};
59 auto start = std::chrono::high_resolution_clock::now();
61 std::vector<std::thread> producers;
62 std::vector<std::thread> consumers;
65 for (
int p = 0; p < num_producers; ++p) {
66 producers.emplace_back([&queue, &produced, p, num_jobs, num_producers]() {
67 for (
int i = 0; i < num_jobs / num_producers; ++i) {
68 auto job = std::make_unique<callback_job>(
69 [p, i]() -> kcenon::common::VoidResult {
70 return kcenon::common::ok();
75 if (!r.is_err())
break;
76 std::this_thread::yield();
78 job = std::make_unique<callback_job>(
79 [p, i]() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
81 produced.fetch_add(1);
87 for (
int c = 0; c < num_consumers; ++c) {
88 consumers.emplace_back([&queue, &consumed, num_jobs]() {
89 while (consumed.load() < num_jobs) {
95 consumed.fetch_add(1);
97 std::this_thread::yield();
104 for (
auto& t : producers) t.join();
105 for (
auto& t : consumers) t.join();
107 auto duration = std::chrono::high_resolution_clock::now() - start;
108 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
110 std::string policy_name;
112 case adaptive_job_queue::policy::accuracy_first:
113 policy_name =
"Accuracy (Mutex)";
115 case adaptive_job_queue::policy::performance_first:
116 policy_name =
"Performance (Lock-free)";
118 case adaptive_job_queue::policy::balanced:
119 policy_name =
"Balanced (Adaptive)";
121 case adaptive_job_queue::policy::manual:
122 policy_name =
"Manual";
126 double ops_per_sec = (ms > 0) ? (num_jobs * 1000.0 / ms) : 0;
127 std::cout << policy_name <<
" policy: " << num_jobs <<
" jobs in "
128 << ms <<
" ms = " << std::fixed << std::setprecision(0)
129 << ops_per_sec <<
" ops/sec" << std::endl;
136 std::cout <<
"\n[Example 2] Balanced Policy Behavior" << std::endl;
141 std::cout <<
"Phase 1: Low contention (1P-1C)" << std::endl;
143 std::atomic<bool>
running{
true};
144 std::atomic<int> jobs_processed{0};
146 std::thread producer([&queue, &
running]() {
148 auto job = std::make_unique<callback_job>(
149 []() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
150 auto enqueue_result = queue.
enqueue(std::move(
job));
151 if (enqueue_result.is_err()) {
152 std::cerr <<
"enqueue failed: " << enqueue_result.error().message << std::endl;
154 std::this_thread::sleep_for(1ms);
158 std::thread consumer([&queue, &
running, &jobs_processed]() {
164 jobs_processed.fetch_add(1);
166 std::this_thread::sleep_for(1ms);
170 std::this_thread::sleep_for(2s);
177 <<
", Jobs processed: " << jobs_processed.load() << std::endl;
181 std::cout <<
"Phase 2: High contention (8P-8C)" << std::endl;
183 std::atomic<bool>
running{
true};
184 std::atomic<int> jobs_processed{0};
185 std::vector<std::thread> threads;
188 for (
int i = 0; i < 8; ++i) {
189 threads.emplace_back([&queue, &
running]() {
190 std::random_device rd;
191 std::mt19937 gen(rd());
192 std::uniform_int_distribution<> dist(0, 100);
195 auto job = std::make_unique<callback_job>(
196 []() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
197 auto enqueue_result = queue.
enqueue(std::move(
job));
198 if (enqueue_result.is_err()) {
201 if (dist(gen) < 10) {
202 std::this_thread::sleep_for(std::chrono::microseconds(dist(gen)));
209 for (
int i = 0; i < 8; ++i) {
210 threads.emplace_back([&queue, &
running, &jobs_processed]() {
216 jobs_processed.fetch_add(1);
222 std::this_thread::sleep_for(2s);
224 for (
auto& t : threads) t.join();
228 <<
", Jobs processed: " << jobs_processed.load() << std::endl;
235 std::cout <<
"\n[Example 3] Different Queue Policies" << std::endl;
242 std::vector<std::unique_ptr<job>> jobs;
243 for (
int i = 0; i < 100; ++i) {
244 jobs.push_back(std::make_unique<callback_job>(
245 [i]() -> kcenon::common::VoidResult {
247 return kcenon::common::ok();
252 int enqueue_count = 0;
253 for (
auto&
job : jobs) {
259 std::cout <<
"Enqueued " << enqueue_count <<
" jobs" << std::endl;
266 int success_count = 0;
268 while (!mutex_queue.
empty()) {
272 if (work_result.is_ok()) {
276 std::cerr <<
"Job failed: " << work_result.error().message << std::endl;
280 std::cout <<
"Processed " << success_count <<
" jobs successfully, "
281 << fail_count <<
" failed" << std::endl;
287 std::cout <<
"\n[Example 4] Performance Monitoring" << std::endl;
291 const int num_operations = 50000;
292 std::atomic<bool>
running{
true};
293 std::atomic<int> enqueued{0};
294 std::atomic<int> dequeued{0};
297 std::thread producer([&queue, &enqueued, num_operations]() {
298 for (
int i = 0; i < num_operations; ++i) {
299 auto job = std::make_unique<callback_job>(
300 []() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
302 while (queue.
enqueue(std::move(
job)).is_err()) {
303 std::this_thread::yield();
304 job = std::make_unique<callback_job>(
305 []() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
307 enqueued.fetch_add(1);
312 std::thread consumer([&queue, &dequeued, num_operations]() {
313 while (dequeued.load() < num_operations) {
318 dequeued.fetch_add(1);
324 std::thread monitor([&queue, &
running, &enqueued, &dequeued, num_operations]() {
325 auto start = std::chrono::steady_clock::now();
327 while (dequeued.load() < num_operations) {
328 std::this_thread::sleep_for(500ms);
330 auto now = std::chrono::steady_clock::now();
331 auto elapsed = std::chrono::duration<double>(now - start).count();
334 double rate = (elapsed > 0) ? (dequeued.load() / elapsed) : 0;
336 std::cout <<
"Status: " <<
mode_to_string(current_mode) <<
" mode, Enqueued: "
337 << enqueued.load() <<
", Dequeued: " << dequeued.load()
338 <<
", Rate: " << std::fixed << std::setprecision(0)
339 << rate <<
" ops/sec" << std::endl;
350 std::cout <<
"Completed " << num_operations <<
" operations" << std::endl;
351 std::cout <<
"Statistics: mode_switches=" << stats.mode_switches
352 <<
", enqueues=" << stats.enqueue_count
353 <<
", dequeues=" << stats.dequeue_count << std::endl;
359 std::cout <<
"\n[Example 5] Web Server Simulation" << std::endl;
362 std::atomic<bool> server_running{
true};
363 std::atomic<int> requests_handled{0};
364 std::atomic<int> requests_failed{0};
367 enum class request_type { GET, POST, PUT, DELETE };
370 std::vector<std::thread> clients;
371 for (
int client_id = 0; client_id < 5; ++client_id) {
372 clients.emplace_back([&request_queue, &server_running, &requests_failed, client_id]() {
373 std::random_device rd;
374 std::mt19937 gen(rd());
375 std::uniform_int_distribution<> type_dist(0, 3);
376 std::uniform_int_distribution<> delay_dist(10, 100);
378 while (server_running) {
379 auto type =
static_cast<request_type
>(type_dist(gen));
381 auto request = std::make_unique<callback_job>(
382 [type]() -> kcenon::common::VoidResult {
384 std::this_thread::sleep_for(std::chrono::microseconds(
385 type == request_type::GET ? 10 : 50));
386 return kcenon::common::ok();
389 auto r = request_queue.
enqueue(std::move(request));
390 if (r.is_err()) requests_failed.fetch_add(1);
392 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
398 std::vector<std::thread> workers;
399 for (
int worker_id = 0; worker_id < 3; ++worker_id) {
400 workers.emplace_back([&request_queue, &server_running, &requests_handled, worker_id]() {
401 while (server_running) {
402 auto request = request_queue.
dequeue();
403 if (request.is_ok()) {
407 requests_handled.fetch_add(1);
409 std::cerr <<
"Worker " << worker_id <<
" request failed: "
410 <<
result.error().message << std::endl;
413 std::this_thread::sleep_for(1ms);
420 std::this_thread::sleep_for(5s);
421 server_running =
false;
424 for (
auto& t : clients) t.join();
425 for (
auto& t : workers) t.join();
427 std::cout <<
"Server simulation complete: " << requests_handled.load()
428 <<
" requests handled, " << requests_failed.load() <<
" failed" << std::endl;
432 std::cout <<
"Final stats: mode_switches=" << stats.mode_switches
433 <<
", time_in_mutex=" << stats.time_in_mutex_ms <<
"ms"
434 <<
", time_in_lockfree=" << stats.time_in_lockfree_ms <<
"ms" << std::endl;