Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
kcenon::network::internal::tcp_socket Class Reference

A lightweight wrapper around asio::ip::tcp::socket, enabling asynchronous read and write operations. More...

#include <tcp_socket.h>

Inheritance diagram for kcenon::network::internal::tcp_socket:
Inheritance graph
Collaboration diagram for kcenon::network::internal::tcp_socket:
Collaboration graph

Public Types

using backpressure_callback = std::function<void(bool apply_backpressure)>
 Callback type for backpressure notifications.
 

Public Member Functions

 tcp_socket (asio::ip::tcp::socket socket)
 Constructs a tcp_socket by taking ownership of a moved socket.
 
 tcp_socket (asio::ip::tcp::socket socket, const socket_config &config)
 Constructs a tcp_socket with custom configuration.
 
 ~tcp_socket ()=default
 Default destructor (no special cleanup needed).
 
auto attach_observer (std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void
 Attaches an observer to receive socket events.
 
auto detach_observer (std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void
 Detaches a previously attached observer.
 
auto set_receive_callback (std::function< void(const std::vector< uint8_t > &)> callback) -> void
 Sets a callback to receive inbound data chunks.
 
auto set_receive_callback_view (std::function< void(std::span< const uint8_t >)> callback) -> void
 Sets a zero-copy callback to receive inbound data as a view.
 
auto set_error_callback (std::function< void(std::error_code)> callback) -> void
 Sets a callback to handle socket errors (e.g., read/write failures).
 
auto start_read () -> void
 Begins the continuous asynchronous read loop.
 
auto async_send (std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> void
 Initiates an asynchronous write of the given data buffer.
 
auto set_backpressure_callback (backpressure_callback callback) -> void
 Sets a callback for backpressure notifications.
 
auto try_send (std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> bool
 Attempts to send data without blocking.
 
auto pending_bytes () const -> std::size_t
 Returns current pending bytes count.
 
auto is_backpressure_active () const -> bool
 Checks if backpressure is currently active.
 
auto metrics () const -> const socket_metrics &
 Returns socket metrics for monitoring.
 
auto reset_metrics () -> void
 Resets socket metrics to zero.
 
auto config () const -> const socket_config &
 Returns the current socket configuration.
 
auto socket () -> asio::ip::tcp::socket &
 Provides direct access to the underlying asio::ip::tcp::socket in case advanced operations are needed.
 
auto stop_read () -> void
 Stops the read loop to prevent further async operations.
 
auto close () -> void
 Safely closes the socket and stops all async operations.
 
auto is_closed () const -> bool
 Checks if the socket has been closed.
 

Private Types

using receive_callback_t = std::function<void(const std::vector<uint8_t>&)>
 Callback type aliases for lock-free storage.
 
using receive_callback_view_t = std::function<void(std::span<const uint8_t>)>
 
using error_callback_t = std::function<void(std::error_code)>
 

Private Member Functions

auto do_read () -> void
 Internal function to handle the read logic with async_read_some().
 
auto notify_observers_receive (std::span< const uint8_t > data) -> void
 Helper to notify all observers of receive events.
 
auto notify_observers_error (std::error_code ec) -> void
 Helper to notify all observers of error events.
 
auto notify_observers_backpressure (bool apply) -> void
 Helper to notify all observers of backpressure events.
 

Private Attributes

asio::ip::tcp::socket socket_
 
std::array< uint8_t, 4096 > read_buffer_
 
std::shared_ptr< receive_callback_treceive_callback_
 
std::shared_ptr< receive_callback_view_treceive_callback_view_
 
std::shared_ptr< error_callback_terror_callback_
 
std::mutex callback_mutex_
 
socket_config config_
 Backpressure configuration.
 
socket_metrics metrics_
 Socket runtime metrics.
 
std::vector< std::weak_ptr< network_core::interfaces::socket_observer > > observers_ {}
 List of socket observers (using weak_ptr for automatic cleanup). Protected by callback_mutex_.
 
std::atomic< bool > is_reading_ {false}
 
std::atomic< bool > is_closed_ {false}
 
std::atomic< std::size_t > pending_bytes_ {0}
 Current pending bytes in send buffer.
 
std::atomic< bool > backpressure_active_ {false}
 Backpressure state flag.
 
std::shared_ptr< backpressure_callbackbackpressure_callback_
 Backpressure notification callback.
 

Detailed Description

A lightweight wrapper around asio::ip::tcp::socket, enabling asynchronous read and write operations.

Key Features

Thread Safety

  • All public methods are thread-safe. Callback registration is protected by callback_mutex_.
  • ASIO operations are serialized through the io_context, ensuring read_buffer_ is only accessed by one async operation at a time.
  • The provided callbacks will be invoked on an ASIO worker thread; ensure that your callback logic is thread-safe if it shares data.

Definition at line 43 of file tcp_socket.h.

Member Typedef Documentation

◆ backpressure_callback

using kcenon::network::internal::tcp_socket::backpressure_callback = std::function<void(bool apply_backpressure)>

Callback type for backpressure notifications.

Parameters
apply_backpressureTrue when backpressure should be applied (high water mark reached), false when it can be released (low water mark reached).

Definition at line 52 of file tcp_socket.h.

◆ error_callback_t

using kcenon::network::internal::tcp_socket::error_callback_t = std::function<void(std::error_code)>
private

Definition at line 305 of file tcp_socket.h.

◆ receive_callback_t

using kcenon::network::internal::tcp_socket::receive_callback_t = std::function<void(const std::vector<uint8_t>&)>
private

Callback type aliases for lock-free storage.

Definition at line 303 of file tcp_socket.h.

◆ receive_callback_view_t

using kcenon::network::internal::tcp_socket::receive_callback_view_t = std::function<void(std::span<const uint8_t>)>
private

Definition at line 304 of file tcp_socket.h.

Constructor & Destructor Documentation

◆ tcp_socket() [1/2]

kcenon::network::internal::tcp_socket::tcp_socket ( asio::ip::tcp::socket socket)

Constructs a tcp_socket by taking ownership of a moved socket.

Parameters
socketAn asio::ip::tcp::socket that must be open/connected or at least valid.

After construction, you can immediately call start_read() to begin receiving data. For sending, call async_send().

Definition at line 15 of file tcp_socket.cpp.

16 : socket_(std::move(socket)), config_{}
17 {
18 // constructor body empty
19 }
auto socket() -> asio::ip::tcp::socket &
Provides direct access to the underlying asio::ip::tcp::socket in case advanced operations are needed...
Definition tcp_socket.h:269
socket_config config_
Backpressure configuration.
Definition tcp_socket.h:323

◆ tcp_socket() [2/2]

kcenon::network::internal::tcp_socket::tcp_socket ( asio::ip::tcp::socket socket,
const socket_config & config )

Constructs a tcp_socket with custom configuration.

Parameters
socketAn asio::ip::tcp::socket that must be open/connected.
configSocket configuration including backpressure settings.

Definition at line 21 of file tcp_socket.cpp.

22 : socket_(std::move(socket)), config_(config)
23 {
24 // constructor body empty
25 }
auto config() const -> const socket_config &
Returns the current socket configuration.

References config.

◆ ~tcp_socket()

kcenon::network::internal::tcp_socket::~tcp_socket ( )
default

Default destructor (no special cleanup needed).

Member Function Documentation

◆ async_send()

auto kcenon::network::internal::tcp_socket::async_send ( std::vector< uint8_t > && data,
std::function< void(std::error_code, std::size_t)> handler ) -> void

Initiates an asynchronous write of the given data buffer.

Parameters
dataThe buffer to send over TCP (moved for efficiency).
handlerA completion handler with signature void(std::error_code, std::size_t) that is invoked upon success or failure.

The handler receives:

  • ec : the std::error_code from the write operation,
  • bytes_transferred : how many bytes were actually written.

Example

auto sock = std::make_shared<network::tcp_socket>(...);
std::vector<uint8_t> buf = {0x01, 0x02, 0x03};
sock->async_send(std::move(buf), [](std::error_code ec, std::size_t len) {
if(ec) {
// handle error
}
else {
// handle success
}
});
Note
Data is moved (not copied) to avoid memory allocation.
The original vector will be empty after this call.

Definition at line 253 of file tcp_socket.cpp.

256{
257 // Check if socket has been closed or is no longer open before starting async operation
258 // Both checks are needed: is_closed_ for explicit close() calls, is_open() for ASIO state
259 // This prevents UBSAN errors from accessing null descriptor_state in epoll_reactor
260 if (is_closed_.load() || !socket_.is_open())
261 {
262 if (handler)
263 {
264 handler(asio::error::not_connected, 0);
265 }
266 return;
267 }
268
269 // Additional check: verify native handle is valid before async operation
270 // This prevents SEGV when socket is in transitional state after move construction
271#ifdef _WIN32
272 if (socket_.native_handle() == INVALID_SOCKET)
273#else
274 if (socket_.native_handle() < 0)
275#endif
276 {
277 if (handler)
278 {
279 handler(asio::error::not_connected, 0);
280 }
281 return;
282 }
283
284 // Verify ASIO internal state (descriptor_state) is valid before async operations
285 // by accessing the socket's non-blocking property. This forces ASIO to access
286 // its internal implementation state, which will fail gracefully if invalid.
287 std::error_code state_check_ec;
288 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
289 if (state_check_ec)
290 {
291 if (handler)
292 {
293 handler(state_check_ec, 0);
294 }
295 return;
296 }
297
298 auto self = shared_from_this();
299 std::size_t data_size = data.size();
300
301 // Track pending bytes
302 std::size_t new_pending = pending_bytes_.fetch_add(data_size) + data_size;
303 metrics_.current_pending_bytes.store(new_pending);
304
305 // Update peak pending bytes
306 std::size_t peak = metrics_.peak_pending_bytes.load();
307 while (new_pending > peak &&
308 !metrics_.peak_pending_bytes.compare_exchange_weak(peak, new_pending))
309 {
310 // Loop until we successfully update or find a higher value
311 }
312
313 // Check high water mark for backpressure
314 if (config_.high_water_mark > 0 && !backpressure_active_.load() &&
315 new_pending >= config_.high_water_mark)
316 {
317 backpressure_active_.store(true);
318 metrics_.backpressure_events.fetch_add(1);
319
320 // Invoke backpressure callback
321 auto bp_cb = std::atomic_load(&backpressure_callback_);
322 if (bp_cb && *bp_cb)
323 {
324 (*bp_cb)(true);
325 }
326 }
327
328 // Move data into shared_ptr for lifetime management
329 auto buffer = std::make_shared<std::vector<uint8_t>>(std::move(data));
330 try
331 {
332 asio::async_write(
333 socket_, asio::buffer(*buffer),
334 [handler = std::move(handler), self, buffer, data_size](std::error_code ec, std::size_t bytes_transferred)
335 {
336 // Update pending bytes on completion
337 std::size_t remaining = self->pending_bytes_.fetch_sub(data_size) - data_size;
338 self->metrics_.current_pending_bytes.store(remaining);
339
340 if (!ec)
341 {
342 self->metrics_.total_bytes_sent.fetch_add(bytes_transferred);
343 self->metrics_.send_count.fetch_add(1);
344 }
345
346 // Check low water mark to release backpressure
347 if (self->config_.low_water_mark > 0 && self->backpressure_active_.load() &&
348 remaining <= self->config_.low_water_mark)
349 {
350 self->backpressure_active_.store(false);
351
352 // Invoke backpressure callback
353 auto bp_cb = std::atomic_load(&self->backpressure_callback_);
354 if (bp_cb && *bp_cb)
355 {
356 (*bp_cb)(false);
357 }
358 }
359
360 if constexpr (std::is_invocable_v<decltype(handler), std::error_code, std::size_t>)
361 {
362 if (handler)
363 {
364 handler(ec, bytes_transferred);
365 }
366 }
367 });
368 }
369 catch (const std::exception&)
370 {
371 // Socket may have been closed between our checks and async operation
372 // Rollback pending bytes and notify handler
373 pending_bytes_.fetch_sub(data_size);
375 if (handler)
376 {
377 handler(asio::error::not_connected, 0);
378 }
379 }
380}
socket_metrics metrics_
Socket runtime metrics.
Definition tcp_socket.h:326
std::atomic< bool > backpressure_active_
Backpressure state flag.
Definition tcp_socket.h:341
std::atomic< std::size_t > pending_bytes_
Current pending bytes in send buffer.
Definition tcp_socket.h:338
std::shared_ptr< backpressure_callback > backpressure_callback_
Backpressure notification callback.
Definition tcp_socket.h:344
std::size_t high_water_mark
High water mark - trigger backpressure callback.
Definition common_defs.h:47
std::size_t low_water_mark
Low water mark - resume sending.
Definition common_defs.h:57
std::atomic< std::size_t > backpressure_events
Definition common_defs.h:73
std::atomic< std::size_t > peak_pending_bytes
Definition common_defs.h:72
std::atomic< std::size_t > current_pending_bytes
Definition common_defs.h:71

◆ attach_observer()

auto kcenon::network::internal::tcp_socket::attach_observer ( std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void

Attaches an observer to receive socket events.

Parameters
observerShared pointer to socket_observer implementation.

Multiple observers can be attached to the same socket. All attached observers will be notified of socket events.

See also
socket_observer
detach_observer()

◆ close()

auto kcenon::network::internal::tcp_socket::close ( ) -> void

Safely closes the socket and stops all async operations.

This method atomically sets the closed flag before closing the socket, preventing data races between the close operation and async read operations. Thread-safe with respect to concurrent async operations.

Definition at line 89 of file tcp_socket.cpp.

90 {
91 // Atomically mark socket as closed before actual close
92 // This prevents data races with concurrent async operations
93 is_closed_.store(true);
94 is_reading_.store(false);
95
96 // Post the actual socket close to the socket's executor to ensure
97 // thread-safety. ASIO's epoll_reactor has internal data structures that
98 // must be accessed from the same thread as async operations to avoid
99 // data races and SEGV errors detected by ThreadSanitizer/AddressSanitizer.
100 try
101 {
102 auto self = shared_from_this();
103 asio::post(socket_.get_executor(), [self]() {
104 std::error_code ec;
105 if (self->socket_.is_open())
106 {
107 self->socket_.cancel(ec);
108 self->socket_.close(ec);
109 }
110 // Ignore close errors - socket may already be closed
111 });
112 }
113 catch (...)
114 {
115 // If post fails (e.g., executor not running or shared_from_this fails),
116 // fall back to direct close. This may race but is better than leaking
117 // the socket.
118 std::error_code ec;
119 if (socket_.is_open())
120 {
121 socket_.cancel(ec);
122 socket_.close(ec);
123 }
124 }
125 }

◆ config()

auto kcenon::network::internal::tcp_socket::config ( ) const -> const socket_config&
nodiscard

Returns the current socket configuration.

Returns
Const reference to socket_config struct.

Definition at line 429 of file tcp_socket.cpp.

430{
431 return config_;
432}

◆ detach_observer()

auto kcenon::network::internal::tcp_socket::detach_observer ( std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void

Detaches a previously attached observer.

Parameters
observerThe observer to detach.

If the observer was not attached, this method has no effect.

◆ do_read()

auto kcenon::network::internal::tcp_socket::do_read ( ) -> void
private

Internal function to handle the read logic with async_read_some().

Upon success, it calls receive_callback_ if set, then schedules another read. On error, it calls error_callback_ if available.

Definition at line 132 of file tcp_socket.cpp.

133 {
134 // Check if reading has been stopped before initiating new async operation
135 if (!is_reading_.load())
136 {
137 return;
138 }
139
140 // Check if socket has been closed or is no longer open before starting async operation
141 // This prevents data races and UBSAN errors from accessing null descriptor_state
142 // Both checks are needed: is_closed_ for explicit close() calls, is_open() for ASIO state
143 if (is_closed_.load() || !socket_.is_open())
144 {
145 is_reading_.store(false);
146 return;
147 }
148
149 // Additional check: verify native handle is valid before async operation
150 // This prevents SEGV when socket is in transitional state after move construction
151 // On POSIX systems, invalid handles are < 0; on Windows, INVALID_SOCKET
152#ifdef _WIN32
153 if (socket_.native_handle() == INVALID_SOCKET)
154#else
155 if (socket_.native_handle() < 0)
156#endif
157 {
158 is_reading_.store(false);
159 return;
160 }
161
162 // Verify ASIO internal state (descriptor_state) is valid before async operations
163 // by accessing the socket's non-blocking property. This forces ASIO to access
164 // its internal implementation state, which will fail gracefully if invalid
165 // instead of causing UBSAN errors from null pointer dereferencing.
166 std::error_code state_check_ec;
167 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
168 if (state_check_ec)
169 {
170 is_reading_.store(false);
171 // Notify via error callback if available
172 auto error_cb = std::atomic_load(&error_callback_);
173 if (error_cb && *error_cb)
174 {
175 (*error_cb)(state_check_ec);
176 }
177 return;
178 }
179
180 auto self = shared_from_this();
181 try
182 {
183 socket_.async_read_some(
184 asio::buffer(read_buffer_),
185 [this, self](std::error_code ec, std::size_t length)
186 {
187 // Check if reading has been stopped or socket closed at callback time
188 // This prevents accessing invalid socket state after close()
189 if (!is_reading_.load() || is_closed_.load())
190 {
191 return;
192 }
193
194 if (ec)
195 {
196 // On error, invoke the error callback
197 // Lock-free callback access via atomic_load
198 auto error_cb = std::atomic_load(&error_callback_);
199 if (error_cb && *error_cb)
200 {
201 (*error_cb)(ec);
202 }
203 return;
204 }
205
206 // On success, if length > 0, dispatch to the appropriate callback
207 if (length > 0)
208 {
209 // Lock-free callback access via atomic_load
210 // Prefer view callback (zero-copy) over vector callback
211 auto view_cb = std::atomic_load(&receive_callback_view_);
212 if (view_cb && *view_cb)
213 {
214 // Zero-copy path: create span view directly into read_buffer_
215 // No std::vector allocation or copy required
216 std::span<const uint8_t> data_view(read_buffer_.data(), length);
217 (*view_cb)(data_view);
218 }
219 else
220 {
221 // Legacy path: allocate and copy into vector for compatibility
222 auto recv_cb = std::atomic_load(&receive_callback_);
223 if (recv_cb && *recv_cb)
224 {
225 std::vector<uint8_t> chunk(read_buffer_.begin(),
226 read_buffer_.begin() + length);
227 (*recv_cb)(chunk);
228 }
229 }
230 }
231
232 // Continue reading only if still active and socket is not closed
233 // Use atomic is_closed_ flag to prevent data race with close()
234 if (is_reading_.load() && !is_closed_.load())
235 {
236 do_read();
237 }
238 });
239 }
240 catch (const std::exception&)
241 {
242 // Socket may have been invalidated between checks and async operation
243 // This catch prevents crashes from race conditions during socket teardown
244 is_reading_.store(false);
245 auto error_cb = std::atomic_load(&error_callback_);
246 if (error_cb && *error_cb)
247 {
248 (*error_cb)(asio::error::not_connected);
249 }
250 }
251 }
std::shared_ptr< receive_callback_t > receive_callback_
Definition tcp_socket.h:316
std::array< uint8_t, 4096 > read_buffer_
Definition tcp_socket.h:310
std::shared_ptr< error_callback_t > error_callback_
Definition tcp_socket.h:318
std::shared_ptr< receive_callback_view_t > receive_callback_view_
Definition tcp_socket.h:317
auto do_read() -> void
Internal function to handle the read logic with async_read_some().

◆ is_backpressure_active()

auto kcenon::network::internal::tcp_socket::is_backpressure_active ( ) const -> bool
nodiscard

Checks if backpressure is currently active.

Returns
true if pending bytes exceed high_water_mark.

Definition at line 414 of file tcp_socket.cpp.

415{
416 return backpressure_active_.load();
417}

◆ is_closed()

auto kcenon::network::internal::tcp_socket::is_closed ( ) const -> bool
nodiscard

Checks if the socket has been closed.

Returns
true if close() has been called on this socket.

Definition at line 127 of file tcp_socket.cpp.

128 {
129 return is_closed_.load();
130 }

◆ metrics()

auto kcenon::network::internal::tcp_socket::metrics ( ) const -> const socket_metrics&
nodiscard

Returns socket metrics for monitoring.

Returns
Const reference to socket_metrics struct.

Definition at line 419 of file tcp_socket.cpp.

420{
421 return metrics_;
422}

◆ notify_observers_backpressure()

auto kcenon::network::internal::tcp_socket::notify_observers_backpressure ( bool apply) -> void
private

Helper to notify all observers of backpressure events.

◆ notify_observers_error()

auto kcenon::network::internal::tcp_socket::notify_observers_error ( std::error_code ec) -> void
private

Helper to notify all observers of error events.

◆ notify_observers_receive()

auto kcenon::network::internal::tcp_socket::notify_observers_receive ( std::span< const uint8_t > data) -> void
private

Helper to notify all observers of receive events.

◆ pending_bytes()

auto kcenon::network::internal::tcp_socket::pending_bytes ( ) const -> std::size_t
nodiscard

Returns current pending bytes count.

Returns
Number of bytes currently pending in send buffer.

Definition at line 409 of file tcp_socket.cpp.

410{
411 return pending_bytes_.load();
412}

◆ reset_metrics()

auto kcenon::network::internal::tcp_socket::reset_metrics ( ) -> void

Resets socket metrics to zero.

Definition at line 424 of file tcp_socket.cpp.

◆ set_backpressure_callback()

auto kcenon::network::internal::tcp_socket::set_backpressure_callback ( backpressure_callback callback) -> void

Sets a callback for backpressure notifications.

Parameters
callbackFunction invoked when backpressure state changes.

The callback receives true when pending bytes exceed high_water_mark (apply backpressure), and false when they drop below low_water_mark (release backpressure).

Definition at line 382 of file tcp_socket.cpp.

383{
384 auto new_cb = std::make_shared<backpressure_callback>(std::move(callback));
385 std::lock_guard<std::mutex> lock(callback_mutex_);
386 std::atomic_store(&backpressure_callback_, new_cb);
387}

◆ set_error_callback()

auto kcenon::network::internal::tcp_socket::set_error_callback ( std::function< void(std::error_code)> callback) -> void

Sets a callback to handle socket errors (e.g., read/write failures).

Parameters
callbackA function with signature void(std::error_code), invoked when any asynchronous operation fails.

If no callback is set, errors are not explicitly handled here (beyond stopping reads).

Definition at line 43 of file tcp_socket.cpp.

45 {
46 auto new_cb = std::make_shared<error_callback_t>(std::move(callback));
47 std::lock_guard<std::mutex> lock(callback_mutex_);
48 std::atomic_store(&error_callback_, new_cb);
49 }

◆ set_receive_callback()

auto kcenon::network::internal::tcp_socket::set_receive_callback ( std::function< void(const std::vector< uint8_t > &)> callback) -> void

Sets a callback to receive inbound data chunks.

Parameters
callbackA function with signature void(const std::vector<uint8_t>&), called whenever a chunk of data is successfully read.

If no callback is set, received data is effectively discarded.

Note
For better performance, consider using set_receive_callback_view() instead.

Definition at line 27 of file tcp_socket.cpp.

29 {
30 auto new_cb = std::make_shared<receive_callback_t>(std::move(callback));
31 std::lock_guard<std::mutex> lock(callback_mutex_);
32 std::atomic_store(&receive_callback_, new_cb);
33 }

◆ set_receive_callback_view()

auto kcenon::network::internal::tcp_socket::set_receive_callback_view ( std::function< void(std::span< const uint8_t >)> callback) -> void

Sets a zero-copy callback to receive inbound data as a view.

Parameters
callbackA function with signature void(std::span<const uint8_t>), called whenever a chunk of data is successfully read.

Zero-Copy Performance

Unlike set_receive_callback(), this callback receives data as a non-owning view directly into the internal read buffer, avoiding per-read std::vector allocations and copies.

Lifetime Contract

  • The span is valid only until the callback returns.
  • Callers must not store, capture, or use the span after returning from the callback.
  • If data must be retained, copy it into your own container within the callback.

Dispatch Priority

  • If both view and vector callbacks are set, the view callback takes priority and the vector callback is not invoked.

Example

sock->set_receive_callback_view([](std::span<const uint8_t> data) {
// Process data directly (zero-copy)
process_bytes(data.data(), data.size());
// If you need to keep the data:
// my_buffer.insert(my_buffer.end(), data.begin(), data.end());
});

Definition at line 35 of file tcp_socket.cpp.

37 {
38 auto new_cb = std::make_shared<receive_callback_view_t>(std::move(callback));
39 std::lock_guard<std::mutex> lock(callback_mutex_);
40 std::atomic_store(&receive_callback_view_, new_cb);
41 }

◆ socket()

auto kcenon::network::internal::tcp_socket::socket ( ) -> asio::ip::tcp::socket&
inline

Provides direct access to the underlying asio::ip::tcp::socket in case advanced operations are needed.

Returns
A reference to the wrapped asio::ip::tcp::socket.

Definition at line 269 of file tcp_socket.h.

269{ return socket_; }

References socket_.

◆ start_read()

auto kcenon::network::internal::tcp_socket::start_read ( ) -> void

Begins the continuous asynchronous read loop.

Once called, the class repeatedly calls async_read_some(). If an error occurs, on_error() is triggered, stopping further reads.

Definition at line 51 of file tcp_socket.cpp.

52 {
53 // Prevent duplicate read operations - only start if not already reading
54 bool expected = false;
55 if (!is_reading_.compare_exchange_strong(expected, true))
56 {
57 // Already reading, don't start another async operation
58 return;
59 }
60
61 // Post the first read operation to the socket's executor to ensure
62 // the executor's internal state (descriptor_state) is fully initialized.
63 // This prevents SEGV errors when start_read() is called immediately
64 // after socket construction from a moved asio::ip::tcp::socket.
65 try
66 {
67 auto self = shared_from_this();
68 asio::post(socket_.get_executor(), [self]() {
69 if (self->is_reading_.load() && !self->is_closed_.load())
70 {
71 self->do_read();
72 }
73 });
74 }
75 catch (...)
76 {
77 // If post fails, fall back to direct call
78 // This may happen if executor is stopped
79 is_reading_.store(false);
80 }
81 }

◆ stop_read()

auto kcenon::network::internal::tcp_socket::stop_read ( ) -> void

Stops the read loop to prevent further async operations.

Definition at line 83 of file tcp_socket.cpp.

84 {
85 // Stop further read operations
86 is_reading_.store(false);
87 }

◆ try_send()

auto kcenon::network::internal::tcp_socket::try_send ( std::vector< uint8_t > && data,
std::function< void(std::error_code, std::size_t)> handler ) -> bool
nodiscard

Attempts to send data without blocking.

Parameters
dataThe buffer to send (moved for efficiency).
handlerCompletion handler for async operation.
Returns
true if send was initiated, false if backpressure is active.

Unlike async_send(), this method checks backpressure limits before initiating the send. Returns false immediately if:

  • max_pending_bytes is set and would be exceeded
  • Backpressure is currently active

Example

if (!sock->try_send(std::move(data), handler)) {
// Queue data for later or drop it
}

Definition at line 389 of file tcp_socket.cpp.

392{
393 std::size_t data_size = data.size();
394 std::size_t current = pending_bytes_.load();
395
396 // Check if max_pending_bytes limit would be exceeded
397 if (config_.max_pending_bytes > 0 &&
398 current + data_size > config_.max_pending_bytes)
399 {
400 metrics_.rejected_sends.fetch_add(1);
401 return false;
402 }
403
404 // Proceed with async_send
405 async_send(std::move(data), std::move(handler));
406 return true;
407}
auto async_send(std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> void
Initiates an asynchronous write of the given data buffer.
std::size_t max_pending_bytes
Maximum bytes allowed in pending send buffer.
Definition common_defs.h:38
std::atomic< std::size_t > rejected_sends
Definition common_defs.h:74

Member Data Documentation

◆ backpressure_active_

std::atomic<bool> kcenon::network::internal::tcp_socket::backpressure_active_ {false}
private

Backpressure state flag.

Definition at line 341 of file tcp_socket.h.

341{false};

◆ backpressure_callback_

std::shared_ptr<backpressure_callback> kcenon::network::internal::tcp_socket::backpressure_callback_
private

Backpressure notification callback.

Definition at line 344 of file tcp_socket.h.

◆ callback_mutex_

std::mutex kcenon::network::internal::tcp_socket::callback_mutex_
private

Protects callback registration only.

Definition at line 320 of file tcp_socket.h.

◆ config_

socket_config kcenon::network::internal::tcp_socket::config_
private

Backpressure configuration.

Definition at line 323 of file tcp_socket.h.

◆ error_callback_

std::shared_ptr<error_callback_t> kcenon::network::internal::tcp_socket::error_callback_
private

Definition at line 318 of file tcp_socket.h.

◆ is_closed_

std::atomic<bool> kcenon::network::internal::tcp_socket::is_closed_ {false}
private

Flag to indicate socket is closed.

Definition at line 335 of file tcp_socket.h.

335{false};

◆ is_reading_

std::atomic<bool> kcenon::network::internal::tcp_socket::is_reading_ {false}
private

Flag to prevent read after stop.

Definition at line 334 of file tcp_socket.h.

334{false};

◆ metrics_

socket_metrics kcenon::network::internal::tcp_socket::metrics_
mutableprivate

Socket runtime metrics.

Definition at line 326 of file tcp_socket.h.

◆ observers_

std::vector<std::weak_ptr<network_core::interfaces::socket_observer> > kcenon::network::internal::tcp_socket::observers_ {}
private

List of socket observers (using weak_ptr for automatic cleanup). Protected by callback_mutex_.

Definition at line 332 of file tcp_socket.h.

332{};

◆ pending_bytes_

std::atomic<std::size_t> kcenon::network::internal::tcp_socket::pending_bytes_ {0}
private

Current pending bytes in send buffer.

Definition at line 338 of file tcp_socket.h.

338{0};

◆ read_buffer_

std::array<uint8_t, 4096> kcenon::network::internal::tcp_socket::read_buffer_
private

Buffer for receiving data in do_read().

Definition at line 310 of file tcp_socket.h.

◆ receive_callback_

std::shared_ptr<receive_callback_t> kcenon::network::internal::tcp_socket::receive_callback_
private

Lock-free callback storage using shared_ptr + atomic operations. This eliminates mutex contention on the receive hot path.

Definition at line 316 of file tcp_socket.h.

◆ receive_callback_view_

std::shared_ptr<receive_callback_view_t> kcenon::network::internal::tcp_socket::receive_callback_view_
private

Definition at line 317 of file tcp_socket.h.

◆ socket_

asio::ip::tcp::socket kcenon::network::internal::tcp_socket::socket_
private

The underlying ASIO TCP socket.

Definition at line 307 of file tcp_socket.h.

Referenced by socket().


The documentation for this class was generated from the following files: