Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
common_executor_adapter.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
37#pragma once
38
39#if __has_include(<kcenon/common/interfaces/executor_interface.h>)
40#include <kcenon/common/interfaces/executor_interface.h>
41#include <kcenon/common/patterns/result.h>
42#elif __has_include(<common/interfaces/executor_interface.h>)
43#include <common/interfaces/executor_interface.h>
44#include <common/patterns/result.h>
45#ifndef KCENON_COMMON_EXECUTOR_FALLBACK_DEFINED
46#define KCENON_COMMON_EXECUTOR_FALLBACK_DEFINED
47namespace kcenon {
48namespace common {
49using ::common::Result;
50using ::common::VoidResult;
51namespace interfaces {
52using IExecutor = ::common::interfaces::IExecutor;
53}
54} // namespace common
55} // namespace kcenon
56#endif
57#else
58#error "Unable to locate common executor interface header."
59#endif
60
64
65#include <atomic>
66#include <chrono>
67#include <exception>
68#include <functional>
69#include <future>
70#include <memory>
71#include <optional>
72#include <sstream>
73#include <string>
74#include <thread>
75#include <utility>
76
78
79namespace detail {
80
81inline common::error_info make_error_info(int code, std::string message, std::string module = "thread_system") {
82 return common::error_info{code, std::move(message), std::move(module)};
83}
84
85inline common::error_info make_error_info(kcenon::thread::error_code code, std::string message) {
86 return common::error_info{
87 static_cast<int>(code),
88 std::move(message),
89 "thread_system"
90 };
91}
92
93inline std::exception_ptr to_exception(const common::error_info& info) {
94 std::ostringstream ss;
95 ss << "[" << info.module << "] " << info.message << " (code=" << info.code << ")";
96 if (info.details) {
97 ss << ": " << *info.details;
98 }
99 return std::make_exception_ptr(std::runtime_error(ss.str()));
100}
101
102inline common::VoidResult make_error(const common::error_info& info) {
103 return common::VoidResult(info);
104}
105
106inline common::VoidResult make_error(kcenon::thread::error_code code, std::string message) {
107 return common::VoidResult(make_error_info(code, std::move(message)));
108}
109
110inline common::error_info unexpected_pool_error() {
111 return make_error_info(-1, "Thread pool unavailable");
112}
113
114inline common::VoidResult wrap_user_task(const std::function<void()>& task) {
115 try {
116 task();
117 return common::ok();
118 } catch (const std::exception& ex) {
120 } catch (...) {
122 "Unknown exception while executing task");
123 }
124}
125
126inline std::optional<common::error_info> enqueue_job(
127 const std::shared_ptr<kcenon::thread::thread_pool>& pool,
128 const std::shared_ptr<std::promise<void>>& promise,
129 std::function<common::VoidResult()> body) {
130 if (!pool) {
132 promise->set_exception(to_exception(info));
133 return info;
134 }
135
136 auto completion_once = std::make_shared<std::once_flag>();
137 // Capture promise by value so it stays alive until the lambda is
138 // destroyed by the worker thread (after do_work() returns and the
139 // callback_job is cleaned up). The worker goes through several
140 // steps between set_value()/set_exception() and lambda destruction,
141 // which gives future::get()'s condition_variable::wait ample time
142 // to complete – avoiding the data-race that early p.reset() caused.
143 auto captured_promise = promise;
144
145 auto job = std::make_unique<kcenon::thread::callback_job>(
146 [captured_promise, completion_once, body = std::move(body)]() -> common::VoidResult {
147 try {
148 auto result = body();
149 if (result.is_err()) {
150 auto info = result.error();
151 std::call_once(*completion_once, [&]() {
152 captured_promise->set_exception(to_exception(info));
153 });
154 return result;
155 }
156 std::call_once(*completion_once, [&]() {
157 captured_promise->set_value();
158 });
159 return result;
160 } catch (const std::exception& ex) {
161 auto info = make_error_info(
163 ex.what()
164 );
165 std::call_once(*completion_once, [&]() {
166 captured_promise->set_exception(to_exception(info));
167 });
168 return make_error(info);
169 } catch (...) {
170 auto info = make_error_info(
172 "Unhandled exception while executing job"
173 );
174 std::call_once(*completion_once, [&]() {
175 captured_promise->set_exception(to_exception(info));
176 });
177 return make_error(info);
178 }
179 });
180
181 auto enqueue_result = pool->enqueue(std::move(job));
182 if (enqueue_result.is_err()) {
183 const auto& info = enqueue_result.error();
184 std::call_once(*completion_once, [&]() {
185 promise->set_exception(to_exception(info));
186 });
187 return info;
188 }
189
190 return std::nullopt;
191}
192
193inline common::Result<std::future<void>> schedule_task(
194 const std::shared_ptr<kcenon::thread::thread_pool>& pool,
195 std::function<common::VoidResult()> body) {
196 auto promise = std::make_shared<std::promise<void>>();
197 auto future = promise->get_future();
198
199 if (auto error = enqueue_job(pool, promise, std::move(body))) {
200 return common::Result<std::future<void>>(*error);
201 }
202
203 return common::Result<std::future<void>>::ok(std::move(future));
204}
205
207 std::shared_ptr<kcenon::thread::thread_pool> pool,
208 std::shared_ptr<std::promise<void>> promise,
209 std::function<common::VoidResult()> body,
210 std::chrono::milliseconds delay) {
211 // Use the thread pool itself to handle delayed execution
212 // This avoids creating detached threads or extra std::async threads
213 if (!pool) {
214 promise->set_exception(to_exception(unexpected_pool_error()));
215 return;
216 }
217
218 // Create once_flag to protect against race between delayed_job exception and enqueue failure
219 auto completion_once = std::make_shared<std::once_flag>();
220
221 // Capture promise by value (same approach as enqueue_job – no early reset)
222 auto captured_promise = promise;
223
224 // Create a wrapper job that handles delay and then enqueues the actual task
225 auto delayed_job = std::make_unique<kcenon::thread::callback_job>(
226 [pool, captured_promise, completion_once, body = std::move(body), delay]() -> common::VoidResult {
227 try {
228 if (delay.count() > 0) {
229 std::this_thread::sleep_for(delay);
230 }
231 // Enqueue the actual job after the delay
232 // Note: enqueue_job has its own once_flag protection internally
233 (void)enqueue_job(pool, captured_promise, std::move(body));
234 return common::ok();
235 } catch (...) {
236 std::call_once(*completion_once, [&]() {
237 captured_promise->set_exception(std::current_exception());
238 });
240 "Exception during delayed task scheduling");
241 }
242 });
243
244 // Enqueue the delayed job to the pool
245 auto enqueue_result = pool->enqueue(std::move(delayed_job));
246 if (enqueue_result.is_err()) {
247 std::call_once(*completion_once, [&]() {
248 promise->set_exception(to_exception(enqueue_result.error()));
249 });
250 }
251}
252
253} // namespace detail
254
282class thread_pool_executor_adapter : public common::interfaces::IExecutor {
283public:
284 explicit thread_pool_executor_adapter(std::shared_ptr<kcenon::thread::thread_pool> pool)
285 : pool_(std::move(pool)) {}
286
287 std::future<void> submit(std::function<void()> task) {
288 auto result = detail::schedule_task(pool_, [task = std::move(task)]() mutable {
289 return detail::wrap_user_task(task);
290 });
291
292 if (result.is_ok()) {
293 return std::move(result.unwrap());
294 }
295
296 std::promise<void> failed;
297 failed.set_exception(detail::to_exception(result.error()));
298 return failed.get_future();
299 }
300
301 std::future<void> submit_delayed(std::function<void()> task,
302 std::chrono::milliseconds delay) {
303 auto promise = std::make_shared<std::promise<void>>();
304 auto future = promise->get_future();
305
307 [task = std::move(task)]() mutable {
308 return detail::wrap_user_task(task);
309 },
310 delay);
311
312 return future;
313 }
314
315 common::Result<std::future<void>> execute(std::unique_ptr<common::interfaces::IJob>&& job) override {
316 auto shared_job = std::shared_ptr<common::interfaces::IJob>(std::move(job));
318 [shared_job]() mutable -> common::VoidResult {
319 try {
320 auto result = shared_job->execute();
321 if (result.is_err()) {
322 return detail::make_error(result.error());
323 }
324 return common::ok();
325 } catch (const std::exception& ex) {
326 return detail::make_error(
328 ex.what());
329 } catch (...) {
330 return detail::make_error(
332 "Unknown exception while executing common job");
333 }
334 });
335 }
336
337 common::Result<std::future<void>> execute_delayed(
338 std::unique_ptr<common::interfaces::IJob>&& job,
339 std::chrono::milliseconds delay) override {
340 auto promise = std::make_shared<std::promise<void>>();
341 auto future = promise->get_future();
342 auto shared_job = std::shared_ptr<common::interfaces::IJob>(std::move(job));
343
345 [shared_job]() mutable -> common::VoidResult {
346 try {
347 auto result = shared_job->execute();
348 if (result.is_err()) {
349 return detail::make_error(result.error());
350 }
351 return common::ok();
352 } catch (const std::exception& ex) {
353 return detail::make_error(
355 ex.what());
356 } catch (...) {
357 return detail::make_error(
359 "Unknown exception while executing common job");
360 }
361 },
362 delay);
363
364 return common::Result<std::future<void>>::ok(std::move(future));
365 }
366
367 size_t worker_count() const override {
368 return pool_ ? pool_->get_active_worker_count() : 0U;
369 }
370
371 bool is_running() const override {
372 return pool_ && pool_->is_running();
373 }
374
375 size_t pending_tasks() const override {
376 return pool_ ? pool_->get_pending_task_count() : 0U;
377 }
378
379 void shutdown(bool wait_for_completion = true) override {
380 if (!pool_) {
381 return;
382 }
383
384 auto stop_result = pool_->stop(!wait_for_completion);
385 if (stop_result.is_err()) {
386 // Best effort: surface error via exception to aid debugging.
387 const auto& err = stop_result.error();
388 throw std::runtime_error(err.message);
389 }
390 }
391
392 std::shared_ptr<kcenon::thread::thread_pool> get_thread_pool() const {
393 return pool_;
394 }
395
396private:
397 std::shared_ptr<kcenon::thread::thread_pool> pool_;
398};
399
412public:
419 static std::shared_ptr<common::interfaces::IExecutor> create_from_thread_pool(
420 std::shared_ptr<kcenon::thread::thread_pool> pool) {
421 return std::make_shared<thread_pool_executor_adapter>(std::move(pool));
422 }
423};
424
425} // namespace kcenon::thread::adapters
Specialized job class that encapsulates user-defined callbacks.
Factory for creating IExecutor adapters from thread_pool instances.
static std::shared_ptr< common::interfaces::IExecutor > create_from_thread_pool(std::shared_ptr< kcenon::thread::thread_pool > pool)
Create an IExecutor adapter from a thread_pool.
Adapter exposing thread_pool through common::interfaces::IExecutor.
std::shared_ptr< kcenon::thread::thread_pool > get_thread_pool() const
std::future< void > submit(std::function< void()> task)
common::Result< std::future< void > > execute_delayed(std::unique_ptr< common::interfaces::IJob > &&job, std::chrono::milliseconds delay) override
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay)
thread_pool_executor_adapter(std::shared_ptr< kcenon::thread::thread_pool > pool)
common::Result< std::future< void > > execute(std::unique_ptr< common::interfaces::IJob > &&job) override
std::shared_ptr< kcenon::thread::thread_pool > pool_
void shutdown(bool wait_for_completion=true) override
Represents an error in the thread system.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
auto is_running() const -> bool
Check if the thread pool is currently running.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
Core thread pool implementation with work stealing and auto-scaling.
@ delay
Delay processing (attempt later)
Error codes and utilities for the thread system.
std::optional< common::error_info > enqueue_job(const std::shared_ptr< kcenon::thread::thread_pool > &pool, const std::shared_ptr< std::promise< void > > &promise, std::function< common::VoidResult()> body)
common::VoidResult wrap_user_task(const std::function< void()> &task)
common::VoidResult make_error(const common::error_info &info)
std::exception_ptr to_exception(const common::error_info &info)
common::error_info make_error_info(int code, std::string message, std::string module="thread_system")
void schedule_task_async(std::shared_ptr< kcenon::thread::thread_pool > pool, std::shared_ptr< std::promise< void > > promise, std::function< common::VoidResult()> body, std::chrono::milliseconds delay)
common::Result< std::future< void > > schedule_task(const std::shared_ptr< kcenon::thread::thread_pool > &pool, std::function< common::VoidResult()> body)
error_code
Strongly typed error codes for thread system operations.
@ failed
Execution failed.
@ info
Informational messages highlighting progress.
STL namespace.