Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
realtime_log_analyzer.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
42#pragma once
43
44#include <kcenon/common/interfaces/logger_interface.h>
46
47#include <atomic>
48#include <chrono>
49#include <cstdint>
50#include <deque>
51#include <functional>
52#include <memory>
53#include <mutex>
54#include <regex>
55#include <shared_mutex>
56#include <string>
57#include <unordered_set>
58#include <vector>
59
61
62// Type alias for log_level
63using log_level = common::interfaces::log_level;
64
73 enum class type : std::uint8_t {
78 };
79
81 std::chrono::system_clock::time_point detected_at;
82 std::string description;
83 std::vector<analyzed_log_entry> related_entries;
84 std::string pattern;
85 size_t current_count = 0;
86 size_t threshold = 0;
87};
88
94 size_t error_spike_threshold = 100;
97 std::chrono::seconds window_duration{60};
98 std::chrono::seconds baseline_duration{300};
99 bool track_new_errors = true;
103};
104
110 std::string pattern;
111 log_level min_level;
112 std::regex compiled_pattern;
113
114 pattern_alert(const std::string& p, log_level level)
115 : pattern(p), min_level(level), compiled_pattern(p, std::regex::optimize) {}
116};
117
139public:
143 using anomaly_callback = std::function<void(const anomaly_event&)>;
144
149
155 : config_(config) {}
156
165 std::unique_lock lock(callback_mutex_);
166 callback_ = std::move(cb);
167 }
168
178 void analyze(const analyzed_log_entry& entry) {
179 auto now = std::chrono::system_clock::now();
180
181 // Add to sliding window
182 add_to_window(entry, now);
183
184 // Check for error spike
185 if (entry.level == log_level::error ||
186 entry.level == log_level::fatal) {
187 check_error_spike(entry, now);
188 }
189
190 // Check pattern alerts
191 check_pattern_alerts(entry, now);
192
193 // Check rate anomaly
196 }
197
198 // Track new error types
200 (entry.level == log_level::error ||
201 entry.level == log_level::fatal)) {
202 check_new_error_type(entry, now);
203 }
204 }
205
210 void set_error_spike_threshold(size_t errors_per_minute) {
211 config_.error_spike_threshold = errors_per_minute;
212 }
213
219 void add_pattern_alert(const std::string& pattern, log_level min_level) {
220 std::unique_lock lock(patterns_mutex_);
221 patterns_.emplace_back(pattern, min_level);
222 }
223
229 bool remove_pattern_alert(const std::string& pattern) {
230 std::unique_lock lock(patterns_mutex_);
231 auto it = std::remove_if(patterns_.begin(), patterns_.end(),
232 [&pattern](const pattern_alert& alert) {
233 return alert.pattern == pattern;
234 });
235 if (it != patterns_.end()) {
236 patterns_.erase(it, patterns_.end());
237 return true;
238 }
239 return false;
240 }
241
246 std::unique_lock lock(patterns_mutex_);
247 patterns_.clear();
248 }
249
255 void set_rate_thresholds(size_t high_threshold, size_t low_threshold = 0) {
256 config_.rate_anomaly_high_threshold = high_threshold;
257 config_.rate_anomaly_low_threshold = low_threshold;
258 }
259
264 void set_track_new_errors(bool enable) {
265 config_.track_new_errors = enable;
266 }
267
272 double get_error_rate() const {
273 std::shared_lock lock(window_mutex_);
275 }
276
281 double get_log_rate() const {
282 std::shared_lock lock(window_mutex_);
284 }
285
290 struct statistics {
291 size_t total_analyzed = 0;
292 size_t total_errors = 0;
294 size_t error_spikes = 0;
295 size_t pattern_matches = 0;
296 size_t rate_anomalies = 0;
297 size_t new_error_types = 0;
298 double current_log_rate = 0.0;
299 double current_error_rate = 0.0;
300 };
301
303 std::shared_lock lock(stats_mutex_);
304 statistics stats;
305 stats.total_analyzed = total_analyzed_.load();
306 stats.total_errors = total_errors_.load();
308 stats.error_spikes = error_spikes_.load();
309 stats.pattern_matches = pattern_matches_.load();
310 stats.rate_anomalies = rate_anomalies_.load();
311 stats.new_error_types = new_error_types_.load();
314 return stats;
315 }
316
320 void reset() {
321 {
322 std::unique_lock lock(window_mutex_);
323 log_window_.clear();
324 error_window_.clear();
325 baseline_rates_.clear();
326 }
327 {
328 std::unique_lock lock(errors_mutex_);
329 known_errors_.clear();
330 }
331 {
332 std::unique_lock lock(stats_mutex_);
333 total_analyzed_ = 0;
334 total_errors_ = 0;
336 error_spikes_ = 0;
338 rate_anomalies_ = 0;
340 }
341 last_rate_check_ = std::chrono::system_clock::time_point{};
342 last_spike_alert_ = std::chrono::system_clock::time_point{};
343 }
344
350 return config_;
351 }
352
358 config_ = config;
359 }
360
361private:
363 std::chrono::system_clock::time_point timestamp;
365 };
366
368 std::chrono::system_clock::time_point now) {
369 std::unique_lock lock(window_mutex_);
370
371 // Add to log window
372 log_window_.push_back({now, entry});
373
374 // Add to error window if error/fatal
375 if (entry.level == log_level::error ||
376 entry.level == log_level::fatal) {
377 error_window_.push_back({now, entry});
378 }
379
380 // Clean up old entries
381 auto cutoff = now - config_.window_duration;
384
385 // Update statistics
386 total_analyzed_.fetch_add(1, std::memory_order_relaxed);
387 if (entry.level == log_level::error ||
388 entry.level == log_level::fatal) {
389 total_errors_.fetch_add(1, std::memory_order_relaxed);
390 }
391 }
392
393 static void cleanup_window(std::deque<timestamped_entry>& window,
394 std::chrono::system_clock::time_point cutoff) {
395 while (!window.empty() && window.front().timestamp < cutoff) {
396 window.pop_front();
397 }
398 }
399
400 double calculate_rate(const std::deque<timestamped_entry>& window) const {
401 if (window.empty()) return 0.0;
402
403 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
404 config_.window_duration).count();
405 if (duration == 0) duration = 60;
406
407 return static_cast<double>(window.size()) * 60.0 / duration;
408 }
409
411 std::chrono::system_clock::time_point now) {
412 std::shared_lock lock(window_mutex_);
413
414 double current_rate = calculate_rate(error_window_);
415
416 if (current_rate >= static_cast<double>(config_.error_spike_threshold)) {
417 // Rate limit: don't alert more than once per minute
418 {
419 std::shared_lock rate_lock(rate_limit_mutex_);
420 if (now - last_spike_alert_ < std::chrono::minutes(1)) {
421 return;
422 }
423 }
424
425 lock.unlock(); // Release before callback
426
427 // Update last alert time
428 {
429 std::unique_lock rate_lock(rate_limit_mutex_);
430 last_spike_alert_ = now;
431 }
432
433 anomaly_event event;
435 event.detected_at = now;
436 event.description = "Error spike detected: " +
437 std::to_string(static_cast<size_t>(current_rate)) +
438 " errors/minute (threshold: " +
439 std::to_string(config_.error_spike_threshold) + ")";
440 event.current_count = static_cast<size_t>(current_rate);
441 event.threshold = config_.error_spike_threshold;
442
443 // Collect related entries
444 {
445 std::shared_lock entries_lock(window_mutex_);
447 }
448
449 notify_anomaly(event);
450 error_spikes_.fetch_add(1, std::memory_order_relaxed);
451 }
452 }
453
455 std::chrono::system_clock::time_point now) {
456 std::shared_lock lock(patterns_mutex_);
457
458 for (const auto& alert : patterns_) {
459 // Check level threshold
460 if (static_cast<int>(entry.level) < static_cast<int>(alert.min_level)) {
461 continue;
462 }
463
464 // Check pattern match
465 if (std::regex_search(entry.message, alert.compiled_pattern)) {
466 lock.unlock(); // Release before callback
467
468 anomaly_event event;
470 event.detected_at = now;
471 event.pattern = alert.pattern;
472 event.description = "Pattern '" + alert.pattern +
473 "' matched in log message: " + entry.message;
474 event.related_entries.push_back(entry);
475
476 notify_anomaly(event);
477 pattern_matches_.fetch_add(1, std::memory_order_relaxed);
478 return; // Only report first match per entry
479 }
480 }
481 }
482
483 void check_rate_anomaly(std::chrono::system_clock::time_point now) {
484 // Rate limit rate anomaly checks to once per 10 seconds
485 {
486 std::shared_lock rate_lock(rate_limit_mutex_);
487 if (now - last_rate_check_ < std::chrono::seconds(10)) {
488 return;
489 }
490 }
491 {
492 std::unique_lock rate_lock(rate_limit_mutex_);
493 last_rate_check_ = now;
494 }
495
496 std::shared_lock lock(window_mutex_);
497 double current_rate = calculate_rate(log_window_);
498 lock.unlock();
499
500 // Check high rate
501 if (current_rate >= static_cast<double>(config_.rate_anomaly_high_threshold)) {
502 anomaly_event event;
504 event.detected_at = now;
505 event.description = "High log rate detected: " +
506 std::to_string(static_cast<size_t>(current_rate)) +
507 " logs/minute (threshold: " +
508 std::to_string(config_.rate_anomaly_high_threshold) + ")";
509 event.current_count = static_cast<size_t>(current_rate);
510 event.threshold = config_.rate_anomaly_high_threshold;
511
512 notify_anomaly(event);
513 rate_anomalies_.fetch_add(1, std::memory_order_relaxed);
514 }
515 // Check low rate (if enabled)
516 else if (config_.rate_anomaly_low_threshold > 0 &&
517 current_rate < static_cast<double>(config_.rate_anomaly_low_threshold) &&
518 total_analyzed_.load() > 100) { // Only after enough data
519 anomaly_event event;
521 event.detected_at = now;
522 event.description = "Low log rate detected: " +
523 std::to_string(static_cast<size_t>(current_rate)) +
524 " logs/minute (threshold: " +
525 std::to_string(config_.rate_anomaly_low_threshold) + ")";
526 event.current_count = static_cast<size_t>(current_rate);
527 event.threshold = config_.rate_anomaly_low_threshold;
528
529 notify_anomaly(event);
530 rate_anomalies_.fetch_add(1, std::memory_order_relaxed);
531 }
532 }
533
535 std::chrono::system_clock::time_point now) {
536 // Normalize error message (remove numbers, timestamps, etc.)
537 std::string normalized = normalize_error_message(entry.message);
538
539 {
540 std::shared_lock lock(errors_mutex_);
541 if (known_errors_.contains(normalized)) {
542 return; // Already seen this error type
543 }
544 }
545
546 // New error type detected
547 {
548 std::unique_lock lock(errors_mutex_);
549 known_errors_.insert(normalized);
550 }
551
552 anomaly_event event;
554 event.detected_at = now;
555 event.description = "New error type detected: " + entry.message;
556 event.related_entries.push_back(entry);
557
558 notify_anomaly(event);
559 new_error_types_.fetch_add(1, std::memory_order_relaxed);
560 }
561
562 static std::string normalize_error_message(const std::string& message) {
563 // Remove numbers (IDs, timestamps, etc.)
564 std::string normalized = std::regex_replace(message,
565 std::regex(R"(\d+)"), "N");
566
567 // Remove hex values
568 normalized = std::regex_replace(normalized,
569 std::regex(R"(0x[0-9a-fA-F]+)"), "HEX");
570
571 // Remove UUIDs
572 normalized = std::regex_replace(normalized,
573 std::regex(R"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"),
574 "UUID");
575
576 return normalized;
577 }
578
580 const std::deque<timestamped_entry>& window) const {
581 size_t count = 0;
582 for (auto it = window.rbegin();
583 it != window.rend() && count < config_.max_related_entries;
584 ++it, ++count) {
585 event.related_entries.push_back(it->entry);
586 }
587 }
588
589 void notify_anomaly(const anomaly_event& event) {
590 anomalies_detected_.fetch_add(1, std::memory_order_relaxed);
591
592 std::shared_lock lock(callback_mutex_);
593 if (callback_) {
594 callback_(event);
595 }
596 }
597
598 // Configuration
600
601 // Callback
603 mutable std::shared_mutex callback_mutex_;
604
605 // Sliding windows
606 std::deque<timestamped_entry> log_window_;
607 std::deque<timestamped_entry> error_window_;
608 std::deque<double> baseline_rates_;
609 mutable std::shared_mutex window_mutex_;
610
611 // Pattern alerts
612 std::vector<pattern_alert> patterns_;
613 mutable std::shared_mutex patterns_mutex_;
614
615 // Known error types for new error detection
616 std::unordered_set<std::string> known_errors_;
617 mutable std::shared_mutex errors_mutex_;
618
619 // Rate limiting
620 std::chrono::system_clock::time_point last_rate_check_;
621 std::chrono::system_clock::time_point last_spike_alert_;
622 mutable std::shared_mutex rate_limit_mutex_;
623
624 // Statistics (atomic for lock-free reads)
625 std::atomic<size_t> total_analyzed_{0};
626 std::atomic<size_t> total_errors_{0};
627 std::atomic<size_t> anomalies_detected_{0};
628 std::atomic<size_t> error_spikes_{0};
629 std::atomic<size_t> pattern_matches_{0};
630 std::atomic<size_t> rate_anomalies_{0};
631 std::atomic<size_t> new_error_types_{0};
632 mutable std::shared_mutex stats_mutex_;
633};
634
640public:
644 static std::unique_ptr<realtime_log_analyzer> create_basic() {
645 return std::make_unique<realtime_log_analyzer>();
646 }
647
652 static std::unique_ptr<realtime_log_analyzer> create(
653 const realtime_analysis_config& config) {
654 return std::make_unique<realtime_log_analyzer>(config);
655 }
656
662 static std::unique_ptr<realtime_log_analyzer> create_production(
663 size_t error_threshold = 50,
664 realtime_log_analyzer::anomaly_callback callback = nullptr) {
665
667 config.error_spike_threshold = error_threshold;
668 config.rate_anomaly_high_threshold = 1000;
669 config.rate_anomaly_low_threshold = 10;
670 config.track_new_errors = true;
671 config.enable_rate_anomaly_detection = true;
672
673 auto analyzer = std::make_unique<realtime_log_analyzer>(config);
674 if (callback) {
675 analyzer->set_anomaly_callback(std::move(callback));
676 }
677 return analyzer;
678 }
679};
680
681} // namespace kcenon::logger::analysis
Factory for creating configured realtime log analyzers.
static std::unique_ptr< realtime_log_analyzer > create(const realtime_analysis_config &config)
Create a realtime analyzer with custom configuration.
static std::unique_ptr< realtime_log_analyzer > create_production(size_t error_threshold=50, realtime_log_analyzer::anomaly_callback callback=nullptr)
Create a production-ready analyzer with sensible defaults.
static std::unique_ptr< realtime_log_analyzer > create_basic()
Create a basic realtime analyzer with default settings.
Real-time log analyzer with anomaly detection.
bool remove_pattern_alert(const std::string &pattern)
Remove a pattern alert.
void collect_related_entries(anomaly_event &event, const std::deque< timestamped_entry > &window) const
std::chrono::system_clock::time_point last_rate_check_
double get_log_rate() const
Get current log rate (logs per minute)
void reset()
Reset all statistics and tracked state.
void check_rate_anomaly(std::chrono::system_clock::time_point now)
void set_config(const realtime_analysis_config &config)
Set the configuration.
double calculate_rate(const std::deque< timestamped_entry > &window) const
double get_error_rate() const
Get current error rate (errors per minute)
realtime_log_analyzer(const realtime_analysis_config &config)
Constructor with configuration.
void set_error_spike_threshold(size_t errors_per_minute)
Set error spike threshold.
void add_to_window(const analyzed_log_entry &entry, std::chrono::system_clock::time_point now)
void set_track_new_errors(bool enable)
Enable or disable new error tracking.
const realtime_analysis_config & get_config() const
Get the configuration.
void check_pattern_alerts(const analyzed_log_entry &entry, std::chrono::system_clock::time_point now)
static std::string normalize_error_message(const std::string &message)
void add_pattern_alert(const std::string &pattern, log_level min_level)
Add a pattern-based alert.
void check_new_error_type(const analyzed_log_entry &entry, std::chrono::system_clock::time_point now)
realtime_log_analyzer()=default
Default constructor.
void check_error_spike(const analyzed_log_entry &entry, std::chrono::system_clock::time_point now)
void analyze(const analyzed_log_entry &entry)
Analyze a log entry in real-time.
std::function< void(const anomaly_event &)> anomaly_callback
Callback type for anomaly notifications.
void set_anomaly_callback(anomaly_callback cb)
Set the anomaly callback.
std::chrono::system_clock::time_point last_spike_alert_
void set_rate_thresholds(size_t high_threshold, size_t low_threshold=0)
Set rate anomaly thresholds.
static void cleanup_window(std::deque< timestamped_entry > &window, std::chrono::system_clock::time_point cutoff)
Log analysis and metrics functionality.
common::interfaces::log_level log_level
Log entry for analysis.
Definition analysis.cppm:43
log_level level
std::string message
Represents an anomaly event detected during real-time analysis.
std::vector< analyzed_log_entry > related_entries
Log entries related to this anomaly.
std::chrono::system_clock::time_point detected_at
When the anomaly was detected.
std::string description
Human-readable description.
@ rate_anomaly
Unusual log rate (too high or too low)
@ error_spike
Sudden increase in errors.
@ pattern_match
Configured pattern detected.
@ new_error_type
Previously unseen error message.
size_t threshold
Threshold that was exceeded.
size_t current_count
Current count (for spike/rate anomalies)
std::string pattern
Pattern that triggered (for pattern_match)
log_level min_level
Minimum log level to trigger.
std::regex compiled_pattern
Pre-compiled regex for efficiency.
std::string pattern
Regex pattern to match.
pattern_alert(const std::string &p, log_level level)
Configuration for real-time log analysis.
bool enable_rate_anomaly_detection
Enable rate anomaly detection.
size_t error_spike_threshold
Errors per minute to trigger spike alert.
size_t max_related_entries
Max entries to store per anomaly.
size_t rate_anomaly_low_threshold
Logs per minute considered low (0 = disabled)
double rate_deviation_factor
Factor for dynamic rate anomaly detection.
std::chrono::seconds baseline_duration
Duration for baseline rate calculation.
size_t rate_anomaly_high_threshold
Logs per minute considered high.
bool track_new_errors
Enable new error type detection.
std::chrono::seconds window_duration
Sliding window duration for rate calculation.
analyzed_log_entry entry
std::chrono::system_clock::time_point timestamp