Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
tcp_socket.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
7#include <array>
8#include <atomic>
9#include <functional>
10#include <memory>
11#include <mutex>
12#include <span>
13#include <system_error>
14#include <vector>
15
16#include <asio.hpp>
17
19#include "kcenon/network-core/interfaces/socket_observer.h"
20
22{
43 class tcp_socket : public std::enable_shared_from_this<tcp_socket>
44 {
45 public:
52 using backpressure_callback = std::function<void(bool apply_backpressure)>;
53
63 tcp_socket(asio::ip::tcp::socket socket);
64
70 tcp_socket(asio::ip::tcp::socket socket, const socket_config& config);
71
75 ~tcp_socket() = default;
76
87 auto attach_observer(std::shared_ptr<network_core::interfaces::socket_observer> observer)
88 -> void;
89
96 auto detach_observer(std::shared_ptr<network_core::interfaces::socket_observer> observer)
97 -> void;
98
111 std::function<void(const std::vector<uint8_t>&)> callback) -> void;
112
146 std::function<void(std::span<const uint8_t>)> callback) -> void;
147
158 auto set_error_callback(std::function<void(std::error_code)> callback)
159 -> void;
160
168 auto start_read() -> void;
169
198 auto async_send(
199 std::vector<uint8_t>&& data,
200 std::function<void(std::error_code, std::size_t)> handler) -> void;
201
212
231 [[nodiscard]] auto try_send(
232 std::vector<uint8_t>&& data,
233 std::function<void(std::error_code, std::size_t)> handler) -> bool;
234
239 [[nodiscard]] auto pending_bytes() const -> std::size_t;
240
245 [[nodiscard]] auto is_backpressure_active() const -> bool;
246
251 [[nodiscard]] auto metrics() const -> const socket_metrics&;
252
256 auto reset_metrics() -> void;
257
262 [[nodiscard]] auto config() const -> const socket_config&;
263
269 auto socket() -> asio::ip::tcp::socket& { return socket_; }
270
274 auto stop_read() -> void;
275
283 auto close() -> void;
284
289 [[nodiscard]] auto is_closed() const -> bool;
290
291 private:
299 auto do_read() -> void;
300
301 private:
303 using receive_callback_t = std::function<void(const std::vector<uint8_t>&)>;
304 using receive_callback_view_t = std::function<void(std::span<const uint8_t>)>;
305 using error_callback_t = std::function<void(std::error_code)>;
306
307 asio::ip::tcp::socket socket_;
309 std::array<uint8_t, 4096>
319
324
327
332 std::vector<std::weak_ptr<network_core::interfaces::socket_observer>> observers_{};
333
334 std::atomic<bool> is_reading_{false};
335 std::atomic<bool> is_closed_{false};
338 std::atomic<std::size_t> pending_bytes_{0};
339
341 std::atomic<bool> backpressure_active_{false};
342
344 std::shared_ptr<backpressure_callback> backpressure_callback_;
345
347 auto notify_observers_receive(std::span<const uint8_t> data) -> void;
348
350 auto notify_observers_error(std::error_code ec) -> void;
351
353 auto notify_observers_backpressure(bool apply) -> void;
354 };
355} // namespace kcenon::network::internal
A lightweight wrapper around asio::ip::tcp::socket, enabling asynchronous read and write operations.
Definition tcp_socket.h:44
std::shared_ptr< receive_callback_t > receive_callback_
Definition tcp_socket.h:316
std::function< void(std::error_code)> error_callback_t
Definition tcp_socket.h:305
auto socket() -> asio::ip::tcp::socket &
Provides direct access to the underlying asio::ip::tcp::socket in case advanced operations are needed...
Definition tcp_socket.h:269
tcp_socket(asio::ip::tcp::socket socket)
Constructs a tcp_socket by taking ownership of a moved socket.
std::function< void(std::span< const uint8_t >)> receive_callback_view_t
Definition tcp_socket.h:304
auto set_receive_callback(std::function< void(const std::vector< uint8_t > &)> callback) -> void
Sets a callback to receive inbound data chunks.
socket_config config_
Backpressure configuration.
Definition tcp_socket.h:323
auto notify_observers_receive(std::span< const uint8_t > data) -> void
Helper to notify all observers of receive events.
auto try_send(std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> bool
Attempts to send data without blocking.
auto notify_observers_backpressure(bool apply) -> void
Helper to notify all observers of backpressure events.
std::array< uint8_t, 4096 > read_buffer_
Definition tcp_socket.h:310
socket_metrics metrics_
Socket runtime metrics.
Definition tcp_socket.h:326
std::vector< std::weak_ptr< network_core::interfaces::socket_observer > > observers_
List of socket observers (using weak_ptr for automatic cleanup). Protected by callback_mutex_.
Definition tcp_socket.h:332
auto stop_read() -> void
Stops the read loop to prevent further async operations.
auto detach_observer(std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void
Detaches a previously attached observer.
auto is_backpressure_active() const -> bool
Checks if backpressure is currently active.
std::atomic< bool > backpressure_active_
Backpressure state flag.
Definition tcp_socket.h:341
auto set_receive_callback_view(std::function< void(std::span< const uint8_t >)> callback) -> void
Sets a zero-copy callback to receive inbound data as a view.
std::atomic< std::size_t > pending_bytes_
Current pending bytes in send buffer.
Definition tcp_socket.h:338
std::function< void(const std::vector< uint8_t > &)> receive_callback_t
Callback type aliases for lock-free storage.
Definition tcp_socket.h:303
auto attach_observer(std::shared_ptr< network_core::interfaces::socket_observer > observer) -> void
Attaches an observer to receive socket events.
auto set_error_callback(std::function< void(std::error_code)> callback) -> void
Sets a callback to handle socket errors (e.g., read/write failures).
auto start_read() -> void
Begins the continuous asynchronous read loop.
auto notify_observers_error(std::error_code ec) -> void
Helper to notify all observers of error events.
auto async_send(std::vector< uint8_t > &&data, std::function< void(std::error_code, std::size_t)> handler) -> void
Initiates an asynchronous write of the given data buffer.
auto set_backpressure_callback(backpressure_callback callback) -> void
Sets a callback for backpressure notifications.
auto pending_bytes() const -> std::size_t
Returns current pending bytes count.
std::shared_ptr< backpressure_callback > backpressure_callback_
Backpressure notification callback.
Definition tcp_socket.h:344
auto reset_metrics() -> void
Resets socket metrics to zero.
std::function< void(bool apply_backpressure)> backpressure_callback
Callback type for backpressure notifications.
Definition tcp_socket.h:52
auto metrics() const -> const socket_metrics &
Returns socket metrics for monitoring.
std::shared_ptr< error_callback_t > error_callback_
Definition tcp_socket.h:318
std::shared_ptr< receive_callback_view_t > receive_callback_view_
Definition tcp_socket.h:317
auto do_read() -> void
Internal function to handle the read logic with async_read_some().
auto config() const -> const socket_config &
Returns the current socket configuration.
~tcp_socket()=default
Default destructor (no special cleanup needed).
auto is_closed() const -> bool
Checks if the socket has been closed.
auto close() -> void
Safely closes the socket and stops all async operations.
std::mutex mutex
Configuration for TCP socket backpressure control.
Definition common_defs.h:30
Runtime metrics for socket monitoring.
Definition common_defs.h:68