Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
batch_processor.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
15#include "batch_processor.h"
17#include "jthread_compat.h"
18
19#include <algorithm>
20#include <chrono>
21#include <condition_variable>
22#include <thread>
23
24namespace kcenon::logger::async {
25
34public:
35 using process_callback = std::function<void()>;
36
38 std::mutex& notify_mutex,
39 std::condition_variable& notify_cv)
40 : callback_(std::move(callback))
41 , notify_mutex_(notify_mutex)
42 , notify_cv_(notify_cv)
44 , stop_source_(std::make_shared<simple_stop_source>())
45#endif
46 {}
47
51
52 void start() {
53 if (running_.exchange(true, std::memory_order_acq_rel)) {
54 return; // Already started
55 }
56
57#if LOGGER_HAS_JTHREAD
58 auto callback = callback_;
59 auto& cv = notify_cv_;
60 auto& mtx = notify_mutex_;
61 thread_ = compat_jthread([callback, &cv, &mtx](std::stop_token stop_token) {
62 while (!stop_token.stop_requested()) {
63 if (callback) {
64 callback();
65 }
66 // Wait for notification or timeout instead of polling
67 std::unique_lock<std::mutex> lock(mtx);
68 cv.wait_for(lock, std::chrono::milliseconds(10),
69 [&stop_token]{ return stop_token.stop_requested(); });
70 }
71 });
72#else
73 // Reset stop source for new start
74 stop_source_->reset();
75
76 auto callback = callback_;
77 auto stop = stop_source_;
78 auto& cv = notify_cv_;
79 auto& mtx = notify_mutex_;
80 thread_ = compat_jthread([callback, stop, &cv, &mtx](simple_stop_source& /*unused*/) {
81 while (!stop->stop_requested()) {
82 if (callback) {
83 callback();
84 }
85 // Wait for notification or timeout instead of polling
86 std::unique_lock<std::mutex> lock(mtx);
87 cv.wait_for(lock, std::chrono::milliseconds(10),
88 [&stop]{ return stop->stop_requested(); });
89 }
90 });
91#endif
92 }
93
94 void stop() {
95 if (!running_.exchange(false, std::memory_order_acq_rel)) {
96 return; // Already stopped
97 }
98
99 // Request stop and join thread
101 thread_.join();
102 }
103
104 [[nodiscard]] bool is_running() const noexcept {
105 return running_.load(std::memory_order_acquire);
106 }
107
108private:
110 std::mutex& notify_mutex_;
111 std::condition_variable& notify_cv_;
113 std::atomic<bool> running_{false};
114#if !LOGGER_HAS_JTHREAD
115 std::shared_ptr<simple_stop_source> stop_source_;
116#endif
117};
118
120 : config_(cfg)
121 , writer_(std::move(writer))
122 , queue_(std::make_unique<lockfree_spsc_queue<batch_entry, queue_size>>())
123 , current_batch_size_(cfg.initial_batch_size)
124 , current_wait_time_(cfg.max_wait_time)
125 , last_adjustment_time_(std::chrono::steady_clock::now()) {
126
127 if (!writer_) {
128 throw std::invalid_argument("Writer cannot be null");
129 }
130
131 // Validate configuration
132 if (cfg.min_batch_size > cfg.max_batch_size) {
133 throw std::invalid_argument("min_batch_size cannot be greater than max_batch_size");
134 }
135
137 throw std::invalid_argument("initial_batch_size must be within min/max range");
138 }
139}
140
144
146 bool expected = false;
147 if (!running_.compare_exchange_strong(expected, true)) {
148 return false; // Already running
149 }
150
151 should_stop_ = false;
152 processing_worker_ = std::make_unique<batch_processing_jthread_worker>(
154 processing_worker_->start();
155 return true;
156}
157
158void batch_processor::stop(bool flush_remaining) {
159 if (!running_.exchange(false)) {
160 return; // Already stopped
161 }
162
163 should_stop_ = true;
164
165 // Wake the worker so it can observe should_stop_ and exit
166 notify_cv_.notify_one();
167
168 if (processing_worker_) {
169 processing_worker_->stop();
170 processing_worker_.reset();
171 }
172
173 if (flush_remaining) {
174 // Process any remaining entries
175 batch_entry entry;
176 std::vector<batch_entry> final_batch;
177 final_batch.reserve(queue_size);
178
179 while (queue_->dequeue(entry)) {
180 final_batch.push_back(std::move(entry));
181 }
182
183 if (!final_batch.empty()) {
184 process_batch(final_batch);
185 }
186
187 if (writer_) {
188 writer_->flush();
189 }
190 }
191}
192
194 if (!running_.load(std::memory_order_relaxed)) {
195 return false;
196 }
197
198 if (!queue_->enqueue(std::move(entry))) {
199 stats_.dropped_entries.fetch_add(1, std::memory_order_relaxed);
200 return false;
201 }
202
203 // Wake the worker thread immediately
204 notify_cv_.notify_one();
205
206 return true;
207}
208
210 return add_entry(batch_entry(entry));
211}
212
214 // This is handled by the processing loop
215 // We could add a flush signal mechanism here if needed
216}
217
219 return running_.load(std::memory_order_relaxed) &&
220 writer_ && writer_->is_healthy();
221}
222
224 return queue_->size();
225}
226
228 static thread_local std::chrono::steady_clock::time_point last_flush_time = std::chrono::steady_clock::now();
229 static thread_local std::chrono::steady_clock::time_point last_adjustment_time = std::chrono::steady_clock::now();
230
231 const auto batch_size = current_batch_size_.load(std::memory_order_relaxed);
232 const auto wait_time = current_wait_time_.load(std::memory_order_relaxed);
233
234 std::vector<batch_entry> current_batch;
235 current_batch.reserve(batch_size);
236
237 const auto deadline = std::chrono::steady_clock::now() + wait_time;
238 const auto entries_collected = collect_entries(current_batch, batch_size, deadline);
239
240 if (entries_collected > 0) {
241 const auto process_start = std::chrono::steady_clock::now();
242 const auto processed = process_batch(current_batch);
243 const auto process_end = std::chrono::steady_clock::now();
244
245 const auto processing_time = process_end - process_start;
246 const bool flushed_by_size = (entries_collected >= batch_size);
247 const bool flushed_by_time = should_flush_by_time(last_flush_time);
248
249 std::string flush_reason;
250 if (flushed_by_size) {
251 flush_reason = "size";
252 stats_.flush_by_size.fetch_add(1, std::memory_order_relaxed);
253 } else if (flushed_by_time) {
254 flush_reason = "time";
255 stats_.flush_by_time.fetch_add(1, std::memory_order_relaxed);
256 } else {
257 flush_reason = "partial";
258 }
259
260 update_stats(processed, processing_time, flush_reason);
261 last_flush_time = process_end;
262
263 // Handle back-pressure
266 }
267
268 // Dynamic batch size adjustment
270 const auto now = std::chrono::steady_clock::now();
271 if (now - last_adjustment_time > std::chrono::seconds(5)) {
273 last_adjustment_time = now;
274 }
275 }
276 }
277}
278
279size_t batch_processor::collect_entries(std::vector<batch_entry>& batch,
280 size_t max_entries,
281 std::chrono::steady_clock::time_point deadline) {
282 size_t collected = 0;
283 batch_entry entry;
284
285 while (collected < max_entries && std::chrono::steady_clock::now() < deadline) {
286 if (queue_->dequeue(entry)) {
287 batch.push_back(std::move(entry));
288 ++collected;
289 } else {
290 // Queue is empty, wait for notification or remaining deadline
291 auto remaining = deadline - std::chrono::steady_clock::now();
292 if (remaining <= std::chrono::steady_clock::duration::zero()) {
293 break;
294 }
295 std::unique_lock<std::mutex> lock(notify_mutex_);
296 notify_cv_.wait_for(lock, remaining,
297 [this]{ return queue_->size() > 0 || should_stop_.load(std::memory_order_relaxed); });
298 }
299 }
300
301 // Try to collect more entries if we have time left and space
302 while (collected < max_entries && queue_->dequeue(entry)) {
303 batch.push_back(std::move(entry));
304 ++collected;
305 }
306
307 return collected;
308}
309
310size_t batch_processor::process_batch(std::vector<batch_entry>& batch) {
311 if (batch.empty() || !writer_) {
312 return 0;
313 }
314
315 size_t processed = 0;
316 for (const auto& entry : batch) {
317 // Create log_entry from batch_entry
318 log_entry log_ent(entry.level, entry.message, entry.file,
319 entry.line, entry.function, entry.timestamp);
320
321 auto result = writer_->write(log_ent);
322 if (result.is_ok()) {
323 ++processed;
324 }
325 }
326
327 // Flush after batch processing
328 writer_->flush();
329
330 stats_.total_batches.fetch_add(1, std::memory_order_relaxed);
331 stats_.total_entries.fetch_add(processed, std::memory_order_relaxed);
332
333 return processed;
334}
335
337 const auto current_size = current_batch_size_.load(std::memory_order_relaxed);
338 const auto queue_size = get_queue_size();
339 const auto recent_time = recent_processing_time_ms_.load(std::memory_order_relaxed);
340
341 size_t new_size = current_size;
342
343 // Increase batch size if queue is building up and processing is fast
344 if (queue_size > current_size * 2 && recent_time < 10.0) {
345 new_size = std::min(config_.max_batch_size,
346 static_cast<size_t>(current_size * config_.size_increase_factor));
347 }
348 // Decrease batch size if processing is slow or queue is small
349 else if (recent_time > 100.0 || queue_size < current_size / 4) {
350 new_size = std::max(config_.min_batch_size,
351 static_cast<size_t>(current_size * config_.size_decrease_factor));
352 }
353
354 if (new_size != current_size) {
355 current_batch_size_.store(new_size, std::memory_order_relaxed);
356 stats_.dynamic_size_adjustments.fetch_add(1, std::memory_order_relaxed);
357 }
358}
359
361 const auto queue_size = get_queue_size();
362
364 stats_.back_pressure_events.fetch_add(1, std::memory_order_relaxed);
365
366 // Apply back-pressure delay
367 std::this_thread::sleep_for(config_.back_pressure_delay);
368
369 return queue_size < queue_size * 1.5; // Continue if queue isn't growing too fast
370 }
371
372 return true;
373}
374
375bool batch_processor::should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const {
376 const auto now = std::chrono::steady_clock::now();
377 const auto elapsed = now - last_flush_time;
378 const auto current_wait = current_wait_time_.load(std::memory_order_relaxed);
379
380 return elapsed >= current_wait;
381}
382
383void batch_processor::update_stats(size_t batch_size,
384 std::chrono::nanoseconds processing_time,
385 const std::string& flush_reason) {
386 (void)batch_size; // Suppress unused parameter warning
387 (void)flush_reason; // Suppress unused parameter warning
388 const double processing_time_ms =
389 std::chrono::duration_cast<std::chrono::microseconds>(processing_time).count() / 1000.0;
390
391 // Update recent processing time (exponential moving average)
392 const double alpha = 0.1;
393 const double current_time = recent_processing_time_ms_.load(std::memory_order_relaxed);
394 const double new_time = alpha * processing_time_ms + (1.0 - alpha) * current_time;
395 recent_processing_time_ms_.store(new_time, std::memory_order_relaxed);
396
397 // Update average batch size
398 const auto total_batches = stats_.total_batches.load(std::memory_order_relaxed);
399 if (total_batches > 0) {
400 const auto total_entries = stats_.total_entries.load(std::memory_order_relaxed);
401 const double avg_size = static_cast<double>(total_entries) / total_batches;
402 stats_.average_batch_size.store(avg_size, std::memory_order_relaxed);
403 }
404
405 // Update average processing time
407 recent_processing_time_ms_.load(std::memory_order_relaxed),
408 std::memory_order_relaxed);
409
410 // Update recent queue size for adjustment algorithm
411 recent_queue_size_.store(get_queue_size(), std::memory_order_relaxed);
412}
413
414std::unique_ptr<batch_processor> make_batch_processor(
415 log_writer_ptr writer,
416 const batch_processor::config& cfg) {
417 return std::make_unique<batch_processor>(std::move(writer), cfg);
418}
419
420} // namespace kcenon::logger::async
Optimized batch processing engine for log entries.
Worker thread for batch processing with jthread compatibility.
std::shared_ptr< simple_stop_source > stop_source_
batch_processing_jthread_worker(process_callback callback, std::mutex &notify_mutex, std::condition_variable &notify_cv)
void stop(bool flush_remaining=true)
Stop the batch processor.
std::atomic< std::chrono::milliseconds > current_wait_time_
batch_processor(log_writer_ptr writer, const config &cfg=config{})
Constructor.
std::unique_ptr< batch_processing_jthread_worker > processing_worker_
void update_stats(size_t batch_size, std::chrono::nanoseconds processing_time, const std::string &flush_reason)
Update processing statistics.
bool start()
Start the batch processor.
bool add_entry(batch_entry &&entry)
Add entry to the batch queue.
size_t collect_entries(std::vector< batch_entry > &batch, size_t max_entries, std::chrono::steady_clock::time_point deadline)
Collect entries for batch processing.
std::unique_ptr< lockfree_spsc_queue< batch_entry, queue_size > > queue_
size_t process_batch(std::vector< batch_entry > &batch)
Process current batch.
void adjust_batch_size()
Adjust batch size based on performance metrics.
bool is_healthy() const
Check if processor is healthy.
void process_loop_iteration()
Single iteration of processing loop.
void flush()
Force flush current batch.
bool should_flush_by_time(std::chrono::steady_clock::time_point last_flush_time) const
Check if batch should be flushed based on time.
bool handle_back_pressure()
Handle back-pressure conditions.
size_t get_queue_size() const
Get current queue size.
std::atomic< double > recent_processing_time_ms_
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Lock-free single-producer single-consumer queue.
Simple stop source for environments without std::stop_token.
Compatibility header for std::jthread and std::stop_token kcenon.
#define LOGGER_HAS_JTHREAD
std::unique_ptr< batch_processor > make_batch_processor(log_writer_ptr writer, const batch_processor::config &cfg)
Factory function to create a batch processor.
std::unique_ptr< log_writer_interface > log_writer_ptr
Type alias for writer unique pointer.
Object pool implementation for high-performance memory management.
Batch entry structure.
Configuration for batch processor.
double size_decrease_factor
Factor for decreasing batch size.
std::chrono::microseconds back_pressure_delay
Delay when under back-pressure.
bool enable_back_pressure
Enable back-pressure handling.
bool enable_dynamic_sizing
Enable dynamic batch sizing.
size_t back_pressure_threshold
Queue size threshold for back-pressure.
double size_increase_factor
Factor for increasing batch size.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155