Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
log_collector.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
19#include <kcenon/common/interfaces/logger_interface.h>
20
22
23#include <atomic>
24#include <condition_variable>
25#include <cstdio>
26#include <functional>
27#include <memory>
28#include <mutex>
29#include <queue>
30#include <thread>
31#include <vector>
32
33namespace kcenon::logger {
34
35// Type alias for log_level
36using log_level = common::interfaces::log_level;
37
46 std::queue<log_entry> queue;
47 mutable std::mutex queue_mutex;
48#if LOGGER_HAS_JTHREAD
49 std::condition_variable_any queue_cv; // Works with stop_token
50#else
51 std::condition_variable queue_cv; // Standard condition variable
52#endif
53 std::vector<std::weak_ptr<log_writer_interface>> writers;
54 std::mutex writers_mutex;
55 const std::size_t batch_size;
56 const std::size_t buffer_size;
57
58 explicit log_collector_shared_state(std::size_t buffer_sz, std::size_t batch_sz)
59 : batch_size(batch_sz)
60 , buffer_size(buffer_sz) {}
61};
62
70public:
71 explicit log_collector_jthread_worker(std::shared_ptr<log_collector_shared_state> state)
72 : state_(std::move(state))
73 , running_(false)
74 {}
75
79
80 void start() {
81 if (running_.exchange(true, std::memory_order_acq_rel)) {
82 return; // Already started
83 }
84
85#if LOGGER_HAS_JTHREAD
86 // Capture state by shared_ptr - survives worker destruction
87 auto state = state_;
88 thread_ = async::compat_jthread([state](std::stop_token stop_token) {
89 worker_loop(state, stop_token);
90 });
91#else
92 // Create worker thread with manual stop source
93 // Note: We use the stop_source created by compat_jthread to ensure
94 // request_stop() correctly signals the worker loop
95 auto state = state_;
97 worker_loop(state, stop);
98 });
99#endif
100 }
101
102 void stop() {
103 if (!running_.exchange(false, std::memory_order_acq_rel)) {
104 return; // Already stopped
105 }
106
107 // Request stop
109
110 // Wake up the worker in case it's waiting
111 if (state_) {
112 state_->queue_cv.notify_all();
113 }
114
115 // Wait for thread to finish
116 thread_.join();
117 }
118
119 void notify_work() {
120 if (state_) {
121 state_->queue_cv.notify_one();
122 }
123 }
124
125 [[nodiscard]] bool is_running() const noexcept {
126 return running_.load(std::memory_order_acquire);
127 }
128
129private:
130#if LOGGER_HAS_JTHREAD
131 static void worker_loop(std::shared_ptr<log_collector_shared_state> state,
132 std::stop_token stop_token) {
133 if (!state) {
134 return;
135 }
136
137 while (!stop_token.stop_requested()) {
138 std::vector<log_entry> batch;
139
140 {
141 std::unique_lock<std::mutex> lock(state->queue_mutex);
142
143 // Wait for work or stop signal using condition_variable_any
144 bool has_work = state->queue_cv.wait(lock, stop_token, [&state]() {
145 return !state->queue.empty();
146 });
147
148 // Check if stop was requested
149 if (stop_token.stop_requested()) {
150 break;
151 }
152
153 // Check if we actually have work
154 if (!has_work || state->queue.empty()) {
155 continue;
156 }
157
158 // Extract batch from queue
159 batch.reserve(std::min(state->batch_size, state->queue.size()));
160 while (!state->queue.empty() && batch.size() < state->batch_size) {
161 batch.push_back(std::move(state->queue.front()));
162 state->queue.pop();
163 }
164 }
165
166 // Process batch outside the lock
167 for (const auto& entry : batch) {
168 write_to_all(state, entry);
169 }
170 }
171 }
172#else
173 static void worker_loop(std::shared_ptr<log_collector_shared_state> state,
175 if (!state) {
176 return;
177 }
178
179 while (!stop.stop_requested()) {
180 std::vector<log_entry> batch;
181
182 {
183 std::unique_lock<std::mutex> lock(state->queue_mutex);
184
185 // Wait for work or stop signal
186 state->queue_cv.wait(lock, [&state, &stop]() {
187 return stop.stop_requested() || !state->queue.empty();
188 });
189
190 // Check if stop was requested
191 if (stop.stop_requested()) {
192 break;
193 }
194
195 // Check if we actually have work
196 if (state->queue.empty()) {
197 continue;
198 }
199
200 // Extract batch from queue
201 batch.reserve(std::min(state->batch_size, state->queue.size()));
202 while (!state->queue.empty() && batch.size() < state->batch_size) {
203 batch.push_back(std::move(state->queue.front()));
204 state->queue.pop();
205 }
206 }
207
208 // Process batch outside the lock
209 for (const auto& entry : batch) {
210 write_to_all(state, entry);
211 }
212 }
213 }
214#endif
215
216 static void write_to_all(const std::shared_ptr<log_collector_shared_state>& state,
217 const log_entry& entry) {
218 if (!state) {
219 return;
220 }
221
222 // Snapshot writers under lock
223 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
224 {
225 std::lock_guard<std::mutex> lock(state->writers_mutex);
226 writers_snapshot.reserve(state->writers.size());
227 for (auto& weak_writer : state->writers) {
228 if (auto writer = weak_writer.lock()) {
229 writers_snapshot.push_back(writer);
230 }
231 }
232 }
233
234 // Write to all writers without holding mutex
235 for (auto& writer : writers_snapshot) {
236 try {
237 writer->write(entry);
238 } catch (...) {
239 // Swallow exceptions to prevent thread termination
240 }
241 }
242 }
243
244private:
245 std::shared_ptr<log_collector_shared_state> state_;
247 std::atomic<bool> running_;
248};
249
251public:
252 explicit impl(std::size_t buffer_size, std::size_t batch_size)
253 : state_(std::make_shared<log_collector_shared_state>(buffer_size, batch_size))
254 , worker_(std::make_unique<log_collector_jthread_worker>(state_)) {
255 }
256
258 stop();
259 }
260
261 bool enqueue(log_level level,
262 const std::string& message,
263 const std::string& file,
264 int line,
265 const std::string& function,
266 const std::chrono::system_clock::time_point& timestamp) {
267 {
268 std::lock_guard<std::mutex> lock(state_->queue_mutex);
269
270 // Check if queue is full
271 if (state_->queue.size() >= state_->buffer_size) {
272 // Track dropped message
273 std::uint64_t dropped = dropped_messages_.fetch_add(1, std::memory_order_relaxed) + 1;
274
275 // Log warning periodically (every 100 dropped messages) to avoid spam
276 if (dropped % 100 == 1) {
277 std::fprintf(stderr, "[WARNING] Log queue full: %llu messages dropped total\n",
278 static_cast<unsigned long long>(dropped));
279 }
280
281 return false;
282 }
283
284 // Create log_entry with optional source location
285 log_entry entry(level, message, timestamp);
286 if (!file.empty() || line != 0 || !function.empty()) {
287 entry.location = source_location{file, line, function};
288 }
289 state_->queue.push(std::move(entry));
290 }
291
292 // Notify worker thread
293 if (worker_) {
294 worker_->notify_work();
295 }
296 return true;
297 }
298
299 void add_writer(std::shared_ptr<log_writer_interface> writer) {
300 if (!writer) {
301 return;
302 }
303 std::lock_guard<std::mutex> lock(state_->writers_mutex);
304 state_->writers.push_back(writer);
305 }
306
308 std::lock_guard<std::mutex> lock(state_->writers_mutex);
309 state_->writers.clear();
310 }
311
312 void start() {
313 if (worker_) {
314 worker_->start();
315 }
316 }
317
318 void stop() {
319 // Stop the worker thread first
320 if (worker_) {
321 worker_->stop();
322 }
323
324 // Drain remaining entries
325 drain_queue();
326
327 // Flush all writers
329 }
330
331 void flush() {
332 // Wait for queue to be empty
333 while (true) {
334 {
335 std::lock_guard<std::mutex> lock(state_->queue_mutex);
336 if (state_->queue.empty()) {
337 break;
338 }
339 }
340 // Brief yield to allow worker to process
341 std::this_thread::sleep_for(std::chrono::microseconds(100));
342 }
343
344 // Flush all writers
346 }
347
348 [[nodiscard]] std::pair<std::size_t, std::size_t> get_queue_metrics() const {
349 std::lock_guard<std::mutex> lock(state_->queue_mutex);
350 return {state_->queue.size(), state_->buffer_size};
351 }
352
353private:
354 void drain_queue() {
355 std::queue<log_entry> remaining;
356 {
357 std::lock_guard<std::mutex> lock(state_->queue_mutex);
358 std::swap(remaining, state_->queue);
359 }
360
361 // Process remaining entries
362 while (!remaining.empty()) {
363 auto entry = std::move(remaining.front());
364 remaining.pop();
365 write_to_all(entry);
366 }
367 }
368
370 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
371 {
372 std::lock_guard<std::mutex> lock(state_->writers_mutex);
373 writers_snapshot.reserve(state_->writers.size());
374 for (auto& weak_writer : state_->writers) {
375 if (auto writer = weak_writer.lock()) {
376 writers_snapshot.push_back(writer);
377 }
378 }
379 }
380
381 for (auto& writer : writers_snapshot) {
382 try {
383 writer->flush();
384 } catch (...) {
385 // Swallow exceptions during flush
386 }
387 }
388 }
389
390 void write_to_all(const log_entry& entry) {
391 // Snapshot writers under lock
392 std::vector<std::shared_ptr<log_writer_interface>> writers_snapshot;
393 {
394 std::lock_guard<std::mutex> lock(state_->writers_mutex);
395 writers_snapshot.reserve(state_->writers.size());
396 for (auto& weak_writer : state_->writers) {
397 if (auto writer = weak_writer.lock()) {
398 writers_snapshot.push_back(writer);
399 }
400 }
401 }
402
403 // Write to all writers without holding mutex
404 for (auto& writer : writers_snapshot) {
405 try {
406 writer->write(entry);
407 } catch (...) {
408 // Swallow exceptions to prevent issues
409 }
410 }
411 }
412
413private:
414 std::shared_ptr<log_collector_shared_state> state_;
415 std::unique_ptr<log_collector_jthread_worker> worker_;
416 std::atomic<std::uint64_t> dropped_messages_{0};
417};
418
419// log_collector public interface implementation
420
421log_collector::log_collector(std::size_t buffer_size, std::size_t batch_size)
422 : pimpl_(std::make_unique<impl>(buffer_size, batch_size)) {
423}
424
426
427bool log_collector::enqueue(log_level level,
428 const std::string& message,
429 const std::string& file,
430 int line,
431 const std::string& function,
432 const std::chrono::system_clock::time_point& timestamp) {
433 return pimpl_->enqueue(level, message, file, line, function, timestamp);
434}
435
436void log_collector::add_writer(std::shared_ptr<log_writer_interface> writer) {
437 pimpl_->add_writer(writer);
438}
439
441 pimpl_->clear_writers();
442}
443
445 pimpl_->start();
446}
447
449 pimpl_->stop();
450}
451
453 pimpl_->flush();
454}
455
456std::pair<std::size_t, std::size_t> log_collector::get_queue_metrics() const {
457 return pimpl_->get_queue_metrics();
458}
459
460} // namespace kcenon::logger
Abstract base class for all log output writers kcenon.
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
void add_writer(std::shared_ptr< log_writer_interface > writer)
std::shared_ptr< log_collector_shared_state > state_
std::atomic< std::uint64_t > dropped_messages_
std::pair< std::size_t, std::size_t > get_queue_metrics() const
void write_to_all(const log_entry &entry)
bool enqueue(log_level level, const std::string &message, const std::string &file, int line, const std::string &function, const std::chrono::system_clock::time_point &timestamp)
impl(std::size_t buffer_size, std::size_t batch_size)
std::unique_ptr< log_collector_jthread_worker > worker_
Worker thread for log processing with jthread compatibility.
log_collector_jthread_worker(std::shared_ptr< log_collector_shared_state > state)
static void worker_loop(std::shared_ptr< log_collector_shared_state > state, async::simple_stop_source &stop)
std::shared_ptr< log_collector_shared_state > state_
static void write_to_all(const std::shared_ptr< log_collector_shared_state > &state, const log_entry &entry)
void clear_writers()
Clear all writers.
bool enqueue(common::interfaces::log_level level, const std::string &message, const std::string &file, int line, const std::string &function, const std::chrono::system_clock::time_point &timestamp)
Enqueue a log entry.
log_collector(std::size_t buffer_size=8192, std::size_t batch_size=100)
Constructor.
std::unique_ptr< impl > pimpl_
void start()
Start the background processing thread.
void flush()
Flush all pending log entries.
void stop()
Stop the background processing thread.
std::pair< size_t, size_t > get_queue_metrics() const
Get queue metrics.
void add_writer(std::shared_ptr< log_writer_interface > writer)
Add a writer.
Compatibility header for std::jthread and std::stop_token kcenon.
Asynchronous log collector using C++20 std::jthread kcenon.
Data structures for representing log entries and source locations kcenon.
common::interfaces::log_level log_level
Shared state for log processing - survives impl destruction.
std::vector< std::weak_ptr< log_writer_interface > > writers
log_collector_shared_state(std::size_t buffer_sz, std::size_t batch_sz)
Represents a single log entry with all associated metadata.
Definition log_entry.h:155
std::optional< source_location > location
Optional source code location information.
Definition log_entry.h:183
Source code location information for debugging.
Definition log_entry.h:93