19#include <kcenon/common/interfaces/logger_interface.h>
26#include <condition_variable>
31class batch_processing_jthread_worker;
65 common::interfaces::log_level
level;
79 std::chrono::system_clock::time_point ts)
125 const config& cfg = config{});
142 void stop(
bool flush_remaining =
true);
156 bool add_entry(
const batch_entry& entry);
216 std::chrono::steady_clock::time_point deadline);
243 std::chrono::nanoseconds processing_time,
244 const std::string& flush_reason);
254 std::unique_ptr<lockfree_spsc_queue<batch_entry, queue_size>>
queue_;
Abstract base class for all log output writers kcenon.
Advanced batch processor with dynamic sizing and back-pressure handling.
void stop(bool flush_remaining=true)
Stop the batch processor.
std::atomic< std::chrono::milliseconds > current_wait_time_
batch_processor(log_writer_ptr writer, const config &cfg=config{})
Constructor.
std::chrono::steady_clock::time_point last_adjustment_time_
const processing_stats & get_stats() const
Get current processing statistics.
size_t get_current_batch_size() const
Get current batch size setting.
~batch_processor()
Destructor.
std::unique_ptr< batch_processing_jthread_worker > processing_worker_
std::atomic< size_t > current_batch_size_
void update_stats(size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
Update processing statistics.
bool start()
Start the batch processor.
void reset_stats()
Reset statistics.
bool add_entry(batch_entry &&entry)
Add entry to the batch queue.
size_t collect_entries(std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
Collect entries for batch processing.
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_
size_t process_batch(std::vector< batch_entry > &batch)
Process current batch.
std::atomic< bool > should_stop_
void adjust_batch_size()
Adjust batch size based on performance metrics.
bool is_healthy() const
Check if processor is healthy.
std::atomic< size_t > recent_queue_size_
std::condition_variable notify_cv_
void process_loop_iteration()
Single iteration of processing loop.
static constexpr size_t queue_size
void flush()
Force flush current batch.
bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const
Check if batch should be flushed based on time.
std::atomic< bool > running_
bool handle_back_pressure()
Handle back-pressure conditions.
size_t get_queue_size() const
Get current queue size.
std::atomic< double > recent_processing_time_ms_
Error codes specific to the logger system.
High-performance lock-free queue implementation.
Data structures for representing log entries and source locations kcenon.
std::unique_ptr< batch_processor > make_batch_processor(log_writer_ptr writer, const batch_processor::config &cfg)
Factory function to create a batch processor.
std::unique_ptr< log_writer_interface > log_writer_ptr
Type alias for writer unique pointer.
batch_entry(common::interfaces::log_level lvl, std::string msg, std::string f, int l, std::string func, std::chrono::system_clock::time_point ts)
std::chrono::system_clock::time_point timestamp
common::interfaces::log_level level
Configuration for batch processor.
std::chrono::milliseconds min_wait_time
Minimum wait time.
double size_decrease_factor
Factor for decreasing batch size.
std::chrono::microseconds back_pressure_delay
Delay when under back-pressure.
size_t max_batch_size
Maximum batch size.
std::chrono::milliseconds max_wait_time
Maximum wait time.
size_t min_batch_size
Minimum batch size.
bool enable_back_pressure
Enable back-pressure handling.
size_t initial_batch_size
Initial batch size.
bool enable_dynamic_sizing
Enable dynamic batch sizing.
size_t back_pressure_threshold
Queue size threshold for back-pressure.
double size_increase_factor
Factor for increasing batch size.
std::atomic< uint64_t > dynamic_size_adjustments
std::atomic< uint64_t > flush_by_time
std::atomic< uint64_t > flush_by_size
std::atomic< uint64_t > dropped_entries
std::atomic< double > average_batch_size
std::atomic< uint64_t > flush_by_manual
std::atomic< double > average_processing_time_ms
std::atomic< uint64_t > back_pressure_events
std::atomic< uint64_t > total_batches
std::atomic< uint64_t > total_entries