132 auto tcp_socket::do_read() ->
void
135 if (!is_reading_.load())
143 if (is_closed_.load() || !socket_.is_open())
145 is_reading_.store(
false);
153 if (socket_.native_handle() == INVALID_SOCKET)
155 if (socket_.native_handle() < 0)
158 is_reading_.store(
false);
166 std::error_code state_check_ec;
167 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
170 is_reading_.store(
false);
172 auto error_cb = std::atomic_load(&error_callback_);
173 if (error_cb && *error_cb)
175 (*error_cb)(state_check_ec);
180 auto self = shared_from_this();
183 socket_.async_read_some(
184 asio::buffer(read_buffer_),
185 [
this, self](std::error_code ec, std::size_t length)
189 if (!is_reading_.load() || is_closed_.load())
198 auto error_cb = std::atomic_load(&error_callback_);
199 if (error_cb && *error_cb)
211 auto view_cb = std::atomic_load(&receive_callback_view_);
212 if (view_cb && *view_cb)
216 std::span<const uint8_t> data_view(read_buffer_.data(), length);
217 (*view_cb)(data_view);
222 auto recv_cb = std::atomic_load(&receive_callback_);
223 if (recv_cb && *recv_cb)
225 std::vector<uint8_t> chunk(read_buffer_.begin(),
226 read_buffer_.begin() + length);
234 if (is_reading_.load() && !is_closed_.load())
240 catch (
const std::exception&)
244 is_reading_.store(
false);
245 auto error_cb = std::atomic_load(&error_callback_);
246 if (error_cb && *error_cb)
248 (*error_cb)(asio::error::not_connected);
253auto tcp_socket::async_send(
254 std::vector<uint8_t>&& data,
255 std::function<
void(std::error_code, std::size_t)> handler) ->
void
260 if (is_closed_.load() || !socket_.is_open())
264 handler(asio::error::not_connected, 0);
272 if (socket_.native_handle() == INVALID_SOCKET)
274 if (socket_.native_handle() < 0)
279 handler(asio::error::not_connected, 0);
287 std::error_code state_check_ec;
288 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
293 handler(state_check_ec, 0);
298 auto self = shared_from_this();
299 std::size_t data_size = data.size();
302 std::size_t new_pending = pending_bytes_.fetch_add(data_size) + data_size;
303 metrics_.current_pending_bytes.store(new_pending);
306 std::size_t peak = metrics_.peak_pending_bytes.load();
307 while (new_pending > peak &&
308 !metrics_.peak_pending_bytes.compare_exchange_weak(peak, new_pending))
314 if (config_.high_water_mark > 0 && !backpressure_active_.load() &&
315 new_pending >= config_.high_water_mark)
317 backpressure_active_.store(
true);
318 metrics_.backpressure_events.fetch_add(1);
321 auto bp_cb = std::atomic_load(&backpressure_callback_);
329 auto buffer = std::make_shared<std::vector<uint8_t>>(std::move(data));
333 socket_, asio::buffer(*buffer),
334 [handler = std::move(handler), self, buffer, data_size](std::error_code ec, std::size_t bytes_transferred)
337 std::size_t remaining = self->pending_bytes_.fetch_sub(data_size) - data_size;
338 self->metrics_.current_pending_bytes.store(remaining);
342 self->metrics_.total_bytes_sent.fetch_add(bytes_transferred);
343 self->metrics_.send_count.fetch_add(1);
347 if (self->config_.low_water_mark > 0 && self->backpressure_active_.load() &&
348 remaining <= self->config_.low_water_mark)
350 self->backpressure_active_.store(
false);
353 auto bp_cb = std::atomic_load(&self->backpressure_callback_);
360 if constexpr (std::is_invocable_v<
decltype(handler), std::error_code, std::size_t>)
364 handler(ec, bytes_transferred);
369 catch (
const std::exception&)
373 pending_bytes_.fetch_sub(data_size);
374 metrics_.current_pending_bytes.store(pending_bytes_.load());
377 handler(asio::error::not_connected, 0);
389auto tcp_socket::try_send(
390 std::vector<uint8_t>&& data,
391 std::function<
void(std::error_code, std::size_t)> handler) ->
bool
393 std::size_t data_size = data.size();
394 std::size_t current = pending_bytes_.load();
397 if (config_.max_pending_bytes > 0 &&
398 current + data_size > config_.max_pending_bytes)
400 metrics_.rejected_sends.fetch_add(1);
405 async_send(std::move(data), std::move(handler));