Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
thread_pool_adapters.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
22
23#include <chrono>
24#include <functional>
25#include <future>
26#include <memory>
27#include <stdexcept>
28#include <string>
29
30#if KCENON_WITH_COMMON_SYSTEM
31#include <kcenon/common/interfaces/executor_interface.h>
32#include <kcenon/common/patterns/result.h>
33#endif
34
36
37#if KCENON_WITH_COMMON_SYSTEM
38
46class function_job final : public ::kcenon::common::interfaces::IJob {
47public:
53 explicit function_job(std::function<void()> func, std::string name = "function_job")
54 : func_(std::move(func)), name_(std::move(name)) {}
55
60 ::kcenon::common::VoidResult execute() override {
61 try {
62 if (func_) {
63 func_();
64 }
65 return ::kcenon::common::ok();
66 } catch (const std::exception& ex) {
67 return ::kcenon::common::VoidResult(::kcenon::common::error_info{
68 ::kcenon::common::error_codes::INTERNAL_ERROR,
69 ex.what(),
70 "network_system::function_job"});
71 } catch (...) {
72 return ::kcenon::common::VoidResult(::kcenon::common::error_info{
73 ::kcenon::common::error_codes::INTERNAL_ERROR,
74 "Unknown exception in function_job",
75 "network_system::function_job"});
76 }
77 }
78
83 std::string get_name() const override { return name_; }
84
85private:
86 std::function<void()> func_;
87 std::string name_;
88};
89
107class network_to_common_thread_adapter
108 : public ::kcenon::common::interfaces::IExecutor {
109public:
115 explicit network_to_common_thread_adapter(
116 std::shared_ptr<thread_pool_interface> pool)
117 : pool_(std::move(pool)) {
118 if (!pool_) {
119 throw std::invalid_argument(
120 "network_to_common_thread_adapter requires non-null pool");
121 }
122 }
123
129 ::kcenon::common::Result<std::future<void>> execute(
130 std::unique_ptr<::kcenon::common::interfaces::IJob>&& job) override {
131 if (!pool_ || !pool_->is_running()) {
132 return ::kcenon::common::Result<std::future<void>>::err(
133 ::kcenon::common::error_codes::INVALID_ARGUMENT,
134 "Thread pool not running",
135 "network_to_common_thread_adapter");
136 }
137
138 try {
139 // Wrap the job in a shared_ptr for safe capture in lambda
140 auto shared_job = std::shared_ptr<::kcenon::common::interfaces::IJob>(
141 std::move(job));
142
143 auto future = pool_->submit([shared_job]() mutable {
144 auto result = shared_job->execute();
145 if (result.is_err()) {
146 throw std::runtime_error(result.error().message);
147 }
148 });
149
150 return ::kcenon::common::Result<std::future<void>>::ok(std::move(future));
151 } catch (const std::exception& e) {
152 return ::kcenon::common::Result<std::future<void>>::err(
153 ::kcenon::common::error_codes::INTERNAL_ERROR,
154 e.what(),
155 "network_to_common_thread_adapter");
156 }
157 }
158
165 ::kcenon::common::Result<std::future<void>> execute_delayed(
166 std::unique_ptr<::kcenon::common::interfaces::IJob>&& job,
167 std::chrono::milliseconds delay) override {
168 if (!pool_ || !pool_->is_running()) {
169 return ::kcenon::common::Result<std::future<void>>::err(
170 ::kcenon::common::error_codes::INVALID_ARGUMENT,
171 "Thread pool not running",
172 "network_to_common_thread_adapter");
173 }
174
175 try {
176 auto shared_job = std::shared_ptr<::kcenon::common::interfaces::IJob>(
177 std::move(job));
178
179 auto future = pool_->submit_delayed(
180 [shared_job]() mutable {
181 auto result = shared_job->execute();
182 if (result.is_err()) {
183 throw std::runtime_error(result.error().message);
184 }
185 },
186 delay);
187
188 return ::kcenon::common::Result<std::future<void>>::ok(std::move(future));
189 } catch (const std::exception& e) {
190 return ::kcenon::common::Result<std::future<void>>::err(
191 ::kcenon::common::error_codes::INTERNAL_ERROR,
192 e.what(),
193 "network_to_common_thread_adapter");
194 }
195 }
196
201 size_t worker_count() const override {
202 return pool_ ? pool_->worker_count() : 0;
203 }
204
209 bool is_running() const override {
210 return pool_ ? pool_->is_running() : false;
211 }
212
217 size_t pending_tasks() const override {
218 return pool_ ? pool_->pending_tasks() : 0;
219 }
220
228 void shutdown([[maybe_unused]] bool wait_for_completion = true) override {
229 // thread_pool_interface doesn't expose shutdown
230 // Lifecycle management should be done on the underlying pool directly
231 }
232
233private:
234 std::shared_ptr<thread_pool_interface> pool_;
235};
236
256class common_to_network_thread_adapter : public thread_pool_interface {
257public:
263 explicit common_to_network_thread_adapter(
264 std::shared_ptr<::kcenon::common::interfaces::IExecutor> executor)
265 : executor_(std::move(executor)) {
266 if (!executor_) {
267 throw std::invalid_argument(
268 "common_to_network_thread_adapter requires non-null executor");
269 }
270 }
271
277 std::future<void> submit(std::function<void()> task) override {
278 if (!executor_ || !executor_->is_running()) {
279 return make_error_future("Executor not running");
280 }
281
282 auto result = executor_->execute(
283 std::make_unique<function_job>(std::move(task)));
284
285 if (result.is_err()) {
286 return make_error_future(result.error().message);
287 }
288
289 return std::move(result.value());
290 }
291
298 std::future<void> submit_delayed(
299 std::function<void()> task,
300 std::chrono::milliseconds delay) override {
301 if (!executor_ || !executor_->is_running()) {
302 return make_error_future("Executor not running");
303 }
304
305 auto result = executor_->execute_delayed(
306 std::make_unique<function_job>(std::move(task)), delay);
307
308 if (result.is_err()) {
309 return make_error_future(result.error().message);
310 }
311
312 return std::move(result.value());
313 }
314
319 size_t worker_count() const override {
320 return executor_ ? executor_->worker_count() : 0;
321 }
322
327 bool is_running() const override {
328 return executor_ ? executor_->is_running() : false;
329 }
330
335 size_t pending_tasks() const override {
336 return executor_ ? executor_->pending_tasks() : 0;
337 }
338
343 void shutdown(bool wait_for_completion = true) {
344 if (executor_) {
345 executor_->shutdown(wait_for_completion);
346 }
347 }
348
349private:
350 static std::future<void> make_error_future(const std::string& message) {
351 std::promise<void> promise;
352 promise.set_exception(std::make_exception_ptr(std::runtime_error(message)));
353 return promise.get_future();
354 }
355
356 std::shared_ptr<::kcenon::common::interfaces::IExecutor> executor_;
357};
358
359#else // !KCENON_WITH_COMMON_SYSTEM
360
361// Placeholder types when common_system is not available
365
369
373
374#endif // KCENON_WITH_COMMON_SYSTEM
375
376} // namespace kcenon::network::integration
Feature flags for network_system.
VoidResult shutdown()
Shutdown the network system.
Thread system integration interface for network_system.