Common System 0.2.0
Common interfaces and patterns for system integration
Loading...
Searching...
No Matches
executor_example.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16
18#include <iostream>
19#include <thread>
20#include <chrono>
21#include <vector>
22#include <atomic>
23#include <queue>
24#include <mutex>
25#include <condition_variable>
26
27using namespace kcenon::common;
28using namespace kcenon::common::interfaces;
29using namespace std::chrono_literals;
30
34class mock_executor : public IExecutor {
35public:
36 mock_executor(size_t num_workers = 4)
37 : num_workers_(num_workers), running_(true) {
38 // Start worker threads
39 for (size_t i = 0; i < num_workers_; ++i) {
40 workers_.emplace_back([this] { work_loop(); });
41 }
42 }
43
45 shutdown(true);
46 }
47
48 // Job-based execution support
49 Result<std::future<void>> execute(std::unique_ptr<IJob>&& job) override {
50 if (!job) {
51 return error_info(1, "Job is null", "mock_executor");
52 }
53
54 auto promise = std::make_shared<std::promise<void>>();
55 auto future = promise->get_future();
56
57 // Use shared_ptr to make lambda copy-constructible
58 auto shared_job = std::shared_ptr<IJob>(std::move(job));
59
60 {
61 std::lock_guard<std::mutex> lock(queue_mutex_);
62 tasks_.emplace([shared_job, promise]() {
63 try {
64 auto result = shared_job->execute();
65 if (result.is_err()) {
66 const auto& err = result.error();
67 promise->set_exception(
68 std::make_exception_ptr(
69 std::runtime_error(err.message)));
70 } else {
71 promise->set_value();
72 }
73 } catch (...) {
74 promise->set_exception(std::current_exception());
75 }
76 });
78 }
79 queue_cv_.notify_one();
80
81 return ok(std::move(future));
82 }
83
85 std::unique_ptr<IJob>&& job,
86 std::chrono::milliseconds delay) override {
87 if (!job) {
88 return error_info(1, "Job is null", "mock_executor");
89 }
90
91 auto promise = std::make_shared<std::promise<void>>();
92 auto future = promise->get_future();
93
94 // Use shared_ptr to make lambda copy-constructible
95 auto shared_job = std::shared_ptr<IJob>(std::move(job));
96
97 {
98 std::lock_guard<std::mutex> lock(queue_mutex_);
99 tasks_.emplace([shared_job, promise, delay]() {
100 std::this_thread::sleep_for(delay);
101 try {
102 auto result = shared_job->execute();
103 if (result.is_err()) {
104 const auto& err = result.error();
105 promise->set_exception(
106 std::make_exception_ptr(
107 std::runtime_error(err.message)));
108 } else {
109 promise->set_value();
110 }
111 } catch (...) {
112 promise->set_exception(std::current_exception());
113 }
114 });
116 }
117 queue_cv_.notify_one();
118
119 return ok(std::move(future));
120 }
121
122 size_t worker_count() const override {
123 return num_workers_;
124 }
125
126 bool is_running() const override {
127 return running_;
128 }
129
130 size_t pending_tasks() const override {
131 return pending_count_;
132 }
133
134 void shutdown(bool wait_for_completion) override {
135 if (!running_) return;
136
137 if (wait_for_completion) {
138 // Wait for all tasks to complete
139 std::unique_lock<std::mutex> lock(queue_mutex_);
140 queue_cv_.wait(lock, [this] { return tasks_.empty(); });
141 }
142
143 running_ = false;
144 queue_cv_.notify_all();
145
146 for (auto& worker : workers_) {
147 if (worker.joinable()) {
148 worker.join();
149 }
150 }
151 }
152
153private:
154 void work_loop() {
155 while (running_) {
156 std::function<void()> task;
157
158 {
159 std::unique_lock<std::mutex> lock(queue_mutex_);
160 queue_cv_.wait(lock, [this] {
161 return !tasks_.empty() || !running_;
162 });
163
164 if (!running_ && tasks_.empty()) {
165 break;
166 }
167
168 if (!tasks_.empty()) {
169 task = std::move(tasks_.front());
170 tasks_.pop();
172 }
173 }
174
175 if (task) {
176 task();
177 }
178 }
179 }
180
182 std::atomic<bool> running_;
183 std::atomic<size_t> pending_count_{0};
184 std::vector<std::thread> workers_;
185 std::queue<std::function<void()>> tasks_;
186 std::mutex queue_mutex_;
187 std::condition_variable queue_cv_;
188};
189
193class function_job : public IJob {
194public:
195 explicit function_job(std::function<void()> func, std::string name = "function_job")
196 : func_(std::move(func)), name_(std::move(name)) {}
197
198 VoidResult execute() override {
199 try {
200 func_();
201 return VoidResult(std::monostate{});
202 } catch (const std::exception& e) {
203 return VoidResult(error_info(1, e.what(), "function_job"));
204 }
205 }
206
207 std::string get_name() const override { return name_; }
208
209private:
210 std::function<void()> func_;
211 std::string name_;
212};
213
217class calculation_job : public IJob {
218public:
219 calculation_job(int value, std::atomic<int>& result)
220 : value_(value), result_(result) {}
221
222 VoidResult execute() override {
223 try {
224 // Simulate some work
225 std::this_thread::sleep_for(10ms);
226 result_ += value_ * value_;
227 return VoidResult(std::monostate{});
228 } catch (const std::exception& e) {
229 return VoidResult(
230 error_info(1, e.what(), "calculation_job"));
231 }
232 }
233
234 std::string get_name() const override {
235 return "calculation_job_" + std::to_string(value_);
236 }
237
238 int get_priority() const override {
239 return value_; // Higher values = higher priority
240 }
241
242private:
244 std::atomic<int>& result_;
245};
246
250void process_data_batch(IExecutor& executor, const std::vector<int>& data) {
251 std::atomic<int> sum{0};
252 std::vector<std::future<void>> futures;
253
254 std::cout << "Processing " << data.size() << " items using "
255 << executor.worker_count() << " workers\n";
256
257 // Submit tasks using job-based API
258 for (int value : data) {
259 auto job = std::make_unique<function_job>([&sum, value] {
260 // Simulate some work
261 std::this_thread::sleep_for(10ms);
262 sum += value * value;
263 });
264
265 auto result = executor.execute(std::move(job));
266 if (result.is_ok()) {
267 futures.push_back(std::move(result.value()));
268 }
269 }
270
271 // Wait for completion
272 for (auto& future : futures) {
273 future.wait();
274 }
275
276 std::cout << "Sum of squares: " << sum << "\n";
277}
278
283public:
284 std::shared_ptr<IExecutor> get_executor() override {
285 if (!default_executor_) {
287 }
288 return default_executor_;
289 }
290
291 std::shared_ptr<IExecutor> create_executor(size_t worker_count) override {
292 return std::make_shared<mock_executor>(worker_count);
293 }
294
295private:
296 std::shared_ptr<IExecutor> default_executor_;
297};
298
299int main() {
300 std::cout << "=== IExecutor Interface Examples ===\n\n";
301
302 // Example 1: Basic usage
303 std::cout << "1. Basic task execution:\n";
304 mock_executor executor(2);
305
306 auto job1 = std::make_unique<function_job>([] {
307 std::cout << " Task 1 executed\n";
308 });
309 auto result1 = executor.execute(std::move(job1));
310 if (result1.is_ok()) {
311 std::move(result1).value().wait();
312 }
313
314 auto job2 = std::make_unique<function_job>([] {
315 std::cout << " Task 2 executed\n";
316 });
317 auto result2 = executor.execute(std::move(job2));
318 if (result2.is_ok()) {
319 std::move(result2).value().wait();
320 }
321
322 // Example 2: Check executor status
323 std::cout << "\n2. Executor status:\n";
324 std::cout << " Workers: " << executor.worker_count() << "\n";
325 std::cout << " Running: " << (executor.is_running() ? "yes" : "no") << "\n";
326 std::cout << " Pending: " << executor.pending_tasks() << "\n";
327
328 // Example 3: Batch processing
329 std::cout << "\n3. Batch processing:\n";
330 std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
331 process_data_batch(executor, data);
332
333 // Example 4: Using executor provider
334 std::cout << "\n4. Using executor provider:\n";
336 auto shared_executor = provider.get_executor();
337
338 auto provider_job = std::make_unique<function_job>([] {
339 std::cout << " Task from shared executor\n";
340 });
341 auto provider_result = shared_executor->execute(std::move(provider_job));
342 if (provider_result.is_ok()) {
343 std::move(provider_result).value().wait();
344 }
345
346 // Example 5: Delayed execution
347 std::cout << "\n5. Delayed execution:\n";
348 std::cout << " Scheduling delayed task...\n";
349 auto start = std::chrono::steady_clock::now();
350
351 auto delayed_job = std::make_unique<function_job>([start] {
352 auto elapsed = std::chrono::steady_clock::now() - start;
353 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
354 std::cout << " Delayed task executed after " << ms.count() << "ms\n";
355 });
356
357 auto delayed_result = executor.execute_delayed(std::move(delayed_job), 500ms);
358 if (delayed_result.is_ok()) {
359 std::move(delayed_result).value().wait();
360 }
361
362 // Example 6: Error handling
363 std::cout << "\n6. Error handling:\n";
364 auto error_job = std::make_unique<function_job>([] {
365 throw std::runtime_error("Task failed!");
366 });
367
368 auto error_result = executor.execute(std::move(error_job));
369 if (error_result.is_ok()) {
370 try {
371 auto error_future = std::move(error_result.value());
372 error_future.get();
373 } catch (const std::exception& e) {
374 std::cout << " Caught exception: " << e.what() << "\n";
375 }
376 }
377
378 // Example 7: Custom job execution
379 std::cout << "\n7. Custom job execution:\n";
380 {
381 mock_executor job_executor(2);
382 std::atomic<int> job_sum{0};
383 std::vector<std::future<void>> job_futures;
384
385 std::cout << " Executing calculation jobs...\n";
386 for (int i = 1; i <= 5; ++i) {
387 auto job = std::make_unique<calculation_job>(i, job_sum);
388 auto result = job_executor.execute(std::move(job));
389
390 if (result.is_ok()) {
391 job_futures.push_back(std::move(result.value()));
392 } else {
393 const auto& err = result.error();
394 std::cout << " Failed to execute job: "
395 << err.message << "\n";
396 }
397 }
398
399 // Wait for all jobs to complete
400 for (auto& future : job_futures) {
401 future.wait();
402 }
403
404 std::cout << " Custom job sum of squares: " << job_sum << "\n";
405 }
406
407 // Example 8: Graceful shutdown
408 std::cout << "\n8. Graceful shutdown:\n";
409
410 // Execute some tasks
411 for (int i = 0; i < 5; ++i) {
412 auto final_job = std::make_unique<function_job>([i] {
413 std::this_thread::sleep_for(50ms);
414 std::cout << " Final task " << i << " completed\n";
415 });
416 executor.execute(std::move(final_job));
417 }
418
419 std::cout << " Pending tasks before shutdown: "
420 << executor.pending_tasks() << "\n";
421 std::cout << " Shutting down (waiting for completion)...\n";
422
423 executor.shutdown(true);
424 std::cout << " Shutdown complete\n";
425
426 std::cout << "\n=== Examples completed ===\n";
427 return 0;
428}
std::string get_name() const override
Get the name of the job (for logging/debugging)
int get_priority() const override
Get the priority of the job (higher = more important)
VoidResult execute() override
Execute the job.
calculation_job(int value, std::atomic< int > &result)
std::atomic< int > & result_
std::shared_ptr< IExecutor > create_executor(size_t worker_count) override
Create a new executor with specific configuration.
std::shared_ptr< IExecutor > default_executor_
std::shared_ptr< IExecutor > get_executor() override
Get the default executor instance.
function_job(std::function< void()> func, std::string name="function_job")
std::string get_name() const override
Get the name of the job (for logging/debugging)
VoidResult execute() override
Execute the job.
std::function< void()> func_
Result type for error handling with member function support.
Definition core.cppm:165
const error_info & error() const
Get error reference.
Definition core.h:405
Interface for modules that provide executor implementations.
Abstract interface for task execution systems.
Definition executor.cppm:80
virtual size_t worker_count() const =0
Get the number of worker threads.
virtual Result< std::future< void > > execute(std::unique_ptr< IJob > &&job)=0
Execute a job with Result-based error handling.
Abstract job interface for task execution.
Definition executor.cppm:49
size_t worker_count() const override
Get the number of worker threads.
Result< std::future< void > > execute(std::unique_ptr< IJob > &&job) override
Execute a job with Result-based error handling.
Result< std::future< void > > execute_delayed(std::unique_ptr< IJob > &&job, std::chrono::milliseconds delay) override
Execute a job with delay.
std::queue< std::function< void()> > tasks_
std::atomic< size_t > pending_count_
std::mutex queue_mutex_
void shutdown(bool wait_for_completion) override
Shutdown the executor gracefully.
size_t pending_tasks() const override
Get the number of pending tasks.
bool is_running() const override
Check if the executor is running.
std::condition_variable queue_cv_
mock_executor(size_t num_workers=4)
std::vector< std::thread > workers_
std::atomic< bool > running_
void process_data_batch(IExecutor &executor, const std::vector< int > &data)
int main()
Executor interfaces for task submission and management.
Core interfaces.
Definition adapter.h:21
Result< std::monostate > VoidResult
Specialized Result for void operations.
Definition core.h:70
VoidResult err(const error_info &error)
Factory function to create error VoidResult.
Definition core.cppm:432
VoidResult ok()
Create a successful void result.
Definition utilities.h:71
Standard error information used by Result<T>.
Definition core.cppm:106