Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
async_writer.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
15#include "queued_writer_base.h"
16#include <queue>
17#include <thread>
18#include <condition_variable>
19#include <iostream>
20
21namespace kcenon::logger {
22
37class async_writer : public queued_writer_base<std::queue<log_entry>> {
39
40public:
47 explicit async_writer(std::unique_ptr<log_writer_interface> wrapped_writer,
48 std::size_t queue_size = 10000,
49 std::chrono::seconds flush_timeout = std::chrono::seconds(5))
50 : base_type(std::move(wrapped_writer), queue_size, "async")
52 , running_(false) {
53 }
54
58 ~async_writer() override {
59 stop();
60 }
61
65 void start() {
66 // Use compare_exchange to safely check and set running_ flag
67 bool expected = false;
68 if (!running_.compare_exchange_strong(expected, true)) {
69 return; // Already running
70 }
71
72 // Try to create the worker thread with proper error handling
73 try {
74 worker_thread_ = std::thread([this]() {
76 });
77 } catch (const std::exception&) {
78 // Thread creation failed, rollback the running_ flag
79 running_.store(false);
80 throw; // Re-throw to notify caller
81 }
82 }
83
88 void stop(bool force_flush = true) {
89 if (!running_.exchange(false)) {
90 return; // Already stopped
91 }
92
93 // Process remaining messages if requested
94 if (force_flush) {
95 std::lock_guard<std::mutex> lock(queue_mutex_);
96 size_t remaining_count = queue_.size();
97 if (remaining_count > 0) {
98 std::cerr << "[async_writer] Info: Processing " << remaining_count
99 << " remaining messages before shutdown.\n";
100 }
101 }
102
103 // Signal the worker thread to stop
104 {
105 std::lock_guard<std::mutex> lock(queue_mutex_);
106 queue_cv_.notify_all();
107 }
108
109 // Wait for the worker thread to finish (infinite wait - shutdown prioritizes safety)
110 if (worker_thread_.joinable()) {
111 worker_thread_.join();
112 }
113
114 // Verify all messages were processed
115 if (!queue_.empty()) {
116 std::cerr << "[async_writer] Warning: " << queue_.size()
117 << " messages were not processed during shutdown.\n";
118 }
119 }
120
127 common::VoidResult write(const log_entry& entry) override {
128 if (!running_) {
129 // If not running, write directly
130 return wrapped().write(entry);
131 }
132
133 return try_enqueue(entry);
134 }
135
141 if (!running_) {
142 return wrapped().flush();
143 }
144
145 // Wait for the queue to be empty with a timeout to prevent indefinite blocking
146 std::unique_lock<std::mutex> lock(queue_mutex_);
147 bool flushed = flush_cv_.wait_for(lock, flush_timeout_, [this]() {
148 return queue_.empty();
149 });
150
151 if (!flushed) {
152 // Timeout occurred - worker thread may have exited or is blocked
154 "Flush operation timed out after " +
155 std::to_string(flush_timeout_.count()) + " seconds");
156 }
157
158 // Flush the wrapped writer
159 return wrapped().flush();
160 }
161
166 bool is_healthy() const override {
167 return wrapped().is_healthy() && running_;
168 }
169
174 std::string get_name() const override {
175 return "async_" + wrapped().get_name();
176 }
177
178protected:
184 common::VoidResult handle_overflow(const log_entry& /*entry*/) override {
185 return make_logger_void_result(logger_error_code::queue_full, "Async writer queue is full");
186 }
187
191 void on_entry_enqueued() override {
192 queue_cv_.notify_one();
193 }
194
195private:
200 while (running_) {
201 std::unique_lock<std::mutex> lock(queue_mutex_);
202
203 // Wait for messages or stop signal
204 queue_cv_.wait(lock, [this]() {
205 return !queue_.empty() || !running_;
206 });
207
208 // Process all available messages
209 while (!queue_.empty()) {
210 auto entry = std::move(queue_.front());
211 queue_.pop();
212
213 // Unlock while writing
214 lock.unlock();
215 wrapped().write(entry);
216 lock.lock();
217 }
218
219 // Notify flush waiters
220 flush_cv_.notify_all();
221 }
222 }
223
228 std::lock_guard<std::mutex> lock(queue_mutex_);
229 while (!queue_.empty()) {
230 auto entry = std::move(queue_.front());
231 queue_.pop();
232 wrapped().write(entry);
233 }
234 wrapped().flush();
235 }
236
237 std::chrono::seconds flush_timeout_; // Configurable flush timeout
238
239 std::condition_variable queue_cv_;
240 std::condition_variable flush_cv_;
241
242 std::atomic<bool> running_;
243 std::thread worker_thread_;
244};
245
246} // namespace kcenon::logger
Asynchronous wrapper for log writers.
~async_writer() override
Destructor.
void on_entry_enqueued() override
Called after entry is enqueued - notifies worker thread.
std::string get_name() const override
Get the name of this writer.
bool is_healthy() const override
Check if the writer is healthy.
void stop(bool force_flush=true)
Stop the async writer thread.
std::atomic< bool > running_
void start()
Start the async writer thread.
void process_messages()
Process messages from the queue.
common::VoidResult flush() override
Flush all pending messages.
std::chrono::seconds flush_timeout_
std::condition_variable flush_cv_
std::condition_variable queue_cv_
async_writer(std::unique_ptr< log_writer_interface > wrapped_writer, std::size_t queue_size=10000, std::chrono::seconds flush_timeout=std::chrono::seconds(5))
Constructor accepting log_writer_interface (Decorator pattern)
common::VoidResult write(const log_entry &entry) override
Write a log entry asynchronously.
common::VoidResult handle_overflow(const log_entry &) override
Handle queue overflow.
void flush_remaining()
Flush any remaining messages after stopping.
log_writer_interface & wrapped() noexcept
Access the wrapped writer (non-const)
virtual std::string get_name() const =0
virtual bool is_healthy() const =0
Check if the writer is healthy.
virtual common::VoidResult flush()=0
Flush any buffered data.
virtual common::VoidResult write(const log_entry &entry)=0
Write a log entry.
Abstract base class for queue-based log writers.
common::VoidResult make_logger_void_result(logger_error_code code, const std::string &message="")
Base template for queue-based log writers.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155