24 #pragma comment(lib, "ws2_32.lib")
25 typedef int socklen_t;
26 typedef SSIZE_T ssize_t;
27 #define close closesocket
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
66 if (
running_.exchange(
true, std::memory_order_acq_rel)) {
73 while (!stop_token.stop_requested()) {
77 std::this_thread::sleep_for(std::chrono::milliseconds(10));
86 while (!
stop.stop_requested()) {
90 std::this_thread::sleep_for(std::chrono::milliseconds(10));
97 if (!
running_.exchange(
false, std::memory_order_acq_rel)) {
107 has_work_.store(
true, std::memory_order_release);
128 std::chrono::seconds interval)
138 if (
running_.exchange(
true, std::memory_order_acq_rel)) {
142#if LOGGER_HAS_JTHREAD
146 while (!stop_token.stop_requested()) {
151 for (
auto elapsed = std::chrono::milliseconds{0};
152 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
153 && !stop_token.stop_requested();
154 elapsed += std::chrono::milliseconds(100)) {
155 std::this_thread::sleep_for(std::chrono::milliseconds(100));
166 while (!
stop.stop_requested()) {
171 for (
auto elapsed = std::chrono::milliseconds{0};
172 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
173 && !
stop.stop_requested();
174 elapsed += std::chrono::milliseconds(100)) {
175 std::this_thread::sleep_for(std::chrono::milliseconds(100));
183 if (!
running_.exchange(
false, std::memory_order_acq_rel)) {
203 std::chrono::seconds reconnect_interval)
206 , protocol_(protocol)
207 , buffer_size_(buffer_size)
208 , reconnect_interval_(reconnect_interval)
214 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
215 throw std::runtime_error(
"Failed to initialize Winsock");
222 send_worker_ = std::make_unique<network_send_jthread_worker>(
288 entry.
location->function.to_string(),
306 auto start = std::chrono::steady_clock::now();
307 auto timeout = std::chrono::seconds(5);
310 if (
buffer_cv_.wait_for(lock, timeout, [
this] { return buffer_.empty() || !running_; })) {
313 "Network writer stopped before flush completed");
317 "Network flush timeout");
320 if (std::chrono::steady_clock::now() - start > timeout) {
322 "Network flush exceeded timeout");
341 struct addrinfo hints{}, *
result =
nullptr;
342 hints.ai_family = AF_UNSPEC;
343 hints.ai_socktype = sock_type;
345 auto port_str = std::to_string(
port_);
346 int rv = getaddrinfo(
host_.c_str(), port_str.c_str(), &hints, &
result);
355 for (
auto* rp =
result; rp !=
nullptr; rp = rp->ai_next) {
356 socket_fd_ = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
396 int sent = ::send(
socket_fd_, data.c_str(),
static_cast<int>(data.length()), 0);
398 ssize_t sent = ::send(
socket_fd_, data.c_str(), data.length(), 0);
424 auto entry = std::move(
buffer_.front());
444 std::ostringstream oss;
448 auto time_t = std::chrono::system_clock::to_time_t(entry.
timestamp);
449 oss <<
"\"@timestamp\":\"";
450 oss << std::put_time(std::gmtime(&time_t),
"%Y-%m-%dT%H:%M:%SZ") <<
"\",";
453 auto level =
static_cast<common::interfaces::log_level
>(
static_cast<int>(entry.
level));
461 std::string file = entry.
location->file.to_string();
462 std::string function = entry.
location->function.to_string();
465 oss <<
",\"file\":\"" <<
escape_json(file) <<
"\"";
466 oss <<
",\"line\":" << entry.
location->line;
469 if (!function.empty()) {
470 oss <<
",\"function\":\"" <<
escape_json(function) <<
"\"";
476 if (gethostname(hostname,
sizeof(hostname)) == 0) {
477 oss <<
",\"host\":\"" << hostname <<
"\"";
485 std::string unsigned_body = oss.str() +
"}";
489 oss <<
",\"signature\":\"" << sig <<
"\"";
498 std::shared_ptr<security::integrity_policy> policy) {
506 if (c ==
'"') escaped +=
"\\\"";
507 else if (c ==
'\\') escaped +=
"\\\\";
508 else if (c ==
'\n') escaped +=
"\\n";
509 else if (c ==
'\r') escaped +=
"\\r";
510 else if (c ==
'\t') escaped +=
"\\t";
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
virtual common::VoidResult close()
Close the writer and release resources.
Worker thread for reconnection attempts with jthread compatibility.
reconnect_callback callback_
std::function< void()> reconnect_callback
~network_reconnect_jthread_worker()
std::atomic< bool > running_
network_reconnect_jthread_worker(reconnect_callback callback, std::chrono::seconds interval)
std::chrono::seconds reconnect_interval_
Worker thread for sending buffered logs with jthread compatibility.
std::atomic< bool > running_
std::atomic< bool > has_work_
std::function< void()> send_callback
network_send_jthread_worker(send_callback callback)
~network_send_jthread_worker()
std::atomic< bool > connected_
~network_writer() override
Destructor.
std::shared_ptr< security::integrity_policy > integrity_policy_
std::atomic< bool > running_
common::VoidResult write(const log_entry &entry) override
Write log entry.
common::VoidResult flush() override
Flush pending logs.
std::chrono::seconds reconnect_interval_
std::condition_variable buffer_cv_
network_writer(const std::string &host, uint16_t port, protocol_type protocol=protocol_type::tcp, size_t buffer_size=8192, std::chrono::seconds reconnect_interval=std::chrono::seconds(5))
Constructor.
std::unique_ptr< network_reconnect_jthread_worker > reconnect_worker_
std::string format_for_network(const log_entry &entry)
void set_integrity_policy(std::shared_ptr< security::integrity_policy > policy)
Enable tamper-evident integrity signing for outbound frames.
std::queue< log_entry > buffer_
bool send_data(const std::string &data)
std::unique_ptr< network_send_jthread_worker > send_worker_
std::string escape_json(const std::string &str) const
connection_stats get_stats() const
std::string to_string() const
Convert to std::string.
static std::string level_to_string(log_level level)
Convert log level to human-readable string.
Structured error context for debugging log system failures.
Tamper-evident log signing policies for writers.
Compatibility header for std::jthread and std::stop_token kcenon.
void safe_destructor_operation(const std::string &operation_name, F &&operation) noexcept
Safe operation execution for destructors.
common::VoidResult make_logger_void_result(logger_error_code code, const std::string &message="")
Network writer for sending logs over TCP/UDP.
String utility functions for log formatting and conversion.
Represents a single log entry with all associated metadata.
std::optional< source_location > location
Optional source code location information.
log_level level
Severity level of the log message.
small_string_256 message
The actual log message.
std::chrono::system_clock::time_point timestamp
Timestamp when the log entry was created.
Get connection statistics.
std::chrono::system_clock::time_point last_connected
std::chrono::system_clock::time_point last_error
uint64_t connection_failures