Four examples: basic single-producer/single-consumer, multi-producer/ multi-consumer, batch enqueue/dequeue, and enqueue/dequeue performance measurement with statistics.
#include "thread_base/lockfree/queues/lockfree_job_queue.h"
#include "thread_base/jobs/callback_job.h"
#include "logger/core/logger.h"
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>
#include <random>
using namespace log_module;
using namespace std::chrono_literals;
{
write_information("[Example 1] Basic SPSC Pattern");
lockfree_job_queue queue;
std::atomic<int> counter{0};
std::thread producer([&queue, &counter]() {
for (int i = 0; i < 10; ++i) {
auto job = std::make_unique<callback_job>([&counter, i]() -> kcenon::common::VoidResult {
counter.fetch_add(1);
write_information("Processed job {}", i);
return kcenon::common::ok();
});
write_error(
"Failed to enqueue job {}: {}", i,
result.error().message);
}
std::this_thread::sleep_for(10ms);
}
write_information("Producer finished");
});
std::thread consumer([&queue, &counter]() {
int consumed = 0;
while (consumed < 10) {
auto result = queue.dequeue();
if (work_result.is_ok()) {
consumed++;
} else {
write_error("Job failed: {}", work_result.error().message);
}
} else {
std::this_thread::sleep_for(5ms);
}
}
write_information("Consumer finished");
});
producer.join();
consumer.join();
write_information(
"Total jobs processed: {}", counter.load());
}
{
write_information("\n[Example 2] MPMC Pattern");
lockfree_job_queue queue;
std::atomic<int> produced{0};
std::atomic<int> consumed{0};
const int num_producers = 3;
const int num_consumers = 2;
const int jobs_per_producer = 20;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int p = 0; p < num_producers; ++p) {
producers.emplace_back([&queue, &produced, p, jobs_per_producer]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> delay_dist(1, 10);
for (int i = 0; i < jobs_per_producer; ++i) {
auto job = std::make_unique<callback_job>(
[p, i]() -> kcenon::common::VoidResult {
write_information("Job from producer {} #{}", p, i);
return kcenon::common::ok();
});
while (true) {
produced.fetch_add(1);
break;
}
std::this_thread::yield();
}
std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
}
write_information(
"Producer {} finished", p);
});
}
for (int c = 0; c < num_consumers; ++c) {
consumers.emplace_back([&queue, &consumed, c, num_producers, jobs_per_producer]() {
const int total_jobs = num_producers * jobs_per_producer;
while (consumed.load() < total_jobs) {
auto result = queue.dequeue();
if (work_result.is_ok()) {
consumed.fetch_add(1);
} else {
write_error("Consumer {} job failed: {}", c, work_result.error().message);
}
} else {
std::this_thread::sleep_for(1ms);
}
}
write_information(
"Consumer {} finished", c);
});
}
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();
write_information(
"Total produced: {}, consumed: {}",
produced.load(), consumed.load());
}
{
write_information("\n[Example 3] Batch Operations");
lockfree_job_queue queue;
std::atomic<int> processed{0};
std::vector<std::unique_ptr<job>> batch;
for (int i = 0; i < 50; ++i) {
batch.push_back(std::make_unique<callback_job>(
[&processed, i]() -> kcenon::common::VoidResult {
processed.fetch_add(1);
write_information("Batch job {}", i);
return kcenon::common::ok();
}));
}
write_information(
"Enqueueing {} jobs in batch", batch.size());
auto enqueue_result = queue.enqueue_batch(std::move(batch));
if (enqueue_result.is_err()) {
write_error(
"Batch enqueue failed: {}", enqueue_result.error().message);
return;
}
write_information(
"Dequeued {} jobs in batch",
dequeued.size());
for (
auto&
job : dequeued) {
} else {
write_error(
"Batch job failed: {}",
result.error().message);
}
}
write_information(
"Total processed: {}", processed.load());
}
{
write_information("\n[Example 4] Performance Measurement");
lockfree_job_queue queue;
const int num_operations = 100000;
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_operations; ++i) {
auto job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult {
return kcenon::common::ok();
});
while (true) {
auto r = queue.enqueue(std::move(
job));
if (r.is_ok()) break;
std::this_thread::yield();
job = std::make_unique<callback_job>([]() -> kcenon::common::VoidResult {
return kcenon::common::ok(); });
}
}
auto enqueue_time = std::chrono::high_resolution_clock::now() - start;
start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_operations; ++i) {
while (true) {
auto result = queue.dequeue();
break;
}
std::this_thread::yield();
}
}
auto dequeue_time = std::chrono::high_resolution_clock::now() - start;
auto stats = queue.get_statistics();
write_information(
"Enqueue performance: {} ops in {} ms = {} ops/sec",
num_operations,
std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count(),
num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(enqueue_time).count());
write_information(
"Dequeue performance: {} ops in {} ms = {} ops/sec",
num_operations,
std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count(),
num_operations * 1000.0 / std::chrono::duration_cast<std::chrono::milliseconds>(dequeue_time).count());
write_information(
"Queue statistics:\n"
" Enqueued: {}\n"
" Dequeued: {}\n"
" Retries: {}\n"
" Average enqueue latency: {} ns\n"
" Average dequeue latency: {} ns",
stats.enqueue_count,
stats.dequeue_count,
stats.retry_count,
stats.get_average_enqueue_latency_ns(),
stats.get_average_dequeue_latency_ns());
}
{
log_module::start();
log_module::console_target(log_types::Debug);
write_information(
"Lock-Free MPMC Queue Sample\n"
"===========================");
try {
} catch (const std::exception& e) {
write_error(
"Exception: {}", e.what());
}
write_information("\nAll examples completed!");
log_module::stop();
return 0;
}
Represents a unit of work (task) to be executed, typically by a job queue.
virtual auto do_work(void) -> common::VoidResult
The core task execution method to be overridden by derived classes.
A template class representing either a value or an error.
T & value() &
Gets the value.
bool has_value() const noexcept
Checks if the result contains a value.
bool is_ok() const noexcept
Checks if the result is successful.
@ dequeued
Job was taken from queue by a worker.
void basic_spsc_example()
void performance_example()
void batch_operations_example()
Core threading foundation of the thread system library.