Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::adapters::detail Namespace Reference

Functions

common::error_info make_error_info (int code, std::string message, std::string module="thread_system")
 
common::error_info make_error_info (kcenon::thread::error_code code, std::string message)
 
std::exception_ptr to_exception (const common::error_info &info)
 
common::VoidResult make_error (const common::error_info &info)
 
common::VoidResult make_error (kcenon::thread::error_code code, std::string message)
 
common::error_info unexpected_pool_error ()
 
common::VoidResult wrap_user_task (const std::function< void()> &task)
 
std::optional< common::error_info > enqueue_job (const std::shared_ptr< kcenon::thread::thread_pool > &pool, const std::shared_ptr< std::promise< void > > &promise, std::function< common::VoidResult()> body)
 
common::Result< std::future< void > > schedule_task (const std::shared_ptr< kcenon::thread::thread_pool > &pool, std::function< common::VoidResult()> body)
 
void schedule_task_async (std::shared_ptr< kcenon::thread::thread_pool > pool, std::shared_ptr< std::promise< void > > promise, std::function< common::VoidResult()> body, std::chrono::milliseconds delay)
 

Function Documentation

◆ enqueue_job()

std::optional< common::error_info > kcenon::thread::adapters::detail::enqueue_job ( const std::shared_ptr< kcenon::thread::thread_pool > & pool,
const std::shared_ptr< std::promise< void > > & promise,
std::function< common::VoidResult()> body )
inline

Definition at line 126 of file common_executor_adapter.h.

129 {
130 if (!pool) {
132 promise->set_exception(to_exception(info));
133 return info;
134 }
135
136 auto completion_once = std::make_shared<std::once_flag>();
137 // Capture promise by value so it stays alive until the lambda is
138 // destroyed by the worker thread (after do_work() returns and the
139 // callback_job is cleaned up). The worker goes through several
140 // steps between set_value()/set_exception() and lambda destruction,
141 // which gives future::get()'s condition_variable::wait ample time
142 // to complete – avoiding the data-race that early p.reset() caused.
143 auto captured_promise = promise;
144
145 auto job = std::make_unique<kcenon::thread::callback_job>(
146 [captured_promise, completion_once, body = std::move(body)]() -> common::VoidResult {
147 try {
148 auto result = body();
149 if (result.is_err()) {
150 auto info = result.error();
151 std::call_once(*completion_once, [&]() {
152 captured_promise->set_exception(to_exception(info));
153 });
154 return result;
155 }
156 std::call_once(*completion_once, [&]() {
157 captured_promise->set_value();
158 });
159 return result;
160 } catch (const std::exception& ex) {
161 auto info = make_error_info(
163 ex.what()
164 );
165 std::call_once(*completion_once, [&]() {
166 captured_promise->set_exception(to_exception(info));
167 });
168 return make_error(info);
169 } catch (...) {
170 auto info = make_error_info(
172 "Unhandled exception while executing job"
173 );
174 std::call_once(*completion_once, [&]() {
175 captured_promise->set_exception(to_exception(info));
176 });
177 return make_error(info);
178 }
179 });
180
181 auto enqueue_result = pool->enqueue(std::move(job));
182 if (enqueue_result.is_err()) {
183 const auto& info = enqueue_result.error();
184 std::call_once(*completion_once, [&]() {
185 promise->set_exception(to_exception(info));
186 });
187 return info;
188 }
189
190 return std::nullopt;
191}
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.
common::VoidResult make_error(const common::error_info &info)
std::exception_ptr to_exception(const common::error_info &info)
common::error_info make_error_info(int code, std::string message, std::string module="thread_system")
@ info
Informational messages highlighting progress.

References kcenon::thread::info, kcenon::thread::job_execution_failed, make_error(), make_error_info(), to_exception(), and unexpected_pool_error().

Referenced by schedule_task(), and schedule_task_async().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ make_error() [1/2]

common::VoidResult kcenon::thread::adapters::detail::make_error ( const common::error_info & info)
inline

Definition at line 102 of file common_executor_adapter.h.

102 {
103 return common::VoidResult(info);
104}

References kcenon::thread::info.

Referenced by enqueue_job(), kcenon::thread::adapters::thread_pool_executor_adapter::execute(), kcenon::thread::adapters::thread_pool_executor_adapter::execute_delayed(), schedule_task_async(), and wrap_user_task().

Here is the caller graph for this function:

◆ make_error() [2/2]

common::VoidResult kcenon::thread::adapters::detail::make_error ( kcenon::thread::error_code code,
std::string message )
inline

Definition at line 106 of file common_executor_adapter.h.

106 {
107 return common::VoidResult(make_error_info(code, std::move(message)));
108}

References make_error_info().

Here is the call graph for this function:

◆ make_error_info() [1/2]

common::error_info kcenon::thread::adapters::detail::make_error_info ( int code,
std::string message,
std::string module = "thread_system" )
inline

Definition at line 81 of file common_executor_adapter.h.

81 {
82 return common::error_info{code, std::move(message), std::move(module)};
83}

Referenced by enqueue_job(), make_error(), and unexpected_pool_error().

Here is the caller graph for this function:

◆ make_error_info() [2/2]

common::error_info kcenon::thread::adapters::detail::make_error_info ( kcenon::thread::error_code code,
std::string message )
inline

Definition at line 85 of file common_executor_adapter.h.

85 {
86 return common::error_info{
87 static_cast<int>(code),
88 std::move(message),
89 "thread_system"
90 };
91}

◆ schedule_task()

common::Result< std::future< void > > kcenon::thread::adapters::detail::schedule_task ( const std::shared_ptr< kcenon::thread::thread_pool > & pool,
std::function< common::VoidResult()> body )
inline

Definition at line 193 of file common_executor_adapter.h.

195 {
196 auto promise = std::make_shared<std::promise<void>>();
197 auto future = promise->get_future();
198
199 if (auto error = enqueue_job(pool, promise, std::move(body))) {
200 return common::Result<std::future<void>>(*error);
201 }
202
203 return common::Result<std::future<void>>::ok(std::move(future));
204}
Represents an error in the thread system.
std::optional< common::error_info > enqueue_job(const std::shared_ptr< kcenon::thread::thread_pool > &pool, const std::shared_ptr< std::promise< void > > &promise, std::function< common::VoidResult()> body)

References enqueue_job().

Referenced by kcenon::thread::adapters::thread_pool_executor_adapter::execute(), and kcenon::thread::adapters::thread_pool_executor_adapter::submit().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ schedule_task_async()

void kcenon::thread::adapters::detail::schedule_task_async ( std::shared_ptr< kcenon::thread::thread_pool > pool,
std::shared_ptr< std::promise< void > > promise,
std::function< common::VoidResult()> body,
std::chrono::milliseconds delay )
inline

Definition at line 206 of file common_executor_adapter.h.

210 {
211 // Use the thread pool itself to handle delayed execution
212 // This avoids creating detached threads or extra std::async threads
213 if (!pool) {
214 promise->set_exception(to_exception(unexpected_pool_error()));
215 return;
216 }
217
218 // Create once_flag to protect against race between delayed_job exception and enqueue failure
219 auto completion_once = std::make_shared<std::once_flag>();
220
221 // Capture promise by value (same approach as enqueue_job – no early reset)
222 auto captured_promise = promise;
223
224 // Create a wrapper job that handles delay and then enqueues the actual task
225 auto delayed_job = std::make_unique<kcenon::thread::callback_job>(
226 [pool, captured_promise, completion_once, body = std::move(body), delay]() -> common::VoidResult {
227 try {
228 if (delay.count() > 0) {
229 std::this_thread::sleep_for(delay);
230 }
231 // Enqueue the actual job after the delay
232 // Note: enqueue_job has its own once_flag protection internally
233 (void)enqueue_job(pool, captured_promise, std::move(body));
234 return common::ok();
235 } catch (...) {
236 std::call_once(*completion_once, [&]() {
237 captured_promise->set_exception(std::current_exception());
238 });
240 "Exception during delayed task scheduling");
241 }
242 });
243
244 // Enqueue the delayed job to the pool
245 auto enqueue_result = pool->enqueue(std::move(delayed_job));
246 if (enqueue_result.is_err()) {
247 std::call_once(*completion_once, [&]() {
248 promise->set_exception(to_exception(enqueue_result.error()));
249 });
250 }
251}
@ delay
Delay processing (attempt later)

References kcenon::thread::delay, enqueue_job(), kcenon::thread::job_execution_failed, make_error(), to_exception(), and unexpected_pool_error().

Referenced by kcenon::thread::adapters::thread_pool_executor_adapter::execute_delayed(), and kcenon::thread::adapters::thread_pool_executor_adapter::submit_delayed().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ to_exception()

std::exception_ptr kcenon::thread::adapters::detail::to_exception ( const common::error_info & info)
inline

Definition at line 93 of file common_executor_adapter.h.

93 {
94 std::ostringstream ss;
95 ss << "[" << info.module << "] " << info.message << " (code=" << info.code << ")";
96 if (info.details) {
97 ss << ": " << *info.details;
98 }
99 return std::make_exception_ptr(std::runtime_error(ss.str()));
100}

References kcenon::thread::info.

Referenced by enqueue_job(), schedule_task_async(), and kcenon::thread::adapters::thread_pool_executor_adapter::submit().

Here is the caller graph for this function:

◆ unexpected_pool_error()

common::error_info kcenon::thread::adapters::detail::unexpected_pool_error ( )
inline

Definition at line 110 of file common_executor_adapter.h.

110 {
111 return make_error_info(-1, "Thread pool unavailable");
112}

References make_error_info().

Referenced by enqueue_job(), and schedule_task_async().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ wrap_user_task()

common::VoidResult kcenon::thread::adapters::detail::wrap_user_task ( const std::function< void()> & task)
inline

Definition at line 114 of file common_executor_adapter.h.

114 {
115 try {
116 task();
117 return common::ok();
118 } catch (const std::exception& ex) {
119 return make_error(kcenon::thread::error_code::job_execution_failed, ex.what());
120 } catch (...) {
122 "Unknown exception while executing task");
123 }
124}

References kcenon::thread::job_execution_failed, and make_error().

Referenced by kcenon::thread::adapters::thread_pool_executor_adapter::submit(), and kcenon::thread::adapters::thread_pool_executor_adapter::submit_delayed().

Here is the call graph for this function:
Here is the caller graph for this function: