19#include <kcenon/common/interfaces/logger_interface.h>
24#include <condition_variable>
36using log_level = common::interfaces::log_level;
49 std::condition_variable_any
queue_cv;
53 std::vector<std::weak_ptr<log_writer_interface>>
writers;
81 if (
running_.exchange(
true, std::memory_order_acq_rel)) {
103 if (!
running_.exchange(
false, std::memory_order_acq_rel)) {
112 state_->queue_cv.notify_all();
121 state_->queue_cv.notify_one();
126 return running_.load(std::memory_order_acquire);
130#if LOGGER_HAS_JTHREAD
131 static void worker_loop(std::shared_ptr<log_collector_shared_state> state,
132 std::stop_token stop_token) {
137 while (!stop_token.stop_requested()) {
138 std::vector<log_entry> batch;
141 std::unique_lock<std::mutex> lock(state->queue_mutex);
144 bool has_work = state->queue_cv.wait(lock, stop_token, [&state]() {
145 return !state->queue.empty();
149 if (stop_token.stop_requested()) {
154 if (!has_work || state->queue.empty()) {
159 batch.reserve(std::min(state->batch_size, state->queue.size()));
160 while (!state->queue.empty() && batch.size() < state->batch_size) {
161 batch.push_back(std::move(state->queue.front()));
167 for (
const auto& entry : batch) {
173 static void worker_loop(std::shared_ptr<log_collector_shared_state> state,
179 while (!
stop.stop_requested()) {
180 std::vector<log_entry> batch;
183 std::unique_lock<std::mutex> lock(state->queue_mutex);
186 state->queue_cv.wait(lock, [&state, &
stop]() {
187 return stop.stop_requested() || !state->queue.empty();
191 if (
stop.stop_requested()) {
196 if (state->queue.empty()) {
201 batch.reserve(std::min(state->batch_size, state->queue.size()));
202 while (!state->queue.empty() && batch.size() < state->batch_size) {
203 batch.push_back(std::move(state->queue.front()));
209 for (
const auto& entry : batch) {
216 static void write_to_all(
const std::shared_ptr<log_collector_shared_state>& state,
223 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
225 std::lock_guard<std::mutex> lock(state->writers_mutex);
226 writers_snapshot.reserve(state->writers.size());
227 for (
auto& weak_writer : state->writers) {
228 if (
auto writer = weak_writer.lock()) {
229 writers_snapshot.push_back(writer);
235 for (
auto& writer : writers_snapshot) {
237 writer->write(entry);
245 std::shared_ptr<log_collector_shared_state>
state_;
252 explicit impl(std::size_t buffer_size, std::size_t batch_size)
262 const std::string& message,
263 const std::string& file,
265 const std::string& function,
266 const std::chrono::system_clock::time_point& timestamp) {
268 std::lock_guard<std::mutex> lock(
state_->queue_mutex);
273 std::uint64_t dropped =
dropped_messages_.fetch_add(1, std::memory_order_relaxed) + 1;
276 if (dropped % 100 == 1) {
277 std::fprintf(stderr,
"[WARNING] Log queue full: %llu messages dropped total\n",
278 static_cast<unsigned long long>(dropped));
285 log_entry entry(level, message, timestamp);
286 if (!file.empty() || line != 0 || !function.empty()) {
289 state_->queue.push(std::move(entry));
299 void add_writer(std::shared_ptr<log_writer_interface> writer) {
303 std::lock_guard<std::mutex> lock(
state_->writers_mutex);
304 state_->writers.push_back(writer);
308 std::lock_guard<std::mutex> lock(
state_->writers_mutex);
335 std::lock_guard<std::mutex> lock(
state_->queue_mutex);
336 if (
state_->queue.empty()) {
341 std::this_thread::sleep_for(std::chrono::microseconds(100));
349 std::lock_guard<std::mutex> lock(
state_->queue_mutex);
355 std::queue<log_entry> remaining;
357 std::lock_guard<std::mutex> lock(
state_->queue_mutex);
358 std::swap(remaining,
state_->queue);
362 while (!remaining.empty()) {
363 auto entry = std::move(remaining.front());
370 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
372 std::lock_guard<std::mutex> lock(
state_->writers_mutex);
373 writers_snapshot.reserve(
state_->writers.size());
374 for (
auto& weak_writer :
state_->writers) {
375 if (
auto writer = weak_writer.lock()) {
376 writers_snapshot.push_back(writer);
381 for (
auto& writer : writers_snapshot) {
392 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
394 std::lock_guard<std::mutex> lock(
state_->writers_mutex);
395 writers_snapshot.reserve(
state_->writers.size());
396 for (
auto& weak_writer :
state_->writers) {
397 if (
auto writer = weak_writer.lock()) {
398 writers_snapshot.push_back(writer);
404 for (
auto& writer : writers_snapshot) {
406 writer->write(entry);
414 std::shared_ptr<log_collector_shared_state>
state_;
415 std::unique_ptr<log_collector_jthread_worker>
worker_;
422 : pimpl_(std::make_unique<
impl>(buffer_size, batch_size)) {
428 const std::string& message,
429 const std::string& file,
431 const std::string& function,
432 const std::chrono::system_clock::time_point& timestamp) {
433 return pimpl_->enqueue(level, message, file, line, function, timestamp);
437 pimpl_->add_writer(writer);
457 return pimpl_->get_queue_metrics();
Abstract base class for all log output writers kcenon.
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
void add_writer(std::shared_ptr< log_writer_interface > writer)
std::shared_ptr< log_collector_shared_state > state_
std::atomic< std::uint64_t > dropped_messages_
std::pair< std::size_t, std::size_t > get_queue_metrics() const
void write_to_all(const log_entry &entry)
bool enqueue(log_level level, const std::string &message, const std::string &file, int line, const std::string &function, const std::chrono::system_clock::time_point ×tamp)
impl(std::size_t buffer_size, std::size_t batch_size)
std::unique_ptr< log_collector_jthread_worker > worker_
Worker thread for log processing with jthread compatibility.
log_collector_jthread_worker(std::shared_ptr< log_collector_shared_state > state)
async::compat_jthread thread_
static void worker_loop(std::shared_ptr< log_collector_shared_state > state, async::simple_stop_source &stop)
bool is_running() const noexcept
std::shared_ptr< log_collector_shared_state > state_
static void write_to_all(const std::shared_ptr< log_collector_shared_state > &state, const log_entry &entry)
~log_collector_jthread_worker()
std::atomic< bool > running_
~log_collector()
Destructor.
void clear_writers()
Clear all writers.
bool enqueue(common::interfaces::log_level level, const std::string &message, const std::string &file, int line, const std::string &function, const std::chrono::system_clock::time_point ×tamp)
Enqueue a log entry.
log_collector(std::size_t buffer_size=8192, std::size_t batch_size=100)
Constructor.
std::unique_ptr< impl > pimpl_
void start()
Start the background processing thread.
void flush()
Flush all pending log entries.
void stop()
Stop the background processing thread.
std::pair< size_t, size_t > get_queue_metrics() const
Get queue metrics.
void add_writer(std::shared_ptr< log_writer_interface > writer)
Add a writer.
Compatibility header for std::jthread and std::stop_token kcenon.
Asynchronous log collector using C++20 std::jthread kcenon.
Data structures for representing log entries and source locations kcenon.
common::interfaces::log_level log_level
Shared state for log processing - survives impl destruction.
const std::size_t buffer_size
const std::size_t batch_size
std::queue< log_entry > queue
std::vector< std::weak_ptr< log_writer_interface > > writers
std::condition_variable queue_cv
log_collector_shared_state(std::size_t buffer_sz, std::size_t batch_sz)
Represents a single log entry with all associated metadata.
std::optional< source_location > location
Optional source code location information.
Source code location information for debugging.