Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
data_consistency.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#pragma once
12
13#include <atomic>
14#include <chrono>
15#include <condition_variable>
16#include <functional>
17#include <memory>
18#include <mutex>
19#include <shared_mutex>
20#include <string>
21#include <thread>
22#include <unordered_map>
23#include <vector>
24
26
27namespace kcenon::monitoring {
28
33 active,
36};
37
42 valid,
44};
45
50 std::chrono::milliseconds timeout{std::chrono::milliseconds(30000)};
51 std::chrono::milliseconds lock_timeout{std::chrono::milliseconds(10000)};
52 size_t max_retries{3};
53
54 [[nodiscard]] bool validate() const {
55 if (timeout.count() <= 0) {
56 return false;
57 }
58 if (lock_timeout.count() <= 0) {
59 return false;
60 }
61 if (max_retries == 0) {
62 return false;
63 }
64 return true;
65 }
66};
67
72 std::chrono::milliseconds validation_interval{std::chrono::milliseconds(60000)};
75 bool enable_auto_repair{false};
76
77 [[nodiscard]] bool validate() const {
78 if (validation_interval.count() <= 0) {
79 return false;
80 }
81 if (max_validation_failures == 0) {
82 return false;
83 }
85 return false;
86 }
87 return true;
88 }
89};
90
95 std::atomic<size_t> total_transactions{0};
96 std::atomic<size_t> committed_transactions{0};
97 std::atomic<size_t> aborted_transactions{0};
98 std::atomic<size_t> deadlocks_detected{0};
99
100 double get_abort_rate() const {
101 size_t total = total_transactions.load();
102 if (total == 0) {
103 return 0.0;
104 }
105 return static_cast<double>(aborted_transactions.load()) / static_cast<double>(total);
106 }
107};
108
113 std::atomic<size_t> validation_runs{0};
114 std::atomic<size_t> repair_operations{0};
115};
116
121public:
122 using execute_func_t = std::function<common::VoidResult()>;
123 using rollback_func_t = std::function<common::VoidResult()>;
124
125 transaction_operation(const std::string& name,
126 execute_func_t execute_func,
127 rollback_func_t rollback_func = nullptr)
128 : name_(name)
129 , execute_func_(std::move(execute_func))
130 , rollback_func_(std::move(rollback_func))
131 , executed_(false) {}
132
133 std::string name() const { return name_; }
134 bool is_executed() const { return executed_; }
135
136 common::VoidResult execute() {
137 if (execute_func_) {
138 auto result = execute_func_();
139 if (result.is_ok()) {
140 executed_ = true;
141 }
142 return result;
143 }
144 executed_ = true;
145 return common::ok();
146 }
147
148 bool rollback() {
149 if (rollback_func_) {
150 auto result = rollback_func_();
151 return result.is_ok();
152 }
153 return true;
154 }
155
156private:
157 std::string name_;
161};
162
167public:
168 transaction(const std::string& id, const transaction_config& config)
169 : id_(id)
170 , config_(config)
172 , creation_time_(std::chrono::steady_clock::now()) {}
173
174 std::string id() const { return id_; }
175 transaction_state state() const { return state_; }
176 size_t operation_count() const { return operations_.size(); }
177
178 bool add_operation(std::unique_ptr<transaction_operation> op) {
180 return false;
181 }
182 operations_.push_back(std::move(op));
183 return true;
184 }
185
186 bool commit() {
188 return false;
189 }
190
191 // Check for timeout
192 auto now = std::chrono::steady_clock::now();
193 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - creation_time_);
194 if (elapsed > config_.timeout) {
195 abort();
196 return false;
197 }
198
199 // Execute all operations
200 size_t executed_count = 0;
201 for (auto& op : operations_) {
202 auto result = op->execute();
203 if (!result.is_ok()) {
204 // Rollback executed operations in reverse order
205 for (size_t i = executed_count; i > 0; --i) {
206 operations_[i - 1]->rollback();
207 }
209 return false;
210 }
211 ++executed_count;
212 }
213
215 return true;
216 }
217
218 bool abort() {
220 return false;
221 }
223 return true;
224 }
225
226 std::chrono::steady_clock::time_point creation_time() const {
227 return creation_time_;
228 }
229
230private:
231 std::string id_;
234 std::chrono::steady_clock::time_point creation_time_;
235 std::vector<std::unique_ptr<transaction_operation>> operations_;
236};
237
242public:
243 transaction_manager(const std::string& name, const transaction_config& config)
244 : name_(name)
245 , config_(config) {}
246
247 std::string get_name() const { return name_; }
248
249 common::Result<std::shared_ptr<transaction>> begin_transaction(const std::string& id) {
250 std::unique_lock<std::shared_mutex> lock(mutex_);
251
252 // Check for duplicate transaction
253 if (active_transactions_.find(id) != active_transactions_.end()) {
254 return common::make_error<std::shared_ptr<transaction>>(
256 "Transaction with ID '" + id + "' already exists");
257 }
258
259 auto tx = std::make_shared<transaction>(id, config_);
260 active_transactions_[id] = tx;
262 return common::ok(tx);
263 }
264
265 bool commit_transaction(const std::string& id) {
266 std::unique_lock<std::shared_mutex> lock(mutex_);
267
268 auto it = active_transactions_.find(id);
269 if (it == active_transactions_.end()) {
270 return false;
271 }
272
273 auto tx = it->second;
274 active_transactions_.erase(it);
275
276 bool success = tx->commit();
277 if (success) {
280 } else {
282 }
283 return success;
284 }
285
286 bool abort_transaction(const std::string& id) {
287 std::unique_lock<std::shared_mutex> lock(mutex_);
288
289 auto it = active_transactions_.find(id);
290 if (it == active_transactions_.end()) {
291 return false;
292 }
293
294 auto tx = it->second;
295 active_transactions_.erase(it);
296
297 tx->abort();
299 return true;
300 }
301
303 std::shared_lock<std::shared_mutex> lock(mutex_);
304 return active_transactions_.size();
305 }
306
308 std::shared_lock<std::shared_mutex> lock(mutex_);
309 return completed_transactions_.size();
310 }
311
312 common::Result<std::vector<std::string>> detect_deadlocks() {
313 std::shared_lock<std::shared_mutex> lock(mutex_);
314
315 std::vector<std::string> deadlocked;
316 auto now = std::chrono::steady_clock::now();
317
318 for (const auto& [id, tx] : active_transactions_) {
319 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
320 now - tx->creation_time());
321 if (elapsed > config_.timeout) {
322 deadlocked.push_back(id);
324 }
325 }
326
327 return common::ok(deadlocked);
328 }
329
330 void cleanup_completed_transactions(std::chrono::milliseconds /*max_age*/) {
331 std::unique_lock<std::shared_mutex> lock(mutex_);
333 }
334
336 const transaction_metrics& get_metrics() const { return metrics_; }
337
338private:
339 std::string name_;
341 mutable std::shared_mutex mutex_;
342 std::unordered_map<std::string, std::shared_ptr<transaction>> active_transactions_;
343 std::unordered_map<std::string, std::shared_ptr<transaction>> completed_transactions_;
345};
346
351public:
352 using validation_func_t = std::function<validation_result()>;
353 using repair_func_t = std::function<common::VoidResult()>;
354
355 state_validator(const std::string& name, const validation_config& config)
356 : name_(name)
357 , config_(config)
358 , running_(false) {}
359
361 stop();
362 }
363
364 // Non-copyable and non-movable due to thread ownership
369
370 std::string get_name() const { return name_; }
371
372 bool add_validation_rule(const std::string& name,
373 validation_func_t validation_func,
374 repair_func_t repair_func = nullptr) {
375 std::unique_lock<std::mutex> lock(mutex_);
376 validation_rules_[name] = {std::move(validation_func), std::move(repair_func)};
377 return true;
378 }
379
380 common::Result<std::unordered_map<std::string, validation_result>> validate() {
381 std::unique_lock<std::mutex> lock(mutex_);
382
383 std::unordered_map<std::string, validation_result> results;
385
386 for (const auto& [name, rule] : validation_rules_) {
387 auto validation_result_value = rule.validation_func();
388 results[name] = validation_result_value;
389
390 // If validation failed and auto-repair is enabled
391 if (validation_result_value == validation_result::invalid &&
392 config_.enable_auto_repair && rule.repair_func) {
393 auto repair_result = rule.repair_func();
394 if (repair_result.is_ok()) {
396 // Re-validate after repair
397 auto after_repair = rule.validation_func();
398 results[name + "_after_repair"] = after_repair;
399 }
400 }
401 }
402
403 return common::ok(results);
404 }
405
406 common::VoidResult start() {
407 if (running_.exchange(true)) {
408 return common::VoidResult::err(error_info(monitoring_error_code::already_started, "Validator already running").to_common_error());
409 }
410
411 validation_thread_ = std::thread([this]() {
412 while (running_) {
413 {
414 std::unique_lock<std::mutex> lock(cv_mutex_);
415 cv_.wait_for(lock, config_.validation_interval, [this]() {
416 return !running_.load();
417 });
418 }
419
420 if (!running_) {
421 break;
422 }
423
424 validate();
425 }
426 });
427
428 return common::ok();
429 }
430
431 common::VoidResult stop() {
432 if (!running_.exchange(false)) {
433 return common::ok();
434 }
435
436 {
437 std::unique_lock<std::mutex> lock(cv_mutex_);
438 cv_.notify_all();
439 }
440
441 if (validation_thread_.joinable()) {
442 validation_thread_.join();
443 }
444
445 return common::ok();
446 }
447
448 common::Result<bool> is_healthy() const {
449 // Simple health check - all validation rules should pass
450 return common::ok(true);
451 }
452
454 const validation_metrics& get_metrics() const { return metrics_; }
455
456private:
461
462 std::string name_;
464 std::mutex mutex_;
465 std::unordered_map<std::string, validation_rule> validation_rules_;
467
468 std::atomic<bool> running_;
470 std::mutex cv_mutex_;
471 std::condition_variable cv_;
472};
473
478public:
479 explicit data_consistency_manager(const std::string& name)
480 : name_(name) {}
481
482 common::VoidResult add_transaction_manager(const std::string& name,
483 const transaction_config& config) {
484 std::unique_lock<std::mutex> lock(mutex_);
485
486 if (transaction_managers_.find(name) != transaction_managers_.end()) {
487 return common::VoidResult::err(common::error_info(
489 "Transaction manager '" + name + "' already exists",
490 "monitoring_system"));
491 }
492
493 transaction_managers_[name] = std::make_shared<transaction_manager>(name, config);
494 return common::ok();
495 }
496
498 std::unique_lock<std::mutex> lock(mutex_);
499
500 auto it = transaction_managers_.find(name);
501 if (it == transaction_managers_.end()) {
502 return nullptr;
503 }
504 return it->second.get();
505 }
506
507 common::VoidResult add_state_validator(const std::string& name,
508 const validation_config& config) {
509 std::unique_lock<std::mutex> lock(mutex_);
510
511 if (state_validators_.find(name) != state_validators_.end()) {
512 return common::VoidResult::err(common::error_info(
514 "State validator '" + name + "' already exists",
515 "monitoring_system"));
516 }
517
518 state_validators_[name] = std::make_shared<state_validator>(name, config);
519 return common::ok();
520 }
521
522 state_validator* get_state_validator(const std::string& name) {
523 std::unique_lock<std::mutex> lock(mutex_);
524
525 auto it = state_validators_.find(name);
526 if (it == state_validators_.end()) {
527 return nullptr;
528 }
529 return it->second.get();
530 }
531
532 common::VoidResult start_all_validators() {
533 std::unique_lock<std::mutex> lock(mutex_);
534
535 for (auto& [name, validator] : state_validators_) {
536 auto result = validator->start();
537 if (!result.is_ok()) {
538 return result;
539 }
540 }
541 return common::ok();
542 }
543
544 common::VoidResult stop_all_validators() {
545 std::unique_lock<std::mutex> lock(mutex_);
546
547 for (auto& [name, validator] : state_validators_) {
548 validator->stop();
549 }
550 return common::ok();
551 }
552
553 common::Result<bool> is_healthy() const {
554 return common::ok(true);
555 }
556
557 std::unordered_map<std::string, std::string> get_all_metrics() const {
558 std::unordered_map<std::string, std::string> all_metrics;
559
560 for (const auto& [name, manager] : transaction_managers_) {
561 all_metrics[name + "_transactions"] = std::to_string(manager->get_metrics().total_transactions.load());
562 }
563
564 for (const auto& [name, validator] : state_validators_) {
565 all_metrics[name + "_validation"] = std::to_string(validator->get_metrics().validation_runs.load());
566 }
567
568 return all_metrics;
569 }
570
571private:
572 std::string name_;
573 std::mutex mutex_;
574 std::unordered_map<std::string, std::shared_ptr<transaction_manager>> transaction_managers_;
575 std::unordered_map<std::string, std::shared_ptr<state_validator>> state_validators_;
576};
577
581inline std::shared_ptr<transaction_manager> create_transaction_manager(const std::string& name) {
582 transaction_config config;
583 return std::make_shared<transaction_manager>(name, config);
584}
585
589inline std::shared_ptr<state_validator> create_state_validator(const std::string& name) {
590 validation_config config;
591 return std::make_shared<state_validator>(name, config);
592}
593
597inline std::shared_ptr<data_consistency_manager> create_data_consistency_manager(const std::string& name) {
598 return std::make_shared<data_consistency_manager>(name);
599}
600
601} // namespace kcenon::monitoring
Data consistency manager coordinating transaction managers and validators.
std::unordered_map< std::string, std::string > get_all_metrics() const
transaction_manager * get_transaction_manager(const std::string &name)
common::VoidResult add_transaction_manager(const std::string &name, const transaction_config &config)
state_validator * get_state_validator(const std::string &name)
common::Result< bool > is_healthy() const
common::VoidResult add_state_validator(const std::string &name, const validation_config &config)
std::unordered_map< std::string, std::shared_ptr< state_validator > > state_validators_
std::unordered_map< std::string, std::shared_ptr< transaction_manager > > transaction_managers_
State validator for validating system state.
state_validator & operator=(state_validator &&)=delete
std::function< validation_result()> validation_func_t
std::function< common::VoidResult()> repair_func_t
std::unordered_map< std::string, validation_rule > validation_rules_
bool add_validation_rule(const std::string &name, validation_func_t validation_func, repair_func_t repair_func=nullptr)
const validation_metrics & get_metrics() const
state_validator(const state_validator &)=delete
state_validator & operator=(const state_validator &)=delete
state_validator(const std::string &name, const validation_config &config)
common::Result< bool > is_healthy() const
state_validator(state_validator &&)=delete
common::Result< std::unordered_map< std::string, validation_result > > validate()
Transaction manager for coordinating transactions.
std::unordered_map< std::string, std::shared_ptr< transaction > > active_transactions_
const transaction_metrics & get_metrics() const
common::Result< std::shared_ptr< transaction > > begin_transaction(const std::string &id)
bool commit_transaction(const std::string &id)
std::unordered_map< std::string, std::shared_ptr< transaction > > completed_transactions_
void cleanup_completed_transactions(std::chrono::milliseconds)
common::Result< std::vector< std::string > > detect_deadlocks()
transaction_manager(const std::string &name, const transaction_config &config)
bool abort_transaction(const std::string &id)
Single transaction operation with execute and rollback capabilities.
std::function< common::VoidResult()> rollback_func_t
transaction_operation(const std::string &name, execute_func_t execute_func, rollback_func_t rollback_func=nullptr)
std::function< common::VoidResult()> execute_func_t
Transaction containing multiple operations.
std::chrono::steady_clock::time_point creation_time() const
bool add_operation(std::unique_ptr< transaction_operation > op)
std::chrono::steady_clock::time_point creation_time_
transaction(const std::string &id, const transaction_config &config)
std::vector< std::unique_ptr< transaction_operation > > operations_
transaction_state state() const
std::shared_ptr< state_validator > create_state_validator(const std::string &name)
Factory function to create a state validator.
validation_result
Validation result states.
std::shared_ptr< data_consistency_manager > create_data_consistency_manager(const std::string &name)
Factory function to create a data consistency manager.
transaction_state
Transaction states.
std::shared_ptr< transaction_manager > create_transaction_manager(const std::string &name)
Factory function to create a transaction manager.
Result pattern type definitions for monitoring system.
Extended error information with context.
std::chrono::milliseconds lock_timeout
std::chrono::milliseconds validation_interval