Demonstrates the IExecutor and IJob interfaces for task execution.
Demonstrates the IExecutor and IJob interfaces for task execution.Covers basic task execution, batch processing, delayed execution, error handling, custom job implementations, the executor provider pattern, and graceful shutdown. Implements a mock_executor to illustrate how to build concrete executor back-ends.
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <atomic>
#include <queue>
#include <mutex>
#include <condition_variable>
using namespace std::chrono_literals;
public:
}
}
}
if (!job) {
return error_info(1,
"Job is null",
"mock_executor");
}
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto shared_job = std::shared_ptr<IJob>(std::move(job));
{
tasks_.emplace([shared_job, promise]() {
try {
auto result = shared_job->execute();
if (result.is_err()) {
promise->set_exception(
std::make_exception_ptr(
std::runtime_error(
err.message)));
} else {
promise->set_value();
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
}
return ok(std::move(future));
}
std::unique_ptr<IJob>&& job,
std::chrono::milliseconds delay) override {
if (!job) {
return error_info(1,
"Job is null",
"mock_executor");
}
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto shared_job = std::shared_ptr<IJob>(std::move(job));
{
tasks_.emplace([shared_job, promise, delay]() {
std::this_thread::sleep_for(delay);
try {
auto result = shared_job->execute();
if (result.is_err()) {
promise->set_exception(
std::make_exception_ptr(
std::runtime_error(
err.message)));
} else {
promise->set_value();
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
}
return ok(std::move(future));
}
}
}
}
void shutdown(
bool wait_for_completion)
override {
if (wait_for_completion) {
}
if (worker.joinable()) {
worker.join();
}
}
}
private:
std::function<void()> task;
{
});
break;
}
task = std::move(
tasks_.front());
}
}
if (task) {
task();
}
}
}
std::queue<std::function<void()>>
tasks_;
};
public:
explicit function_job(std::function<
void()> func, std::string name =
"function_job")
:
func_(std::move(func)),
name_(std::move(name)) {}
try {
} catch (const std::exception& e) {
}
}
private:
std::function<void()>
func_;
};
public:
try {
std::this_thread::sleep_for(10ms);
} catch (const std::exception& e) {
}
}
return "calculation_job_" + std::to_string(
value_);
}
}
private:
};
std::atomic<int> sum{0};
std::vector<std::future<void>> futures;
std::cout << "Processing " << data.size() << " items using "
for (int value : data) {
auto job = std::make_unique<function_job>([&sum, value] {
std::this_thread::sleep_for(10ms);
sum += value * value;
});
auto result = executor.
execute(std::move(job));
if (result.is_ok()) {
futures.push_back(std::move(result.value()));
}
}
for (auto& future : futures) {
future.wait();
}
std::cout << "Sum of squares: " << sum << "\n";
}
public:
}
}
return std::make_shared<mock_executor>(worker_count);
}
private:
};
std::cout << "=== IExecutor Interface Examples ===\n\n";
std::cout << "1. Basic task execution:\n";
auto job1 = std::make_unique<function_job>([] {
std::cout << " Task 1 executed\n";
});
auto result1 = executor.
execute(std::move(job1));
if (result1.is_ok()) {
std::move(result1).value().wait();
}
auto job2 = std::make_unique<function_job>([] {
std::cout << " Task 2 executed\n";
});
auto result2 = executor.
execute(std::move(job2));
if (result2.is_ok()) {
std::move(result2).value().wait();
}
std::cout << "\n2. Executor status:\n";
std::cout <<
" Workers: " << executor.
worker_count() <<
"\n";
std::cout <<
" Running: " << (executor.
is_running() ?
"yes" :
"no") <<
"\n";
std::cout << "\n3. Batch processing:\n";
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::cout << "\n4. Using executor provider:\n";
auto provider_job = std::make_unique<function_job>([] {
std::cout << " Task from shared executor\n";
});
auto provider_result = shared_executor->execute(std::move(provider_job));
if (provider_result.is_ok()) {
std::move(provider_result).value().wait();
}
std::cout << "\n5. Delayed execution:\n";
std::cout << " Scheduling delayed task...\n";
auto start = std::chrono::steady_clock::now();
auto delayed_job = std::make_unique<function_job>([start] {
auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
std::cout << " Delayed task executed after " << ms.count() << "ms\n";
});
auto delayed_result = executor.
execute_delayed(std::move(delayed_job), 500ms);
if (delayed_result.is_ok()) {
std::move(delayed_result).value().wait();
}
std::cout << "\n6. Error handling:\n";
auto error_job = std::make_unique<function_job>([] {
throw std::runtime_error("Task failed!");
});
auto error_result = executor.
execute(std::move(error_job));
if (error_result.is_ok()) {
try {
auto error_future = std::move(error_result.value());
error_future.get();
} catch (const std::exception& e) {
std::cout << " Caught exception: " << e.what() << "\n";
}
}
std::cout << "\n7. Custom job execution:\n";
{
std::atomic<int> job_sum{0};
std::vector<std::future<void>> job_futures;
std::cout << " Executing calculation jobs...\n";
for (int i = 1; i <= 5; ++i) {
auto job = std::make_unique<calculation_job>(i, job_sum);
auto result = job_executor.execute(std::move(job));
if (result.is_ok()) {
job_futures.push_back(std::move(result.value()));
} else {
std::cout << " Failed to execute job: "
}
}
for (auto& future : job_futures) {
future.wait();
}
std::cout << " Custom job sum of squares: " << job_sum << "\n";
}
std::cout << "\n8. Graceful shutdown:\n";
for (int i = 0; i < 5; ++i) {
auto final_job = std::make_unique<function_job>([i] {
std::this_thread::sleep_for(50ms);
std::cout << " Final task " << i << " completed\n";
});
executor.
execute(std::move(final_job));
}
std::cout << " Pending tasks before shutdown: "
std::cout << " Shutting down (waiting for completion)...\n";
std::cout << " Shutdown complete\n";
std::cout << "\n=== Examples completed ===\n";
return 0;
}
std::string get_name() const override
Get the name of the job (for logging/debugging)
int get_priority() const override
Get the priority of the job (higher = more important)
VoidResult execute() override
Execute the job.
calculation_job(int value, std::atomic< int > &result)
std::atomic< int > & result_
std::shared_ptr< IExecutor > create_executor(size_t worker_count) override
Create a new executor with specific configuration.
std::shared_ptr< IExecutor > default_executor_
std::shared_ptr< IExecutor > get_executor() override
Get the default executor instance.
function_job(std::function< void()> func, std::string name="function_job")
std::string get_name() const override
Get the name of the job (for logging/debugging)
VoidResult execute() override
Execute the job.
std::function< void()> func_
Result type for error handling with member function support.
const error_info & error() const
Get error reference.
Interface for modules that provide executor implementations.
Abstract interface for task execution systems.
virtual void shutdown(bool wait_for_completion=true)=0
Shutdown the executor gracefully.
virtual size_t worker_count() const =0
Get the number of worker threads.
virtual Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay)=0
Execute a job with delay.
virtual bool is_running() const =0
Check if the executor is running.
virtual Result< std::future< void > > execute(std::unique_ptr< IJob > &&job)=0
Execute a job with Result-based error handling.
virtual size_t pending_tasks() const =0
Get the number of pending tasks.
Abstract job interface for task execution.
size_t worker_count() const override
Get the number of worker threads.
Result< std::future< void > > execute(std::unique_ptr< IJob > &&job) override
Execute a job with Result-based error handling.
Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
std::queue< std::function< void()> > tasks_
std::atomic< size_t > pending_count_
void shutdown(bool wait_for_completion) override
Shutdown the executor gracefully.
size_t pending_tasks() const override
Get the number of pending tasks.
bool is_running() const override
Check if the executor is running.
std::condition_variable queue_cv_
mock_executor(size_t num_workers=4)
std::vector< std::thread > workers_
std::atomic< bool > running_
void process_data_batch(IExecutor &executor, const std::vector< int > &data)
Executor interfaces for task submission and management.
Result< std::monostate > VoidResult
Specialized Result for void operations.
VoidResult err(const error_info &error)
Factory function to create error VoidResult.
VoidResult ok()
Create a successful void result.
Standard error information used by Result<T>.