37 write_information(
"[Example 1] Basic Typed Job Queue (Lock-free MPMC)");
39 typed_job_queue_t<job_types> queue;
40 std::atomic<int> high_jobs{0};
41 std::atomic<int> normal_jobs{0};
42 std::atomic<int> low_jobs{0};
45 std::thread producer([&queue, &high_jobs, &normal_jobs, &low_jobs]() {
46 for (
int i = 0; i < 30; ++i) {
48 std::atomic<int>* counter;
51 type = job_types::RealTime;
53 }
else if (i % 3 == 1) {
54 type = job_types::Batch;
55 counter = &normal_jobs;
57 type = job_types::Background;
63 [counter, i, type]() -> kcenon::common::VoidResult {
64 counter->fetch_add(1);
65 std::string type_str = type == job_types::RealTime ?
"RealTime" :
66 (type == job_types::Batch ?
"Batch" :
"Background");
67 write_information(
"{} priority job {} completed", type_str, i);
68 return kcenon::common::ok();
76 "Failed to enqueue job {}: {}", i,
result.error().message);
79 std::this_thread::sleep_for(10ms);
81 write_information(
"Producer finished");
85 std::thread consumer([&queue]() {
86 int total_consumed = 0;
88 while (total_consumed < 30) {
90 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
92 if (
job.has_value()) {
98 write_error(
"Job failed: {}",
result.error().message);
101 std::this_thread::sleep_for(5ms);
104 write_information(
"Consumer finished");
111 "Jobs processed - RealTime: {}, Batch: {}, Background: {}",
112 high_jobs.load(), normal_jobs.load(), low_jobs.load());
118 write_information(
"\n[Example 2] MPMC Typed Queue Processing");
120 typed_job_queue_t<job_types> queue;
121 const int num_producers = 4;
122 const int num_consumers = 3;
123 const int jobs_per_producer = 25;
125 std::atomic<int> total_produced{0};
126 std::atomic<int> total_consumed{0};
127 std::map<job_types, std::atomic<int>> type_counts;
128 type_counts[job_types::RealTime].store(0);
129 type_counts[job_types::Batch].store(0);
130 type_counts[job_types::Background].store(0);
132 std::vector<std::thread> producers;
133 std::vector<std::thread> consumers;
136 for (
int p = 0; p < num_producers; ++p) {
137 producers.emplace_back([&queue, &total_produced, &type_counts, p, jobs_per_producer]() {
138 std::random_device rd;
139 std::mt19937 gen(rd());
140 std::uniform_int_distribution<> type_dist(0, 2);
141 std::uniform_int_distribution<> delay_dist(1, 10);
143 for (
int i = 0; i < jobs_per_producer; ++i) {
146 auto job = std::make_unique<callback_typed_job>(
147 [p, i, type]() -> kcenon::common::VoidResult {
149 std::this_thread::sleep_for(std::chrono::microseconds(
150 type == job_types::RealTime ? 10 :
151 (type == job_types::Batch ? 50 : 100)));
152 write_information(
"Producer {} job {} (type: {})", p, i,
static_cast<int>(type));
153 return kcenon::common::ok();
160 auto result = queue.enqueue(std::move(
job));
162 total_produced.fetch_add(1);
163 type_counts[type].fetch_add(1);
166 std::this_thread::yield();
168 job = std::make_unique<callback_typed_job>(
169 [p, i, type]() -> kcenon::common::VoidResult {
171 std::this_thread::sleep_for(std::chrono::microseconds(
172 type == job_types::RealTime ? 10 :
173 (type == job_types::Batch ? 50 : 100)));
174 write_information(
"Producer {} job {} (type: {})", p, i,
static_cast<int>(type));
175 return kcenon::common::ok();
181 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
185 "Producer {} finished", p);
190 for (
int c = 0; c < num_consumers; ++c) {
191 consumers.emplace_back([&queue, &total_consumed, c, num_producers, jobs_per_producer]() {
192 const int total_jobs = num_producers * jobs_per_producer;
193 std::vector<job_types> preference;
197 preference = {job_types::RealTime, job_types::Batch, job_types::Background};
199 preference = {job_types::Batch, job_types::RealTime, job_types::Background};
201 preference = {job_types::Background, job_types::Batch, job_types::RealTime};
204 while (total_consumed.load() < total_jobs) {
205 auto job = queue.dequeue(preference);
207 if (
job.has_value()) {
211 total_consumed.fetch_add(1);
213 write_error(
"Consumer {} job failed: {}",
214 c,
result.error().message);
217 std::this_thread::sleep_for(1ms);
222 "Consumer {} finished", c);
227 for (
auto& t : producers) t.join();
228 for (
auto& t : consumers) t.join();
231 "Total jobs - Produced: {}, Consumed: {}",
232 total_produced.load(), total_consumed.load());
234 "By type - High: {}, Normal: {}, Low: {}",
235 type_counts[job_types::RealTime].load(),
236 type_counts[job_types::Batch].load(),
237 type_counts[job_types::Background].load());
243 write_information(
"\n[Example 3] Performance Comparison");
245 const int num_jobs = 50000;
246 const int num_workers = 4;
250 typed_job_queue_t<job_types> queue;
253 auto start = std::chrono::high_resolution_clock::now();
256 for (
int i = 0; i < num_jobs; ++i) {
258 auto job = std::make_unique<callback_typed_job>(
259 [&
completed]() -> kcenon::common::VoidResult {
261 return kcenon::common::ok();
266 auto enqueue_result = queue.enqueue(std::move(
job));
267 while (enqueue_result.is_err()) {
268 std::this_thread::yield();
270 job = std::make_unique<callback_typed_job>(
271 [&
completed]() -> kcenon::common::VoidResult {
273 return kcenon::common::ok();
277 enqueue_result = queue.enqueue(std::move(
job));
282 std::vector<std::thread> workers;
283 for (
int w = 0; w < num_workers; ++w) {
284 workers.emplace_back([&queue, &
completed, num_jobs]() {
286 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
287 if (
job.has_value()) {
291 std::this_thread::yield();
297 for (
auto& t : workers) t.join();
299 auto duration = std::chrono::high_resolution_clock::now() - start;
300 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
303 "Typed queue (lock-free): {} jobs in {} ms = {} ops/sec",
304 num_jobs, ms, num_jobs * 1000.0 / ms);
311 write_information(
"\n[Example 4] Task Scheduling System");
313 typed_job_queue_t<job_types> task_queue;
320 std::atomic<int>
failed{0};
321 std::atomic<int64_t> total_latency_us{0};
324 std::map<job_types, TaskStats> stats;
328 std::random_device rd;
329 std::mt19937 gen(rd());
330 std::uniform_int_distribution<> type_dist(0, 2);
331 std::uniform_int_distribution<> delay_dist(10, 100);
335 auto creation_time = std::chrono::high_resolution_clock::now();
337 auto task = std::make_unique<callback_typed_job>(
338 [type, creation_time, &stats]() -> kcenon::common::VoidResult {
339 auto start_time = std::chrono::high_resolution_clock::now();
340 auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
341 start_time - creation_time).count();
343 stats[type].total_latency_us.fetch_add(
latency);
346 std::this_thread::sleep_for(std::chrono::microseconds(
347 type == job_types::RealTime ? 50 :
348 (type == job_types::Batch ? 200 : 500)));
350 stats[type].completed.fetch_add(1);
352 write_information(
"Task completed - Type: {}, Latency: {} Ξs",
353 static_cast<int>(type),
latency);
354 return kcenon::common::ok();
359 auto enqueue_result = task_queue.enqueue(std::move(task));
360 if (!enqueue_result.is_err()) {
361 stats[type].created.fetch_add(1);
363 stats[type].failed.fetch_add(1);
366 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
371 std::vector<std::thread> workers;
376 auto task = task_queue.dequeue({job_types::RealTime});
377 if (task.has_value()) {
382 write_error(
"High priority task failed: {}",
386 std::this_thread::sleep_for(1ms);
392 for (
int i = 0; i < 2; ++i) {
395 auto task = task_queue.dequeue({job_types::Batch, job_types::Background, job_types::RealTime});
396 if (task.has_value()) {
401 write_error(
"General worker {} task failed: {}",
402 i,
result.error().message);
405 std::this_thread::sleep_for(2ms);
412 std::this_thread::sleep_for(5s);
416 for (
auto& t : workers) t.join();
419 write_information(
"Task Scheduling Statistics:");
420 for (
const auto& [type, stat] : stats) {
421 std::string type_name = type == job_types::RealTime ?
"RealTime" :
422 (type == job_types::Batch ?
"Batch" :
"Background");
426 static_cast<double>(stat.total_latency_us.load()) /
completed : 0.0;
429 " {} - Created: {}, Completed: {}, Failed: {}, Avg Latency: {:.1f} Ξs",
430 type_name, stat.created.load(),
completed,
431 stat.failed.load(), avg_latency);
438 write_information(
"\n[Example 5] Stress Test - High Contention");
440 typed_job_queue_t<job_types> queue;
441 const int num_threads = 16;
442 const int ops_per_thread = 10000;
443 std::atomic<int> total_ops{0};
445 auto start = std::chrono::high_resolution_clock::now();
447 std::vector<std::thread> threads;
450 for (
int t = 0; t < num_threads; ++t) {
451 if (t < num_threads / 2) {
453 threads.emplace_back([&queue, &total_ops, t, ops_per_thread]() {
454 for (
int i = 0; i < ops_per_thread; ++i) {
456 auto job = std::make_unique<callback_typed_job>(
457 [&total_ops]() -> kcenon::common::VoidResult {
458 total_ops.fetch_add(1);
459 return kcenon::common::ok();
464 auto enqueue_result = queue.enqueue(std::move(
job));
465 while (enqueue_result.is_err()) {
466 std::this_thread::yield();
468 job = std::make_unique<callback_typed_job>(
469 [&total_ops]() -> kcenon::common::VoidResult {
470 total_ops.fetch_add(1);
471 return kcenon::common::ok();
475 enqueue_result = queue.enqueue(std::move(
job));
481 threads.emplace_back([&queue, ops_per_thread]() {
483 while (consumed < ops_per_thread) {
484 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
485 if (
job.has_value()) {
490 std::this_thread::yield();
497 for (
auto& t : threads) t.join();
499 auto duration = std::chrono::high_resolution_clock::now() - start;
500 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
503 "Stress test completed: {} operations in {} ms = {} ops/sec",
504 total_ops.load(), ms, total_ops.load() * 1000.0 / ms);
std::atomic< bool > system_running