Logger System 1.0.0
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
network_writer.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
20
21#ifdef _WIN32
22 #include <winsock2.h>
23 #include <ws2tcpip.h>
24 #pragma comment(lib, "ws2_32.lib")
25 typedef int socklen_t;
26 typedef SSIZE_T ssize_t; // Define ssize_t for Windows
27 #define close closesocket
28#else
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <netdb.h>
33 #include <unistd.h>
34#endif
35
36#include <cerrno>
37#include <cstring>
38#include <functional>
39#include <iomanip>
40#include <sstream>
41#include <thread>
42
43namespace kcenon::logger {
44
45using namespace async;
46
54public:
55 using send_callback = std::function<void()>;
56
58 : callback_(std::move(callback))
59 {}
60
64
65 void start() {
66 if (running_.exchange(true, std::memory_order_acq_rel)) {
67 return; // Already started
68 }
69
70#if LOGGER_HAS_JTHREAD
71 auto callback = callback_;
72 thread_ = compat_jthread([callback](std::stop_token stop_token) {
73 while (!stop_token.stop_requested()) {
74 if (callback) {
75 callback();
76 }
77 std::this_thread::sleep_for(std::chrono::milliseconds(10));
78 }
79 });
80#else
81 // Create worker thread with manual stop source
82 // Note: We use the stop_source created by compat_jthread to ensure
83 // request_stop() correctly signals the worker loop
84 auto callback = callback_;
86 while (!stop.stop_requested()) {
87 if (callback) {
88 callback();
89 }
90 std::this_thread::sleep_for(std::chrono::milliseconds(10));
91 }
92 });
93#endif
94 }
95
96 void stop() {
97 if (!running_.exchange(false, std::memory_order_acq_rel)) {
98 return; // Already stopped
99 }
100
101 // Request stop and join thread
103 thread_.join();
104 }
105
106 void notify_work() {
107 has_work_.store(true, std::memory_order_release);
108 }
109
110private:
113 std::atomic<bool> running_{false};
114 std::atomic<bool> has_work_{false};
115};
116
124public:
125 using reconnect_callback = std::function<void()>;
126
128 std::chrono::seconds interval)
129 : callback_(std::move(callback))
130 , reconnect_interval_(interval)
131 {}
132
136
137 void start() {
138 if (running_.exchange(true, std::memory_order_acq_rel)) {
139 return; // Already started
140 }
141
142#if LOGGER_HAS_JTHREAD
143 auto callback = callback_;
144 auto interval = reconnect_interval_;
145 thread_ = compat_jthread([callback, interval](std::stop_token stop_token) {
146 while (!stop_token.stop_requested()) {
147 if (callback) {
148 callback();
149 }
150 // Sleep with stop checking
151 for (auto elapsed = std::chrono::milliseconds{0};
152 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
153 && !stop_token.stop_requested();
154 elapsed += std::chrono::milliseconds(100)) {
155 std::this_thread::sleep_for(std::chrono::milliseconds(100));
156 }
157 }
158 });
159#else
160 // Create worker thread with manual stop source
161 // Note: We use the stop_source created by compat_jthread to ensure
162 // request_stop() correctly signals the worker loop
163 auto callback = callback_;
164 auto interval = reconnect_interval_;
165 thread_ = compat_jthread([callback, interval](simple_stop_source& stop) {
166 while (!stop.stop_requested()) {
167 if (callback) {
168 callback();
169 }
170 // Sleep with stop checking
171 for (auto elapsed = std::chrono::milliseconds{0};
172 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
173 && !stop.stop_requested();
174 elapsed += std::chrono::milliseconds(100)) {
175 std::this_thread::sleep_for(std::chrono::milliseconds(100));
176 }
177 }
178 });
179#endif
180 }
181
182 void stop() {
183 if (!running_.exchange(false, std::memory_order_acq_rel)) {
184 return; // Already stopped
185 }
186
187 // Request stop and join thread
189 thread_.join();
190 }
191
192private:
194 std::chrono::seconds reconnect_interval_;
196 std::atomic<bool> running_{false};
197};
198
199network_writer::network_writer(const std::string& host,
200 uint16_t port,
201 protocol_type protocol,
202 size_t buffer_size,
203 std::chrono::seconds reconnect_interval)
204 : host_(host)
205 , port_(port)
206 , protocol_(protocol)
207 , buffer_size_(buffer_size)
208 , reconnect_interval_(reconnect_interval)
209 , socket_fd_(-1) {
210
211#ifdef _WIN32
212 // Initialize Winsock
213 WSADATA wsaData;
214 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
215 throw std::runtime_error("Failed to initialize Winsock");
216 }
217#endif
218
219 running_ = true;
220
221 // Create and start send worker
222 send_worker_ = std::make_unique<network_send_jthread_worker>(
223 [this] { process_buffer(); });
224 send_worker_->start();
225
226 // Create and start reconnect worker for TCP
228 reconnect_worker_ = std::make_unique<network_reconnect_jthread_worker>(
230 reconnect_worker_->start();
231 }
232
233 // Initial connection attempt
234 connect();
235}
236
238 running_ = false;
239
240 // Stop workers with error handling
241 // IMPORTANT: Reset workers after stop to join their threads before accessing any members
242 utils::safe_destructor_operation("send_worker_stop", [this]() {
243 if (send_worker_) {
244 send_worker_->stop();
245 send_worker_.reset();
246 }
247 });
248
249 utils::safe_destructor_operation("reconnect_worker_stop", [this]() {
250 if (reconnect_worker_) {
251 reconnect_worker_->stop();
252 reconnect_worker_.reset();
253 }
254 });
255
256 // Disconnect with error handling
257 utils::safe_destructor_operation("network_disconnect", [this]() {
258 disconnect();
259 });
260
261#ifdef _WIN32
262 // Cleanup Winsock with error handling
263 utils::safe_destructor_operation("winsock_cleanup", []() {
264 WSACleanup();
265 });
266#endif
267}
268
270
271 std::lock_guard<std::mutex> lock(buffer_mutex_);
272
273 // Check buffer size
274 if (buffer_.size() >= buffer_size_) {
275 // Drop oldest message
276 buffer_.pop();
277 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
279 // Note: We still accept the new message after dropping the oldest
280 }
281
282 // Create a copy of the entry since log_entry is move-only
283 if (entry.location) {
284 buffer_.emplace(entry.level,
285 entry.message.to_string(),
286 entry.location->file.to_string(),
287 entry.location->line,
288 entry.location->function.to_string(),
289 entry.timestamp);
290 } else {
291 buffer_.emplace(entry.level,
292 entry.message.to_string(),
293 entry.timestamp);
294 }
295
296 // Notify send worker
297 if (send_worker_) {
298 send_worker_->notify_work();
299 }
300
301 return common::ok();
302}
303
305 std::unique_lock<std::mutex> lock(buffer_mutex_);
306 auto start = std::chrono::steady_clock::now();
307 auto timeout = std::chrono::seconds(5); // 5 second timeout
308
309 while (!buffer_.empty()) {
310 if (buffer_cv_.wait_for(lock, timeout, [this] { return buffer_.empty() || !running_; })) {
311 if (!buffer_.empty() && !running_) {
313 "Network writer stopped before flush completed");
314 }
315 } else {
317 "Network flush timeout");
318 }
319
320 if (std::chrono::steady_clock::now() - start > timeout) {
322 "Network flush exceeded timeout");
323 }
324 }
325 return common::ok();
326}
327
329 std::lock_guard<std::mutex> lock(stats_mutex_);
330 return stats_;
331}
332
334 if (connected_) {
335 return true;
336 }
337
338 // Resolve hostname using getaddrinfo (thread-safe, IPv4/IPv6)
339 int sock_type = (protocol_ == protocol_type::tcp) ? SOCK_STREAM : SOCK_DGRAM;
340
341 struct addrinfo hints{}, *result = nullptr;
342 hints.ai_family = AF_UNSPEC;
343 hints.ai_socktype = sock_type;
344
345 auto port_str = std::to_string(port_);
346 int rv = getaddrinfo(host_.c_str(), port_str.c_str(), &hints, &result);
347 if (rv != 0) {
348 std::lock_guard<std::mutex> lock(stats_mutex_);
350 stats_.last_error = std::chrono::system_clock::now();
351 return false;
352 }
353
354 // Try each resolved address until one succeeds
355 for (auto* rp = result; rp != nullptr; rp = rp->ai_next) {
356 socket_fd_ = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
357 if (socket_fd_ < 0) {
358 continue;
359 }
360
361 if (::connect(socket_fd_, rp->ai_addr, static_cast<socklen_t>(rp->ai_addrlen)) == 0) {
362 freeaddrinfo(result);
363 connected_ = true;
364
365 std::lock_guard<std::mutex> lock(stats_mutex_);
366 stats_.last_connected = std::chrono::system_clock::now();
367 return true;
368 }
369
371 socket_fd_ = -1;
372 }
373
374 freeaddrinfo(result);
375
376 std::lock_guard<std::mutex> lock(stats_mutex_);
378 stats_.last_error = std::chrono::system_clock::now();
379 return false;
380}
381
383 if (socket_fd_ >= 0) {
385 socket_fd_ = -1;
386 }
387 connected_ = false;
388}
389
390bool network_writer::send_data(const std::string& data) {
391 if (!connected_ || socket_fd_ < 0) {
392 return false;
393 }
394
395#ifdef _WIN32
396 int sent = ::send(socket_fd_, data.c_str(), static_cast<int>(data.length()), 0);
397#else
398 ssize_t sent = ::send(socket_fd_, data.c_str(), data.length(), 0);
399#endif
400 if (sent < 0) {
402 // TCP connection lost
403 disconnect();
404
405 std::lock_guard<std::mutex> lock(stats_mutex_);
407 stats_.last_error = std::chrono::system_clock::now();
408 }
409 return false;
410 }
411
412 std::lock_guard<std::mutex> lock(stats_mutex_);
414 stats_.bytes_sent += sent;
415
416 return true;
417}
418
420 std::unique_lock<std::mutex> lock(buffer_mutex_);
421
422 // Process buffered logs
423 while (!buffer_.empty() && running_) {
424 auto entry = std::move(buffer_.front());
425 buffer_.pop();
426 lock.unlock();
427
428 // Format and send
429 std::string formatted = format_for_network(entry);
430 send_data(formatted);
431
432 lock.lock();
433 }
434}
435
437 if (!connected_ && running_) {
438 connect();
439 }
440}
441
443 // Format as JSON for network transmission
444 std::ostringstream oss;
445 oss << "{";
446
447 // Timestamp
448 auto time_t = std::chrono::system_clock::to_time_t(entry.timestamp);
449 oss << "\"@timestamp\":\"";
450 oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%dT%H:%M:%SZ") << "\",";
451
452 // Level - convert logger_system::log_level to common::interfaces::log_level
453 auto level = static_cast<common::interfaces::log_level>(static_cast<int>(entry.level));
454 oss << "\"level\":\"" << utils::string_utils::level_to_string(level) << "\",";
455
456 // Message
457 oss << "\"message\":\"" << escape_json(entry.message.to_string()) << "\"";
458
459 // Optional fields from source_location
460 if (entry.location) {
461 std::string file = entry.location->file.to_string();
462 std::string function = entry.location->function.to_string();
463
464 if (!file.empty()) {
465 oss << ",\"file\":\"" << escape_json(file) << "\"";
466 oss << ",\"line\":" << entry.location->line;
467 }
468
469 if (!function.empty()) {
470 oss << ",\"function\":\"" << escape_json(function) << "\"";
471 }
472 }
473
474 // Add hostname
475 char hostname[256];
476 if (gethostname(hostname, sizeof(hostname)) == 0) {
477 oss << ",\"host\":\"" << hostname << "\"";
478 }
479
480 // Tamper-evident signature (Issue #612). The signature covers the
481 // unsigned JSON body (closed with '}'); verifiers recompute by
482 // stripping the trailing ,"sig_alg":"..","signature":".." suffix,
483 // which is a stable canonical form on the wire.
484 if (integrity_policy_) {
485 std::string unsigned_body = oss.str() + "}";
486 std::string sig = integrity_policy_->sign(unsigned_body);
487 if (!sig.empty()) {
488 oss << ",\"sig_alg\":\"" << integrity_policy_->name() << "\"";
489 oss << ",\"signature\":\"" << sig << "\"";
490 }
491 }
492
493 oss << "}\n";
494 return oss.str();
495}
496
498 std::shared_ptr<security::integrity_policy> policy) {
499 std::lock_guard<std::mutex> lock(buffer_mutex_);
500 integrity_policy_ = std::move(policy);
501}
502
503std::string network_writer::escape_json(const std::string& str) const {
504 std::string escaped;
505 for (char c : str) {
506 if (c == '"') escaped += "\\\"";
507 else if (c == '\\') escaped += "\\\\";
508 else if (c == '\n') escaped += "\\n";
509 else if (c == '\r') escaped += "\\r";
510 else if (c == '\t') escaped += "\\t";
511 else escaped += c;
512 }
513 return escaped;
514}
515
516} // namespace kcenon::logger
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
virtual common::VoidResult close()
Close the writer and release resources.
Worker thread for reconnection attempts with jthread compatibility.
network_reconnect_jthread_worker(reconnect_callback callback, std::chrono::seconds interval)
Worker thread for sending buffered logs with jthread compatibility.
network_send_jthread_worker(send_callback callback)
~network_writer() override
Destructor.
std::shared_ptr< security::integrity_policy > integrity_policy_
common::VoidResult write(const log_entry &entry) override
Write log entry.
common::VoidResult flush() override
Flush pending logs.
std::chrono::seconds reconnect_interval_
std::condition_variable buffer_cv_
network_writer(const std::string &host, uint16_t port, protocol_type protocol=protocol_type::tcp, size_t buffer_size=8192, std::chrono::seconds reconnect_interval=std::chrono::seconds(5))
Constructor.
std::unique_ptr< network_reconnect_jthread_worker > reconnect_worker_
std::string format_for_network(const log_entry &entry)
void set_integrity_policy(std::shared_ptr< security::integrity_policy > policy)
Enable tamper-evident integrity signing for outbound frames.
std::queue< log_entry > buffer_
bool send_data(const std::string &data)
std::unique_ptr< network_send_jthread_worker > send_worker_
std::string escape_json(const std::string &str) const
connection_stats get_stats() const
std::string to_string() const
Convert to std::string.
static std::string level_to_string(log_level level)
Convert log level to human-readable string.
Structured error context for debugging log system failures.
Tamper-evident log signing policies for writers.
Compatibility header for std::jthread and std::stop_token kcenon.
VoidResult ok()
void safe_destructor_operation(const std::string &operation_name, F &&operation) noexcept
Safe operation execution for destructors.
common::VoidResult make_logger_void_result(logger_error_code code, const std::string &message="")
Network writer for sending logs over TCP/UDP.
String utility functions for log formatting and conversion.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155
std::optional< source_location > location
Optional source code location information.
Definition log_entry.h:183
log_level level
Severity level of the log message.
Definition log_entry.h:162
small_string_256 message
The actual log message.
Definition log_entry.h:169
std::chrono::system_clock::time_point timestamp
Timestamp when the log entry was created.
Definition log_entry.h:175
std::chrono::system_clock::time_point last_connected
std::chrono::system_clock::time_point last_error