Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
alert_pipeline.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
15#include <algorithm>
16#include <chrono>
17#include <functional>
18#include <memory>
19#include <mutex>
20#include <string>
21#include <unordered_map>
22#include <vector>
23
24#include "alert_types.h"
26
27namespace kcenon::monitoring {
28
34 std::chrono::milliseconds group_wait{30000};
35 std::chrono::milliseconds group_interval{300000};
36 std::chrono::milliseconds resolve_timeout{300000};
37 std::vector<std::string> group_by_labels;
38
42 bool validate() const {
43 return group_wait.count() > 0 &&
44 group_interval.count() > 0 &&
45 resolve_timeout.count() > 0;
46 }
47};
48
80public:
85 : config_(config) {}
86
92 std::string add_alert(const alert& a) {
93 std::lock_guard<std::mutex> lock(mutex_);
94
95 std::string group_key = compute_group_key(a);
96 auto now = std::chrono::steady_clock::now();
97
98 auto it = groups_.find(group_key);
99 if (it == groups_.end()) {
100 alert_group new_group(group_key);
102 new_group.add_alert(a);
103 groups_[group_key] = std::move(new_group);
104 first_seen_[group_key] = now;
105 } else {
106 // Check for duplicate
107 bool is_duplicate = false;
108 for (auto& existing : it->second.alerts) {
109 if (existing.fingerprint() == a.fingerprint()) {
110 existing = a; // Update existing alert
111 is_duplicate = true;
112 break;
113 }
114 }
115 if (!is_duplicate) {
116 it->second.add_alert(a);
117 }
118 }
119
120 return group_key;
121 }
122
127 std::vector<alert_group> get_ready_groups() {
128 std::lock_guard<std::mutex> lock(mutex_);
129
130 std::vector<alert_group> ready;
131 auto now = std::chrono::steady_clock::now();
132
133 for (auto& [key, group] : groups_) {
134 if (group.empty()) {
135 continue;
136 }
137
138 // Check if first wait has elapsed
139 auto first_seen = first_seen_[key];
140 if (now - first_seen < config_.group_wait) {
141 continue;
142 }
143
144 // Check if enough time since last send
145 auto last_sent_it = last_sent_.find(key);
146 if (last_sent_it != last_sent_.end()) {
147 if (now - last_sent_it->second < config_.group_interval) {
148 continue;
149 }
150 }
151
152 ready.push_back(group);
153 }
154
155 return ready;
156 }
157
162 void mark_sent(const std::string& group_key) {
163 std::lock_guard<std::mutex> lock(mutex_);
164 last_sent_[group_key] = std::chrono::steady_clock::now();
165 }
166
170 void cleanup() {
171 std::lock_guard<std::mutex> lock(mutex_);
172
173 auto now = std::chrono::steady_clock::now();
174
175 for (auto it = groups_.begin(); it != groups_.end(); ) {
176 auto& group = it->second;
177
178 // Remove resolved alerts that have timed out
179 auto alert_it = group.alerts.begin();
180 while (alert_it != group.alerts.end()) {
181 if (alert_it->state == alert_state::resolved) {
182 auto resolved_time = alert_it->resolved_at.value_or(now);
183 if (now - resolved_time > config_.resolve_timeout) {
184 alert_it = group.alerts.erase(alert_it);
185 continue;
186 }
187 }
188 ++alert_it;
189 }
190
191 // Remove empty groups
192 if (group.empty()) {
193 first_seen_.erase(it->first);
194 last_sent_.erase(it->first);
195 it = groups_.erase(it);
196 } else {
197 ++it;
198 }
199 }
200 }
201
205 size_t group_count() const {
206 std::lock_guard<std::mutex> lock(mutex_);
207 return groups_.size();
208 }
209
213 size_t total_alert_count() const {
214 std::lock_guard<std::mutex> lock(mutex_);
215 size_t count = 0;
216 for (const auto& [key, group] : groups_) {
217 count += group.size();
218 }
219 return count;
220 }
221
222private:
223 std::string compute_group_key(const alert& a) const {
224 if (config_.group_by_labels.empty()) {
225 return a.rule_name;
226 }
227
228 std::string key;
229 for (const auto& label : config_.group_by_labels) {
230 key += a.labels.get(label) + ":";
231 }
232 return key;
233 }
234
236 alert_labels common;
237 for (const auto& label : config_.group_by_labels) {
238 std::string val = a.labels.get(label);
239 if (!val.empty()) {
240 common.set(label, val);
241 }
242 }
243 return common;
244 }
245
247 mutable std::mutex mutex_;
248
249 std::unordered_map<std::string, alert_group> groups_;
250 std::unordered_map<std::string, std::chrono::steady_clock::time_point> first_seen_;
251 std::unordered_map<std::string, std::chrono::steady_clock::time_point> last_sent_;
252};
253
262 std::string name;
265 std::vector<std::string> equal;
266
270 bool matches_source(const alert& a) const {
271 for (const auto& [key, value] : source_match.labels) {
272 if (a.labels.get(key) != value) {
273 return false;
274 }
275 }
276 return true;
277 }
278
282 bool should_inhibit(const alert& source, const alert& target) const {
283 // Source must match
284 if (!matches_source(source)) {
285 return false;
286 }
287
288 // Target must match target_match labels
289 for (const auto& [key, value] : target_match.labels) {
290 if (target.labels.get(key) != value) {
291 return false;
292 }
293 }
294
295 // Equal labels must match between source and target
296 for (const auto& label : equal) {
297 if (source.labels.get(label) != target.labels.get(label)) {
298 return false;
299 }
300 }
301
302 return true;
303 }
304};
305
331public:
335 void add_rule(const inhibition_rule& rule) {
336 std::lock_guard<std::mutex> lock(mutex_);
337 rules_.push_back(rule);
338 }
339
343 void remove_rule(const std::string& name) {
344 std::lock_guard<std::mutex> lock(mutex_);
345 rules_.erase(
346 std::remove_if(rules_.begin(), rules_.end(),
347 [&name](const inhibition_rule& r) { return r.name == name; }),
348 rules_.end()
349 );
350 }
351
358 bool is_inhibited(const alert& target,
359 const std::vector<alert>& active_alerts) const {
360 std::lock_guard<std::mutex> lock(mutex_);
361
362 for (const auto& rule : rules_) {
363 for (const auto& source : active_alerts) {
364 // Source must be firing
365 if (source.state != alert_state::firing) {
366 continue;
367 }
368
369 // Don't inhibit self
370 if (source.fingerprint() == target.fingerprint()) {
371 continue;
372 }
373
374 if (rule.should_inhibit(source, target)) {
375 return true;
376 }
377 }
378 }
379
380 return false;
381 }
382
386 std::vector<inhibition_rule> get_rules() const {
387 std::lock_guard<std::mutex> lock(mutex_);
388 return rules_;
389 }
390
391private:
392 mutable std::mutex mutex_;
393 std::vector<inhibition_rule> rules_;
394};
395
404public:
408 explicit cooldown_tracker(std::chrono::milliseconds default_cooldown)
409 : default_cooldown_(default_cooldown) {}
410
416 bool is_in_cooldown(const std::string& fingerprint) const {
417 std::lock_guard<std::mutex> lock(mutex_);
418
419 auto it = last_notification_.find(fingerprint);
420 if (it == last_notification_.end()) {
421 return false;
422 }
423
424 auto now = std::chrono::steady_clock::now();
425 auto cooldown = get_cooldown_for(fingerprint);
426 return (now - it->second) < cooldown;
427 }
428
433 void record_notification(const std::string& fingerprint) {
434 std::lock_guard<std::mutex> lock(mutex_);
435 last_notification_[fingerprint] = std::chrono::steady_clock::now();
436 }
437
443 void set_cooldown(const std::string& fingerprint,
444 std::chrono::milliseconds cooldown) {
445 std::lock_guard<std::mutex> lock(mutex_);
446 custom_cooldowns_[fingerprint] = cooldown;
447 }
448
454 std::chrono::milliseconds remaining_cooldown(const std::string& fingerprint) const {
455 std::lock_guard<std::mutex> lock(mutex_);
456
457 auto it = last_notification_.find(fingerprint);
458 if (it == last_notification_.end()) {
459 return std::chrono::milliseconds::zero();
460 }
461
462 auto now = std::chrono::steady_clock::now();
463 auto cooldown = get_cooldown_for(fingerprint);
464 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
465 now - it->second);
466
467 if (elapsed >= cooldown) {
468 return std::chrono::milliseconds::zero();
469 }
470 return cooldown - elapsed;
471 }
472
476 void clear_cooldown(const std::string& fingerprint) {
477 std::lock_guard<std::mutex> lock(mutex_);
478 last_notification_.erase(fingerprint);
479 }
480
484 void reset() {
485 std::lock_guard<std::mutex> lock(mutex_);
486 last_notification_.clear();
487 }
488
489private:
490 std::chrono::milliseconds get_cooldown_for(const std::string& fingerprint) const {
491 auto it = custom_cooldowns_.find(fingerprint);
492 if (it != custom_cooldowns_.end()) {
493 return it->second;
494 }
495 return default_cooldown_;
496 }
497
498 std::chrono::milliseconds default_cooldown_;
499 mutable std::mutex mutex_;
500 std::unordered_map<std::string, std::chrono::steady_clock::time_point> last_notification_;
501 std::unordered_map<std::string, std::chrono::milliseconds> custom_cooldowns_;
502};
503
512public:
517 explicit alert_deduplicator(std::chrono::milliseconds cache_duration)
518 : cache_duration_(cache_duration) {}
519
525 bool is_duplicate(const alert& a) {
526 std::lock_guard<std::mutex> lock(mutex_);
527
529
530 auto fingerprint = a.fingerprint();
531 auto it = seen_.find(fingerprint);
532
533 if (it == seen_.end()) {
534 seen_[fingerprint] = std::chrono::steady_clock::now();
535 return false;
536 }
537
538 // Same state is duplicate
539 auto state_it = last_state_.find(fingerprint);
540 if (state_it != last_state_.end() && state_it->second == a.state) {
541 return true;
542 }
543
544 // State changed - not duplicate
545 last_state_[fingerprint] = a.state;
546 return false;
547 }
548
552 void mark_seen(const alert& a) {
553 std::lock_guard<std::mutex> lock(mutex_);
554 auto fingerprint = a.fingerprint();
555 seen_[fingerprint] = std::chrono::steady_clock::now();
556 last_state_[fingerprint] = a.state;
557 }
558
562 void reset() {
563 std::lock_guard<std::mutex> lock(mutex_);
564 seen_.clear();
565 last_state_.clear();
566 }
567
568private:
570 auto now = std::chrono::steady_clock::now();
571 for (auto it = seen_.begin(); it != seen_.end(); ) {
572 if (now - it->second > cache_duration_) {
573 last_state_.erase(it->first);
574 it = seen_.erase(it);
575 } else {
576 ++it;
577 }
578 }
579 }
580
581 std::chrono::milliseconds cache_duration_;
582 mutable std::mutex mutex_;
583 std::unordered_map<std::string, std::chrono::steady_clock::time_point> seen_;
584 std::unordered_map<std::string, alert_state> last_state_;
585};
586
592public:
593 virtual ~pipeline_stage() = default;
594
600 virtual bool process(alert& a) = 0;
601
605 virtual std::string name() const = 0;
606};
607
616public:
620 void add_stage(std::shared_ptr<pipeline_stage> stage) {
621 stages_.push_back(std::move(stage));
622 }
623
629 bool process(alert& a) {
630 for (const auto& stage : stages_) {
631 if (!stage->process(a)) {
632 return false;
633 }
634 }
635 return true;
636 }
637
641 std::vector<std::string> stage_names() const {
642 std::vector<std::string> names;
643 names.reserve(stages_.size());
644 for (const auto& stage : stages_) {
645 names.push_back(stage->name());
646 }
647 return names;
648 }
649
650private:
651 std::vector<std::shared_ptr<pipeline_stage>> stages_;
652};
653
654} // namespace kcenon::monitoring
Core alert data structures for the monitoring system.
Groups and deduplicates alerts.
size_t group_count() const
Get current group count.
void cleanup()
Remove resolved alerts and clean up old groups.
std::string add_alert(const alert &a)
Add an alert for aggregation.
void mark_sent(const std::string &group_key)
Mark a group as sent.
alert_labels extract_common_labels(const alert &a) const
std::string compute_group_key(const alert &a) const
std::unordered_map< std::string, std::chrono::steady_clock::time_point > first_seen_
std::unordered_map< std::string, alert_group > groups_
alert_aggregator(const alert_aggregator_config &config)
Construct with configuration.
std::unordered_map< std::string, std::chrono::steady_clock::time_point > last_sent_
std::vector< alert_group > get_ready_groups()
Get groups ready for notification.
size_t total_alert_count() const
Get total alert count across all groups.
Deduplicates alerts based on fingerprint.
std::unordered_map< std::string, alert_state > last_state_
bool is_duplicate(const alert &a)
Check if alert is a duplicate.
void mark_seen(const alert &a)
Mark alert as seen.
std::chrono::milliseconds cache_duration_
alert_deduplicator(std::chrono::milliseconds cache_duration)
Construct with cache duration.
std::unordered_map< std::string, std::chrono::steady_clock::time_point > seen_
void reset()
Clear deduplication cache.
Manages alert inhibition rules.
void remove_rule(const std::string &name)
Remove an inhibition rule by name.
bool is_inhibited(const alert &target, const std::vector< alert > &active_alerts) const
Check if an alert is inhibited by any active alerts.
void add_rule(const inhibition_rule &rule)
Add an inhibition rule.
std::vector< inhibition_rule > get_rules() const
Get all rules.
std::vector< inhibition_rule > rules_
Configurable alert processing pipeline.
void add_stage(std::shared_ptr< pipeline_stage > stage)
Add a processing stage.
bool process(alert &a)
Process an alert through all stages.
std::vector< std::shared_ptr< pipeline_stage > > stages_
std::vector< std::string > stage_names() const
Get stage names.
Tracks cooldown periods for alert notifications.
void record_notification(const std::string &fingerprint)
Record notification time.
bool is_in_cooldown(const std::string &fingerprint) const
Check if alert is in cooldown.
cooldown_tracker(std::chrono::milliseconds default_cooldown)
Set default cooldown period.
std::chrono::milliseconds default_cooldown_
std::unordered_map< std::string, std::chrono::steady_clock::time_point > last_notification_
void reset()
Clear all cooldown state.
std::unordered_map< std::string, std::chrono::milliseconds > custom_cooldowns_
std::chrono::milliseconds get_cooldown_for(const std::string &fingerprint) const
std::chrono::milliseconds remaining_cooldown(const std::string &fingerprint) const
Get time remaining in cooldown.
void clear_cooldown(const std::string &fingerprint)
Clear cooldown state for an alert.
void set_cooldown(const std::string &fingerprint, std::chrono::milliseconds cooldown)
Set custom cooldown for specific alert.
Base class for pipeline processing stages.
virtual bool process(alert &a)=0
Process an alert through this stage.
virtual std::string name() const =0
Get stage name.
@ firing
Alert is active and notifications sent.
@ resolved
Alert condition cleared.
Result pattern type definitions for monitoring system.
Configuration for alert aggregation.
std::chrono::milliseconds group_interval
Interval between group sends.
std::chrono::milliseconds group_wait
Initial wait before sending.
std::chrono::milliseconds resolve_timeout
Time before removing resolved.
std::vector< std::string > group_by_labels
Labels to group by.
bool validate() const
Validate configuration.
Group of related alerts for batch notification.
alert_labels common_labels
Labels shared by all alerts.
void add_alert(alert a)
Add an alert to the group.
Key-value labels for alert identification and routing.
std::unordered_map< std::string, std::string > labels
void set(const std::string &key, const std::string &value)
Add or update a label.
std::string get(const std::string &key) const
Get a label value.
Core alert data structure.
alert_state state
Current state.
std::string rule_name
Name of triggering rule.
alert_labels labels
Identifying labels.
std::string fingerprint() const
Get alert fingerprint for deduplication.
Rule for inhibiting alerts based on other alerts.
std::vector< std::string > equal
Labels that must be equal on both.
bool should_inhibit(const alert &source, const alert &target) const
Check if target alert should be inhibited by source.
alert_labels target_match
Labels that target alert must have.
bool matches_source(const alert &a) const
Check if source alert matches this rule.
alert_labels source_match
Labels that source alert must have.