39#if __has_include(<kcenon/common/interfaces/executor_interface.h>)
40#include <kcenon/common/interfaces/executor_interface.h>
41#include <kcenon/common/patterns/result.h>
42#elif __has_include(<common/interfaces/executor_interface.h>)
43#include <common/interfaces/executor_interface.h>
44#include <common/patterns/result.h>
45#ifndef KCENON_COMMON_EXECUTOR_FALLBACK_DEFINED
46#define KCENON_COMMON_EXECUTOR_FALLBACK_DEFINED
49using ::common::Result;
50using ::common::VoidResult;
52using IExecutor = ::common::interfaces::IExecutor;
58#error "Unable to locate common executor interface header."
81inline common::error_info
make_error_info(
int code, std::string message, std::string module =
"thread_system") {
82 return common::error_info{code, std::move(message), std::move(module)};
86 return common::error_info{
87 static_cast<int>(code),
94 std::ostringstream ss;
95 ss <<
"[" <<
info.module <<
"] " <<
info.message <<
" (code=" <<
info.code <<
")";
97 ss <<
": " << *
info.details;
99 return std::make_exception_ptr(std::runtime_error(ss.str()));
103 return common::VoidResult(
info);
118 }
catch (
const std::exception& ex) {
122 "Unknown exception while executing task");
127 const std::shared_ptr<kcenon::thread::thread_pool>& pool,
128 const std::shared_ptr<std::promise<void>>& promise,
129 std::function<common::VoidResult()> body) {
136 auto completion_once = std::make_shared<std::once_flag>();
143 auto captured_promise = promise;
145 auto job = std::make_unique<kcenon::thread::callback_job>(
146 [captured_promise, completion_once, body = std::move(body)]() -> common::VoidResult {
151 std::call_once(*completion_once, [&]() {
156 std::call_once(*completion_once, [&]() {
157 captured_promise->set_value();
160 }
catch (
const std::exception& ex) {
165 std::call_once(*completion_once, [&]() {
172 "Unhandled exception while executing job"
174 std::call_once(*completion_once, [&]() {
181 auto enqueue_result = pool->enqueue(std::move(
job));
182 if (enqueue_result.is_err()) {
183 const auto&
info = enqueue_result.error();
184 std::call_once(*completion_once, [&]() {
194 const std::shared_ptr<kcenon::thread::thread_pool>& pool,
195 std::function<common::VoidResult()> body) {
196 auto promise = std::make_shared<std::promise<void>>();
197 auto future = promise->get_future();
200 return common::Result<std::future<void>>(*error);
203 return common::Result<std::future<void>>::ok(std::move(future));
207 std::shared_ptr<kcenon::thread::thread_pool> pool,
208 std::shared_ptr<std::promise<void>> promise,
209 std::function<common::VoidResult()> body,
210 std::chrono::milliseconds
delay) {
219 auto completion_once = std::make_shared<std::once_flag>();
222 auto captured_promise = promise;
225 auto delayed_job = std::make_unique<kcenon::thread::callback_job>(
226 [pool, captured_promise, completion_once, body = std::move(body),
delay]() -> common::VoidResult {
228 if (
delay.count() > 0) {
229 std::this_thread::sleep_for(
delay);
233 (void)
enqueue_job(pool, captured_promise, std::move(body));
236 std::call_once(*completion_once, [&]() {
237 captured_promise->set_exception(std::current_exception());
240 "Exception during delayed task scheduling");
245 auto enqueue_result = pool->enqueue(std::move(delayed_job));
246 if (enqueue_result.is_err()) {
247 std::call_once(*completion_once, [&]() {
248 promise->set_exception(
to_exception(enqueue_result.error()));
287 std::future<void>
submit(std::function<
void()> task) {
293 return std::move(
result.unwrap());
296 std::promise<void>
failed;
298 return failed.get_future();
302 std::chrono::milliseconds
delay) {
303 auto promise = std::make_shared<std::promise<void>>();
304 auto future = promise->get_future();
307 [task = std::move(task)]()
mutable {
315 common::Result<std::future<void>>
execute(std::unique_ptr<common::interfaces::IJob>&&
job)
override {
316 auto shared_job = std::shared_ptr<common::interfaces::IJob>(std::move(
job));
318 [shared_job]()
mutable -> common::VoidResult {
320 auto result = shared_job->execute();
325 }
catch (
const std::exception& ex) {
332 "Unknown exception while executing common job");
338 std::unique_ptr<common::interfaces::IJob>&&
job,
339 std::chrono::milliseconds
delay)
override {
340 auto promise = std::make_shared<std::promise<void>>();
341 auto future = promise->get_future();
342 auto shared_job = std::shared_ptr<common::interfaces::IJob>(std::move(
job));
345 [shared_job]()
mutable -> common::VoidResult {
347 auto result = shared_job->execute();
352 }
catch (
const std::exception& ex) {
359 "Unknown exception while executing common job");
364 return common::Result<std::future<void>>::ok(std::move(future));
379 void shutdown(
bool wait_for_completion =
true)
override {
384 auto stop_result =
pool_->
stop(!wait_for_completion);
385 if (stop_result.is_err()) {
387 const auto& err = stop_result.error();
388 throw std::runtime_error(err.message);
397 std::shared_ptr<kcenon::thread::thread_pool>
pool_;
420 std::shared_ptr<kcenon::thread::thread_pool> pool) {
421 return std::make_shared<thread_pool_executor_adapter>(std::move(pool));
Specialized job class that encapsulates user-defined callbacks.
Factory for creating IExecutor adapters from thread_pool instances.
static std::shared_ptr< common::interfaces::IExecutor > create_from_thread_pool(std::shared_ptr< kcenon::thread::thread_pool > pool)
Create an IExecutor adapter from a thread_pool.
Adapter exposing thread_pool through common::interfaces::IExecutor.
std::shared_ptr< kcenon::thread::thread_pool > get_thread_pool() const
std::future< void > submit(std::function< void()> task)
common::Result< std::future< void > > execute_delayed(std::unique_ptr< common::interfaces::IJob > &&job, std::chrono::milliseconds delay) override
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay)
size_t pending_tasks() const override
bool is_running() const override
thread_pool_executor_adapter(std::shared_ptr< kcenon::thread::thread_pool > pool)
size_t worker_count() const override
common::Result< std::future< void > > execute(std::unique_ptr< common::interfaces::IJob > &&job) override
std::shared_ptr< kcenon::thread::thread_pool > pool_
void shutdown(bool wait_for_completion=true) override
Represents an error in the thread system.
Represents a unit of work (task) to be executed, typically by a job queue.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
auto is_running() const -> bool
Check if the thread pool is currently running.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
Core thread pool implementation with work stealing and auto-scaling.
@ delay
Delay processing (attempt later)
Error codes and utilities for the thread system.
std::optional< common::error_info > enqueue_job(const std::shared_ptr< kcenon::thread::thread_pool > &pool, const std::shared_ptr< std::promise< void > > &promise, std::function< common::VoidResult()> body)
common::VoidResult wrap_user_task(const std::function< void()> &task)
common::VoidResult make_error(const common::error_info &info)
std::exception_ptr to_exception(const common::error_info &info)
common::error_info make_error_info(int code, std::string message, std::string module="thread_system")
void schedule_task_async(std::shared_ptr< kcenon::thread::thread_pool > pool, std::shared_ptr< std::promise< void > > promise, std::function< common::VoidResult()> body, std::chrono::milliseconds delay)
common::Result< std::future< void > > schedule_task(const std::shared_ptr< kcenon::thread::thread_pool > &pool, std::function< common::VoidResult()> body)
common::error_info unexpected_pool_error()
error_code
Strongly typed error codes for thread system operations.
@ failed
Execution failed.
@ info
Informational messages highlighting progress.