Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
typed_job_queue_sample.cpp File Reference

Lock-free typed job queue with MPMC patterns and performance benchmarks. More...

#include "typed_thread_pool/scheduling/typed_job_queue.h"
#include "typed_thread_pool/jobs/typed_job.h"
#include "typed_thread_pool/jobs/callback_typed_job.h"
#include "logger/core/logger.h"
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>
#include <random>
#include <map>
Include dependency graph for typed_job_queue_sample.cpp:

Go to the source code of this file.

Functions

void basic_typed_queue_example ()
 
void mpmc_typed_queue_example ()
 
void performance_comparison_example ()
 
void task_scheduling_example ()
 
void stress_test_example ()
 
int main ()
 

Detailed Description

Lock-free typed job queue with MPMC patterns and performance benchmarks.

Definition in file typed_job_queue_sample.cpp.

Function Documentation

◆ basic_typed_queue_example()

void basic_typed_queue_example ( )
Examples
typed_job_queue_sample.cpp.

Definition at line 35 of file typed_job_queue_sample.cpp.

36{
37 write_information("[Example 1] Basic Typed Job Queue (Lock-free MPMC)");
38
39 typed_job_queue_t<job_types> queue;
40 std::atomic<int> high_jobs{0};
41 std::atomic<int> normal_jobs{0};
42 std::atomic<int> low_jobs{0};
43
44 // Producer thread - creates jobs of different types
45 std::thread producer([&queue, &high_jobs, &normal_jobs, &low_jobs]() {
46 for (int i = 0; i < 30; ++i) {
47 job_types type;
48 std::atomic<int>* counter;
49
50 if (i % 3 == 0) {
51 type = job_types::RealTime;
52 counter = &high_jobs;
53 } else if (i % 3 == 1) {
54 type = job_types::Batch;
55 counter = &normal_jobs;
56 } else {
57 type = job_types::Background;
58 counter = &low_jobs;
59 }
60
61 // Create callback_typed_job directly with lambda and type
62 auto typed_job_ptr = std::make_unique<callback_typed_job>(
63 [counter, i, type]() -> kcenon::common::VoidResult {
64 counter->fetch_add(1);
65 std::string type_str = type == job_types::RealTime ? "RealTime" :
66 (type == job_types::Batch ? "Batch" : "Background");
67 write_information("{} priority job {} completed", type_str, i);
68 return kcenon::common::ok(); // Success
69 },
70 type
71 );
72
73 auto result = queue.enqueue(std::move(typed_job_ptr));
74 if (result.is_err()) {
75 write_error(
76 "Failed to enqueue job {}: {}", i, result.error().message);
77 }
78
79 std::this_thread::sleep_for(10ms);
80 }
81 write_information("Producer finished");
82 });
83
84 // Consumer thread - processes jobs respecting type order
85 std::thread consumer([&queue]() {
86 int total_consumed = 0;
87
88 while (total_consumed < 30) {
89 // Try to get high priority first
90 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
91
92 if (job.has_value()) {
93 auto result = job.value()->do_work();
94 if (!result.is_err()) {
95 // Job executed successfully
96 total_consumed++;
97 } else {
98 write_error("Job failed: {}", result.error().message);
99 }
100 } else {
101 std::this_thread::sleep_for(5ms);
102 }
103 }
104 write_information("Consumer finished");
105 });
106
107 producer.join();
108 consumer.join();
109
110 write_information(
111 "Jobs processed - RealTime: {}, Batch: {}, Background: {}",
112 high_jobs.load(), normal_jobs.load(), low_jobs.load());
113}
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
virtual auto do_work(void) -> common::VoidResult
The core task execution method to be overridden by derived classes.
Definition job.cpp:135
A template class representing either a value or an error.
job_types
Defines different types of jobs for a typed thread pool.
Definition job_types.h:33
std::shared_ptr< typed_job_interface< job_type > > typed_job_ptr
Shared pointer type for typed job interfaces.

References kcenon::thread::job::do_work().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

int main ( )

Definition at line 507 of file typed_job_queue_sample.cpp.

508{
509 log_module::start();
510 log_module::console_target(log_types::Debug);
511
512 write_information(
513 "Typed Job Queue Sample (Lock-free MPMC)\n"
514 "=======================================");
515
516 try {
518 // mpmc_typed_queue_example();
519 // performance_comparison_example();
520 // task_scheduling_example();
521 // stress_test_example();
522 } catch (const std::exception& e) {
523 write_error(
524 "Exception: {}", e.what());
525 }
526
527 write_information("\nAll examples completed!");
528
529 log_module::stop();
530 return 0;
531}
void basic_typed_queue_example()

References basic_typed_queue_example().

Here is the call graph for this function:

◆ mpmc_typed_queue_example()

void mpmc_typed_queue_example ( )
Examples
typed_job_queue_sample.cpp.

Definition at line 116 of file typed_job_queue_sample.cpp.

117{
118 write_information("\n[Example 2] MPMC Typed Queue Processing");
119
120 typed_job_queue_t<job_types> queue;
121 const int num_producers = 4;
122 const int num_consumers = 3;
123 const int jobs_per_producer = 25;
124
125 std::atomic<int> total_produced{0};
126 std::atomic<int> total_consumed{0};
127 std::map<job_types, std::atomic<int>> type_counts;
128 type_counts[job_types::RealTime].store(0);
129 type_counts[job_types::Batch].store(0);
130 type_counts[job_types::Background].store(0);
131
132 std::vector<std::thread> producers;
133 std::vector<std::thread> consumers;
134
135 // Start multiple producers
136 for (int p = 0; p < num_producers; ++p) {
137 producers.emplace_back([&queue, &total_produced, &type_counts, p, jobs_per_producer]() {
138 std::random_device rd;
139 std::mt19937 gen(rd());
140 std::uniform_int_distribution<> type_dist(0, 2);
141 std::uniform_int_distribution<> delay_dist(1, 10);
142
143 for (int i = 0; i < jobs_per_producer; ++i) {
144 job_types type = static_cast<job_types>(type_dist(gen));
145
146 auto job = std::make_unique<callback_typed_job>(
147 [p, i, type]() -> kcenon::common::VoidResult {
148 // Simulate work based on priority
149 std::this_thread::sleep_for(std::chrono::microseconds(
150 type == job_types::RealTime ? 10 :
151 (type == job_types::Batch ? 50 : 100)));
152 write_information("Producer {} job {} (type: {})", p, i, static_cast<int>(type));
153 return kcenon::common::ok();
154 },
155 type
156 );
157
158 // Retry on failure with lock-free queue
159 while (true) {
160 auto result = queue.enqueue(std::move(job));
161 if (!result.is_err()) {
162 total_produced.fetch_add(1);
163 type_counts[type].fetch_add(1);
164 break;
165 }
166 std::this_thread::yield();
167 // Re-create job since it was moved
168 job = std::make_unique<callback_typed_job>(
169 [p, i, type]() -> kcenon::common::VoidResult {
170 // Simulate work based on priority
171 std::this_thread::sleep_for(std::chrono::microseconds(
172 type == job_types::RealTime ? 10 :
173 (type == job_types::Batch ? 50 : 100)));
174 write_information("Producer {} job {} (type: {})", p, i, static_cast<int>(type));
175 return kcenon::common::ok();
176 },
177 type
178 );
179 }
180
181 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
182 }
183
184 write_information(
185 "Producer {} finished", p);
186 });
187 }
188
189 // Start multiple consumers with different type preferences
190 for (int c = 0; c < num_consumers; ++c) {
191 consumers.emplace_back([&queue, &total_consumed, c, num_producers, jobs_per_producer]() {
192 const int total_jobs = num_producers * jobs_per_producer;
193 std::vector<job_types> preference;
194
195 // Different consumers have different type preferences
196 if (c == 0) {
197 preference = {job_types::RealTime, job_types::Batch, job_types::Background};
198 } else if (c == 1) {
199 preference = {job_types::Batch, job_types::RealTime, job_types::Background};
200 } else {
201 preference = {job_types::Background, job_types::Batch, job_types::RealTime};
202 }
203
204 while (total_consumed.load() < total_jobs) {
205 auto job = queue.dequeue(preference);
206
207 if (job.has_value()) {
208 auto result = job.value()->do_work();
209 if (result.is_ok()) {
210 // Job executed successfully
211 total_consumed.fetch_add(1);
212 } else {
213 write_error("Consumer {} job failed: {}",
214 c, result.error().message);
215 }
216 } else {
217 std::this_thread::sleep_for(1ms);
218 }
219 }
220
221 write_information(
222 "Consumer {} finished", c);
223 });
224 }
225
226 // Wait for all threads
227 for (auto& t : producers) t.join();
228 for (auto& t : consumers) t.join();
229
230 write_information(
231 "Total jobs - Produced: {}, Consumed: {}",
232 total_produced.load(), total_consumed.load());
233 write_information(
234 "By type - High: {}, Normal: {}, Low: {}",
235 type_counts[job_types::RealTime].load(),
236 type_counts[job_types::Batch].load(),
237 type_counts[job_types::Background].load());
238}
bool is_ok() const noexcept
Checks if the result is successful.

References kcenon::thread::job::do_work(), and kcenon::thread::result< T >::is_ok().

Here is the call graph for this function:

◆ performance_comparison_example()

void performance_comparison_example ( )
Examples
typed_job_queue_sample.cpp.

Definition at line 241 of file typed_job_queue_sample.cpp.

242{
243 write_information("\n[Example 3] Performance Comparison");
244
245 const int num_jobs = 50000;
246 const int num_workers = 4;
247
248 // Test typed queue with lock-free MPMC
249 {
250 typed_job_queue_t<job_types> queue;
251 std::atomic<int> completed{0};
252
253 auto start = std::chrono::high_resolution_clock::now();
254
255 // Enqueue all jobs
256 for (int i = 0; i < num_jobs; ++i) {
257 job_types type = static_cast<job_types>(i % 3);
258 auto job = std::make_unique<callback_typed_job>(
259 [&completed]() -> kcenon::common::VoidResult {
260 completed.fetch_add(1);
261 return kcenon::common::ok();
262 },
263 type
264 );
265
266 auto enqueue_result = queue.enqueue(std::move(job));
267 while (enqueue_result.is_err()) {
268 std::this_thread::yield();
269 // Re-create job since it was moved
270 job = std::make_unique<callback_typed_job>(
271 [&completed]() -> kcenon::common::VoidResult {
272 completed.fetch_add(1);
273 return kcenon::common::ok();
274 },
275 type
276 );
277 enqueue_result = queue.enqueue(std::move(job));
278 }
279 }
280
281 // Process with multiple workers
282 std::vector<std::thread> workers;
283 for (int w = 0; w < num_workers; ++w) {
284 workers.emplace_back([&queue, &completed, num_jobs]() {
285 while (completed.load() < num_jobs) {
286 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
287 if (job.has_value()) {
288 auto work_result = job.value()->do_work();
289 (void)work_result; // Ignore result for sample
290 } else {
291 std::this_thread::yield();
292 }
293 }
294 });
295 }
296
297 for (auto& t : workers) t.join();
298
299 auto duration = std::chrono::high_resolution_clock::now() - start;
300 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
301
302 write_information(
303 "Typed queue (lock-free): {} jobs in {} ms = {} ops/sec",
304 num_jobs, ms, num_jobs * 1000.0 / ms);
305 }
306}
@ completed
Successfully completed.

References kcenon::thread::completed, and kcenon::thread::job::do_work().

Here is the call graph for this function:

◆ stress_test_example()

void stress_test_example ( )
Examples
typed_job_queue_sample.cpp.

Definition at line 436 of file typed_job_queue_sample.cpp.

437{
438 write_information("\n[Example 5] Stress Test - High Contention");
439
440 typed_job_queue_t<job_types> queue;
441 const int num_threads = 16;
442 const int ops_per_thread = 10000;
443 std::atomic<int> total_ops{0};
444
445 auto start = std::chrono::high_resolution_clock::now();
446
447 std::vector<std::thread> threads;
448
449 // Half producers, half consumers
450 for (int t = 0; t < num_threads; ++t) {
451 if (t < num_threads / 2) {
452 // Producer
453 threads.emplace_back([&queue, &total_ops, t, ops_per_thread]() {
454 for (int i = 0; i < ops_per_thread; ++i) {
455 job_types type = static_cast<job_types>((t + i) % 3);
456 auto job = std::make_unique<callback_typed_job>(
457 [&total_ops]() -> kcenon::common::VoidResult {
458 total_ops.fetch_add(1);
459 return kcenon::common::ok();
460 },
461 type
462 );
463
464 auto enqueue_result = queue.enqueue(std::move(job));
465 while (enqueue_result.is_err()) {
466 std::this_thread::yield();
467 // Re-create job since it was moved
468 job = std::make_unique<callback_typed_job>(
469 [&total_ops]() -> kcenon::common::VoidResult {
470 total_ops.fetch_add(1);
471 return kcenon::common::ok();
472 },
473 type
474 );
475 enqueue_result = queue.enqueue(std::move(job));
476 }
477 }
478 });
479 } else {
480 // Consumer
481 threads.emplace_back([&queue, ops_per_thread]() {
482 int consumed = 0;
483 while (consumed < ops_per_thread) {
484 auto job = queue.dequeue({job_types::RealTime, job_types::Batch, job_types::Background});
485 if (job.has_value()) {
486 auto work_result = job.value()->do_work();
487 (void)work_result; // Ignore result for sample
488 consumed++;
489 } else {
490 std::this_thread::yield();
491 }
492 }
493 });
494 }
495 }
496
497 for (auto& t : threads) t.join();
498
499 auto duration = std::chrono::high_resolution_clock::now() - start;
500 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
501
502 write_information(
503 "Stress test completed: {} operations in {} ms = {} ops/sec",
504 total_ops.load(), ms, total_ops.load() * 1000.0 / ms);
505}

References kcenon::thread::job::do_work().

Here is the call graph for this function:

◆ task_scheduling_example()

void task_scheduling_example ( )
Examples
typed_job_queue_sample.cpp.

Definition at line 309 of file typed_job_queue_sample.cpp.

310{
311 write_information("\n[Example 4] Task Scheduling System");
312
313 typed_job_queue_t<job_types> task_queue;
314 std::atomic<bool> system_running{true};
315
316 // Task statistics
317 struct TaskStats {
318 std::atomic<int> created{0};
319 std::atomic<int> completed{0};
320 std::atomic<int> failed{0};
321 std::atomic<int64_t> total_latency_us{0};
322 };
323
324 std::map<job_types, TaskStats> stats;
325
326 // Task generator thread
327 std::thread generator([&task_queue, &system_running, &stats]() {
328 std::random_device rd;
329 std::mt19937 gen(rd());
330 std::uniform_int_distribution<> type_dist(0, 2);
331 std::uniform_int_distribution<> delay_dist(10, 100);
332
333 while (system_running) {
334 job_types type = static_cast<job_types>(type_dist(gen));
335 auto creation_time = std::chrono::high_resolution_clock::now();
336
337 auto task = std::make_unique<callback_typed_job>(
338 [type, creation_time, &stats]() -> kcenon::common::VoidResult {
339 auto start_time = std::chrono::high_resolution_clock::now();
340 auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
341 start_time - creation_time).count();
342
343 stats[type].total_latency_us.fetch_add(latency);
344
345 // Simulate task execution
346 std::this_thread::sleep_for(std::chrono::microseconds(
347 type == job_types::RealTime ? 50 :
348 (type == job_types::Batch ? 200 : 500)));
349
350 stats[type].completed.fetch_add(1);
351
352 write_information("Task completed - Type: {}, Latency: {} μs",
353 static_cast<int>(type), latency);
354 return kcenon::common::ok(); // Success
355 },
356 type
357 );
358
359 auto enqueue_result = task_queue.enqueue(std::move(task));
360 if (!enqueue_result.is_err()) {
361 stats[type].created.fetch_add(1);
362 } else {
363 stats[type].failed.fetch_add(1);
364 }
365
366 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
367 }
368 });
369
370 // Worker threads with type specialization
371 std::vector<std::thread> workers;
372
373 // High priority specialist
374 workers.emplace_back([&task_queue, &system_running]() {
375 while (system_running) {
376 auto task = task_queue.dequeue({job_types::RealTime});
377 if (task.has_value()) {
378 auto result = task.value()->do_work();
379 if (result.is_ok()) {
380 // Task executed successfully
381 } else {
382 write_error("High priority task failed: {}",
383 result.error().message);
384 }
385 } else {
386 std::this_thread::sleep_for(1ms);
387 }
388 }
389 });
390
391 // General workers
392 for (int i = 0; i < 2; ++i) {
393 workers.emplace_back([&task_queue, &system_running, i]() {
394 while (system_running) {
395 auto task = task_queue.dequeue({job_types::Batch, job_types::Background, job_types::RealTime});
396 if (task.has_value()) {
397 auto result = task.value()->do_work();
398 if (result.is_ok()) {
399 // Task executed successfully
400 } else {
401 write_error("General worker {} task failed: {}",
402 i, result.error().message);
403 }
404 } else {
405 std::this_thread::sleep_for(2ms);
406 }
407 }
408 });
409 }
410
411 // Run for 5 seconds
412 std::this_thread::sleep_for(5s);
413 system_running = false;
414
415 generator.join();
416 for (auto& t : workers) t.join();
417
418 // Print statistics
419 write_information("Task Scheduling Statistics:");
420 for (const auto& [type, stat] : stats) {
421 std::string type_name = type == job_types::RealTime ? "RealTime" :
422 (type == job_types::Batch ? "Batch" : "Background");
423
424 int completed = stat.completed.load();
425 double avg_latency = completed > 0 ?
426 static_cast<double>(stat.total_latency_us.load()) / completed : 0.0;
427
428 write_information(
429 " {} - Created: {}, Completed: {}, Failed: {}, Avg Latency: {:.1f} μs",
430 type_name, stat.created.load(), completed,
431 stat.failed.load(), avg_latency);
432 }
433}
T & value() &
Gets the value.
std::atomic< bool > system_running
Definition main.cpp:35
@ created
Worker created but not started.
@ failed
Execution failed.
@ latency
Latency threshold exceeded.

References kcenon::thread::completed, kcenon::thread::created, kcenon::thread::failed, kcenon::thread::result< T >::is_ok(), kcenon::thread::latency, system_running, and kcenon::thread::result< T >::value().

Here is the call graph for this function: