30#if KCENON_WITH_COMMON_SYSTEM
31#include <kcenon/common/interfaces/executor_interface.h>
32#include <kcenon/common/patterns/result.h>
37#if KCENON_WITH_COMMON_SYSTEM
46class function_job final :
public ::kcenon::common::interfaces::IJob {
53 explicit function_job(std::function<
void()> func, std::string name =
"function_job")
54 : func_(std::move(func)), name_(std::move(name)) {}
60 ::kcenon::common::VoidResult execute()
override {
65 return ::kcenon::common::ok();
66 }
catch (
const std::exception& ex) {
67 return ::kcenon::common::VoidResult(::kcenon::common::error_info{
68 ::kcenon::common::error_codes::INTERNAL_ERROR,
70 "network_system::function_job"});
72 return ::kcenon::common::VoidResult(::kcenon::common::error_info{
73 ::kcenon::common::error_codes::INTERNAL_ERROR,
74 "Unknown exception in function_job",
75 "network_system::function_job"});
83 std::string get_name()
const override {
return name_; }
86 std::function<void()> func_;
107class network_to_common_thread_adapter
108 :
public ::kcenon::common::interfaces::IExecutor {
115 explicit network_to_common_thread_adapter(
116 std::shared_ptr<thread_pool_interface> pool)
117 : pool_(std::move(pool)) {
119 throw std::invalid_argument(
120 "network_to_common_thread_adapter requires non-null pool");
129 ::kcenon::common::Result<std::future<void>> execute(
130 std::unique_ptr<::kcenon::common::interfaces::IJob>&& job)
override {
131 if (!pool_ || !pool_->is_running()) {
132 return ::kcenon::common::Result<std::future<void>>::err(
133 ::kcenon::common::error_codes::INVALID_ARGUMENT,
134 "Thread pool not running",
135 "network_to_common_thread_adapter");
140 auto shared_job = std::shared_ptr<::kcenon::common::interfaces::IJob>(
143 auto future = pool_->submit([shared_job]()
mutable {
144 auto result = shared_job->execute();
145 if (result.is_err()) {
146 throw std::runtime_error(result.error().message);
150 return ::kcenon::common::Result<std::future<void>>::ok(std::move(future));
151 }
catch (
const std::exception& e) {
152 return ::kcenon::common::Result<std::future<void>>::err(
153 ::kcenon::common::error_codes::INTERNAL_ERROR,
155 "network_to_common_thread_adapter");
165 ::kcenon::common::Result<std::future<void>> execute_delayed(
166 std::unique_ptr<::kcenon::common::interfaces::IJob>&& job,
167 std::chrono::milliseconds delay)
override {
168 if (!pool_ || !pool_->is_running()) {
169 return ::kcenon::common::Result<std::future<void>>::err(
170 ::kcenon::common::error_codes::INVALID_ARGUMENT,
171 "Thread pool not running",
172 "network_to_common_thread_adapter");
176 auto shared_job = std::shared_ptr<::kcenon::common::interfaces::IJob>(
179 auto future = pool_->submit_delayed(
180 [shared_job]()
mutable {
181 auto result = shared_job->execute();
182 if (result.is_err()) {
183 throw std::runtime_error(result.error().message);
188 return ::kcenon::common::Result<std::future<void>>::ok(std::move(future));
189 }
catch (
const std::exception& e) {
190 return ::kcenon::common::Result<std::future<void>>::err(
191 ::kcenon::common::error_codes::INTERNAL_ERROR,
193 "network_to_common_thread_adapter");
201 size_t worker_count()
const override {
202 return pool_ ? pool_->worker_count() : 0;
209 bool is_running()
const override {
210 return pool_ ? pool_->is_running() :
false;
217 size_t pending_tasks()
const override {
218 return pool_ ? pool_->pending_tasks() : 0;
228 void shutdown([[maybe_unused]]
bool wait_for_completion =
true)
override {
234 std::shared_ptr<thread_pool_interface> pool_;
256class common_to_network_thread_adapter :
public thread_pool_interface {
263 explicit common_to_network_thread_adapter(
264 std::shared_ptr<::kcenon::common::interfaces::IExecutor> executor)
265 : executor_(std::move(executor)) {
267 throw std::invalid_argument(
268 "common_to_network_thread_adapter requires non-null executor");
277 std::future<void> submit(std::function<
void()> task)
override {
278 if (!executor_ || !executor_->is_running()) {
279 return make_error_future(
"Executor not running");
282 auto result = executor_->execute(
283 std::make_unique<function_job>(std::move(task)));
285 if (result.is_err()) {
286 return make_error_future(result.error().message);
289 return std::move(result.value());
298 std::future<void> submit_delayed(
299 std::function<
void()> task,
300 std::chrono::milliseconds delay)
override {
301 if (!executor_ || !executor_->is_running()) {
302 return make_error_future(
"Executor not running");
305 auto result = executor_->execute_delayed(
306 std::make_unique<function_job>(std::move(task)), delay);
308 if (result.is_err()) {
309 return make_error_future(result.error().message);
312 return std::move(result.value());
319 size_t worker_count()
const override {
320 return executor_ ? executor_->worker_count() : 0;
327 bool is_running()
const override {
328 return executor_ ? executor_->is_running() :
false;
335 size_t pending_tasks()
const override {
336 return executor_ ? executor_->pending_tasks() : 0;
343 void shutdown(
bool wait_for_completion =
true) {
345 executor_->shutdown(wait_for_completion);
350 static std::future<void> make_error_future(
const std::string& message) {
351 std::promise<void> promise;
352 promise.set_exception(std::make_exception_ptr(std::runtime_error(message)));
353 return promise.get_future();
356 std::shared_ptr<::kcenon::common::interfaces::IExecutor> executor_;
Feature flags for network_system.
VoidResult shutdown()
Shutdown the network system.
common_to_network_thread_adapter_unavailable()=delete
function_job_unavailable()=delete
network_to_common_thread_adapter_unavailable()=delete
Thread system integration interface for network_system.