Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
batch_processor.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
15#include "lockfree_queue.h"
19#include <kcenon/common/interfaces/logger_interface.h>
20#include <vector>
21#include <chrono>
22#include <atomic>
23#include <memory>
24#include <functional>
25#include <mutex> // Added for synchronization primitives
26#include <condition_variable> // Added for threading coordination
27
28namespace kcenon::logger::async {
29
30// Forward declaration for processing worker (std::jthread-based)
31class batch_processing_jthread_worker;
32
37public:
41 struct config {
42 size_t initial_batch_size{100};
43 size_t min_batch_size{10};
44 size_t max_batch_size{1000};
45
46 std::chrono::milliseconds max_wait_time{1000};
47 std::chrono::milliseconds min_wait_time{10};
48
51
54
56 std::chrono::microseconds back_pressure_delay{100};
57
58 config() noexcept {}
59 };
60
64 struct batch_entry {
65 common::interfaces::log_level level;
66 std::string message;
67 std::string file;
68 int line;
69 std::string function;
70 std::chrono::system_clock::time_point timestamp;
71
72 batch_entry() = default;
73
74 batch_entry(common::interfaces::log_level lvl,
75 std::string msg,
76 std::string f,
77 int l,
78 std::string func,
79 std::chrono::system_clock::time_point ts)
80 : level(lvl)
81 , message(std::move(msg))
82 , file(std::move(f))
83 , line(l)
84 , function(std::move(func))
85 , timestamp(ts) {}
86 };
87
92 std::atomic<uint64_t> total_batches{0};
93 std::atomic<uint64_t> total_entries{0};
94 std::atomic<uint64_t> dropped_entries{0};
95 std::atomic<uint64_t> back_pressure_events{0};
96 std::atomic<uint64_t> dynamic_size_adjustments{0};
97
98 std::atomic<uint64_t> flush_by_size{0};
99 std::atomic<uint64_t> flush_by_time{0};
100 std::atomic<uint64_t> flush_by_manual{0};
101
102 std::atomic<double> average_batch_size{0.0};
103 std::atomic<double> average_processing_time_ms{0.0};
104
105 void reset() {
106 total_batches = 0;
107 total_entries = 0;
108 dropped_entries = 0;
111 flush_by_size = 0;
112 flush_by_time = 0;
113 flush_by_manual = 0;
114 average_batch_size = 0.0;
116 }
117 };
118
124 explicit batch_processor(log_writer_ptr writer,
125 const config& cfg = config{});
126
131
136 bool start();
137
142 void stop(bool flush_remaining = true);
143
149 bool add_entry(batch_entry&& entry);
150
156 bool add_entry(const batch_entry& entry);
157
161 void flush();
162
167 bool is_healthy() const;
168
173 const processing_stats& get_stats() const { return stats_; }
174
179
184 size_t get_queue_size() const;
185
190 size_t get_current_batch_size() const {
191 return current_batch_size_.load(std::memory_order_relaxed);
192 }
193
194private:
199
205 size_t process_batch(std::vector<batch_entry>& batch);
206
214 size_t collect_entries(std::vector<batch_entry>& batch,
215 size_t max_entries,
216 std::chrono::steady_clock::time_point deadline);
217
221 void adjust_batch_size();
222
228
234 bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const;
235
242 void update_stats(size_t batch_size,
243 std::chrono::nanoseconds processing_time,
244 const std::string& flush_reason);
245
246 // Configuration
248
249 // Writer
251
252 // Queue
253 static constexpr size_t queue_size = 8192; // Must be power of 2
254 std::unique_ptr<lockfree_spsc_queue<batch_entry, queue_size>> queue_;
255
256 // Processing worker (using std::jthread)
257 std::unique_ptr<batch_processing_jthread_worker> processing_worker_;
258 std::atomic<bool> running_{false};
259 std::atomic<bool> should_stop_{false};
260
261 // Condition variable for waking the worker thread when new entries arrive
262 std::mutex notify_mutex_;
263 std::condition_variable notify_cv_;
264
265 // Dynamic sizing
266 std::atomic<size_t> current_batch_size_;
267 std::atomic<std::chrono::milliseconds> current_wait_time_;
268
269 // Statistics
271
272 // Performance metrics for dynamic adjustment
273 std::atomic<double> recent_processing_time_ms_{0.0};
274 std::atomic<size_t> recent_queue_size_{0};
275 std::chrono::steady_clock::time_point last_adjustment_time_;
276};
277
284std::unique_ptr<batch_processor> make_batch_processor(
285 log_writer_ptr writer,
287
288} // namespace kcenon::logger::async
Abstract base class for all log output writers kcenon.
Advanced batch processor with dynamic sizing and back-pressure handling.
void stop(bool flush_remaining=true)
Stop the batch processor.
std::atomic< std::chrono::milliseconds > current_wait_time_
batch_processor(log_writer_ptr writer, const config &cfg=config{})
Constructor.
std::chrono::steady_clock::time_point last_adjustment_time_
const processing_stats & get_stats() const
Get current processing statistics.
size_t get_current_batch_size() const
Get current batch size setting.
std::unique_ptr< batch_processing_jthread_worker > processing_worker_
void update_stats(size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
Update processing statistics.
bool start()
Start the batch processor.
bool add_entry(batch_entry &&entry)
Add entry to the batch queue.
size_t collect_entries(std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
Collect entries for batch processing.
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_
size_t process_batch(std::vector< batch_entry > &batch)
Process current batch.
void adjust_batch_size()
Adjust batch size based on performance metrics.
bool is_healthy() const
Check if processor is healthy.
void process_loop_iteration()
Single iteration of processing loop.
void flush()
Force flush current batch.
bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const
Check if batch should be flushed based on time.
bool handle_back_pressure()
Handle back-pressure conditions.
size_t get_queue_size() const
Get current queue size.
std::atomic< double > recent_processing_time_ms_
Error codes specific to the logger system.
High-performance lock-free queue implementation.
Data structures for representing log entries and source locations kcenon.
std::unique_ptr< batch_processor > make_batch_processor(log_writer_ptr writer, const batch_processor::config &cfg)
Factory function to create a batch processor.
std::unique_ptr< log_writer_interface > log_writer_ptr
Type alias for writer unique pointer.
Batch entry structure.
std::string message
int line
batch_entry(common::interfaces::log_level lvl, std::string msg, std::string f, int l, std::string func, std::chrono::system_clock::time_point ts)
batch_entry()=default
std::string function
std::string file
std::chrono::system_clock::time_point timestamp
common::interfaces::log_level level
Configuration for batch processor.
std::chrono::milliseconds min_wait_time
Minimum wait time.
double size_decrease_factor
Factor for decreasing batch size.
std::chrono::microseconds back_pressure_delay
Delay when under back-pressure.
std::chrono::milliseconds max_wait_time
Maximum wait time.
bool enable_back_pressure
Enable back-pressure handling.
bool enable_dynamic_sizing
Enable dynamic batch sizing.
size_t back_pressure_threshold
Queue size threshold for back-pressure.
double size_increase_factor
Factor for increasing batch size.