Common System 0.2.0
Common interfaces and patterns for system integration
Loading...
Searching...
No Matches
executor_example.cpp

Demonstrates the IExecutor and IJob interfaces for task execution.

Demonstrates the IExecutor and IJob interfaces for task execution.Covers basic task execution, batch processing, delayed execution, error handling, custom job implementations, the executor provider pattern, and graceful shutdown. Implements a mock_executor to illustrate how to build concrete executor back-ends.

See also
kcenon::common::interfaces::IExecutor
kcenon::common::interfaces::IJob
// BSD 3-Clause License
// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
// See the LICENSE file in the project root for full license information.
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <atomic>
#include <queue>
#include <mutex>
#include <condition_variable>
using namespace kcenon::common;
using namespace std::chrono_literals;
class mock_executor : public IExecutor {
public:
mock_executor(size_t num_workers = 4)
: num_workers_(num_workers), running_(true) {
// Start worker threads
for (size_t i = 0; i < num_workers_; ++i) {
workers_.emplace_back([this] { work_loop(); });
}
}
shutdown(true);
}
// Job-based execution support
Result<std::future<void>> execute(std::unique_ptr<IJob>&& job) override {
if (!job) {
return error_info(1, "Job is null", "mock_executor");
}
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
// Use shared_ptr to make lambda copy-constructible
auto shared_job = std::shared_ptr<IJob>(std::move(job));
{
std::lock_guard<std::mutex> lock(queue_mutex_);
tasks_.emplace([shared_job, promise]() {
try {
auto result = shared_job->execute();
if (result.is_err()) {
const auto& err = result.error();
promise->set_exception(
std::make_exception_ptr(
std::runtime_error(err.message)));
} else {
promise->set_value();
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
}
queue_cv_.notify_one();
return ok(std::move(future));
}
std::unique_ptr<IJob>&& job,
std::chrono::milliseconds delay) override {
if (!job) {
return error_info(1, "Job is null", "mock_executor");
}
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
// Use shared_ptr to make lambda copy-constructible
auto shared_job = std::shared_ptr<IJob>(std::move(job));
{
std::lock_guard<std::mutex> lock(queue_mutex_);
tasks_.emplace([shared_job, promise, delay]() {
std::this_thread::sleep_for(delay);
try {
auto result = shared_job->execute();
if (result.is_err()) {
const auto& err = result.error();
promise->set_exception(
std::make_exception_ptr(
std::runtime_error(err.message)));
} else {
promise->set_value();
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
}
queue_cv_.notify_one();
return ok(std::move(future));
}
size_t worker_count() const override {
return num_workers_;
}
bool is_running() const override {
return running_;
}
size_t pending_tasks() const override {
}
void shutdown(bool wait_for_completion) override {
if (!running_) return;
if (wait_for_completion) {
// Wait for all tasks to complete
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [this] { return tasks_.empty(); });
}
running_ = false;
queue_cv_.notify_all();
for (auto& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
private:
void work_loop() {
while (running_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [this] {
return !tasks_.empty() || !running_;
});
if (!running_ && tasks_.empty()) {
break;
}
if (!tasks_.empty()) {
task = std::move(tasks_.front());
tasks_.pop();
}
}
if (task) {
task();
}
}
}
size_t num_workers_;
std::atomic<bool> running_;
std::atomic<size_t> pending_count_{0};
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
};
class function_job : public IJob {
public:
explicit function_job(std::function<void()> func, std::string name = "function_job")
: func_(std::move(func)), name_(std::move(name)) {}
VoidResult execute() override {
try {
func_();
return VoidResult(std::monostate{});
} catch (const std::exception& e) {
return VoidResult(error_info(1, e.what(), "function_job"));
}
}
std::string get_name() const override { return name_; }
private:
std::function<void()> func_;
std::string name_;
};
class calculation_job : public IJob {
public:
calculation_job(int value, std::atomic<int>& result)
: value_(value), result_(result) {}
VoidResult execute() override {
try {
// Simulate some work
std::this_thread::sleep_for(10ms);
return VoidResult(std::monostate{});
} catch (const std::exception& e) {
return VoidResult(
error_info(1, e.what(), "calculation_job"));
}
}
std::string get_name() const override {
return "calculation_job_" + std::to_string(value_);
}
int get_priority() const override {
return value_; // Higher values = higher priority
}
private:
int value_;
std::atomic<int>& result_;
};
void process_data_batch(IExecutor& executor, const std::vector<int>& data) {
std::atomic<int> sum{0};
std::vector<std::future<void>> futures;
std::cout << "Processing " << data.size() << " items using "
<< executor.worker_count() << " workers\n";
// Submit tasks using job-based API
for (int value : data) {
auto job = std::make_unique<function_job>([&sum, value] {
// Simulate some work
std::this_thread::sleep_for(10ms);
sum += value * value;
});
auto result = executor.execute(std::move(job));
if (result.is_ok()) {
futures.push_back(std::move(result.value()));
}
}
// Wait for completion
for (auto& future : futures) {
future.wait();
}
std::cout << "Sum of squares: " << sum << "\n";
}
public:
std::shared_ptr<IExecutor> get_executor() override {
}
}
std::shared_ptr<IExecutor> create_executor(size_t worker_count) override {
return std::make_shared<mock_executor>(worker_count);
}
private:
std::shared_ptr<IExecutor> default_executor_;
};
int main() {
std::cout << "=== IExecutor Interface Examples ===\n\n";
// Example 1: Basic usage
std::cout << "1. Basic task execution:\n";
mock_executor executor(2);
auto job1 = std::make_unique<function_job>([] {
std::cout << " Task 1 executed\n";
});
auto result1 = executor.execute(std::move(job1));
if (result1.is_ok()) {
std::move(result1).value().wait();
}
auto job2 = std::make_unique<function_job>([] {
std::cout << " Task 2 executed\n";
});
auto result2 = executor.execute(std::move(job2));
if (result2.is_ok()) {
std::move(result2).value().wait();
}
// Example 2: Check executor status
std::cout << "\n2. Executor status:\n";
std::cout << " Workers: " << executor.worker_count() << "\n";
std::cout << " Running: " << (executor.is_running() ? "yes" : "no") << "\n";
std::cout << " Pending: " << executor.pending_tasks() << "\n";
// Example 3: Batch processing
std::cout << "\n3. Batch processing:\n";
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
process_data_batch(executor, data);
// Example 4: Using executor provider
std::cout << "\n4. Using executor provider:\n";
auto shared_executor = provider.get_executor();
auto provider_job = std::make_unique<function_job>([] {
std::cout << " Task from shared executor\n";
});
auto provider_result = shared_executor->execute(std::move(provider_job));
if (provider_result.is_ok()) {
std::move(provider_result).value().wait();
}
// Example 5: Delayed execution
std::cout << "\n5. Delayed execution:\n";
std::cout << " Scheduling delayed task...\n";
auto start = std::chrono::steady_clock::now();
auto delayed_job = std::make_unique<function_job>([start] {
auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
std::cout << " Delayed task executed after " << ms.count() << "ms\n";
});
auto delayed_result = executor.execute_delayed(std::move(delayed_job), 500ms);
if (delayed_result.is_ok()) {
std::move(delayed_result).value().wait();
}
// Example 6: Error handling
std::cout << "\n6. Error handling:\n";
auto error_job = std::make_unique<function_job>([] {
throw std::runtime_error("Task failed!");
});
auto error_result = executor.execute(std::move(error_job));
if (error_result.is_ok()) {
try {
auto error_future = std::move(error_result.value());
error_future.get();
} catch (const std::exception& e) {
std::cout << " Caught exception: " << e.what() << "\n";
}
}
// Example 7: Custom job execution
std::cout << "\n7. Custom job execution:\n";
{
mock_executor job_executor(2);
std::atomic<int> job_sum{0};
std::vector<std::future<void>> job_futures;
std::cout << " Executing calculation jobs...\n";
for (int i = 1; i <= 5; ++i) {
auto job = std::make_unique<calculation_job>(i, job_sum);
auto result = job_executor.execute(std::move(job));
if (result.is_ok()) {
job_futures.push_back(std::move(result.value()));
} else {
const auto& err = result.error();
std::cout << " Failed to execute job: "
<< err.message << "\n";
}
}
// Wait for all jobs to complete
for (auto& future : job_futures) {
future.wait();
}
std::cout << " Custom job sum of squares: " << job_sum << "\n";
}
// Example 8: Graceful shutdown
std::cout << "\n8. Graceful shutdown:\n";
// Execute some tasks
for (int i = 0; i < 5; ++i) {
auto final_job = std::make_unique<function_job>([i] {
std::this_thread::sleep_for(50ms);
std::cout << " Final task " << i << " completed\n";
});
executor.execute(std::move(final_job));
}
std::cout << " Pending tasks before shutdown: "
<< executor.pending_tasks() << "\n";
std::cout << " Shutting down (waiting for completion)...\n";
executor.shutdown(true);
std::cout << " Shutdown complete\n";
std::cout << "\n=== Examples completed ===\n";
return 0;
}
int main()
std::string get_name() const override
Get the name of the job (for logging/debugging)
int get_priority() const override
Get the priority of the job (higher = more important)
VoidResult execute() override
Execute the job.
calculation_job(int value, std::atomic< int > &result)
std::atomic< int > & result_
std::shared_ptr< IExecutor > create_executor(size_t worker_count) override
Create a new executor with specific configuration.
std::shared_ptr< IExecutor > default_executor_
std::shared_ptr< IExecutor > get_executor() override
Get the default executor instance.
function_job(std::function< void()> func, std::string name="function_job")
std::string get_name() const override
Get the name of the job (for logging/debugging)
VoidResult execute() override
Execute the job.
std::function< void()> func_
Result type for error handling with member function support.
Definition core.cppm:165
const error_info & error() const
Get error reference.
Definition core.h:405
Interface for modules that provide executor implementations.
Abstract interface for task execution systems.
Definition executor.cppm:80
virtual void shutdown(bool wait_for_completion=true)=0
Shutdown the executor gracefully.
virtual size_t worker_count() const =0
Get the number of worker threads.
virtual Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay)=0
Execute a job with delay.
virtual bool is_running() const =0
Check if the executor is running.
virtual Result< std::future< void > > execute(std::unique_ptr< IJob > &&job)=0
Execute a job with Result-based error handling.
virtual size_t pending_tasks() const =0
Get the number of pending tasks.
Abstract job interface for task execution.
Definition executor.cppm:49
size_t worker_count() const override
Get the number of worker threads.
Result< std::future< void > > execute(std::unique_ptr< IJob > &&job) override
Execute a job with Result-based error handling.
Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
std::queue< std::function< void()> > tasks_
std::atomic< size_t > pending_count_
std::mutex queue_mutex_
void shutdown(bool wait_for_completion) override
Shutdown the executor gracefully.
size_t pending_tasks() const override
Get the number of pending tasks.
bool is_running() const override
Check if the executor is running.
std::condition_variable queue_cv_
mock_executor(size_t num_workers=4)
std::vector< std::thread > workers_
std::atomic< bool > running_
void process_data_batch(IExecutor &executor, const std::vector< int > &data)
Executor interfaces for task submission and management.
Core interfaces.
Definition adapter.h:21
Result< std::monostate > VoidResult
Specialized Result for void operations.
Definition core.h:70
VoidResult err(const error_info &error)
Factory function to create error VoidResult.
Definition core.cppm:432
VoidResult ok()
Create a successful void result.
Definition utilities.h:71
Standard error information used by Result<T>.
Definition core.cppm:106