Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
resilient_client.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#include "resilient_client.h"
6
7#include <thread>
8
10
12{
13
14 resilient_client::resilient_client(const std::string& client_id,
15 const std::string& host,
16 unsigned short port,
17 size_t max_retries,
18 std::chrono::milliseconds initial_backoff
19#ifdef WITH_COMMON_SYSTEM
20 , common::resilience::circuit_breaker_config cb_config
21#endif
22 )
23 : host_(host)
24 , port_(port)
25 , max_retries_(max_retries)
26 , initial_backoff_(initial_backoff)
27#ifdef WITH_COMMON_SYSTEM
28 , circuit_breaker_(std::make_unique<common::resilience::circuit_breaker>(cb_config))
29#endif
30 {
31 client_ = std::make_shared<core::messaging_client>(client_id);
32#ifdef WITH_COMMON_SYSTEM
33 NETWORK_LOG_INFO("[resilient_client] Created with max_retries=" +
34 std::to_string(max_retries) + ", initial_backoff=" +
35 std::to_string(initial_backoff.count()) + "ms" +
36 ", circuit_breaker failure_threshold=" +
37 std::to_string(cb_config.failure_threshold));
38#else
39 NETWORK_LOG_INFO("[resilient_client] Created with max_retries=" +
40 std::to_string(max_retries) + ", initial_backoff=" +
41 std::to_string(initial_backoff.count()) + "ms");
42#endif
43 }
44
46 {
47 try
48 {
49 if (is_connected_.load())
50 {
51 (void)disconnect();
52 }
53 }
54 catch (...)
55 {
56 // Destructor must not throw
57 }
58 }
59
61 {
62 if (is_connected_.load())
63 {
64 return ok();
65 }
66
67 for (size_t attempt = 1; attempt <= max_retries_; ++attempt)
68 {
69 NETWORK_LOG_INFO("[resilient_client] Connection attempt " +
70 std::to_string(attempt) + "/" + std::to_string(max_retries_));
71
72 // Invoke reconnect callback if set
73 if (reconnect_callback_)
74 {
75 reconnect_callback_(attempt);
76 }
77
78 // Attempt to connect
79 auto result = client_->start_client(host_, port_);
80 if (!result.is_err())
81 {
82 is_connected_.store(true);
83 NETWORK_LOG_INFO("[resilient_client] Connected successfully on attempt " +
84 std::to_string(attempt));
85 return ok();
86 }
87
88 NETWORK_LOG_WARN("[resilient_client] Connection attempt " +
89 std::to_string(attempt) + " failed: " + result.error().message);
90
91 // Don't sleep after last failed attempt
92 if (attempt < max_retries_)
93 {
94 auto backoff = calculate_backoff(attempt);
95 NETWORK_LOG_INFO("[resilient_client] Backing off for " +
96 std::to_string(backoff.count()) + "ms");
97 std::this_thread::sleep_for(backoff);
98 }
99 }
100
101 return error_void(
103 "Failed to connect after " + std::to_string(max_retries_) + " attempts",
104 "resilient_client::connect",
105 "Host: " + host_ + ", Port: " + std::to_string(port_)
106 );
107 }
108
110 {
111 if (!is_connected_.load())
112 {
113 return ok();
114 }
115
116 auto result = client_->stop_client();
117 if (!result.is_err())
118 {
119 is_connected_.store(false);
120
121 // Invoke disconnect callback if set
122 if (disconnect_callback_)
123 {
124 disconnect_callback_();
125 }
126
127 NETWORK_LOG_INFO("[resilient_client] Disconnected successfully");
128 }
129 else
130 {
131 NETWORK_LOG_ERROR("[resilient_client] Failed to disconnect: " +
132 result.error().message);
133 }
134
135 return result;
136 }
137
138 auto resilient_client::send_with_retry(std::vector<uint8_t>&& data) -> VoidResult
139 {
140#ifdef WITH_COMMON_SYSTEM
141 // Check circuit breaker first
142 if (!circuit_breaker_->allow_request())
143 {
144 NETWORK_LOG_WARN("[resilient_client] Circuit breaker is open, failing fast");
145 return error_void(
147 "Circuit breaker is open",
148 "resilient_client::send_with_retry",
149 "Circuit state: " + common::resilience::to_string(circuit_breaker_->get_state())
150 );
151 }
152#endif
153
154 // Keep a copy of the data for retries
155 auto data_copy = data;
156
157 for (size_t attempt = 1; attempt <= max_retries_; ++attempt)
158 {
159 // Check if connected
160 if (!is_connected_.load() || !client_->is_connected())
161 {
162 NETWORK_LOG_WARN("[resilient_client] Not connected, attempting reconnection");
163
164 // Attempt to reconnect
165 auto reconnect_result = reconnect();
166 if (reconnect_result.is_err())
167 {
168 NETWORK_LOG_ERROR("[resilient_client] Reconnection failed: " +
169 reconnect_result.error().message);
170
171#ifdef WITH_COMMON_SYSTEM
172 // Record failure to circuit breaker
173 circuit_breaker_->record_failure();
174#endif
175
176 // Exponential backoff before next retry
177 if (attempt < max_retries_)
178 {
179 auto backoff = calculate_backoff(attempt);
180 NETWORK_LOG_INFO("[resilient_client] Backing off for " +
181 std::to_string(backoff.count()) + "ms");
182 std::this_thread::sleep_for(backoff);
183 }
184 continue;
185 }
186 }
187
188 // Try to send
189 std::vector<uint8_t> send_data = data_copy; // Make a copy for this attempt
190 auto result = client_->send_packet(std::move(send_data));
191
192 if (!result.is_err())
193 {
194#ifdef WITH_COMMON_SYSTEM
195 // Record success to circuit breaker
196 circuit_breaker_->record_success();
197#endif
198
199 NETWORK_LOG_DEBUG("[resilient_client] Sent " +
200 std::to_string(data_copy.size()) + " bytes successfully");
201 return ok();
202 }
203
204 NETWORK_LOG_WARN("[resilient_client] Send attempt " +
205 std::to_string(attempt) + " failed: " + result.error().message);
206
207#ifdef WITH_COMMON_SYSTEM
208 // Record failure to circuit breaker
209 circuit_breaker_->record_failure();
210#endif
211
212 // Mark as disconnected if send failed
213 is_connected_.store(false);
214
215 // Disconnect the client
216 (void)client_->stop_client();
217
218 // Invoke disconnect callback if set
219 if (disconnect_callback_)
220 {
221 disconnect_callback_();
222 }
223
224 // Don't sleep after last failed attempt
225 if (attempt < max_retries_)
226 {
227 auto backoff = calculate_backoff(attempt);
228 NETWORK_LOG_INFO("[resilient_client] Backing off for " +
229 std::to_string(backoff.count()) + "ms");
230 std::this_thread::sleep_for(backoff);
231 }
232 }
233
234 return error_void(
236 "Failed to send after " + std::to_string(max_retries_) + " attempts",
237 "resilient_client::send_with_retry",
238 "Data size: " + std::to_string(data_copy.size()) + " bytes"
239 );
240 }
241
242 auto resilient_client::is_connected() const noexcept -> bool
243 {
244 return is_connected_.load() && client_->is_connected();
245 }
246
248 std::function<void(size_t)> callback) -> void
249 {
250 reconnect_callback_ = std::move(callback);
251 }
252
254 std::function<void()> callback) -> void
255 {
256 disconnect_callback_ = std::move(callback);
257 }
258
260 -> std::shared_ptr<core::messaging_client>
261 {
262 return client_;
263 }
264
266 {
267 NETWORK_LOG_INFO("[resilient_client] Attempting reconnection");
268
269 // Ensure client is disconnected first
270 if (client_->is_connected())
271 {
272 (void)client_->stop_client();
273 }
274
275 // Try to reconnect with retry logic
276 for (size_t attempt = 1; attempt <= max_retries_; ++attempt)
277 {
278 if (reconnect_callback_)
279 {
280 reconnect_callback_(attempt);
281 }
282
283 auto result = client_->start_client(host_, port_);
284 if (!result.is_err())
285 {
286 is_connected_.store(true);
287 NETWORK_LOG_INFO("[resilient_client] Reconnected successfully on attempt " +
288 std::to_string(attempt));
289 return ok();
290 }
291
292 NETWORK_LOG_WARN("[resilient_client] Reconnection attempt " +
293 std::to_string(attempt) + " failed: " + result.error().message);
294
295 if (attempt < max_retries_)
296 {
297 auto backoff = calculate_backoff(attempt);
298 std::this_thread::sleep_for(backoff);
299 }
300 }
301
302 return error_void(
304 "Failed to reconnect after " + std::to_string(max_retries_) + " attempts",
305 "resilient_client::reconnect",
306 "Host: " + host_ + ", Port: " + std::to_string(port_)
307 );
308 }
309
310 auto resilient_client::calculate_backoff(size_t attempt) const
311 -> std::chrono::milliseconds
312 {
313 // Exponential backoff: initial_backoff * 2^(attempt-1)
314 // Capped at 30 seconds to prevent excessive delays
315 auto backoff = initial_backoff_ * (1 << (attempt - 1));
316 auto max_backoff = std::chrono::duration_cast<std::chrono::milliseconds>(
317 std::chrono::seconds(30));
318
319 return std::min(backoff, max_backoff);
320 }
321
322#ifdef WITH_COMMON_SYSTEM
323 auto resilient_client::circuit_state() const -> common::resilience::circuit_state
324 {
325 return circuit_breaker_->get_state();
326 }
327#endif
328
329} // namespace kcenon::network::utils
auto reconnect() -> VoidResult
Attempts to reconnect with exponential backoff.
auto set_reconnect_callback(std::function< void(size_t attempt)> callback) -> void
Sets callback for reconnection events.
std::shared_ptr< core::messaging_client > client_
~resilient_client() noexcept
Destructor - disconnects client if still connected.
auto calculate_backoff(size_t attempt) const -> std::chrono::milliseconds
Calculates backoff duration for given attempt.
resilient_client(const std::string &client_id, const std::string &host, unsigned short port, size_t max_retries=3, std::chrono::milliseconds initial_backoff=std::chrono::seconds(1))
Constructs a resilient client with reconnection support.
auto disconnect() -> VoidResult
Disconnects from the server.
auto connect() -> VoidResult
Connects to the server with retry logic.
auto send_with_retry(std::vector< uint8_t > &&data) -> VoidResult
Sends data with automatic reconnection on failure.
auto is_connected() const noexcept -> bool
Checks if currently connected to server.
auto get_client() const -> std::shared_ptr< core::messaging_client >
Gets the underlying messaging client.
auto set_disconnect_callback(std::function< void()> callback) -> void
Sets callback for connection loss events.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
Utility components for network_system.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")
VoidResult ok()