Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
thread_system_adapter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
19
21
22#if KCENON_WITH_THREAD_SYSTEM
23
24// Suppress deprecation warnings from thread_system headers
25#pragma clang diagnostic push
26#pragma clang diagnostic ignored "-Wdeprecated-declarations"
27
28#include <stdexcept>
29#include <thread> // For std::thread::hardware_concurrency and std::this_thread::sleep_for (fallback)
30
31#include <kcenon/thread/core/thread_worker.h>
32
34
35thread_system_pool_adapter::thread_system_pool_adapter(
36 std::shared_ptr<kcenon::thread::thread_pool> pool)
37 : pool_(std::move(pool)) {
38 if (!pool_) {
39 throw std::invalid_argument("thread_system_pool_adapter: pool is null");
40 }
41 // No scheduler thread needed - delayed tasks are handled by thread_pool::submit_delayed
42}
43
44thread_system_pool_adapter::~thread_system_pool_adapter() {
45 // No scheduler thread to stop - cleanup handled by thread_pool
46}
47
48std::future<void> thread_system_pool_adapter::submit(std::function<void()> task) {
49 auto promise = std::make_shared<std::promise<void>>();
50 auto future = promise->get_future();
51
52 // Use submit which returns a future and throws on failure
53 try {
54 pool_->submit([task = std::move(task), promise]() mutable {
55 try {
56 if (task) task();
57 promise->set_value();
58 } catch (...) {
59 promise->set_exception(std::current_exception());
60 }
61 });
62 } catch (const std::exception& e) {
63 promise->set_exception(std::make_exception_ptr(
64 std::runtime_error(
65 std::string("thread_system_pool_adapter: submit failed: ") + e.what()
66 )));
67 }
68
69 return future;
70}
71
72std::future<void> thread_system_pool_adapter::submit_delayed(
73 std::function<void()> task,
74 std::chrono::milliseconds delay
75) {
76#if defined(THREAD_HAS_COMMON_EXECUTOR)
77 // Delegate directly to thread_pool::submit_delayed when IExecutor is available
78 // This eliminates the need for a separate scheduler thread
79 return pool_->submit_delayed(std::move(task), delay);
80#else
81 // Fallback: submit a task that sleeps then executes
82 // Note: This blocks a worker thread during the delay period
83 auto promise = std::make_shared<std::promise<void>>();
84 auto future = promise->get_future();
85
86 if (!pool_->is_running()) {
87 promise->set_exception(std::make_exception_ptr(
88 std::runtime_error("thread_system_pool_adapter: pool is not running")));
89 return future;
90 }
91
92 // Use submit which returns a future and throws on failure
93 try {
94 pool_->submit([task = std::move(task), delay, promise]() mutable {
95 try {
96 std::this_thread::sleep_for(delay);
97 if (task) task();
98 promise->set_value();
99 } catch (...) {
100 promise->set_exception(std::current_exception());
101 }
102 });
103 } catch (const std::exception& e) {
104 promise->set_exception(std::make_exception_ptr(
105 std::runtime_error(
106 std::string("thread_system_pool_adapter: delayed submit failed: ") + e.what()
107 )));
108 }
109
110 return future;
111#endif
112}
113
114size_t thread_system_pool_adapter::worker_count() const {
115 return pool_->get_active_worker_count();
116}
117
118bool thread_system_pool_adapter::is_running() const {
119 return pool_->is_running();
120}
121
122size_t thread_system_pool_adapter::pending_tasks() const {
123 return pool_->get_pending_task_count();
124}
125
126std::shared_ptr<thread_system_pool_adapter> thread_system_pool_adapter::create_default(
127 const std::string& pool_name
128) {
129 kcenon::thread::thread_context ctx; // default resolves logger/monitoring if registered
130 auto pool = std::make_shared<kcenon::thread::thread_pool>(pool_name, ctx);
131
132 // Add default workers based on hardware concurrency
133 size_t num_threads = std::thread::hardware_concurrency();
134 if (num_threads == 0) {
135 num_threads = 2; // Fallback
136 }
137
138 for (size_t i = 0; i < num_threads; ++i) {
139 pool->enqueue(std::make_unique<kcenon::thread::thread_worker>());
140 }
141
142 (void)pool->start(); // best-effort start; ignore error to keep adapter usable
143 return std::make_shared<thread_system_pool_adapter>(std::move(pool));
144}
145
146std::shared_ptr<thread_system_pool_adapter> thread_system_pool_adapter::from_service_or_default(
147 const std::string& pool_name
148) {
149 try {
150 auto& sc = kcenon::thread::service_container::global();
151 if (auto existing = sc.resolve<kcenon::thread::thread_pool>()) {
152 return std::make_shared<thread_system_pool_adapter>(std::move(existing));
153 }
154 } catch (...) {
155 // ignore and fallback
156 }
157 return create_default(pool_name);
158}
159
160bool bind_thread_system_pool_into_manager(const std::string& pool_name) {
161 try {
162 auto adapter = thread_system_pool_adapter::from_service_or_default(pool_name);
163 if (!adapter) return false;
164 thread_integration_manager::instance().set_thread_pool(adapter);
165 return true;
166 } catch (...) {
167 return false;
168 }
169}
170
171} // namespace kcenon::network::integration
172
173#pragma clang diagnostic pop
174
175#endif // KCENON_WITH_THREAD_SYSTEM
176
Feature flags for network_system.
Adapter that bridges thread_system::thread_pool to thread_pool_interface.