Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
io_context_thread_manager.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16
17#include <atomic>
18#include <unordered_map>
19#include <vector>
20
22
24public:
26 std::weak_ptr<asio::io_context> context;
27 std::string component_name;
28 };
29
31
33 stop_all();
34 wait_all();
35 }
36
37 std::shared_ptr<thread_pool_interface> get_thread_pool() {
38 std::lock_guard<std::mutex> lock(mutex_);
39 if (!thread_pool_) {
41 if (!thread_pool_) {
42 // Fallback to basic thread pool with size for concurrent io_contexts
43 // Tests like ConnectionScaling need 20+ clients + server
44 // Use larger size to avoid thread exhaustion
45 auto pool_size = std::max(32u, std::thread::hardware_concurrency() * 4);
46 thread_pool_ = std::make_shared<basic_thread_pool>(pool_size);
47 }
48 }
49 return thread_pool_;
50 }
51
52 std::future<void> run_io_context(
53 std::shared_ptr<asio::io_context> io_context,
54 const std::string& component_name
55 ) {
56 auto pool = get_thread_pool();
57 if (!pool) {
58 auto promise = std::make_shared<std::promise<void>>();
59 promise->set_exception(
60 std::make_exception_ptr(
61 std::runtime_error("Thread pool not available")
62 )
63 );
64 return promise->get_future();
65 }
66
67 // Submit io_context::run to thread pool
68 auto ctx_weak = std::weak_ptr<asio::io_context>(io_context);
69 auto name = component_name.empty() ? "unnamed" : component_name;
70
71 // Create a shared promise for the caller's future
72 auto caller_promise = std::make_shared<std::promise<void>>();
73 auto caller_future = caller_promise->get_future();
74
75 // Track the entry for metrics and stop_all functionality
76 {
77 std::lock_guard<std::mutex> lock(mutex_);
78 context_entry entry;
79 entry.context = io_context;
80 entry.component_name = name;
81 entries_.push_back(std::move(entry));
82 total_started_.fetch_add(1, std::memory_order_relaxed);
83 }
84
85 // Submit the io_context::run task - this is the ONLY task we submit
86 // No separate monitoring task to avoid thread pool exhaustion
87 // Note: Capture total_completed_ by pointer to avoid capturing 'this',
88 // which can cause heap corruption during static destruction.
89 auto* completed_ptr = &total_completed_;
90 // Submit io_context::run task to thread pool
91 // IMPORTANT: Avoid logging inside this task to prevent heap corruption during
92 // static destruction. When common_system's GlobalLoggerRegistry is destroyed
93 // before this thread pool task completes, logging causes heap corruption.
94 // Use detail::static_destruction_guard::is_logging_safe() for any necessary logging.
95 pool->submit([ctx_weak, caller_promise, completed_ptr]() {
96 auto ctx = ctx_weak.lock();
97 if (!ctx) {
98 // io_context expired - this can happen during rapid shutdown
99 caller_promise->set_value();
100 return;
101 }
102
103 try {
104 ctx->run();
105 caller_promise->set_value();
106 } catch (...) {
107 caller_promise->set_exception(std::current_exception());
108 }
109
110 completed_ptr->fetch_add(1, std::memory_order_relaxed);
111 });
112
113 // Clean up expired entries periodically
115
116 return caller_future;
117 }
118
119 void stop_io_context(std::shared_ptr<asio::io_context> io_context) {
120 // NOTE: No logging here to prevent heap corruption during static
121 // destruction. This may be called when GlobalLoggerRegistry is destroyed.
122 if (io_context) {
123 io_context->stop();
124 }
125 }
126
127 void stop_all() {
128 // NOTE: No logging here to prevent heap corruption during static
129 // destruction. This may be called when GlobalLoggerRegistry is destroyed.
130 std::lock_guard<std::mutex> lock(mutex_);
131 for (auto& entry : entries_) {
132 auto ctx = entry.context.lock();
133 if (ctx) {
134 ctx->stop();
135 }
136 }
137 }
138
139 void wait_all() {
140 // Since futures are returned directly to callers, wait_all just
141 // waits for all tracked io_contexts to be stopped.
142 // The callers should wait on their own futures.
143 auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
144
145 while (std::chrono::steady_clock::now() < deadline) {
146 {
147 std::lock_guard<std::mutex> lock(mutex_);
148 bool all_stopped = true;
149 for (const auto& entry : entries_) {
150 auto ctx = entry.context.lock();
151 if (ctx && !ctx->stopped()) {
152 all_stopped = false;
153 break;
154 }
155 }
156 if (all_stopped) {
157 entries_.clear();
158 break;
159 }
160 }
161 std::this_thread::sleep_for(std::chrono::milliseconds(10));
162 }
163
164 // Clear any remaining expired entries
165 std::lock_guard<std::mutex> lock(mutex_);
166 entries_.clear();
167 }
168
169 size_t active_count() const {
170 std::lock_guard<std::mutex> lock(mutex_);
171 size_t count = 0;
172 for (const auto& entry : entries_) {
173 auto ctx = entry.context.lock();
174 if (ctx && !ctx->stopped()) {
175 ++count;
176 }
177 }
178 return count;
179 }
180
181 bool is_active(std::shared_ptr<asio::io_context> io_context) const {
182 if (!io_context) {
183 return false;
184 }
185
186 std::lock_guard<std::mutex> lock(mutex_);
187 for (const auto& entry : entries_) {
188 auto ctx = entry.context.lock();
189 if (ctx && ctx.get() == io_context.get() && !ctx->stopped()) {
190 return true;
191 }
192 }
193 return false;
194 }
195
196 void set_thread_pool(std::shared_ptr<thread_pool_interface> pool) {
197 std::lock_guard<std::mutex> lock(mutex_);
198 thread_pool_ = pool;
199 }
200
204 m.total_started = total_started_.load(std::memory_order_relaxed);
205 m.total_completed = total_completed_.load(std::memory_order_relaxed);
206 return m;
207 }
208
209private:
211 std::lock_guard<std::mutex> lock(mutex_);
212 entries_.erase(
213 std::remove_if(entries_.begin(), entries_.end(),
214 [](const context_entry& entry) {
215 return entry.context.expired();
216 }),
217 entries_.end()
218 );
219 }
220
221 mutable std::mutex mutex_;
222 std::shared_ptr<thread_pool_interface> thread_pool_;
223 std::vector<context_entry> entries_;
224 std::atomic<size_t> total_started_;
225 std::atomic<size_t> total_completed_;
226};
227
232
234 // Intentional Leak pattern: Use no-op deleter to prevent destruction
235 // during static destruction phase. This avoids heap corruption when
236 // thread pool tasks still reference impl's members (e.g., completed counter).
237 // Memory impact: ~few KB (reclaimed by OS on process termination)
238 : pimpl_(new impl(), [](impl*) { /* no-op deleter - intentional leak */ }) {
239}
240
242
244 std::shared_ptr<asio::io_context> io_context,
245 const std::string& component_name
246) {
247 return pimpl_->run_io_context(io_context, component_name);
248}
249
251 std::shared_ptr<asio::io_context> io_context
252) {
253 pimpl_->stop_io_context(io_context);
254}
255
257 pimpl_->stop_all();
258}
259
261 pimpl_->wait_all();
262}
263
265 return pimpl_->active_count();
266}
267
269 std::shared_ptr<asio::io_context> io_context
270) const {
271 return pimpl_->is_active(io_context);
272}
273
275 std::shared_ptr<thread_pool_interface> pool
276) {
277 pimpl_->set_thread_pool(pool);
278}
279
283
284} // namespace kcenon::network::integration
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
bool is_active(std::shared_ptr< asio::io_context > io_context) const
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name)
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Manages io_context execution on shared thread pools.
Definition core.cppm:180
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
bool is_active(std::shared_ptr< asio::io_context > io_context) const
Check if an io_context is managed and running.
size_t active_count() const
Get the number of active io_contexts.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
Set a custom thread pool.
void wait_all()
Wait for all managed io_contexts to complete.
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
Global context for shared network system resources.
std::string component_name
std::weak_ptr< asio::io_context > context