21#include <condition_variable>
38 std::mutex& notify_mutex,
39 std::condition_variable& notify_cv)
53 if (
running_.exchange(
true, std::memory_order_acq_rel)) {
62 while (!stop_token.stop_requested()) {
67 std::unique_lock<std::mutex> lock(mtx);
68 cv.wait_for(lock, std::chrono::milliseconds(10),
69 [&stop_token]{
return stop_token.stop_requested(); });
81 while (!
stop->stop_requested()) {
86 std::unique_lock<std::mutex> lock(mtx);
87 cv.wait_for(lock, std::chrono::milliseconds(10),
88 [&
stop]{
return stop->stop_requested(); });
95 if (!
running_.exchange(
false, std::memory_order_acq_rel)) {
105 return running_.load(std::memory_order_acquire);
114#if !LOGGER_HAS_JTHREAD
121 , writer_(std::move(writer))
123 , current_batch_size_(cfg.initial_batch_size)
124 , current_wait_time_(cfg.max_wait_time)
125 , last_adjustment_time_(std::chrono::steady_clock::now()) {
128 throw std::invalid_argument(
"Writer cannot be null");
133 throw std::invalid_argument(
"min_batch_size cannot be greater than max_batch_size");
137 throw std::invalid_argument(
"initial_batch_size must be within min/max range");
146 bool expected =
false;
147 if (!
running_.compare_exchange_strong(expected,
true)) {
173 if (flush_remaining) {
176 std::vector<batch_entry> final_batch;
179 while (
queue_->dequeue(entry)) {
180 final_batch.push_back(std::move(entry));
183 if (!final_batch.empty()) {
194 if (!
running_.load(std::memory_order_relaxed)) {
198 if (!
queue_->enqueue(std::move(entry))) {
219 return running_.load(std::memory_order_relaxed) &&
228 static thread_local std::chrono::steady_clock::time_point last_flush_time = std::chrono::steady_clock::now();
229 static thread_local std::chrono::steady_clock::time_point last_adjustment_time = std::chrono::steady_clock::now();
234 std::vector<batch_entry> current_batch;
235 current_batch.reserve(batch_size);
237 const auto deadline = std::chrono::steady_clock::now() + wait_time;
238 const auto entries_collected =
collect_entries(current_batch, batch_size, deadline);
240 if (entries_collected > 0) {
241 const auto process_start = std::chrono::steady_clock::now();
243 const auto process_end = std::chrono::steady_clock::now();
245 const auto processing_time = process_end - process_start;
246 const bool flushed_by_size = (entries_collected >= batch_size);
249 std::string flush_reason;
250 if (flushed_by_size) {
251 flush_reason =
"size";
253 }
else if (flushed_by_time) {
254 flush_reason =
"time";
257 flush_reason =
"partial";
261 last_flush_time = process_end;
270 const auto now = std::chrono::steady_clock::now();
271 if (now - last_adjustment_time > std::chrono::seconds(5)) {
273 last_adjustment_time = now;
281 std::chrono::steady_clock::time_point deadline) {
282 size_t collected = 0;
285 while (collected < max_entries && std::chrono::steady_clock::now() < deadline) {
286 if (
queue_->dequeue(entry)) {
287 batch.push_back(std::move(entry));
291 auto remaining = deadline - std::chrono::steady_clock::now();
292 if (remaining <= std::chrono::steady_clock::duration::zero()) {
297 [
this]{
return queue_->size() > 0 ||
should_stop_.load(std::memory_order_relaxed); });
302 while (collected < max_entries && queue_->dequeue(entry)) {
303 batch.push_back(std::move(entry));
311 if (batch.empty() || !
writer_) {
315 size_t processed = 0;
316 for (
const auto& entry : batch) {
318 log_entry log_ent(entry.level, entry.message, entry.file,
319 entry.line, entry.function, entry.timestamp);
341 size_t new_size = current_size;
344 if (
queue_size > current_size * 2 && recent_time < 10.0) {
349 else if (recent_time > 100.0 ||
queue_size < current_size / 4) {
354 if (new_size != current_size) {
376 const auto now = std::chrono::steady_clock::now();
377 const auto elapsed = now - last_flush_time;
380 return elapsed >= current_wait;
384 std::chrono::nanoseconds processing_time,
385 const std::string& flush_reason) {
388 const double processing_time_ms =
389 std::chrono::duration_cast<std::chrono::microseconds>(processing_time).count() / 1000.0;
392 const double alpha = 0.1;
394 const double new_time = alpha * processing_time_ms + (1.0 - alpha) * current_time;
399 if (total_batches > 0) {
401 const double avg_size =
static_cast<double>(total_entries) / total_batches;
408 std::memory_order_relaxed);
417 return std::make_unique<batch_processor>(std::move(writer), cfg);
Optimized batch processing engine for log entries.
Worker thread for batch processing with jthread compatibility.
std::condition_variable & notify_cv_
std::function< void()> process_callback
std::shared_ptr< simple_stop_source > stop_source_
~batch_processing_jthread_worker()
batch_processing_jthread_worker(process_callback callback, std::mutex ¬ify_mutex, std::condition_variable ¬ify_cv)
std::mutex & notify_mutex_
bool is_running() const noexcept
std::atomic< bool > running_
process_callback callback_
void stop(bool flush_remaining=true)
Stop the batch processor.
std::atomic< std::chrono::milliseconds > current_wait_time_
batch_processor(log_writer_ptr writer, const config &cfg=config{})
Constructor.
~batch_processor()
Destructor.
std::unique_ptr< batch_processing_jthread_worker > processing_worker_
std::atomic< size_t > current_batch_size_
void update_stats(size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
Update processing statistics.
bool start()
Start the batch processor.
bool add_entry(batch_entry &&entry)
Add entry to the batch queue.
size_t collect_entries(std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
Collect entries for batch processing.
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_
size_t process_batch(std::vector< batch_entry > &batch)
Process current batch.
std::atomic< bool > should_stop_
void adjust_batch_size()
Adjust batch size based on performance metrics.
bool is_healthy() const
Check if processor is healthy.
std::atomic< size_t > recent_queue_size_
std::condition_variable notify_cv_
void process_loop_iteration()
Single iteration of processing loop.
static constexpr size_t queue_size
void flush()
Force flush current batch.
bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const
Check if batch should be flushed based on time.
std::atomic< bool > running_
bool handle_back_pressure()
Handle back-pressure conditions.
size_t get_queue_size() const
Get current queue size.
std::atomic< double > recent_processing_time_ms_
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Lock-free single-producer single-consumer queue.
Simple stop source for environments without std::stop_token.
Compatibility header for std::jthread and std::stop_token kcenon.
#define LOGGER_HAS_JTHREAD
std::unique_ptr< batch_processor > make_batch_processor(log_writer_ptr writer, const batch_processor::config &cfg)
Factory function to create a batch processor.
std::unique_ptr< log_writer_interface > log_writer_ptr
Type alias for writer unique pointer.
Object pool implementation for high-performance memory management.
Configuration for batch processor.
double size_decrease_factor
Factor for decreasing batch size.
std::chrono::microseconds back_pressure_delay
Delay when under back-pressure.
size_t max_batch_size
Maximum batch size.
size_t min_batch_size
Minimum batch size.
bool enable_back_pressure
Enable back-pressure handling.
size_t initial_batch_size
Initial batch size.
bool enable_dynamic_sizing
Enable dynamic batch sizing.
size_t back_pressure_threshold
Queue size threshold for back-pressure.
double size_increase_factor
Factor for increasing batch size.
std::atomic< uint64_t > dynamic_size_adjustments
std::atomic< uint64_t > flush_by_time
std::atomic< uint64_t > flush_by_size
std::atomic< uint64_t > dropped_entries
std::atomic< double > average_batch_size
std::atomic< double > average_processing_time_ms
std::atomic< uint64_t > back_pressure_events
std::atomic< uint64_t > total_batches
std::atomic< uint64_t > total_entries
Represents a single log entry with all associated metadata.