34 write_information(
"[Example 1] Basic SPSC Pattern");
36 lockfree_job_queue queue;
37 std::atomic<int> counter{0};
40 std::thread producer([&queue, &counter]() {
41 for (
int i = 0; i < 10; ++i) {
42 auto job = std::make_unique<callback_job>([&counter, i]() -> kcenon::common::VoidResult {
44 write_information(
"Processed job {}", i);
45 return kcenon::common::ok();
48 auto result = queue.enqueue(std::move(
job));
51 "Failed to enqueue job {}: {}", i,
result.error().message);
54 std::this_thread::sleep_for(10ms);
56 write_information(
"Producer finished");
60 std::thread consumer([&queue, &counter]() {
62 while (consumed < 10) {
63 auto result = queue.dequeue();
67 if (work_result.is_ok()) {
71 write_error(
"Job failed: {}", work_result.error().message);
74 std::this_thread::sleep_for(5ms);
77 write_information(
"Consumer finished");
84 "Total jobs processed: {}", counter.load());
90 write_information(
"\n[Example 2] MPMC Pattern");
92 lockfree_job_queue queue;
93 std::atomic<int> produced{0};
94 std::atomic<int> consumed{0};
95 const int num_producers = 3;
96 const int num_consumers = 2;
97 const int jobs_per_producer = 20;
99 std::vector<std::thread> producers;
100 std::vector<std::thread> consumers;
103 for (
int p = 0; p < num_producers; ++p) {
104 producers.emplace_back([&queue, &produced, p, jobs_per_producer]() {
105 std::random_device rd;
106 std::mt19937 gen(rd());
107 std::uniform_int_distribution<> delay_dist(1, 10);
109 for (
int i = 0; i < jobs_per_producer; ++i) {
110 auto job = std::make_unique<callback_job>(
111 [p, i]() -> kcenon::common::VoidResult {
112 write_information(
"Job from producer {} #{}", p, i);
113 return kcenon::common::ok();
118 auto result = queue.enqueue(std::move(
job));
120 produced.fetch_add(1);
123 std::this_thread::yield();
126 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
130 "Producer {} finished", p);
135 for (
int c = 0; c < num_consumers; ++c) {
136 consumers.emplace_back([&queue, &consumed, c, num_producers, jobs_per_producer]() {
137 const int total_jobs = num_producers * jobs_per_producer;
139 while (consumed.load() < total_jobs) {
140 auto result = queue.dequeue();
144 if (work_result.is_ok()) {
146 consumed.fetch_add(1);
148 write_error(
"Consumer {} job failed: {}", c, work_result.error().message);
151 std::this_thread::sleep_for(1ms);
156 "Consumer {} finished", c);
161 for (
auto& t : producers) t.join();
162 for (
auto& t : consumers) t.join();
165 "Total produced: {}, consumed: {}",
166 produced.load(), consumed.load());
172 write_information(
"\n[Example 3] Batch Operations");
174 lockfree_job_queue queue;
175 std::atomic<int> processed{0};
178 std::vector<std::unique_ptr<job>> batch;
179 for (
int i = 0; i < 50; ++i) {
180 batch.push_back(std::make_unique<callback_job>(
181 [&processed, i]() -> kcenon::common::VoidResult {
182 processed.fetch_add(1);
183 write_information(
"Batch job {}", i);
184 return kcenon::common::ok();
189 "Enqueueing {} jobs in batch", batch.size());
191 auto enqueue_result = queue.enqueue_batch(std::move(batch));
192 if (enqueue_result.is_err()) {
194 "Batch enqueue failed: {}", enqueue_result.error().message);
199 auto dequeued = queue.dequeue_batch();
201 "Dequeued {} jobs in batch", dequeued.size());
204 for (
auto&
job : dequeued) {
209 write_error(
"Batch job failed: {}",
result.error().message);
214 "Total processed: {}", processed.load());
220 write_information(
"\n[Example 4] Performance Measurement");
222 lockfree_job_queue queue;
223 const int num_operations = 100000;
226 auto start = std::chrono::high_resolution_clock::now();
228 for (
int i = 0; i < num_operations; ++i) {
229 auto job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult {
230 return kcenon::common::ok();
234 auto r = queue.enqueue(std::move(
job));
235 if (r.is_ok())
break;
236 std::this_thread::yield();
237 job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
241 auto enqueue_time = std::chrono::high_resolution_clock::now() - start;
244 start = std::chrono::high_resolution_clock::now();
246 for (
int i = 0; i < num_operations; ++i) {
248 auto result = queue.dequeue();
252 std::this_thread::yield();
256 auto dequeue_time = std::chrono::high_resolution_clock::now() - start;
259 auto stats = queue.get_statistics();
262 "Enqueue performance: {} ops in {} ms = {} ops/sec",
264 std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count(),
265 num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count());
268 "Dequeue performance: {} ops in {} ms = {} ops/sec",
270 std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count(),
271 num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count());
274 "Queue statistics:\n"
278 " Average enqueue latency: {} ns\n"
279 " Average dequeue latency: {} ns",
283 stats.get_average_enqueue_latency_ns(),
284 stats.get_average_dequeue_latency_ns());