Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
connection_pool.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
8#include <future>
9
11{
12
13 connection_pool::connection_pool(std::string host, unsigned short port,
14 size_t pool_size,
15 std::chrono::seconds acquire_timeout)
16 : host_(std::move(host)), port_(port), pool_size_(pool_size),
17 acquire_timeout_(acquire_timeout)
18 {
19 }
20
22 {
23 try
24 {
25 // Set shutdown flag
26 is_shutdown_.store(true);
27
28 // Notify all waiting threads
29 cv_.notify_all();
30
31 // Stop all connections
32 std::lock_guard<std::mutex> lock(mutex_);
33 while (!available_.empty())
34 {
35 auto client = std::move(available_.front());
36 available_.pop();
37 if (client)
38 {
39 (void)client->stop_client();
40 }
41 }
42 }
43 catch (...)
44 {
45 // Destructor must not throw
46 }
47 }
48
50 {
51 NETWORK_LOG_INFO("[connection_pool] Initializing pool with " +
52 std::to_string(pool_size_) + " connections to " + host_ + ":" +
53 std::to_string(port_));
54
55 // Zero-size pool requires no connections — succeed immediately
56 if (pool_size_ == 0)
57 {
58 NETWORK_LOG_INFO("[connection_pool] Pool size is 0, no connections to create");
59 return ok();
60 }
61
62 // Launch all connections in parallel using std::async
63 struct connect_result
64 {
65 std::unique_ptr<messaging_client> client;
66 bool success = false;
67 std::string error_message;
68 };
69
70 std::vector<std::future<connect_result>> futures;
71 futures.reserve(pool_size_);
72
73 for (size_t i = 0; i < pool_size_; ++i)
74 {
75 futures.push_back(std::async(std::launch::async,
76 [this, i]() -> connect_result {
77 connect_result cr;
78 cr.client = std::make_unique<messaging_client>(
79 "pool_client_" + std::to_string(i));
80
81 auto result = cr.client->start_client(host_, port_);
82 if (result.is_err())
83 {
84 cr.success = false;
85 cr.error_message = result.error().message;
86 cr.client.reset();
87 }
88 else
89 {
90 cr.success = true;
91 }
92 return cr;
93 }));
94 }
95
96 // Collect results
97 size_t success_count = 0;
98 size_t fail_count = 0;
99 {
100 std::lock_guard<std::mutex> lock(mutex_);
101 for (size_t i = 0; i < futures.size(); ++i)
102 {
103 auto cr = futures[i].get();
104 if (cr.success && cr.client)
105 {
106 available_.push(std::move(cr.client));
107 ++success_count;
108 }
109 else
110 {
111 ++fail_count;
113 "[connection_pool] Connection " + std::to_string(i) +
114 " failed: " + cr.error_message);
115 }
116 }
117 }
118
119 if (success_count == 0)
120 {
121 return error_void(
122 -1,
123 "All " + std::to_string(pool_size_) +
124 " pool connections failed",
125 "connection_pool::initialize",
126 "Host: " + host_ + ", Port: " + std::to_string(port_)
127 );
128 }
129
130 if (fail_count > 0)
131 {
132 NETWORK_LOG_WARN("[connection_pool] Initialized with " +
133 std::to_string(success_count) + "/" +
134 std::to_string(pool_size_) +
135 " connections (" + std::to_string(fail_count) + " failed)");
136 }
137 else
138 {
139 NETWORK_LOG_INFO("[connection_pool] Successfully initialized " +
140 std::to_string(success_count) + " connections");
141 }
142
143 return ok();
144 }
145
146 auto connection_pool::acquire() -> std::unique_ptr<messaging_client>
147 {
148 std::unique_lock<std::mutex> lock(mutex_);
149
150 // Wait until a connection is available, shutdown, or timeout
151 if (acquire_timeout_.count() > 0)
152 {
153 if (!cv_.wait_for(lock, acquire_timeout_, [this] {
154 return !available_.empty() || is_shutdown_.load();
155 }))
156 {
158 "[connection_pool] Acquire timed out after " +
159 std::to_string(acquire_timeout_.count()) +
160 "s. Active: " +
161 std::to_string(active_count_.load()) + "/" +
162 std::to_string(pool_size_));
163 return nullptr;
164 }
165 }
166 else
167 {
168 cv_.wait(lock, [this] {
169 return !available_.empty() || is_shutdown_.load();
170 });
171 }
172
173 // Check if shutdown
174 if (is_shutdown_.load())
175 {
176 return nullptr;
177 }
178
179 // Get connection from queue
180 auto client = std::move(available_.front());
181 available_.pop();
182 active_count_.fetch_add(1, std::memory_order_relaxed);
183
184 NETWORK_LOG_DEBUG("[connection_pool] Acquired connection. Active: " +
185 std::to_string(active_count_.load()) + "/" +
186 std::to_string(pool_size_));
187
188 return client;
189 }
190
191 auto connection_pool::release(std::unique_ptr<messaging_client> client)
192 -> void
193 {
194 if (!client)
195 {
196 return;
197 }
198
199 // Check if client is still connected, reconnect if necessary
200 if (!client->is_connected())
201 {
202 NETWORK_LOG_WARN("[connection_pool] Connection lost, reconnecting...");
203
204 auto result = client->start_client(host_, port_);
205 if (result.is_err())
206 {
207 NETWORK_LOG_ERROR("[connection_pool] Failed to reconnect: " +
208 result.error().message);
209 // Drop this connection and decrement active count
210 active_count_.fetch_sub(1, std::memory_order_relaxed);
211 return;
212 }
213 }
214
215 // Return to pool
216 {
217 std::lock_guard<std::mutex> lock(mutex_);
218 available_.push(std::move(client));
219 }
220
221 active_count_.fetch_sub(1, std::memory_order_relaxed);
222
223 // Notify one waiting thread
224 cv_.notify_one();
225
226 NETWORK_LOG_DEBUG("[connection_pool] Released connection. Active: " +
227 std::to_string(active_count_.load()) + "/" +
228 std::to_string(pool_size_));
229 }
230
231} // namespace kcenon::network::core
~connection_pool() noexcept
Destructor. Stops all connections and cleans up resources.
connection_pool(std::string host, unsigned short port, size_t pool_size=10, std::chrono::seconds acquire_timeout=std::chrono::seconds(30))
Constructs a connection pool.
auto acquire() -> std::unique_ptr< messaging_client >
Acquires a connection from the pool.
std::queue< std::unique_ptr< messaging_client > > available_
auto initialize() -> VoidResult
Initializes the pool by creating and connecting all clients.
auto release(std::unique_ptr< messaging_client > client) -> void
Releases a connection back to the pool.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()