18#include <unordered_map>
38 std::lock_guard<std::mutex> lock(
mutex_);
45 auto pool_size = std::max(32u, std::thread::hardware_concurrency() * 4);
46 thread_pool_ = std::make_shared<basic_thread_pool>(pool_size);
53 std::shared_ptr<asio::io_context> io_context,
54 const std::string& component_name
58 auto promise = std::make_shared<std::promise<void>>();
59 promise->set_exception(
60 std::make_exception_ptr(
61 std::runtime_error(
"Thread pool not available")
64 return promise->get_future();
68 auto ctx_weak = std::weak_ptr<asio::io_context>(io_context);
69 auto name = component_name.empty() ?
"unnamed" : component_name;
72 auto caller_promise = std::make_shared<std::promise<void>>();
73 auto caller_future = caller_promise->get_future();
77 std::lock_guard<std::mutex> lock(
mutex_);
81 entries_.push_back(std::move(entry));
95 pool->submit([ctx_weak, caller_promise, completed_ptr]() {
96 auto ctx = ctx_weak.lock();
99 caller_promise->set_value();
105 caller_promise->set_value();
107 caller_promise->set_exception(std::current_exception());
110 completed_ptr->fetch_add(1, std::memory_order_relaxed);
116 return caller_future;
130 std::lock_guard<std::mutex> lock(
mutex_);
132 auto ctx = entry.context.lock();
143 auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
145 while (std::chrono::steady_clock::now() < deadline) {
147 std::lock_guard<std::mutex> lock(
mutex_);
148 bool all_stopped =
true;
149 for (
const auto& entry :
entries_) {
150 auto ctx = entry.context.lock();
151 if (ctx && !ctx->stopped()) {
161 std::this_thread::sleep_for(std::chrono::milliseconds(10));
165 std::lock_guard<std::mutex> lock(
mutex_);
170 std::lock_guard<std::mutex> lock(
mutex_);
172 for (
const auto& entry :
entries_) {
173 auto ctx = entry.context.lock();
174 if (ctx && !ctx->stopped()) {
181 bool is_active(std::shared_ptr<asio::io_context> io_context)
const {
186 std::lock_guard<std::mutex> lock(
mutex_);
187 for (
const auto& entry :
entries_) {
188 auto ctx = entry.context.lock();
189 if (ctx && ctx.get() == io_context.get() && !ctx->stopped()) {
197 std::lock_guard<std::mutex> lock(
mutex_);
211 std::lock_guard<std::mutex> lock(
mutex_);
215 return entry.context.expired();
238 : pimpl_(new
impl(), [](
impl*) { }) {
244 std::shared_ptr<asio::io_context> io_context,
245 const std::string& component_name
247 return pimpl_->run_io_context(io_context, component_name);
251 std::shared_ptr<asio::io_context> io_context
253 pimpl_->stop_io_context(io_context);
265 return pimpl_->active_count();
269 std::shared_ptr<asio::io_context> io_context
271 return pimpl_->is_active(io_context);
275 std::shared_ptr<thread_pool_interface> pool
277 pimpl_->set_thread_pool(pool);
281 return pimpl_->get_metrics();
std::shared_ptr< kcenon::network::integration::thread_pool_interface > get_thread_pool()
Get current thread pool.
static network_context & instance()
Get the singleton instance.
bool is_active(std::shared_ptr< asio::io_context > io_context) const
io_context_thread_manager::metrics get_metrics() const
std::atomic< size_t > total_completed_
std::vector< context_entry > entries_
size_t active_count() const
std::shared_ptr< thread_pool_interface > thread_pool_
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name)
std::shared_ptr< thread_pool_interface > get_thread_pool()
std::atomic< size_t > total_started_
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Manages io_context execution on shared thread pools.
void stop_all()
Stop all managed io_contexts.
std::shared_ptr< impl > pimpl_
PIMPL pointer with intentional leak pattern.
metrics get_metrics() const
Get current metrics.
bool is_active(std::shared_ptr< asio::io_context > io_context) const
Check if an io_context is managed and running.
size_t active_count() const
Get the number of active io_contexts.
void stop_io_context(std::shared_ptr< asio::io_context > io_context)
Stop an io_context managed by this manager.
static io_context_thread_manager & instance()
Get the singleton instance.
void set_thread_pool(std::shared_ptr< thread_pool_interface > pool)
Set a custom thread pool.
void wait_all()
Wait for all managed io_contexts to complete.
io_context_thread_manager()
~io_context_thread_manager()
std::future< void > run_io_context(std::shared_ptr< asio::io_context > io_context, const std::string &component_name="")
Run an io_context on the shared thread pool.
Unified io_context thread management for network components.
Logger system integration interface for network_system.
Global context for shared network system resources.
std::string component_name
std::weak_ptr< asio::io_context > context
size_t total_started
Total io_contexts started.
size_t total_completed
Total io_contexts completed.
size_t active_contexts
Number of running io_contexts.