Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
kcenon::logger::async::batch_processor Class Reference

Advanced batch processor with dynamic sizing and back-pressure handling. More...

#include <batch_processor.h>

Collaboration diagram for kcenon::logger::async::batch_processor:
Collaboration graph

Classes

struct  batch_entry
 Batch entry structure. More...
 
struct  config
 Configuration for batch processor. More...
 
struct  processing_stats
 Processing statistics. More...
 

Public Member Functions

 batch_processor (log_writer_ptr writer, const config &cfg=config{})
 Constructor.
 
 ~batch_processor ()
 Destructor.
 
bool start ()
 Start the batch processor.
 
void stop (bool flush_remaining=true)
 Stop the batch processor.
 
bool add_entry (batch_entry &&entry)
 Add entry to the batch queue.
 
bool add_entry (const batch_entry &entry)
 Add entry to the batch queue (copy version)
 
void flush ()
 Force flush current batch.
 
bool is_healthy () const
 Check if processor is healthy.
 
const processing_statsget_stats () const
 Get current processing statistics.
 
void reset_stats ()
 Reset statistics.
 
size_t get_queue_size () const
 Get current queue size.
 
size_t get_current_batch_size () const
 Get current batch size setting.
 

Private Member Functions

void process_loop_iteration ()
 Single iteration of processing loop.
 
size_t process_batch (std::vector< batch_entry > &batch)
 Process current batch.
 
size_t collect_entries (std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
 Collect entries for batch processing.
 
void adjust_batch_size ()
 Adjust batch size based on performance metrics.
 
bool handle_back_pressure ()
 Handle back-pressure conditions.
 
bool should_flush_by_time (std::chrono::steady_clock::time_point last_flush_time) const
 Check if batch should be flushed based on time.
 
void update_stats (size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
 Update processing statistics.
 

Private Attributes

config config_
 
log_writer_ptr writer_
 
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_
 
std::unique_ptr< batch_processing_jthread_workerprocessing_worker_
 
std::atomic< bool > running_ {false}
 
std::atomic< bool > should_stop_ {false}
 
std::mutex notify_mutex_
 
std::condition_variable notify_cv_
 
std::atomic< size_t > current_batch_size_
 
std::atomic< std::chrono::milliseconds > current_wait_time_
 
processing_stats stats_
 
std::atomic< double > recent_processing_time_ms_ {0.0}
 
std::atomic< size_t > recent_queue_size_ {0}
 
std::chrono::steady_clock::time_point last_adjustment_time_
 

Static Private Attributes

static constexpr size_t queue_size = 8192
 

Detailed Description

Advanced batch processor with dynamic sizing and back-pressure handling.

Definition at line 36 of file batch_processor.h.

Constructor & Destructor Documentation

◆ batch_processor()

kcenon::logger::async::batch_processor::batch_processor ( log_writer_ptr writer,
const config & cfg = config{} )
explicit

Constructor.

Parameters
writerTarget writer for batch output
cfgConfiguration

Definition at line 119 of file batch_processor.cpp.

120 : config_(cfg)
121 , writer_(std::move(writer))
122 , queue_(std::make_unique<lockfree_spsc_queue<batch_entry, queue_size>>())
123 , current_batch_size_(cfg.initial_batch_size)
124 , current_wait_time_(cfg.max_wait_time)
125 , last_adjustment_time_(std::chrono::steady_clock::now()) {
126
127 if (!writer_) {
128 throw std::invalid_argument("Writer cannot be null");
129 }
130
131 // Validate configuration
132 if (cfg.min_batch_size > cfg.max_batch_size) {
133 throw std::invalid_argument("min_batch_size cannot be greater than max_batch_size");
134 }
135
136 if (cfg.initial_batch_size < cfg.min_batch_size || cfg.initial_batch_size > cfg.max_batch_size) {
137 throw std::invalid_argument("initial_batch_size must be within min/max range");
138 }
139}
std::atomic< std::chrono::milliseconds > current_wait_time_
std::chrono::steady_clock::time_point last_adjustment_time_
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_

References kcenon::logger::async::batch_processor::config::initial_batch_size, kcenon::logger::async::batch_processor::config::max_batch_size, kcenon::logger::async::batch_processor::config::min_batch_size, and writer_.

◆ ~batch_processor()

kcenon::logger::async::batch_processor::~batch_processor ( )

Destructor.

Definition at line 141 of file batch_processor.cpp.

141 {
142 stop(true);
143}
void stop(bool flush_remaining=true)
Stop the batch processor.

References stop().

Here is the call graph for this function:

Member Function Documentation

◆ add_entry() [1/2]

bool kcenon::logger::async::batch_processor::add_entry ( batch_entry && entry)

Add entry to the batch queue.

Parameters
entryLog entry to add
Returns
true if added successfully, false if queue is full

Definition at line 193 of file batch_processor.cpp.

193 {
194 if (!running_.load(std::memory_order_relaxed)) {
195 return false;
196 }
197
198 if (!queue_->enqueue(std::move(entry))) {
199 stats_.dropped_entries.fetch_add(1, std::memory_order_relaxed);
200 return false;
201 }
202
203 // Wake the worker thread immediately
204 notify_cv_.notify_one();
205
206 return true;
207}

References kcenon::logger::async::batch_processor::processing_stats::dropped_entries, notify_cv_, queue_, running_, and stats_.

Referenced by add_entry().

Here is the caller graph for this function:

◆ add_entry() [2/2]

bool kcenon::logger::async::batch_processor::add_entry ( const batch_entry & entry)

Add entry to the batch queue (copy version)

Parameters
entryLog entry to add
Returns
true if added successfully, false if queue is full

Definition at line 209 of file batch_processor.cpp.

209 {
210 return add_entry(batch_entry(entry));
211}
bool add_entry(batch_entry &&entry)
Add entry to the batch queue.

References add_entry().

Here is the call graph for this function:

◆ adjust_batch_size()

void kcenon::logger::async::batch_processor::adjust_batch_size ( )
private

Adjust batch size based on performance metrics.

Definition at line 336 of file batch_processor.cpp.

336 {
337 const auto current_size = current_batch_size_.load(std::memory_order_relaxed);
338 const auto queue_size = get_queue_size();
339 const auto recent_time = recent_processing_time_ms_.load(std::memory_order_relaxed);
340
341 size_t new_size = current_size;
342
343 // Increase batch size if queue is building up and processing is fast
344 if (queue_size > current_size * 2 && recent_time < 10.0) {
345 new_size = std::min(config_.max_batch_size,
346 static_cast<size_t>(current_size * config_.size_increase_factor));
347 }
348 // Decrease batch size if processing is slow or queue is small
349 else if (recent_time > 100.0 || queue_size < current_size / 4) {
350 new_size = std::max(config_.min_batch_size,
351 static_cast<size_t>(current_size * config_.size_decrease_factor));
352 }
353
354 if (new_size != current_size) {
355 current_batch_size_.store(new_size, std::memory_order_relaxed);
356 stats_.dynamic_size_adjustments.fetch_add(1, std::memory_order_relaxed);
357 }
358}
size_t get_queue_size() const
Get current queue size.
std::atomic< double > recent_processing_time_ms_
double size_decrease_factor
Factor for decreasing batch size.
double size_increase_factor
Factor for increasing batch size.

References config_, current_batch_size_, kcenon::logger::async::batch_processor::processing_stats::dynamic_size_adjustments, get_queue_size(), kcenon::logger::async::batch_processor::config::max_batch_size, kcenon::logger::async::batch_processor::config::min_batch_size, queue_size, recent_processing_time_ms_, kcenon::logger::async::batch_processor::config::size_decrease_factor, kcenon::logger::async::batch_processor::config::size_increase_factor, and stats_.

Referenced by process_loop_iteration().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ collect_entries()

size_t kcenon::logger::async::batch_processor::collect_entries ( std::vector< batch_entry > & batch,
size_t max_entries,
std::chrono::steady_clock::time_point deadline )
private

Collect entries for batch processing.

Parameters
batchVector to fill with entries
max_entriesMaximum entries to collect
deadlineTime deadline for collection
Returns
Number of entries collected

Definition at line 279 of file batch_processor.cpp.

281 {
282 size_t collected = 0;
283 batch_entry entry;
284
285 while (collected < max_entries && std::chrono::steady_clock::now() < deadline) {
286 if (queue_->dequeue(entry)) {
287 batch.push_back(std::move(entry));
288 ++collected;
289 } else {
290 // Queue is empty, wait for notification or remaining deadline
291 auto remaining = deadline - std::chrono::steady_clock::now();
292 if (remaining <= std::chrono::steady_clock::duration::zero()) {
293 break;
294 }
295 std::unique_lock<std::mutex> lock(notify_mutex_);
296 notify_cv_.wait_for(lock, remaining,
297 [this]{ return queue_->size() > 0 || should_stop_.load(std::memory_order_relaxed); });
298 }
299 }
300
301 // Try to collect more entries if we have time left and space
302 while (collected < max_entries && queue_->dequeue(entry)) {
303 batch.push_back(std::move(entry));
304 ++collected;
305 }
306
307 return collected;
308}

References notify_cv_, notify_mutex_, queue_, and should_stop_.

Referenced by process_loop_iteration().

Here is the caller graph for this function:

◆ flush()

void kcenon::logger::async::batch_processor::flush ( )

Force flush current batch.

Definition at line 213 of file batch_processor.cpp.

213 {
214 // This is handled by the processing loop
215 // We could add a flush signal mechanism here if needed
216}

◆ get_current_batch_size()

size_t kcenon::logger::async::batch_processor::get_current_batch_size ( ) const
inline

Get current batch size setting.

Returns
Current batch size

Definition at line 190 of file batch_processor.h.

190 {
191 return current_batch_size_.load(std::memory_order_relaxed);
192 }

References current_batch_size_.

◆ get_queue_size()

size_t kcenon::logger::async::batch_processor::get_queue_size ( ) const

Get current queue size.

Returns
Approximate queue size

Definition at line 223 of file batch_processor.cpp.

223 {
224 return queue_->size();
225}

References queue_.

Referenced by adjust_batch_size(), handle_back_pressure(), and update_stats().

Here is the caller graph for this function:

◆ get_stats()

const processing_stats & kcenon::logger::async::batch_processor::get_stats ( ) const
inline

Get current processing statistics.

Returns
Reference to current stats

Definition at line 173 of file batch_processor.h.

173{ return stats_; }

References stats_.

◆ handle_back_pressure()

bool kcenon::logger::async::batch_processor::handle_back_pressure ( )
private

Handle back-pressure conditions.

Returns
true if should continue processing

Definition at line 360 of file batch_processor.cpp.

360 {
361 const auto queue_size = get_queue_size();
362
364 stats_.back_pressure_events.fetch_add(1, std::memory_order_relaxed);
365
366 // Apply back-pressure delay
367 std::this_thread::sleep_for(config_.back_pressure_delay);
368
369 return queue_size < queue_size * 1.5; // Continue if queue isn't growing too fast
370 }
371
372 return true;
373}
std::chrono::microseconds back_pressure_delay
Delay when under back-pressure.
size_t back_pressure_threshold
Queue size threshold for back-pressure.

References kcenon::logger::async::batch_processor::config::back_pressure_delay, kcenon::logger::async::batch_processor::processing_stats::back_pressure_events, kcenon::logger::async::batch_processor::config::back_pressure_threshold, config_, get_queue_size(), queue_size, and stats_.

Referenced by process_loop_iteration().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ is_healthy()

bool kcenon::logger::async::batch_processor::is_healthy ( ) const

Check if processor is healthy.

Returns
true if running and writer is healthy

Definition at line 218 of file batch_processor.cpp.

218 {
219 return running_.load(std::memory_order_relaxed) &&
220 writer_ && writer_->is_healthy();
221}

References running_, and writer_.

◆ process_batch()

size_t kcenon::logger::async::batch_processor::process_batch ( std::vector< batch_entry > & batch)
private

Process current batch.

Parameters
batchVector of entries to process
Returns
Number of entries processed

Definition at line 310 of file batch_processor.cpp.

310 {
311 if (batch.empty() || !writer_) {
312 return 0;
313 }
314
315 size_t processed = 0;
316 for (const auto& entry : batch) {
317 // Create log_entry from batch_entry
318 log_entry log_ent(entry.level, entry.message, entry.file,
319 entry.line, entry.function, entry.timestamp);
320
321 auto result = writer_->write(log_ent);
322 if (result.is_ok()) {
323 ++processed;
324 }
325 }
326
327 // Flush after batch processing
328 writer_->flush();
329
330 stats_.total_batches.fetch_add(1, std::memory_order_relaxed);
331 stats_.total_entries.fetch_add(processed, std::memory_order_relaxed);
332
333 return processed;
334}

References stats_, kcenon::logger::async::batch_processor::processing_stats::total_batches, kcenon::logger::async::batch_processor::processing_stats::total_entries, and writer_.

Referenced by process_loop_iteration(), and stop().

Here is the caller graph for this function:

◆ process_loop_iteration()

void kcenon::logger::async::batch_processor::process_loop_iteration ( )
private

Single iteration of processing loop.

Definition at line 227 of file batch_processor.cpp.

227 {
228 static thread_local std::chrono::steady_clock::time_point last_flush_time = std::chrono::steady_clock::now();
229 static thread_local std::chrono::steady_clock::time_point last_adjustment_time = std::chrono::steady_clock::now();
230
231 const auto batch_size = current_batch_size_.load(std::memory_order_relaxed);
232 const auto wait_time = current_wait_time_.load(std::memory_order_relaxed);
233
234 std::vector<batch_entry> current_batch;
235 current_batch.reserve(batch_size);
236
237 const auto deadline = std::chrono::steady_clock::now() + wait_time;
238 const auto entries_collected = collect_entries(current_batch, batch_size, deadline);
239
240 if (entries_collected > 0) {
241 const auto process_start = std::chrono::steady_clock::now();
242 const auto processed = process_batch(current_batch);
243 const auto process_end = std::chrono::steady_clock::now();
244
245 const auto processing_time = process_end - process_start;
246 const bool flushed_by_size = (entries_collected >= batch_size);
247 const bool flushed_by_time = should_flush_by_time(last_flush_time);
248
249 std::string flush_reason;
250 if (flushed_by_size) {
251 flush_reason = "size";
252 stats_.flush_by_size.fetch_add(1, std::memory_order_relaxed);
253 } else if (flushed_by_time) {
254 flush_reason = "time";
255 stats_.flush_by_time.fetch_add(1, std::memory_order_relaxed);
256 } else {
257 flush_reason = "partial";
258 }
259
260 update_stats(processed, processing_time, flush_reason);
261 last_flush_time = process_end;
262
263 // Handle back-pressure
266 }
267
268 // Dynamic batch size adjustment
270 const auto now = std::chrono::steady_clock::now();
271 if (now - last_adjustment_time > std::chrono::seconds(5)) {
273 last_adjustment_time = now;
274 }
275 }
276 }
277}
void update_stats(size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
Update processing statistics.
size_t collect_entries(std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
Collect entries for batch processing.
size_t process_batch(std::vector< batch_entry > &batch)
Process current batch.
void adjust_batch_size()
Adjust batch size based on performance metrics.
bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const
Check if batch should be flushed based on time.
bool handle_back_pressure()
Handle back-pressure conditions.
bool enable_back_pressure
Enable back-pressure handling.
bool enable_dynamic_sizing
Enable dynamic batch sizing.

References adjust_batch_size(), collect_entries(), config_, current_batch_size_, current_wait_time_, kcenon::logger::async::batch_processor::config::enable_back_pressure, kcenon::logger::async::batch_processor::config::enable_dynamic_sizing, kcenon::logger::async::batch_processor::processing_stats::flush_by_size, kcenon::logger::async::batch_processor::processing_stats::flush_by_time, handle_back_pressure(), process_batch(), should_flush_by_time(), stats_, and update_stats().

Referenced by start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ reset_stats()

void kcenon::logger::async::batch_processor::reset_stats ( )
inline

Reset statistics.

Definition at line 178 of file batch_processor.h.

References kcenon::logger::async::batch_processor::processing_stats::reset(), and stats_.

Here is the call graph for this function:

◆ should_flush_by_time()

bool kcenon::logger::async::batch_processor::should_flush_by_time ( std::chrono::steady_clock::time_point last_flush_time) const
private

Check if batch should be flushed based on time.

Parameters
last_flush_timeTime of last flush
Returns
true if timeout reached

Definition at line 375 of file batch_processor.cpp.

375 {
376 const auto now = std::chrono::steady_clock::now();
377 const auto elapsed = now - last_flush_time;
378 const auto current_wait = current_wait_time_.load(std::memory_order_relaxed);
379
380 return elapsed >= current_wait;
381}

References current_wait_time_.

Referenced by process_loop_iteration().

Here is the caller graph for this function:

◆ start()

bool kcenon::logger::async::batch_processor::start ( )

Start the batch processor.

Returns
true if started successfully

Definition at line 145 of file batch_processor.cpp.

145 {
146 bool expected = false;
147 if (!running_.compare_exchange_strong(expected, true)) {
148 return false; // Already running
149 }
150
151 should_stop_ = false;
152 processing_worker_ = std::make_unique<batch_processing_jthread_worker>(
154 processing_worker_->start();
155 return true;
156}
std::unique_ptr< batch_processing_jthread_worker > processing_worker_
void process_loop_iteration()
Single iteration of processing loop.

References notify_cv_, notify_mutex_, process_loop_iteration(), processing_worker_, running_, and should_stop_.

Here is the call graph for this function:

◆ stop()

void kcenon::logger::async::batch_processor::stop ( bool flush_remaining = true)

Stop the batch processor.

Parameters
flush_remainingWhether to flush remaining entries

Definition at line 158 of file batch_processor.cpp.

158 {
159 if (!running_.exchange(false)) {
160 return; // Already stopped
161 }
162
163 should_stop_ = true;
164
165 // Wake the worker so it can observe should_stop_ and exit
166 notify_cv_.notify_one();
167
168 if (processing_worker_) {
169 processing_worker_->stop();
170 processing_worker_.reset();
171 }
172
173 if (flush_remaining) {
174 // Process any remaining entries
175 batch_entry entry;
176 std::vector<batch_entry> final_batch;
177 final_batch.reserve(queue_size);
178
179 while (queue_->dequeue(entry)) {
180 final_batch.push_back(std::move(entry));
181 }
182
183 if (!final_batch.empty()) {
184 process_batch(final_batch);
185 }
186
187 if (writer_) {
188 writer_->flush();
189 }
190 }
191}

References notify_cv_, process_batch(), processing_worker_, queue_, queue_size, running_, should_stop_, and writer_.

Referenced by ~batch_processor().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ update_stats()

void kcenon::logger::async::batch_processor::update_stats ( size_t batch_size,
std::chrono::nanoseconds processing_time,
const std::string & flush_reason )
private

Update processing statistics.

Parameters
batch_sizeSize of processed batch
processing_timeTime taken to process
flush_reasonReason for flush

Definition at line 383 of file batch_processor.cpp.

385 {
386 (void)batch_size; // Suppress unused parameter warning
387 (void)flush_reason; // Suppress unused parameter warning
388 const double processing_time_ms =
389 std::chrono::duration_cast<std::chrono::microseconds>(processing_time).count() / 1000.0;
390
391 // Update recent processing time (exponential moving average)
392 const double alpha = 0.1;
393 const double current_time = recent_processing_time_ms_.load(std::memory_order_relaxed);
394 const double new_time = alpha * processing_time_ms + (1.0 - alpha) * current_time;
395 recent_processing_time_ms_.store(new_time, std::memory_order_relaxed);
396
397 // Update average batch size
398 const auto total_batches = stats_.total_batches.load(std::memory_order_relaxed);
399 if (total_batches > 0) {
400 const auto total_entries = stats_.total_entries.load(std::memory_order_relaxed);
401 const double avg_size = static_cast<double>(total_entries) / total_batches;
402 stats_.average_batch_size.store(avg_size, std::memory_order_relaxed);
403 }
404
405 // Update average processing time
407 recent_processing_time_ms_.load(std::memory_order_relaxed),
408 std::memory_order_relaxed);
409
410 // Update recent queue size for adjustment algorithm
411 recent_queue_size_.store(get_queue_size(), std::memory_order_relaxed);
412}

References kcenon::logger::async::batch_processor::processing_stats::average_batch_size, kcenon::logger::async::batch_processor::processing_stats::average_processing_time_ms, get_queue_size(), recent_processing_time_ms_, recent_queue_size_, stats_, kcenon::logger::async::batch_processor::processing_stats::total_batches, and kcenon::logger::async::batch_processor::processing_stats::total_entries.

Referenced by process_loop_iteration().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ config_

config kcenon::logger::async::batch_processor::config_
private

◆ current_batch_size_

std::atomic<size_t> kcenon::logger::async::batch_processor::current_batch_size_
private

◆ current_wait_time_

std::atomic<std::chrono::milliseconds> kcenon::logger::async::batch_processor::current_wait_time_
private

Definition at line 267 of file batch_processor.h.

Referenced by process_loop_iteration(), and should_flush_by_time().

◆ last_adjustment_time_

std::chrono::steady_clock::time_point kcenon::logger::async::batch_processor::last_adjustment_time_
private

Definition at line 275 of file batch_processor.h.

◆ notify_cv_

std::condition_variable kcenon::logger::async::batch_processor::notify_cv_
private

Definition at line 263 of file batch_processor.h.

Referenced by add_entry(), collect_entries(), start(), and stop().

◆ notify_mutex_

std::mutex kcenon::logger::async::batch_processor::notify_mutex_
private

Definition at line 262 of file batch_processor.h.

Referenced by collect_entries(), and start().

◆ processing_worker_

std::unique_ptr<batch_processing_jthread_worker> kcenon::logger::async::batch_processor::processing_worker_
private

Definition at line 257 of file batch_processor.h.

Referenced by start(), and stop().

◆ queue_

std::unique_ptr<lockfree_spsc_queue<batch_entry, queue_size> > kcenon::logger::async::batch_processor::queue_
private

Definition at line 254 of file batch_processor.h.

Referenced by add_entry(), collect_entries(), get_queue_size(), and stop().

◆ queue_size

size_t kcenon::logger::async::batch_processor::queue_size = 8192
staticconstexprprivate

Definition at line 253 of file batch_processor.h.

Referenced by adjust_batch_size(), handle_back_pressure(), and stop().

◆ recent_processing_time_ms_

std::atomic<double> kcenon::logger::async::batch_processor::recent_processing_time_ms_ {0.0}
private

Definition at line 273 of file batch_processor.h.

273{0.0};

Referenced by adjust_batch_size(), and update_stats().

◆ recent_queue_size_

std::atomic<size_t> kcenon::logger::async::batch_processor::recent_queue_size_ {0}
private

Definition at line 274 of file batch_processor.h.

274{0};

Referenced by update_stats().

◆ running_

std::atomic<bool> kcenon::logger::async::batch_processor::running_ {false}
private

Definition at line 258 of file batch_processor.h.

258{false};

Referenced by add_entry(), is_healthy(), start(), and stop().

◆ should_stop_

std::atomic<bool> kcenon::logger::async::batch_processor::should_stop_ {false}
private

Definition at line 259 of file batch_processor.h.

259{false};

Referenced by collect_entries(), start(), and stop().

◆ stats_

processing_stats kcenon::logger::async::batch_processor::stats_
mutableprivate

◆ writer_

log_writer_ptr kcenon::logger::async::batch_processor::writer_
private

Definition at line 250 of file batch_processor.h.

Referenced by batch_processor(), is_healthy(), process_batch(), and stop().


The documentation for this class was generated from the following files: