Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
thread_pool_executor.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
25#pragma once
26
27#include "task.h"
28
29#include <atomic>
30#include <coroutine>
31#include <exception>
32#include <functional>
33#include <future>
34#include <memory>
35#include <optional>
36#include <thread>
37#include <type_traits>
38
39#ifdef KCENON_HAS_COMMON_SYSTEM
40#include <kcenon/common/interfaces/executor_interface.h>
41#include <kcenon/common/interfaces/thread_pool_interface.h>
42#endif
43
45{
52#ifdef KCENON_HAS_COMMON_SYSTEM
53 using executor_ptr = std::shared_ptr<kcenon::common::interfaces::IExecutor>;
54#else
55 using executor_ptr = std::nullptr_t;
56#endif
57
58 namespace detail
59 {
66 template<typename T>
68 {
69 std::function<T()> work_;
70 std::optional<T> result_;
71 std::exception_ptr exception_;
72 std::atomic<bool> ready_{false};
73 std::coroutine_handle<> continuation_;
74
75 explicit executor_state(std::function<T()> work)
76 : work_(std::move(work)) {}
77
82 };
83
90 template<typename T>
92 {
93 std::shared_ptr<executor_state<T>> state_;
95
96 executor_awaitable(std::function<T()> work, executor_ptr executor)
97 : state_(std::make_shared<executor_state<T>>(std::move(work)))
98 , executor_(std::move(executor))
99 {
100 }
101
103 executor_awaitable& operator=(executor_awaitable&&) noexcept = default;
104
106 executor_awaitable& operator=(const executor_awaitable&) = delete;
107
108 [[nodiscard]] bool await_ready() const noexcept
109 {
110 return false;
111 }
112
113 void await_suspend(std::coroutine_handle<> handle)
114 {
115 auto state = state_;
116 state->continuation_ = handle;
117
118#ifdef KCENON_HAS_COMMON_SYSTEM
119 if (executor_ && executor_->is_running())
120 {
121 // Create a job that executes the work
122 class work_job : public kcenon::common::interfaces::IJob
123 {
124 public:
125 work_job(std::shared_ptr<executor_state<T>> s)
126 : state_(std::move(s)) {}
127
128 kcenon::common::VoidResult execute() override
129 {
130 try {
131 state_->result_.emplace(state_->work_());
132 } catch (...) {
133 state_->exception_ = std::current_exception();
134 }
135 state_->ready_.store(true, std::memory_order_release);
136 if (state_->continuation_) {
137 state_->continuation_.resume();
138 }
139 return kcenon::common::ok();
140 }
141
142 std::string get_name() const override {
143 return "async_container_work";
144 }
145
146 private:
147 std::shared_ptr<executor_state<T>> state_;
148 };
149
150 auto job = std::make_unique<work_job>(state);
151 auto result = executor_->execute(std::move(job));
152 if (!result.is_ok()) {
153 // Fallback to thread if executor fails
154 run_in_thread(state, handle);
155 }
156 return;
157 }
158#endif
159 // Fallback: run in a detached thread
160 run_in_thread(state, handle);
161 }
162
164 {
165 while (!state_->ready_.load(std::memory_order_acquire)) {
166 // Spin until ready
167 }
168 if (state_->exception_) {
169 std::rethrow_exception(state_->exception_);
170 }
171 return std::move(*state_->result_);
172 }
173
174 private:
175 static void run_in_thread(
176 std::shared_ptr<executor_state<T>> state,
177 std::coroutine_handle<> handle)
178 {
179 std::thread([state, handle]() mutable {
180 try {
181 state->result_.emplace(state->work_());
182 } catch (...) {
183 state->exception_ = std::current_exception();
184 }
185 state->ready_.store(true, std::memory_order_release);
186 handle.resume();
187 }).detach();
188 }
189 };
190
194 template<typename F>
195 auto make_executor_awaitable(F&& func, executor_ptr executor)
197 {
198 using result_type = std::invoke_result_t<F>;
200 std::forward<F>(func), std::move(executor));
201 }
202
203 } // namespace detail
204
213 {
214 public:
219 {
221 return instance;
222 }
223
229 {
230 executor_ = std::move(executor);
231 }
232
237 [[nodiscard]] executor_ptr get_executor() const
238 {
239 return executor_;
240 }
241
245 [[nodiscard]] bool has_executor() const
246 {
247#ifdef KCENON_HAS_COMMON_SYSTEM
248 return executor_ != nullptr;
249#else
250 return false;
251#endif
252 }
253
258 {
259#ifdef KCENON_HAS_COMMON_SYSTEM
260 executor_ = nullptr;
261#endif
262 }
263
264 private:
267 };
268
285 {
286 public:
288 : previous_(async_executor_context::instance().get_executor())
289 {
290 async_executor_context::instance().set_executor(std::move(executor));
291 }
292
297
300
301 private:
303 };
304
305} // namespace kcenon::container::async
executor_ptr get_executor() const
Get the global executor.
bool has_executor() const
Check if an executor is configured.
void set_executor(executor_ptr executor)
Set the global executor.
static async_executor_context & instance()
Get the singleton instance.
RAII guard for setting executor context.
executor_context_guard & operator=(const executor_context_guard &)=delete
executor_context_guard(const executor_context_guard &)=delete
auto make_executor_awaitable(F &&func, executor_ptr executor) -> executor_awaitable< std::invoke_result_t< F > >
Helper to create executor awaitable.
std::nullptr_t executor_ptr
Executor type for async operations.
Awaitable that runs work using an executor or fallback thread.
executor_awaitable(std::function< T()> work, executor_ptr executor)
static void run_in_thread(std::shared_ptr< executor_state< T > > state, std::coroutine_handle<> handle)
executor_awaitable(executor_awaitable &&) noexcept=default
Shared state for executor-based async operations.
executor_state & operator=(executor_state &&)=delete
executor_state & operator=(const executor_state &)=delete
executor_state(const executor_state &)=delete
C++20 coroutine task type for async operations.