25#include <condition_variable>
29using namespace std::chrono_literals;
51 return error_info(1,
"Job is null",
"mock_executor");
54 auto promise = std::make_shared<std::promise<void>>();
55 auto future = promise->get_future();
58 auto shared_job = std::shared_ptr<IJob>(std::move(job));
62 tasks_.emplace([shared_job, promise]() {
64 auto result = shared_job->execute();
65 if (result.is_err()) {
67 promise->set_exception(
68 std::make_exception_ptr(
69 std::runtime_error(
err.message)));
74 promise->set_exception(std::current_exception());
81 return ok(std::move(future));
85 std::unique_ptr<IJob>&& job,
86 std::chrono::milliseconds delay)
override {
88 return error_info(1,
"Job is null",
"mock_executor");
91 auto promise = std::make_shared<std::promise<void>>();
92 auto future = promise->get_future();
95 auto shared_job = std::shared_ptr<IJob>(std::move(job));
99 tasks_.emplace([shared_job, promise, delay]() {
100 std::this_thread::sleep_for(delay);
102 auto result = shared_job->execute();
103 if (result.is_err()) {
105 promise->set_exception(
106 std::make_exception_ptr(
107 std::runtime_error(
err.message)));
109 promise->set_value();
112 promise->set_exception(std::current_exception());
119 return ok(std::move(future));
137 if (wait_for_completion) {
147 if (worker.joinable()) {
156 std::function<void()> task;
169 task = std::move(
tasks_.front());
185 std::queue<std::function<void()>>
tasks_;
195 explicit function_job(std::function<
void()> func, std::string name =
"function_job")
196 :
func_(std::move(func)),
name_(std::move(name)) {}
202 }
catch (
const std::exception& e) {
225 std::this_thread::sleep_for(10ms);
228 }
catch (
const std::exception& e) {
235 return "calculation_job_" + std::to_string(
value_);
251 std::atomic<int> sum{0};
252 std::vector<std::future<void>> futures;
254 std::cout <<
"Processing " << data.size() <<
" items using "
258 for (
int value : data) {
259 auto job = std::make_unique<function_job>([&sum, value] {
261 std::this_thread::sleep_for(10ms);
262 sum += value * value;
265 auto result = executor.
execute(std::move(job));
266 if (result.is_ok()) {
267 futures.push_back(std::move(result.value()));
272 for (
auto& future : futures) {
276 std::cout <<
"Sum of squares: " << sum <<
"\n";
292 return std::make_shared<mock_executor>(worker_count);
300 std::cout <<
"=== IExecutor Interface Examples ===\n\n";
303 std::cout <<
"1. Basic task execution:\n";
306 auto job1 = std::make_unique<function_job>([] {
307 std::cout <<
" Task 1 executed\n";
309 auto result1 = executor.
execute(std::move(job1));
310 if (result1.is_ok()) {
311 std::move(result1).value().wait();
314 auto job2 = std::make_unique<function_job>([] {
315 std::cout <<
" Task 2 executed\n";
317 auto result2 = executor.
execute(std::move(job2));
318 if (result2.is_ok()) {
319 std::move(result2).value().wait();
323 std::cout <<
"\n2. Executor status:\n";
324 std::cout <<
" Workers: " << executor.
worker_count() <<
"\n";
325 std::cout <<
" Running: " << (executor.
is_running() ?
"yes" :
"no") <<
"\n";
326 std::cout <<
" Pending: " << executor.
pending_tasks() <<
"\n";
329 std::cout <<
"\n3. Batch processing:\n";
330 std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
334 std::cout <<
"\n4. Using executor provider:\n";
338 auto provider_job = std::make_unique<function_job>([] {
339 std::cout <<
" Task from shared executor\n";
341 auto provider_result = shared_executor->execute(std::move(provider_job));
342 if (provider_result.is_ok()) {
343 std::move(provider_result).value().wait();
347 std::cout <<
"\n5. Delayed execution:\n";
348 std::cout <<
" Scheduling delayed task...\n";
349 auto start = std::chrono::steady_clock::now();
351 auto delayed_job = std::make_unique<function_job>([start] {
352 auto elapsed = std::chrono::steady_clock::now() - start;
353 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
354 std::cout <<
" Delayed task executed after " << ms.count() <<
"ms\n";
357 auto delayed_result = executor.
execute_delayed(std::move(delayed_job), 500ms);
358 if (delayed_result.is_ok()) {
359 std::move(delayed_result).value().wait();
363 std::cout <<
"\n6. Error handling:\n";
364 auto error_job = std::make_unique<function_job>([] {
365 throw std::runtime_error(
"Task failed!");
368 auto error_result = executor.
execute(std::move(error_job));
369 if (error_result.is_ok()) {
371 auto error_future = std::move(error_result.value());
373 }
catch (
const std::exception& e) {
374 std::cout <<
" Caught exception: " << e.what() <<
"\n";
379 std::cout <<
"\n7. Custom job execution:\n";
382 std::atomic<int> job_sum{0};
383 std::vector<std::future<void>> job_futures;
385 std::cout <<
" Executing calculation jobs...\n";
386 for (
int i = 1; i <= 5; ++i) {
387 auto job = std::make_unique<calculation_job>(i, job_sum);
388 auto result = job_executor.
execute(std::move(job));
390 if (result.is_ok()) {
391 job_futures.push_back(std::move(result.value()));
394 std::cout <<
" Failed to execute job: "
395 <<
err.message <<
"\n";
400 for (
auto& future : job_futures) {
404 std::cout <<
" Custom job sum of squares: " << job_sum <<
"\n";
408 std::cout <<
"\n8. Graceful shutdown:\n";
411 for (
int i = 0; i < 5; ++i) {
412 auto final_job = std::make_unique<function_job>([i] {
413 std::this_thread::sleep_for(50ms);
414 std::cout <<
" Final task " << i <<
" completed\n";
416 executor.
execute(std::move(final_job));
419 std::cout <<
" Pending tasks before shutdown: "
421 std::cout <<
" Shutting down (waiting for completion)...\n";
424 std::cout <<
" Shutdown complete\n";
426 std::cout <<
"\n=== Examples completed ===\n";
std::string get_name() const override
Get the name of the job (for logging/debugging)
int get_priority() const override
Get the priority of the job (higher = more important)
VoidResult execute() override
Execute the job.
calculation_job(int value, std::atomic< int > &result)
std::atomic< int > & result_
std::shared_ptr< IExecutor > create_executor(size_t worker_count) override
Create a new executor with specific configuration.
std::shared_ptr< IExecutor > default_executor_
std::shared_ptr< IExecutor > get_executor() override
Get the default executor instance.
function_job(std::function< void()> func, std::string name="function_job")
std::string get_name() const override
Get the name of the job (for logging/debugging)
VoidResult execute() override
Execute the job.
std::function< void()> func_
Result type for error handling with member function support.
const error_info & error() const
Get error reference.
Interface for modules that provide executor implementations.
Abstract interface for task execution systems.
virtual size_t worker_count() const =0
Get the number of worker threads.
virtual Result< std::future< void > > execute(std::unique_ptr< IJob > &&job)=0
Execute a job with Result-based error handling.
Abstract job interface for task execution.
size_t worker_count() const override
Get the number of worker threads.
Result< std::future< void > > execute(std::unique_ptr< IJob > &&job) override
Execute a job with Result-based error handling.
Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
std::queue< std::function< void()> > tasks_
std::atomic< size_t > pending_count_
void shutdown(bool wait_for_completion) override
Shutdown the executor gracefully.
size_t pending_tasks() const override
Get the number of pending tasks.
bool is_running() const override
Check if the executor is running.
std::condition_variable queue_cv_
mock_executor(size_t num_workers=4)
std::vector< std::thread > workers_
std::atomic< bool > running_
void process_data_batch(IExecutor &executor, const std::vector< int > &data)
Executor interfaces for task submission and management.
Result< std::monostate > VoidResult
Specialized Result for void operations.
VoidResult err(const error_info &error)
Factory function to create error VoidResult.
VoidResult ok()
Create a successful void result.
Standard error information used by Result<T>.