Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
tcp_socket.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#include "tcp_socket.h"
6
7#include <atomic>
8#include <iostream> // for debugging/logging
9#include <type_traits>
10
11// Use nested namespace definition (C++17)
13{
14
15 tcp_socket::tcp_socket(asio::ip::tcp::socket socket)
16 : socket_(std::move(socket)), config_{}
17 {
18 // constructor body empty
19 }
20
21 tcp_socket::tcp_socket(asio::ip::tcp::socket socket, const socket_config& config)
22 : socket_(std::move(socket)), config_(config)
23 {
24 // constructor body empty
25 }
26
28 std::function<void(const std::vector<uint8_t>&)> callback) -> void
29 {
30 auto new_cb = std::make_shared<receive_callback_t>(std::move(callback));
31 std::lock_guard<std::mutex> lock(callback_mutex_);
32 std::atomic_store(&receive_callback_, new_cb);
33 }
34
36 std::function<void(std::span<const uint8_t>)> callback) -> void
37 {
38 auto new_cb = std::make_shared<receive_callback_view_t>(std::move(callback));
39 std::lock_guard<std::mutex> lock(callback_mutex_);
40 std::atomic_store(&receive_callback_view_, new_cb);
41 }
42
44 std::function<void(std::error_code)> callback) -> void
45 {
46 auto new_cb = std::make_shared<error_callback_t>(std::move(callback));
47 std::lock_guard<std::mutex> lock(callback_mutex_);
48 std::atomic_store(&error_callback_, new_cb);
49 }
50
52 {
53 // Prevent duplicate read operations - only start if not already reading
54 bool expected = false;
55 if (!is_reading_.compare_exchange_strong(expected, true))
56 {
57 // Already reading, don't start another async operation
58 return;
59 }
60
61 // Post the first read operation to the socket's executor to ensure
62 // the executor's internal state (descriptor_state) is fully initialized.
63 // This prevents SEGV errors when start_read() is called immediately
64 // after socket construction from a moved asio::ip::tcp::socket.
65 try
66 {
67 auto self = shared_from_this();
68 asio::post(socket_.get_executor(), [self]() {
69 if (self->is_reading_.load() && !self->is_closed_.load())
70 {
71 self->do_read();
72 }
73 });
74 }
75 catch (...)
76 {
77 // If post fails, fall back to direct call
78 // This may happen if executor is stopped
79 is_reading_.store(false);
80 }
81 }
82
83 auto tcp_socket::stop_read() -> void
84 {
85 // Stop further read operations
86 is_reading_.store(false);
87 }
88
89 auto tcp_socket::close() -> void
90 {
91 // Atomically mark socket as closed before actual close
92 // This prevents data races with concurrent async operations
93 is_closed_.store(true);
94 is_reading_.store(false);
95
96 // Post the actual socket close to the socket's executor to ensure
97 // thread-safety. ASIO's epoll_reactor has internal data structures that
98 // must be accessed from the same thread as async operations to avoid
99 // data races and SEGV errors detected by ThreadSanitizer/AddressSanitizer.
100 try
101 {
102 auto self = shared_from_this();
103 asio::post(socket_.get_executor(), [self]() {
104 std::error_code ec;
105 if (self->socket_.is_open())
106 {
107 self->socket_.cancel(ec);
108 self->socket_.close(ec);
109 }
110 // Ignore close errors - socket may already be closed
111 });
112 }
113 catch (...)
114 {
115 // If post fails (e.g., executor not running or shared_from_this fails),
116 // fall back to direct close. This may race but is better than leaking
117 // the socket.
118 std::error_code ec;
119 if (socket_.is_open())
120 {
121 socket_.cancel(ec);
122 socket_.close(ec);
123 }
124 }
125 }
126
127 auto tcp_socket::is_closed() const -> bool
128 {
129 return is_closed_.load();
130 }
131
132 auto tcp_socket::do_read() -> void
133 {
134 // Check if reading has been stopped before initiating new async operation
135 if (!is_reading_.load())
136 {
137 return;
138 }
139
140 // Check if socket has been closed or is no longer open before starting async operation
141 // This prevents data races and UBSAN errors from accessing null descriptor_state
142 // Both checks are needed: is_closed_ for explicit close() calls, is_open() for ASIO state
143 if (is_closed_.load() || !socket_.is_open())
144 {
145 is_reading_.store(false);
146 return;
147 }
148
149 // Additional check: verify native handle is valid before async operation
150 // This prevents SEGV when socket is in transitional state after move construction
151 // On POSIX systems, invalid handles are < 0; on Windows, INVALID_SOCKET
152#ifdef _WIN32
153 if (socket_.native_handle() == INVALID_SOCKET)
154#else
155 if (socket_.native_handle() < 0)
156#endif
157 {
158 is_reading_.store(false);
159 return;
160 }
161
162 // Verify ASIO internal state (descriptor_state) is valid before async operations
163 // by accessing the socket's non-blocking property. This forces ASIO to access
164 // its internal implementation state, which will fail gracefully if invalid
165 // instead of causing UBSAN errors from null pointer dereferencing.
166 std::error_code state_check_ec;
167 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
168 if (state_check_ec)
169 {
170 is_reading_.store(false);
171 // Notify via error callback if available
172 auto error_cb = std::atomic_load(&error_callback_);
173 if (error_cb && *error_cb)
174 {
175 (*error_cb)(state_check_ec);
176 }
177 return;
178 }
179
180 auto self = shared_from_this();
181 try
182 {
183 socket_.async_read_some(
184 asio::buffer(read_buffer_),
185 [this, self](std::error_code ec, std::size_t length)
186 {
187 // Check if reading has been stopped or socket closed at callback time
188 // This prevents accessing invalid socket state after close()
189 if (!is_reading_.load() || is_closed_.load())
190 {
191 return;
192 }
193
194 if (ec)
195 {
196 // On error, invoke the error callback
197 // Lock-free callback access via atomic_load
198 auto error_cb = std::atomic_load(&error_callback_);
199 if (error_cb && *error_cb)
200 {
201 (*error_cb)(ec);
202 }
203 return;
204 }
205
206 // On success, if length > 0, dispatch to the appropriate callback
207 if (length > 0)
208 {
209 // Lock-free callback access via atomic_load
210 // Prefer view callback (zero-copy) over vector callback
211 auto view_cb = std::atomic_load(&receive_callback_view_);
212 if (view_cb && *view_cb)
213 {
214 // Zero-copy path: create span view directly into read_buffer_
215 // No std::vector allocation or copy required
216 std::span<const uint8_t> data_view(read_buffer_.data(), length);
217 (*view_cb)(data_view);
218 }
219 else
220 {
221 // Legacy path: allocate and copy into vector for compatibility
222 auto recv_cb = std::atomic_load(&receive_callback_);
223 if (recv_cb && *recv_cb)
224 {
225 std::vector<uint8_t> chunk(read_buffer_.begin(),
226 read_buffer_.begin() + length);
227 (*recv_cb)(chunk);
228 }
229 }
230 }
231
232 // Continue reading only if still active and socket is not closed
233 // Use atomic is_closed_ flag to prevent data race with close()
234 if (is_reading_.load() && !is_closed_.load())
235 {
236 do_read();
237 }
238 });
239 }
240 catch (const std::exception&)
241 {
242 // Socket may have been invalidated between checks and async operation
243 // This catch prevents crashes from race conditions during socket teardown
244 is_reading_.store(false);
245 auto error_cb = std::atomic_load(&error_callback_);
246 if (error_cb && *error_cb)
247 {
248 (*error_cb)(asio::error::not_connected);
249 }
250 }
251 }
252
253auto tcp_socket::async_send(
254 std::vector<uint8_t>&& data,
255 std::function<void(std::error_code, std::size_t)> handler) -> void
256{
257 // Check if socket has been closed or is no longer open before starting async operation
258 // Both checks are needed: is_closed_ for explicit close() calls, is_open() for ASIO state
259 // This prevents UBSAN errors from accessing null descriptor_state in epoll_reactor
260 if (is_closed_.load() || !socket_.is_open())
261 {
262 if (handler)
263 {
264 handler(asio::error::not_connected, 0);
265 }
266 return;
267 }
268
269 // Additional check: verify native handle is valid before async operation
270 // This prevents SEGV when socket is in transitional state after move construction
271#ifdef _WIN32
272 if (socket_.native_handle() == INVALID_SOCKET)
273#else
274 if (socket_.native_handle() < 0)
275#endif
276 {
277 if (handler)
278 {
279 handler(asio::error::not_connected, 0);
280 }
281 return;
282 }
283
284 // Verify ASIO internal state (descriptor_state) is valid before async operations
285 // by accessing the socket's non-blocking property. This forces ASIO to access
286 // its internal implementation state, which will fail gracefully if invalid.
287 std::error_code state_check_ec;
288 socket_.non_blocking(socket_.non_blocking(), state_check_ec);
289 if (state_check_ec)
290 {
291 if (handler)
292 {
293 handler(state_check_ec, 0);
294 }
295 return;
296 }
297
298 auto self = shared_from_this();
299 std::size_t data_size = data.size();
300
301 // Track pending bytes
302 std::size_t new_pending = pending_bytes_.fetch_add(data_size) + data_size;
303 metrics_.current_pending_bytes.store(new_pending);
304
305 // Update peak pending bytes
306 std::size_t peak = metrics_.peak_pending_bytes.load();
307 while (new_pending > peak &&
308 !metrics_.peak_pending_bytes.compare_exchange_weak(peak, new_pending))
309 {
310 // Loop until we successfully update or find a higher value
311 }
312
313 // Check high water mark for backpressure
314 if (config_.high_water_mark > 0 && !backpressure_active_.load() &&
315 new_pending >= config_.high_water_mark)
316 {
317 backpressure_active_.store(true);
318 metrics_.backpressure_events.fetch_add(1);
319
320 // Invoke backpressure callback
321 auto bp_cb = std::atomic_load(&backpressure_callback_);
322 if (bp_cb && *bp_cb)
323 {
324 (*bp_cb)(true);
325 }
326 }
327
328 // Move data into shared_ptr for lifetime management
329 auto buffer = std::make_shared<std::vector<uint8_t>>(std::move(data));
330 try
331 {
332 asio::async_write(
333 socket_, asio::buffer(*buffer),
334 [handler = std::move(handler), self, buffer, data_size](std::error_code ec, std::size_t bytes_transferred)
335 {
336 // Update pending bytes on completion
337 std::size_t remaining = self->pending_bytes_.fetch_sub(data_size) - data_size;
338 self->metrics_.current_pending_bytes.store(remaining);
339
340 if (!ec)
341 {
342 self->metrics_.total_bytes_sent.fetch_add(bytes_transferred);
343 self->metrics_.send_count.fetch_add(1);
344 }
345
346 // Check low water mark to release backpressure
347 if (self->config_.low_water_mark > 0 && self->backpressure_active_.load() &&
348 remaining <= self->config_.low_water_mark)
349 {
350 self->backpressure_active_.store(false);
351
352 // Invoke backpressure callback
353 auto bp_cb = std::atomic_load(&self->backpressure_callback_);
354 if (bp_cb && *bp_cb)
355 {
356 (*bp_cb)(false);
357 }
358 }
359
360 if constexpr (std::is_invocable_v<decltype(handler), std::error_code, std::size_t>)
361 {
362 if (handler)
363 {
364 handler(ec, bytes_transferred);
365 }
366 }
367 });
368 }
369 catch (const std::exception&)
370 {
371 // Socket may have been closed between our checks and async operation
372 // Rollback pending bytes and notify handler
373 pending_bytes_.fetch_sub(data_size);
374 metrics_.current_pending_bytes.store(pending_bytes_.load());
375 if (handler)
376 {
377 handler(asio::error::not_connected, 0);
378 }
379 }
380}
381
382auto tcp_socket::set_backpressure_callback(backpressure_callback callback) -> void
383{
384 auto new_cb = std::make_shared<backpressure_callback>(std::move(callback));
385 std::lock_guard<std::mutex> lock(callback_mutex_);
386 std::atomic_store(&backpressure_callback_, new_cb);
387}
388
389auto tcp_socket::try_send(
390 std::vector<uint8_t>&& data,
391 std::function<void(std::error_code, std::size_t)> handler) -> bool
392{
393 std::size_t data_size = data.size();
394 std::size_t current = pending_bytes_.load();
395
396 // Check if max_pending_bytes limit would be exceeded
397 if (config_.max_pending_bytes > 0 &&
398 current + data_size > config_.max_pending_bytes)
399 {
400 metrics_.rejected_sends.fetch_add(1);
401 return false;
402 }
403
404 // Proceed with async_send
405 async_send(std::move(data), std::move(handler));
406 return true;
407}
408
409auto tcp_socket::pending_bytes() const -> std::size_t
410{
411 return pending_bytes_.load();
412}
413
414auto tcp_socket::is_backpressure_active() const -> bool
415{
416 return backpressure_active_.load();
417}
418
419auto tcp_socket::metrics() const -> const socket_metrics&
420{
421 return metrics_;
422}
423
424auto tcp_socket::reset_metrics() -> void
425{
426 metrics_.reset();
427}
428
429auto tcp_socket::config() const -> const socket_config&
430{
431 return config_;
432}
433
434} // namespace kcenon::network::internal
tcp_socket(asio::ip::tcp::socket socket)
Constructs a tcp_socket by taking ownership of a moved socket.
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Sets a callback to receive inbound data chunks.
auto set_receive_callback_view(std::function< void(std::span< const uint8_t >)> callback) -> void
Sets a zero-copy callback to receive inbound data as a view.
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
Sets a callback to handle socket errors (e.g., read/write failures).
auto start_read() -> void
Begins the continuous asynchronous read loop.
std::function< void(bool apply_backpressure)> backpressure_callback
Callback type for backpressure notifications.
Definition tcp_socket.h:52
tracing_config config
Definition exporters.cpp:29
Configuration for TCP socket backpressure control.
Definition common_defs.h:30
Runtime metrics for socket monitoring.
Definition common_defs.h:68