Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
network_writer.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
19
20#ifdef _WIN32
21 #include <winsock2.h>
22 #include <ws2tcpip.h>
23 #pragma comment(lib, "ws2_32.lib")
24 typedef int socklen_t;
25 typedef SSIZE_T ssize_t; // Define ssize_t for Windows
26 #define close closesocket
27#else
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <netdb.h>
32 #include <unistd.h>
33#endif
34
35#include <cerrno>
36#include <cstring>
37#include <functional>
38#include <iomanip>
39#include <sstream>
40#include <thread>
41
42namespace kcenon::logger {
43
44using namespace async;
45
53public:
54 using send_callback = std::function<void()>;
55
57 : callback_(std::move(callback))
58 {}
59
63
64 void start() {
65 if (running_.exchange(true, std::memory_order_acq_rel)) {
66 return; // Already started
67 }
68
69#if LOGGER_HAS_JTHREAD
70 auto callback = callback_;
71 thread_ = compat_jthread([callback](std::stop_token stop_token) {
72 while (!stop_token.stop_requested()) {
73 if (callback) {
74 callback();
75 }
76 std::this_thread::sleep_for(std::chrono::milliseconds(10));
77 }
78 });
79#else
80 // Create worker thread with manual stop source
81 // Note: We use the stop_source created by compat_jthread to ensure
82 // request_stop() correctly signals the worker loop
83 auto callback = callback_;
85 while (!stop.stop_requested()) {
86 if (callback) {
87 callback();
88 }
89 std::this_thread::sleep_for(std::chrono::milliseconds(10));
90 }
91 });
92#endif
93 }
94
95 void stop() {
96 if (!running_.exchange(false, std::memory_order_acq_rel)) {
97 return; // Already stopped
98 }
99
100 // Request stop and join thread
102 thread_.join();
103 }
104
105 void notify_work() {
106 has_work_.store(true, std::memory_order_release);
107 }
108
109private:
112 std::atomic<bool> running_{false};
113 std::atomic<bool> has_work_{false};
114};
115
123public:
124 using reconnect_callback = std::function<void()>;
125
127 std::chrono::seconds interval)
128 : callback_(std::move(callback))
129 , reconnect_interval_(interval)
130 {}
131
135
136 void start() {
137 if (running_.exchange(true, std::memory_order_acq_rel)) {
138 return; // Already started
139 }
140
141#if LOGGER_HAS_JTHREAD
142 auto callback = callback_;
143 auto interval = reconnect_interval_;
144 thread_ = compat_jthread([callback, interval](std::stop_token stop_token) {
145 while (!stop_token.stop_requested()) {
146 if (callback) {
147 callback();
148 }
149 // Sleep with stop checking
150 for (auto elapsed = std::chrono::milliseconds{0};
151 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
152 && !stop_token.stop_requested();
153 elapsed += std::chrono::milliseconds(100)) {
154 std::this_thread::sleep_for(std::chrono::milliseconds(100));
155 }
156 }
157 });
158#else
159 // Create worker thread with manual stop source
160 // Note: We use the stop_source created by compat_jthread to ensure
161 // request_stop() correctly signals the worker loop
162 auto callback = callback_;
163 auto interval = reconnect_interval_;
164 thread_ = compat_jthread([callback, interval](simple_stop_source& stop) {
165 while (!stop.stop_requested()) {
166 if (callback) {
167 callback();
168 }
169 // Sleep with stop checking
170 for (auto elapsed = std::chrono::milliseconds{0};
171 elapsed < std::chrono::duration_cast<std::chrono::milliseconds>(interval)
172 && !stop.stop_requested();
173 elapsed += std::chrono::milliseconds(100)) {
174 std::this_thread::sleep_for(std::chrono::milliseconds(100));
175 }
176 }
177 });
178#endif
179 }
180
181 void stop() {
182 if (!running_.exchange(false, std::memory_order_acq_rel)) {
183 return; // Already stopped
184 }
185
186 // Request stop and join thread
188 thread_.join();
189 }
190
191private:
193 std::chrono::seconds reconnect_interval_;
195 std::atomic<bool> running_{false};
196};
197
198network_writer::network_writer(const std::string& host,
199 uint16_t port,
200 protocol_type protocol,
201 size_t buffer_size,
202 std::chrono::seconds reconnect_interval)
203 : host_(host)
204 , port_(port)
205 , protocol_(protocol)
206 , buffer_size_(buffer_size)
207 , reconnect_interval_(reconnect_interval)
208 , socket_fd_(-1) {
209
210#ifdef _WIN32
211 // Initialize Winsock
212 WSADATA wsaData;
213 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
214 throw std::runtime_error("Failed to initialize Winsock");
215 }
216#endif
217
218 running_ = true;
219
220 // Create and start send worker
221 send_worker_ = std::make_unique<network_send_jthread_worker>(
222 [this] { process_buffer(); });
223 send_worker_->start();
224
225 // Create and start reconnect worker for TCP
227 reconnect_worker_ = std::make_unique<network_reconnect_jthread_worker>(
229 reconnect_worker_->start();
230 }
231
232 // Initial connection attempt
233 connect();
234}
235
237 running_ = false;
238
239 // Stop workers with error handling
240 // IMPORTANT: Reset workers after stop to join their threads before accessing any members
241 utils::safe_destructor_operation("send_worker_stop", [this]() {
242 if (send_worker_) {
243 send_worker_->stop();
244 send_worker_.reset();
245 }
246 });
247
248 utils::safe_destructor_operation("reconnect_worker_stop", [this]() {
249 if (reconnect_worker_) {
250 reconnect_worker_->stop();
251 reconnect_worker_.reset();
252 }
253 });
254
255 // Disconnect with error handling
256 utils::safe_destructor_operation("network_disconnect", [this]() {
257 disconnect();
258 });
259
260#ifdef _WIN32
261 // Cleanup Winsock with error handling
262 utils::safe_destructor_operation("winsock_cleanup", []() {
263 WSACleanup();
264 });
265#endif
266}
267
269
270 std::lock_guard<std::mutex> lock(buffer_mutex_);
271
272 // Check buffer size
273 if (buffer_.size() >= buffer_size_) {
274 // Drop oldest message
275 buffer_.pop();
276 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
278 // Note: We still accept the new message after dropping the oldest
279 }
280
281 // Create a copy of the entry since log_entry is move-only
282 if (entry.location) {
283 buffer_.emplace(entry.level,
284 entry.message.to_string(),
285 entry.location->file.to_string(),
286 entry.location->line,
287 entry.location->function.to_string(),
288 entry.timestamp);
289 } else {
290 buffer_.emplace(entry.level,
291 entry.message.to_string(),
292 entry.timestamp);
293 }
294
295 // Notify send worker
296 if (send_worker_) {
297 send_worker_->notify_work();
298 }
299
300 return common::ok();
301}
302
304 std::unique_lock<std::mutex> lock(buffer_mutex_);
305 auto start = std::chrono::steady_clock::now();
306 auto timeout = std::chrono::seconds(5); // 5 second timeout
307
308 while (!buffer_.empty()) {
309 if (buffer_cv_.wait_for(lock, timeout, [this] { return buffer_.empty() || !running_; })) {
310 if (!buffer_.empty() && !running_) {
312 "Network writer stopped before flush completed");
313 }
314 } else {
316 "Network flush timeout");
317 }
318
319 if (std::chrono::steady_clock::now() - start > timeout) {
321 "Network flush exceeded timeout");
322 }
323 }
324 return common::ok();
325}
326
328 std::lock_guard<std::mutex> lock(stats_mutex_);
329 return stats_;
330}
331
333 if (connected_) {
334 return true;
335 }
336
337 // Resolve hostname using getaddrinfo (thread-safe, IPv4/IPv6)
338 int sock_type = (protocol_ == protocol_type::tcp) ? SOCK_STREAM : SOCK_DGRAM;
339
340 struct addrinfo hints{}, *result = nullptr;
341 hints.ai_family = AF_UNSPEC;
342 hints.ai_socktype = sock_type;
343
344 auto port_str = std::to_string(port_);
345 int rv = getaddrinfo(host_.c_str(), port_str.c_str(), &hints, &result);
346 if (rv != 0) {
347 std::lock_guard<std::mutex> lock(stats_mutex_);
349 stats_.last_error = std::chrono::system_clock::now();
350 return false;
351 }
352
353 // Try each resolved address until one succeeds
354 for (auto* rp = result; rp != nullptr; rp = rp->ai_next) {
355 socket_fd_ = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
356 if (socket_fd_ < 0) {
357 continue;
358 }
359
360 if (::connect(socket_fd_, rp->ai_addr, static_cast<socklen_t>(rp->ai_addrlen)) == 0) {
361 freeaddrinfo(result);
362 connected_ = true;
363
364 std::lock_guard<std::mutex> lock(stats_mutex_);
365 stats_.last_connected = std::chrono::system_clock::now();
366 return true;
367 }
368
370 socket_fd_ = -1;
371 }
372
373 freeaddrinfo(result);
374
375 std::lock_guard<std::mutex> lock(stats_mutex_);
377 stats_.last_error = std::chrono::system_clock::now();
378 return false;
379}
380
382 if (socket_fd_ >= 0) {
384 socket_fd_ = -1;
385 }
386 connected_ = false;
387}
388
389bool network_writer::send_data(const std::string& data) {
390 if (!connected_ || socket_fd_ < 0) {
391 return false;
392 }
393
394#ifdef _WIN32
395 int sent = ::send(socket_fd_, data.c_str(), static_cast<int>(data.length()), 0);
396#else
397 ssize_t sent = ::send(socket_fd_, data.c_str(), data.length(), 0);
398#endif
399 if (sent < 0) {
401 // TCP connection lost
402 disconnect();
403
404 std::lock_guard<std::mutex> lock(stats_mutex_);
406 stats_.last_error = std::chrono::system_clock::now();
407 }
408 return false;
409 }
410
411 std::lock_guard<std::mutex> lock(stats_mutex_);
413 stats_.bytes_sent += sent;
414
415 return true;
416}
417
419 std::unique_lock<std::mutex> lock(buffer_mutex_);
420
421 // Process buffered logs
422 while (!buffer_.empty() && running_) {
423 auto entry = std::move(buffer_.front());
424 buffer_.pop();
425 lock.unlock();
426
427 // Format and send
428 std::string formatted = format_for_network(entry);
429 send_data(formatted);
430
431 lock.lock();
432 }
433}
434
436 if (!connected_ && running_) {
437 connect();
438 }
439}
440
442 // Format as JSON for network transmission
443 std::ostringstream oss;
444 oss << "{";
445
446 // Timestamp
447 auto time_t = std::chrono::system_clock::to_time_t(entry.timestamp);
448 oss << "\"@timestamp\":\"";
449 oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%dT%H:%M:%SZ") << "\",";
450
451 // Level - convert logger_system::log_level to common::interfaces::log_level
452 auto level = static_cast<common::interfaces::log_level>(static_cast<int>(entry.level));
453 oss << "\"level\":\"" << utils::string_utils::level_to_string(level) << "\",";
454
455 // Message
456 oss << "\"message\":\"" << escape_json(entry.message.to_string()) << "\"";
457
458 // Optional fields from source_location
459 if (entry.location) {
460 std::string file = entry.location->file.to_string();
461 std::string function = entry.location->function.to_string();
462
463 if (!file.empty()) {
464 oss << ",\"file\":\"" << escape_json(file) << "\"";
465 oss << ",\"line\":" << entry.location->line;
466 }
467
468 if (!function.empty()) {
469 oss << ",\"function\":\"" << escape_json(function) << "\"";
470 }
471 }
472
473 // Add hostname
474 char hostname[256];
475 if (gethostname(hostname, sizeof(hostname)) == 0) {
476 oss << ",\"host\":\"" << hostname << "\"";
477 }
478
479 oss << "}\n";
480 return oss.str();
481}
482
483std::string network_writer::escape_json(const std::string& str) const {
484 std::string escaped;
485 for (char c : str) {
486 if (c == '"') escaped += "\\\"";
487 else if (c == '\\') escaped += "\\\\";
488 else if (c == '\n') escaped += "\\n";
489 else if (c == '\r') escaped += "\\r";
490 else if (c == '\t') escaped += "\\t";
491 else escaped += c;
492 }
493 return escaped;
494}
495
496} // namespace kcenon::logger
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
virtual common::VoidResult close()
Close the writer and release resources.
Worker thread for reconnection attempts with jthread compatibility.
network_reconnect_jthread_worker(reconnect_callback callback, std::chrono::seconds interval)
Worker thread for sending buffered logs with jthread compatibility.
network_send_jthread_worker(send_callback callback)
~network_writer() override
Destructor.
common::VoidResult write(const log_entry &entry) override
Write log entry.
common::VoidResult flush() override
Flush pending logs.
std::chrono::seconds reconnect_interval_
std::condition_variable buffer_cv_
network_writer(const std::string &host, uint16_t port, protocol_type protocol=protocol_type::tcp, size_t buffer_size=8192, std::chrono::seconds reconnect_interval=std::chrono::seconds(5))
Constructor.
std::unique_ptr< network_reconnect_jthread_worker > reconnect_worker_
std::string format_for_network(const log_entry &entry)
std::queue< log_entry > buffer_
bool send_data(const std::string &data)
std::unique_ptr< network_send_jthread_worker > send_worker_
std::string escape_json(const std::string &str) const
connection_stats get_stats() const
std::string to_string() const
Convert to std::string.
static std::string level_to_string(log_level level)
Convert log level to human-readable string.
Structured error context for debugging log system failures.
Compatibility header for std::jthread and std::stop_token kcenon.
VoidResult ok()
void safe_destructor_operation(const std::string &operation_name, F &&operation) noexcept
Safe operation execution for destructors.
common::VoidResult make_logger_void_result(logger_error_code code, const std::string &message="")
Network writer for sending logs over TCP/UDP.
String utility functions for log formatting and conversion.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155
std::optional< source_location > location
Optional source code location information.
Definition log_entry.h:183
log_level level
Severity level of the log message.
Definition log_entry.h:162
small_string_256 message
The actual log message.
Definition log_entry.h:169
std::chrono::system_clock::time_point timestamp
Timestamp when the log entry was created.
Definition log_entry.h:175
std::chrono::system_clock::time_point last_connected
std::chrono::system_clock::time_point last_error