Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
mpmc_queue_sample.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17#include "thread_base/lockfree/queues/lockfree_job_queue.h"
18#include "thread_base/jobs/callback_job.h"
19#include "logger/core/logger.h"
20
21#include <thread>
22#include <vector>
23#include <atomic>
24#include <chrono>
25#include <random>
26
27using namespace kcenon::thread;
28using namespace log_module;
29using namespace std::chrono_literals;
30
31// Example 1: Basic single producer, single consumer
33{
34 write_information("[Example 1] Basic SPSC Pattern");
35
36 lockfree_job_queue queue;
37 std::atomic<int> counter{0};
38
39 // Producer thread
40 std::thread producer([&queue, &counter]() {
41 for (int i = 0; i < 10; ++i) {
42 auto job = std::make_unique<callback_job>([&counter, i]() -> kcenon::common::VoidResult {
43 counter.fetch_add(1);
44 write_information("Processed job {}", i);
45 return kcenon::common::ok(); // Success
46 });
47
48 auto result = queue.enqueue(std::move(job));
49 if (result.is_err()) {
50 write_error(
51 "Failed to enqueue job {}: {}", i, result.error().message);
52 }
53
54 std::this_thread::sleep_for(10ms);
55 }
56 write_information("Producer finished");
57 });
58
59 // Consumer thread
60 std::thread consumer([&queue, &counter]() {
61 int consumed = 0;
62 while (consumed < 10) {
63 auto result = queue.dequeue();
64 if (result.has_value()) {
65 auto& job = result.value();
66 auto work_result = job->do_work();
67 if (work_result.is_ok()) {
68 // Job executed successfully
69 consumed++;
70 } else {
71 write_error("Job failed: {}", work_result.error().message);
72 }
73 } else {
74 std::this_thread::sleep_for(5ms);
75 }
76 }
77 write_information("Consumer finished");
78 });
79
80 producer.join();
81 consumer.join();
82
83 write_information(
84 "Total jobs processed: {}", counter.load());
85}
86
87// Example 2: Multiple producers, multiple consumers
89{
90 write_information("\n[Example 2] MPMC Pattern");
91
92 lockfree_job_queue queue;
93 std::atomic<int> produced{0};
94 std::atomic<int> consumed{0};
95 const int num_producers = 3;
96 const int num_consumers = 2;
97 const int jobs_per_producer = 20;
98
99 std::vector<std::thread> producers;
100 std::vector<std::thread> consumers;
101
102 // Start producers
103 for (int p = 0; p < num_producers; ++p) {
104 producers.emplace_back([&queue, &produced, p, jobs_per_producer]() {
105 std::random_device rd;
106 std::mt19937 gen(rd());
107 std::uniform_int_distribution<> delay_dist(1, 10);
108
109 for (int i = 0; i < jobs_per_producer; ++i) {
110 auto job = std::make_unique<callback_job>(
111 [p, i]() -> kcenon::common::VoidResult {
112 write_information("Job from producer {} #{}", p, i);
113 return kcenon::common::ok();
114 });
115
116 // Retry on failure (high contention scenario)
117 while (true) {
118 auto result = queue.enqueue(std::move(job));
119 if (!result.is_err()) {
120 produced.fetch_add(1);
121 break;
122 }
123 std::this_thread::yield();
124 }
125
126 std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
127 }
128
129 write_information(
130 "Producer {} finished", p);
131 });
132 }
133
134 // Start consumers
135 for (int c = 0; c < num_consumers; ++c) {
136 consumers.emplace_back([&queue, &consumed, c, num_producers, jobs_per_producer]() {
137 const int total_jobs = num_producers * jobs_per_producer;
138
139 while (consumed.load() < total_jobs) {
140 auto result = queue.dequeue();
141 if (result.has_value()) {
142 auto& job = result.value();
143 auto work_result = job->do_work();
144 if (work_result.is_ok()) {
145 // Job executed successfully
146 consumed.fetch_add(1);
147 } else {
148 write_error("Consumer {} job failed: {}", c, work_result.error().message);
149 }
150 } else {
151 std::this_thread::sleep_for(1ms);
152 }
153 }
154
155 write_information(
156 "Consumer {} finished", c);
157 });
158 }
159
160 // Wait for all threads
161 for (auto& t : producers) t.join();
162 for (auto& t : consumers) t.join();
163
164 write_information(
165 "Total produced: {}, consumed: {}",
166 produced.load(), consumed.load());
167}
168
169// Example 3: Batch operations
171{
172 write_information("\n[Example 3] Batch Operations");
173
174 lockfree_job_queue queue;
175 std::atomic<int> processed{0};
176
177 // Batch enqueue
178 std::vector<std::unique_ptr<job>> batch;
179 for (int i = 0; i < 50; ++i) {
180 batch.push_back(std::make_unique<callback_job>(
181 [&processed, i]() -> kcenon::common::VoidResult {
182 processed.fetch_add(1);
183 write_information("Batch job {}", i);
184 return kcenon::common::ok();
185 }));
186 }
187
188 write_information(
189 "Enqueueing {} jobs in batch", batch.size());
190
191 auto enqueue_result = queue.enqueue_batch(std::move(batch));
192 if (enqueue_result.is_err()) {
193 write_error(
194 "Batch enqueue failed: {}", enqueue_result.error().message);
195 return;
196 }
197
198 // Batch dequeue
199 auto dequeued = queue.dequeue_batch();
200 write_information(
201 "Dequeued {} jobs in batch", dequeued.size());
202
203 // Process all dequeued jobs
204 for (auto& job : dequeued) {
205 auto result = job->do_work();
206 if (result.is_ok()) {
207 // Job executed successfully
208 } else {
209 write_error("Batch job failed: {}", result.error().message);
210 }
211 }
212
213 write_information(
214 "Total processed: {}", processed.load());
215}
216
217// Example 4: Performance measurement
219{
220 write_information("\n[Example 4] Performance Measurement");
221
222 lockfree_job_queue queue;
223 const int num_operations = 100000;
224
225 // Measure enqueue performance
226 auto start = std::chrono::high_resolution_clock::now();
227
228 for (int i = 0; i < num_operations; ++i) {
229 auto job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult {
230 return kcenon::common::ok();
231 });
232
233 while (true) {
234 auto r = queue.enqueue(std::move(job));
235 if (r.is_ok()) break;
236 std::this_thread::yield();
237 job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult { return kcenon::common::ok(); });
238 }
239 }
240
241 auto enqueue_time = std::chrono::high_resolution_clock::now() - start;
242
243 // Measure dequeue performance
244 start = std::chrono::high_resolution_clock::now();
245
246 for (int i = 0; i < num_operations; ++i) {
247 while (true) {
248 auto result = queue.dequeue();
249 if (result.has_value()) {
250 break;
251 }
252 std::this_thread::yield();
253 }
254 }
255
256 auto dequeue_time = std::chrono::high_resolution_clock::now() - start;
257
258 // Get statistics
259 auto stats = queue.get_statistics();
260
261 write_information(
262 "Enqueue performance: {} ops in {} ms = {} ops/sec",
263 num_operations,
264 std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count(),
265 num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count());
266
267 write_information(
268 "Dequeue performance: {} ops in {} ms = {} ops/sec",
269 num_operations,
270 std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count(),
271 num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count());
272
273 write_information(
274 "Queue statistics:\n"
275 " Enqueued: {}\n"
276 " Dequeued: {}\n"
277 " Retries: {}\n"
278 " Average enqueue latency: {} ns\n"
279 " Average dequeue latency: {} ns",
280 stats.enqueue_count,
281 stats.dequeue_count,
282 stats.retry_count,
283 stats.get_average_enqueue_latency_ns(),
284 stats.get_average_dequeue_latency_ns());
285}
286
287int main()
288{
289 log_module::start();
290 log_module::console_target(log_types::Debug);
291
292 write_information(
293 "Lock-Free MPMC Queue Sample\n"
294 "===========================");
295
296 try {
298 mpmc_example();
301 } catch (const std::exception& e) {
302 write_error(
303 "Exception: {}", e.what());
304 }
305
306 write_information("\nAll examples completed!");
307
308 log_module::stop();
309 return 0;
310}
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
virtual auto do_work(void) -> common::VoidResult
The core task execution method to be overridden by derived classes.
Definition job.cpp:135
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
bool is_ok() const noexcept
Checks if the result is successful.
void basic_spsc_example()
void performance_example()
void mpmc_example()
void batch_operations_example()
int main()
Core threading foundation of the thread system library.
Definition thread_impl.h:17