Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
thread_integration.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17
19#include <mutex>
20#include <atomic>
21#include <algorithm>
22#include <stdexcept>
23#include <thread>
24#include <queue>
25#include <condition_variable>
26
27#if KCENON_WITH_THREAD_SYSTEM
28#include <kcenon/thread/core/thread_pool.h>
29#include <kcenon/thread/core/thread_worker.h>
30#endif
31
33
34#if KCENON_WITH_THREAD_SYSTEM
35
36// basic_thread_pool implementation using thread_system::thread_pool
37class basic_thread_pool::impl {
38public:
39 impl(size_t num_threads) : completed_tasks_(0) {
40 if (num_threads == 0) {
41 num_threads = std::thread::hardware_concurrency();
42 if (num_threads == 0) num_threads = 2; // Fallback
43 }
44
45 // Intentional Leak pattern: Use no-op deleter to prevent destruction
46 // during static destruction phase. This avoids heap corruption when
47 // thread_pool's destructor accesses statically destroyed objects.
48 // Memory impact: ~few KB (reclaimed by OS on process termination)
49 auto* pool = new kcenon::thread::thread_pool("network_basic_pool");
50 pool_ = std::shared_ptr<kcenon::thread::thread_pool>(
51 pool,
52 [](kcenon::thread::thread_pool*) { /* no-op deleter - intentional leak */ }
53 );
54
55 // Add workers to the pool
56 for (size_t i = 0; i < num_threads; ++i) {
57 pool_->enqueue(std::make_unique<kcenon::thread::thread_worker>());
58 }
59
60 pool_->start();
61 }
62
63 ~impl() {
64 stop(true);
65 }
66
67 std::future<void> submit(std::function<void()> task) {
68 auto promise = std::make_shared<std::promise<void>>();
69 auto future = promise->get_future();
70
71 if (!pool_ || !pool_->is_running()) {
72 promise->set_exception(
73 std::make_exception_ptr(
74 std::runtime_error("Thread pool is not running")
75 )
76 );
77 return future;
78 }
79
80 // Note: Capture completed_tasks_ by pointer to avoid capturing 'this',
81 // which can cause heap corruption during static destruction.
82 auto* completed_ptr = &completed_tasks_;
83
84 // Use submit which returns a future and throws on failure
85 try {
86 pool_->submit([task = std::move(task), promise, completed_ptr]() mutable {
87 try {
88 if (task) task();
89 promise->set_value();
90 completed_ptr->fetch_add(1, std::memory_order_relaxed);
91 } catch (...) {
92 promise->set_exception(std::current_exception());
93 }
94 });
95 } catch (const std::exception& e) {
96 promise->set_exception(
97 std::make_exception_ptr(
98 std::runtime_error(
99 std::string("Failed to submit task to thread pool: ") + e.what()
100 )
101 )
102 );
103 }
104
105 return future;
106 }
107
108 std::future<void> submit_delayed(
109 std::function<void()> task,
110 std::chrono::milliseconds delay
111 ) {
112 // Use thread_system's submit_delayed if available (via IExecutor interface)
113#if defined(THREAD_HAS_COMMON_EXECUTOR)
114 return pool_->submit_delayed(std::move(task), delay);
115#else
116 // Fallback: submit a task that sleeps then executes
117 // Note: This is not ideal but maintains API compatibility
118 auto promise = std::make_shared<std::promise<void>>();
119 auto future = promise->get_future();
120
121 if (!pool_ || !pool_->is_running()) {
122 promise->set_exception(
123 std::make_exception_ptr(
124 std::runtime_error("Thread pool is not running")
125 )
126 );
127 return future;
128 }
129
130 // Note: Capture completed_tasks_ by pointer to avoid capturing 'this',
131 // which can cause heap corruption during static destruction.
132 auto* completed_ptr = &completed_tasks_;
133
134 // Use submit which returns a future and throws on failure
135 try {
136 pool_->submit([task = std::move(task), delay, promise, completed_ptr]() mutable {
137 try {
138 std::this_thread::sleep_for(delay);
139 if (task) task();
140 promise->set_value();
141 completed_ptr->fetch_add(1, std::memory_order_relaxed);
142 } catch (...) {
143 promise->set_exception(std::current_exception());
144 }
145 });
146 } catch (const std::exception& e) {
147 promise->set_exception(
148 std::make_exception_ptr(
149 std::runtime_error(
150 std::string("Failed to submit delayed task to thread pool: ") + e.what()
151 )
152 )
153 );
154 }
155
156 return future;
157#endif
158 }
159
160 size_t worker_count() const {
161 return pool_ ? pool_->get_active_worker_count() : 0;
162 }
163
164 bool is_running() const {
165 return pool_ && pool_->is_running();
166 }
167
168 size_t pending_tasks() const {
169 return pool_ ? pool_->get_pending_task_count() : 0;
170 }
171
172 void stop(bool wait_for_tasks) {
173 if (pool_) {
174 pool_->stop(!wait_for_tasks);
175 }
176 }
177
178 size_t get_completed_tasks() const {
179 return completed_tasks_.load();
180 }
181
182private:
183 std::shared_ptr<kcenon::thread::thread_pool> pool_;
184 std::atomic<size_t> completed_tasks_;
185};
186
187#else // !KCENON_WITH_THREAD_SYSTEM
188
189// Fallback implementation when thread_system is not available
190// This provides a minimal thread pool using std::thread
192 struct DelayedTask {
193 std::chrono::steady_clock::time_point execute_at;
194 std::function<void()> task;
195
196 bool operator>(const DelayedTask& other) const {
197 return execute_at > other.execute_at;
198 }
199 };
200
201public:
202 impl(size_t num_threads)
203 : running_(true), completed_tasks_(0) {
204
205 if (num_threads == 0) {
206 num_threads = std::thread::hardware_concurrency();
207 if (num_threads == 0) num_threads = 2; // Fallback
208 }
209
210 workers_.reserve(num_threads);
211 for (size_t i = 0; i < num_threads; ++i) {
212 workers_.emplace_back([this] { worker_loop(); });
213 }
214
215 scheduler_thread_ = std::thread([this] { scheduler_loop(); });
216 }
217
219 stop(true);
220 }
221
222 std::future<void> submit(std::function<void()> task) {
223 auto promise = std::make_shared<std::promise<void>>();
224 auto future = promise->get_future();
225
226 {
227 std::unique_lock<std::mutex> lock(queue_mutex_);
228 if (!running_) {
229 promise->set_exception(
230 std::make_exception_ptr(
231 std::runtime_error("Thread pool is not running")
232 )
233 );
234 return future;
235 }
236
237 tasks_.emplace([task, promise]() {
238 try {
239 task();
240 promise->set_value();
241 } catch (...) {
242 promise->set_exception(std::current_exception());
243 }
244 });
245 }
246
247 condition_.notify_one();
248 return future;
249 }
250
251 std::future<void> submit_delayed(
252 std::function<void()> task,
253 std::chrono::milliseconds delay
254 ) {
255 auto promise = std::make_shared<std::promise<void>>();
256 auto future = promise->get_future();
257
258 {
259 std::unique_lock<std::mutex> lock(scheduler_mutex_);
260 if (!running_) {
261 promise->set_exception(
262 std::make_exception_ptr(
263 std::runtime_error("Thread pool is not running")
264 )
265 );
266 return future;
267 }
268
269 auto execute_time = std::chrono::steady_clock::now() + delay;
270 delayed_tasks_.push({execute_time, [this, task, promise]() {
271 // Submit to main pool when due
272 submit([task, promise]() {
273 try {
274 task();
275 promise->set_value();
276 } catch (...) {
277 promise->set_exception(std::current_exception());
278 }
279 });
280 }});
281 }
282
283 scheduler_condition_.notify_one();
284 return future;
285 }
286
287 size_t worker_count() const {
288 return workers_.size();
289 }
290
291 bool is_running() const {
292 return running_.load();
293 }
294
295 size_t pending_tasks() const {
296 std::unique_lock<std::mutex> lock(queue_mutex_);
297 return tasks_.size();
298 }
299
300 void stop(bool wait_for_tasks) {
301 {
302 std::unique_lock<std::mutex> lock(queue_mutex_);
303 if (!wait_for_tasks) {
304 while (!tasks_.empty()) {
305 tasks_.pop();
306 }
307 }
308 running_ = false;
309 }
310
311 {
312 std::unique_lock<std::mutex> lock(scheduler_mutex_);
313 }
314
315 condition_.notify_all();
316 scheduler_condition_.notify_all();
317
318 for (auto& worker : workers_) {
319 if (worker.joinable()) {
320 worker.join();
321 }
322 }
323 workers_.clear();
324
325 if (scheduler_thread_.joinable()) {
326 scheduler_thread_.join();
327 }
328 }
329
330 size_t get_completed_tasks() const {
331 return completed_tasks_.load();
332 }
333
334private:
335 void worker_loop() {
336 while (true) {
337 std::function<void()> task;
338
339 {
340 std::unique_lock<std::mutex> lock(queue_mutex_);
341 condition_.wait(lock, [this] {
342 return !running_ || !tasks_.empty();
343 });
344
345 if (!running_ && tasks_.empty()) {
346 return;
347 }
348
349 if (!tasks_.empty()) {
350 task = std::move(tasks_.front());
351 tasks_.pop();
352 }
353 }
354
355 if (task) {
356 task();
358 }
359 }
360 }
361
363 while (running_) {
364 std::unique_lock<std::mutex> lock(scheduler_mutex_);
365
366 if (delayed_tasks_.empty()) {
367 scheduler_condition_.wait(lock, [this] {
368 return !running_ || !delayed_tasks_.empty();
369 });
370 } else {
371 auto now = std::chrono::steady_clock::now();
372 auto& next_task = delayed_tasks_.top();
373
374 if (now >= next_task.execute_at) {
375 auto task = next_task.task;
376 delayed_tasks_.pop();
377 lock.unlock();
378 task(); // Execute (which submits to main pool)
379 continue;
380 } else {
381 scheduler_condition_.wait_until(lock, next_task.execute_at, [this, &next_task] {
382 return !running_ || delayed_tasks_.empty() || delayed_tasks_.top().execute_at < next_task.execute_at;
383 });
384 }
385 }
386 }
387 }
388
389 std::vector<std::thread> workers_;
390 std::thread scheduler_thread_;
391
392 std::queue<std::function<void()>> tasks_;
393 mutable std::mutex queue_mutex_;
394 std::condition_variable condition_;
395
396 std::priority_queue<DelayedTask, std::vector<DelayedTask>, std::greater<DelayedTask>> delayed_tasks_;
398 std::condition_variable scheduler_condition_;
399
400 std::atomic<bool> running_;
401 std::atomic<size_t> completed_tasks_;
402};
403
404#endif // KCENON_WITH_THREAD_SYSTEM
405
407 // Intentional Leak pattern: Use no-op deleter to prevent destruction
408 // during static destruction phase. This avoids heap corruption when
409 // worker threads may still access the impl's members (queue, mutex, etc.)
410 // Memory impact: ~few KB (reclaimed by OS on process termination)
411 : pimpl_(new impl(num_threads), [](impl*) { /* no-op deleter - intentional leak */ }) {
412}
413
415
416std::future<void> basic_thread_pool::submit(std::function<void()> task) {
417 return pimpl_->submit(task);
418}
419
421 std::function<void()> task,
422 std::chrono::milliseconds delay
423) {
424 return pimpl_->submit_delayed(task, delay);
425}
426
428 return pimpl_->worker_count();
429}
430
432 return pimpl_->is_running();
433}
434
436 return pimpl_->pending_tasks();
437}
438
439void basic_thread_pool::stop(bool wait_for_tasks) {
440 pimpl_->stop(wait_for_tasks);
441}
442
444 return pimpl_->get_completed_tasks();
445}
446
447// thread_integration_manager implementation
449public:
450 impl() = default;
451
452 void set_thread_pool(std::shared_ptr<thread_pool_interface> pool) {
453 std::unique_lock<std::mutex> lock(mutex_);
454 thread_pool_ = pool;
455 }
456
457 std::shared_ptr<thread_pool_interface> get_thread_pool() {
458 std::unique_lock<std::mutex> lock(mutex_);
459 if (!thread_pool_) {
460 // When thread_system is available, basic_thread_pool now internally
461 // uses thread_system::thread_pool, so creating basic_thread_pool
462 // automatically gets the benefits of thread_system.
463 thread_pool_ = std::make_shared<basic_thread_pool>();
464 }
465 return thread_pool_;
466 }
467
468 std::future<void> submit_task(std::function<void()> task) {
469 return get_thread_pool()->submit(task);
470 }
471
472 std::future<void> submit_delayed_task(
473 std::function<void()> task,
474 std::chrono::milliseconds delay
475 ) {
476 return get_thread_pool()->submit_delayed(task, delay);
477 }
478
480 std::unique_lock<std::mutex> lock(mutex_);
481
482 metrics m;
483 if (thread_pool_) {
484 m.worker_threads = thread_pool_->worker_count();
485 m.pending_tasks = thread_pool_->pending_tasks();
486 m.is_running = thread_pool_->is_running();
487
488 if (auto* basic = dynamic_cast<basic_thread_pool*>(thread_pool_.get())) {
489 m.completed_tasks = basic->completed_tasks();
490 }
491 }
492
493 return m;
494 }
495
496private:
497 mutable std::mutex mutex_;
498 std::shared_ptr<thread_pool_interface> thread_pool_;
499};
500
505
507 // Intentional Leak pattern: Use no-op deleter to prevent destruction
508 // during static destruction phase. This avoids heap corruption when
509 // thread pool tasks may still reference the impl's members.
510 // Memory impact: ~few KB (reclaimed by OS on process termination)
511 : pimpl_(new impl(), [](impl*) { /* no-op deleter - intentional leak */ }) {
512}
513
515
517 std::shared_ptr<thread_pool_interface> pool
518) {
519 pimpl_->set_thread_pool(pool);
520}
521
522std::shared_ptr<thread_pool_interface> thread_integration_manager::get_thread_pool() {
523 return pimpl_->get_thread_pool();
524}
525
527 std::function<void()> task
528) {
529 return pimpl_->submit_task(task);
530}
531
533 std::function<void()> task,
534 std::chrono::milliseconds delay
535) {
536 return pimpl_->submit_delayed_task(task, delay);
537}
538
542
543} // namespace kcenon::network::integration
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay)
std::priority_queue< DelayedTask, std::vector< DelayedTask >, std::greater< DelayedTask > > delayed_tasks_
std::future< void > submit(std::function< void()> task)
Basic thread pool implementation for standalone use.
size_t worker_count() const override
Get the number of worker threads.
void stop(bool wait_for_tasks=true)
Stop the thread pool.
bool is_running() const override
Check if the thread pool is running.
size_t completed_tasks() const
Get completed tasks count.
basic_thread_pool(size_t num_threads=0)
Construct with specified number of threads.
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
std::future< void > submit_delayed(std::function< void()> task, std::chrono::milliseconds delay) override
Submit a task with delay.
size_t pending_tasks() const override
Get pending task count.
std::future< void > submit(std::function< void()> task) override
Submit a task to the thread pool.
std::future< void > submit_task(std::function< void()> task)
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
std::future< void > submit_delayed_task(std::function< void()> task, std::chrono::milliseconds delay)
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
Set the thread pool implementation.
std::future< void > submit_delayed_task(std::function< void()> task, std::chrono::milliseconds delay)
Submit a task with delay.
static thread_integration_manager & instance()
Get the singleton instance.
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
std::future< void > submit_task(std::function< void()> task)
Submit a task to the thread pool.
std::shared_ptr< thread_pool_interface > get_thread_pool()
Get the current thread pool.
Feature flags for network_system.
Thread system integration interface for network_system.