52 std::to_string(pool_size_) +
" connections to " + host_ +
":" +
53 std::to_string(port_));
58 NETWORK_LOG_INFO(
"[connection_pool] Pool size is 0, no connections to create");
65 std::unique_ptr<messaging_client> client;
67 std::string error_message;
70 std::vector<std::future<connect_result>> futures;
71 futures.reserve(pool_size_);
73 for (
size_t i = 0; i < pool_size_; ++i)
75 futures.push_back(std::async(std::launch::async,
76 [
this, i]() -> connect_result {
78 cr.client = std::make_unique<messaging_client>(
79 "pool_client_" + std::to_string(i));
81 auto result = cr.client->start_client(host_, port_);
85 cr.error_message = result.error().message;
97 size_t success_count = 0;
98 size_t fail_count = 0;
100 std::lock_guard<std::mutex> lock(mutex_);
101 for (
size_t i = 0; i < futures.size(); ++i)
103 auto cr = futures[i].get();
104 if (cr.success && cr.client)
106 available_.push(std::move(cr.client));
113 "[connection_pool] Connection " + std::to_string(i) +
114 " failed: " + cr.error_message);
119 if (success_count == 0)
123 "All " + std::to_string(pool_size_) +
124 " pool connections failed",
125 "connection_pool::initialize",
126 "Host: " + host_ +
", Port: " + std::to_string(port_)
133 std::to_string(success_count) +
"/" +
134 std::to_string(pool_size_) +
135 " connections (" + std::to_string(fail_count) +
" failed)");
140 std::to_string(success_count) +
" connections");
148 std::unique_lock<std::mutex> lock(mutex_);
151 if (acquire_timeout_.count() > 0)
153 if (!cv_.wait_for(lock, acquire_timeout_, [
this] {
154 return !available_.empty() || is_shutdown_.load();
158 "[connection_pool] Acquire timed out after " +
159 std::to_string(acquire_timeout_.count()) +
161 std::to_string(active_count_.load()) +
"/" +
162 std::to_string(pool_size_));
168 cv_.wait(lock, [
this] {
169 return !available_.empty() || is_shutdown_.load();
174 if (is_shutdown_.load())
180 auto client = std::move(available_.front());
182 active_count_.fetch_add(1, std::memory_order_relaxed);
185 std::to_string(active_count_.load()) +
"/" +
186 std::to_string(pool_size_));
200 if (!client->is_connected())
204 auto result = client->start_client(host_, port_);
208 result.error().message);
210 active_count_.fetch_sub(1, std::memory_order_relaxed);
217 std::lock_guard<std::mutex> lock(mutex_);
218 available_.push(std::move(client));
221 active_count_.fetch_sub(1, std::memory_order_relaxed);
227 std::to_string(active_count_.load()) +
"/" +
228 std::to_string(pool_size_));