15 const std::string& host,
18 std::chrono::milliseconds initial_backoff
19#ifdef WITH_COMMON_SYSTEM
20 , common::resilience::circuit_breaker_config cb_config
25 , max_retries_(max_retries)
26 , initial_backoff_(initial_backoff)
27#ifdef WITH_COMMON_SYSTEM
28 , circuit_breaker_(std::make_unique<common::resilience::circuit_breaker>(cb_config))
31 client_ = std::make_shared<core::messaging_client>(client_id);
32#ifdef WITH_COMMON_SYSTEM
34 std::to_string(max_retries) +
", initial_backoff=" +
35 std::to_string(initial_backoff.count()) +
"ms" +
36 ", circuit_breaker failure_threshold=" +
37 std::to_string(cb_config.failure_threshold));
40 std::to_string(max_retries) +
", initial_backoff=" +
41 std::to_string(initial_backoff.count()) +
"ms");
62 if (is_connected_.load())
67 for (
size_t attempt = 1; attempt <= max_retries_; ++attempt)
70 std::to_string(attempt) +
"/" + std::to_string(max_retries_));
73 if (reconnect_callback_)
75 reconnect_callback_(attempt);
79 auto result = client_->start_client(host_, port_);
82 is_connected_.store(
true);
84 std::to_string(attempt));
89 std::to_string(attempt) +
" failed: " + result.error().message);
92 if (attempt < max_retries_)
94 auto backoff = calculate_backoff(attempt);
96 std::to_string(backoff.count()) +
"ms");
97 std::this_thread::sleep_for(backoff);
103 "Failed to connect after " + std::to_string(max_retries_) +
" attempts",
104 "resilient_client::connect",
105 "Host: " + host_ +
", Port: " + std::to_string(port_)
111 if (!is_connected_.load())
116 auto result = client_->stop_client();
117 if (!result.is_err())
119 is_connected_.store(
false);
122 if (disconnect_callback_)
124 disconnect_callback_();
132 result.error().message);
140#ifdef WITH_COMMON_SYSTEM
142 if (!circuit_breaker_->allow_request())
144 NETWORK_LOG_WARN(
"[resilient_client] Circuit breaker is open, failing fast");
147 "Circuit breaker is open",
148 "resilient_client::send_with_retry",
149 "Circuit state: " + common::resilience::to_string(circuit_breaker_->get_state())
155 auto data_copy = data;
157 for (
size_t attempt = 1; attempt <= max_retries_; ++attempt)
160 if (!is_connected_.load() || !client_->is_connected())
162 NETWORK_LOG_WARN(
"[resilient_client] Not connected, attempting reconnection");
165 auto reconnect_result = reconnect();
166 if (reconnect_result.is_err())
169 reconnect_result.error().message);
171#ifdef WITH_COMMON_SYSTEM
173 circuit_breaker_->record_failure();
177 if (attempt < max_retries_)
179 auto backoff = calculate_backoff(attempt);
181 std::to_string(backoff.count()) +
"ms");
182 std::this_thread::sleep_for(backoff);
189 std::vector<uint8_t> send_data = data_copy;
190 auto result = client_->send_packet(std::move(send_data));
192 if (!result.is_err())
194#ifdef WITH_COMMON_SYSTEM
196 circuit_breaker_->record_success();
200 std::to_string(data_copy.size()) +
" bytes successfully");
205 std::to_string(attempt) +
" failed: " + result.error().message);
207#ifdef WITH_COMMON_SYSTEM
209 circuit_breaker_->record_failure();
213 is_connected_.store(
false);
216 (void)client_->stop_client();
219 if (disconnect_callback_)
221 disconnect_callback_();
225 if (attempt < max_retries_)
227 auto backoff = calculate_backoff(attempt);
229 std::to_string(backoff.count()) +
"ms");
230 std::this_thread::sleep_for(backoff);
236 "Failed to send after " + std::to_string(max_retries_) +
" attempts",
237 "resilient_client::send_with_retry",
238 "Data size: " + std::to_string(data_copy.size()) +
" bytes"
248 std::function<
void(
size_t)> callback) ->
void
250 reconnect_callback_ = std::move(callback);
254 std::function<
void()> callback) ->
void
256 disconnect_callback_ = std::move(callback);
260 -> std::shared_ptr<core::messaging_client>
270 if (client_->is_connected())
272 (void)client_->stop_client();
276 for (
size_t attempt = 1; attempt <= max_retries_; ++attempt)
278 if (reconnect_callback_)
280 reconnect_callback_(attempt);
283 auto result = client_->start_client(host_, port_);
284 if (!result.is_err())
286 is_connected_.store(
true);
287 NETWORK_LOG_INFO(
"[resilient_client] Reconnected successfully on attempt " +
288 std::to_string(attempt));
293 std::to_string(attempt) +
" failed: " + result.error().message);
295 if (attempt < max_retries_)
297 auto backoff = calculate_backoff(attempt);
298 std::this_thread::sleep_for(backoff);
304 "Failed to reconnect after " + std::to_string(max_retries_) +
" attempts",
305 "resilient_client::reconnect",
306 "Host: " + host_ +
", Port: " + std::to_string(port_)
311 -> std::chrono::milliseconds
315 auto backoff = initial_backoff_ * (1 << (attempt - 1));
316 auto max_backoff = std::chrono::duration_cast<std::chrono::milliseconds>(
317 std::chrono::seconds(30));
319 return std::min(backoff, max_backoff);
322#ifdef WITH_COMMON_SYSTEM
323 auto resilient_client::circuit_state() const -> common::resilience::circuit_state
325 return circuit_breaker_->get_state();
auto reconnect() -> VoidResult
Attempts to reconnect with exponential backoff.
auto set_reconnect_callback(std::function< void(size_t attempt)> callback) -> void
Sets callback for reconnection events.
std::shared_ptr< core::messaging_client > client_
~resilient_client() noexcept
Destructor - disconnects client if still connected.
auto calculate_backoff(size_t attempt) const -> std::chrono::milliseconds
Calculates backoff duration for given attempt.
resilient_client(const std::string &client_id, const std::string &host, unsigned short port, size_t max_retries=3, std::chrono::milliseconds initial_backoff=std::chrono::seconds(1))
Constructs a resilient client with reconnection support.
auto disconnect() -> VoidResult
Disconnects from the server.
auto connect() -> VoidResult
Connects to the server with retry logic.
auto send_with_retry(std::vector< uint8_t > &&data) -> VoidResult
Sends data with automatic reconnection on failure.
std::atomic< bool > is_connected_
auto is_connected() const noexcept -> bool
Checks if currently connected to server.
auto get_client() const -> std::shared_ptr< core::messaging_client >
Gets the underlying messaging client.
auto set_disconnect_callback(std::function< void()> callback) -> void
Sets callback for connection loss events.
Logger system integration interface for network_system.
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_INFO(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
constexpr int send_failed
constexpr int connection_failed
constexpr int circuit_open
Utility components for network_system.
VoidResult error_void(int code, const std::string &message, const std::string &source="network_system", const std::string &details="")