Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_data_consistency.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#include <gtest/gtest.h>
6#include <atomic>
7#include <chrono>
8#include <thread>
9#include <vector>
11
12using namespace kcenon::monitoring;
13using namespace kcenon;
14
15class DataConsistencyTest : public ::testing::Test {
16protected:
17 void SetUp() override {
18 call_count = 0;
19 success_count = 0;
21 }
22
23 void TearDown() override {
24 // Clean up any resources if needed
25 }
26
27 std::atomic<int> call_count{0};
28 std::atomic<int> success_count{0};
29 std::atomic<int> rollback_count{0};
30
31 // Helper function for testing operations
32 kcenon::common::VoidResult test_operation() {
33 ++call_count;
35 return kcenon::common::ok();
36 }
37
38 // Helper function that fails
39 kcenon::common::VoidResult failing_operation() {
40 ++call_count;
41 return kcenon::common::VoidResult::err(static_cast<int>(monitoring_error_code::operation_failed), "Simulated failure");
42 }
43
44 // Helper function for rollback
45 kcenon::common::VoidResult rollback_operation() {
47 return kcenon::common::ok();
48 }
49
50 // Helper validation function
52 return validation_result::valid;
53 }
54
56 return validation_result::invalid;
57 }
58
59 kcenon::common::VoidResult test_repair() {
60 return kcenon::common::ok();
61 }
62};
63
64// Transaction Operation Tests
65TEST_F(DataConsistencyTest, TransactionOperationBasic) {
66 auto op = std::make_unique<transaction_operation>(
67 "test_op",
68 [this]() { return test_operation(); },
69 [this]() { return rollback_operation(); }
70 );
71
72 EXPECT_EQ(op->name(), "test_op");
73 EXPECT_FALSE(op->is_executed());
74
75 auto result = op->execute();
76 EXPECT_TRUE(result.is_ok());
77 EXPECT_TRUE(op->is_executed());
78 EXPECT_EQ(call_count.load(), 1);
79 EXPECT_EQ(success_count.load(), 1);
80
81 auto rollback_result = op->rollback();
82 EXPECT_TRUE(rollback_result);
83 EXPECT_EQ(rollback_count.load(), 1);
84}
85
86// Transaction Tests
87TEST_F(DataConsistencyTest, TransactionCommitSuccess) {
88 transaction_config config;
89 transaction tx("test_tx", config);
90
91 // Add operations
92 auto op1 = std::make_unique<transaction_operation>(
93 "op1", [this]() { return test_operation(); });
94 auto op2 = std::make_unique<transaction_operation>(
95 "op2", [this]() { return test_operation(); });
96
97 auto add_result1 = tx.add_operation(std::move(op1));
98 auto add_result2 = tx.add_operation(std::move(op2));
99 EXPECT_TRUE(add_result1);
100 EXPECT_TRUE(add_result2);
101
102 EXPECT_EQ(tx.operation_count(), 2);
103 EXPECT_EQ(tx.state(), transaction_state::active);
104
105 // Commit transaction
106 auto commit_result = tx.commit();
107 EXPECT_TRUE(commit_result);
108 EXPECT_EQ(tx.state(), transaction_state::committed);
109 EXPECT_EQ(call_count.load(), 2);
110 EXPECT_EQ(success_count.load(), 2);
111}
112
113TEST_F(DataConsistencyTest, TransactionRollbackOnFailure) {
114 transaction_config config;
115 transaction tx("test_tx", config);
116
117 // Add operations - second one will fail
118 auto op1 = std::make_unique<transaction_operation>(
119 "op1",
120 [this]() { return test_operation(); },
121 [this]() { return rollback_operation(); });
122 auto op2 = std::make_unique<transaction_operation>(
123 "op2", [this]() { return failing_operation(); });
124
125 tx.add_operation(std::move(op1));
126 tx.add_operation(std::move(op2));
127
128 // Commit should fail and rollback
129 auto commit_result = tx.commit();
130 EXPECT_FALSE(commit_result);
131 EXPECT_EQ(tx.state(), transaction_state::aborted);
132 EXPECT_EQ(call_count.load(), 2); // Both operations attempted
133 EXPECT_EQ(success_count.load(), 1); // Only first succeeded
134 EXPECT_EQ(rollback_count.load(), 1); // First operation rolled back
135}
136
137TEST_F(DataConsistencyTest, TransactionManualAbort) {
138 transaction_config config;
139 transaction tx("test_tx", config);
140
141 auto op = std::make_unique<transaction_operation>(
142 "op",
143 [this]() { return test_operation(); },
144 [this]() { return rollback_operation(); });
145
146 tx.add_operation(std::move(op));
147
148 // Manually abort
149 auto abort_result = tx.abort();
150 EXPECT_TRUE(abort_result);
151 EXPECT_EQ(tx.state(), transaction_state::aborted);
152
153 // Should not be able to add more operations
154 auto op2 = std::make_unique<transaction_operation>("op2", [this]() { return test_operation(); });
155 auto add_result = tx.add_operation(std::move(op2));
156 EXPECT_FALSE(add_result);
157}
158
159TEST_F(DataConsistencyTest, TransactionTimeout) {
160 transaction_config config;
161 config.timeout = std::chrono::milliseconds(50);
162 transaction tx("test_tx", config);
163
164 auto op = std::make_unique<transaction_operation>("op", [this]() { return test_operation(); });
165 tx.add_operation(std::move(op));
166
167 // Wait for timeout
168 std::this_thread::sleep_for(std::chrono::milliseconds(100));
169
170 auto commit_result = tx.commit();
171 EXPECT_FALSE(commit_result);
172 EXPECT_EQ(tx.state(), transaction_state::aborted);
173}
174
175// State Validator Tests
176TEST_F(DataConsistencyTest, StateValidatorBasicValidation) {
177 validation_config config;
178 config.validation_interval = std::chrono::milliseconds(100);
179 state_validator validator("test_validator", config);
180
181 // Add validation rule
182 auto add_result = validator.add_validation_rule(
183 "test_rule",
184 [this]() { return test_validation(); },
185 [this]() { return test_repair(); }
186 );
187 EXPECT_TRUE(add_result);
188
189 // Manual validation
190 auto validation_res = validator.validate();
191 EXPECT_TRUE(validation_res.is_ok());
192
193 auto results = validation_res.value();
194 EXPECT_EQ(results.size(), 1);
195 EXPECT_EQ(results["test_rule"], validation_result::valid);
196
197 auto health = validator.is_healthy();
198 EXPECT_TRUE(health.is_ok());
199 EXPECT_TRUE(health.value());
200}
201
202TEST_F(DataConsistencyTest, StateValidatorFailureAndRepair) {
203 validation_config config;
204 config.enable_auto_repair = true;
205 state_validator validator("test_validator", config);
206
207 // Add validation rule that fails initially
208 std::atomic<bool> should_fail{true};
209 validator.add_validation_rule(
210 "failing_rule",
211 [&should_fail]() {
212 return should_fail.load() ? validation_result::invalid : validation_result::valid;
213 },
214 [&should_fail]() {
215 should_fail = false; // Repair fixes the issue
216 return common::ok();
217 }
218 );
219
220 // First validation should fail and trigger repair
221 auto validation_res = validator.validate();
222 EXPECT_TRUE(validation_res.is_ok());
223
224 auto results = validation_res.value();
225 EXPECT_EQ(results["failing_rule"], validation_result::invalid);
226 EXPECT_EQ(results["failing_rule_after_repair"], validation_result::valid);
227
228 const auto& metrics = validator.get_metrics();
229 EXPECT_EQ(metrics.validation_runs.load(), 1);
230 EXPECT_EQ(metrics.repair_operations.load(), 1);
231}
232
233TEST_F(DataConsistencyTest, StateValidatorContinuousValidation) {
234 validation_config config;
235 config.validation_interval = std::chrono::milliseconds(50);
236 state_validator validator("test_validator", config);
237
238 std::atomic<int> validation_calls{0};
239 validator.add_validation_rule(
240 "continuous_rule",
241 [&validation_calls]() {
242 validation_calls++;
243 return validation_result::valid;
244 }
245 );
246
247 // Start continuous validation
248 auto start_result = validator.start();
249 EXPECT_TRUE(start_result.is_ok());
250
251 // Wait for several validation cycles (400ms with 50ms interval = at least 6-7 cycles)
252 // Using 400ms to account for scheduling delays on various platforms (especially macOS)
253 std::this_thread::sleep_for(std::chrono::milliseconds(400));
254
255 // Stop validation
256 auto stop_result = validator.stop();
257 EXPECT_TRUE(stop_result.is_ok());
258
259 // Should have run multiple validations (at least 2 with 400ms wait)
260 // Using >= 2 to be robust against platform-specific scheduling delays
261 EXPECT_GE(validation_calls.load(), 2);
262}
263
264// Transaction Manager Tests
265TEST_F(DataConsistencyTest, TransactionManagerBasicOperations) {
266 transaction_config config;
267 transaction_manager manager("test_manager", config);
268
269 // Begin transaction
270 auto begin_result = manager.begin_transaction("tx1");
271 EXPECT_TRUE(begin_result.is_ok());
272
273 auto tx = begin_result.value();
274 EXPECT_EQ(tx->id(), "tx1");
275 EXPECT_EQ(tx->state(), transaction_state::active);
276 EXPECT_EQ(manager.active_transaction_count(), 1);
277
278 // Add operation and commit
279 auto op = std::make_unique<transaction_operation>("op", [this]() { return test_operation(); });
280 tx->add_operation(std::move(op));
281
282 auto commit_result = manager.commit_transaction("tx1");
283 EXPECT_TRUE(commit_result);
284 EXPECT_EQ(manager.active_transaction_count(), 0);
285 EXPECT_EQ(manager.completed_transaction_count(), 1);
286
287 const auto& metrics = manager.get_metrics();
288 EXPECT_EQ(metrics.total_transactions.load(), 1);
289 EXPECT_EQ(metrics.committed_transactions.load(), 1);
290 EXPECT_EQ(metrics.aborted_transactions.load(), 0);
291}
292
293TEST_F(DataConsistencyTest, TransactionManagerAbort) {
294 transaction_config config;
295 transaction_manager manager("test_manager", config);
296
297 auto begin_result = manager.begin_transaction("tx1");
298 EXPECT_TRUE(begin_result.is_ok());
299
300 auto abort_result = manager.abort_transaction("tx1");
301 EXPECT_TRUE(abort_result);
302
303 const auto& metrics = manager.get_metrics();
304 EXPECT_EQ(metrics.total_transactions.load(), 1);
305 EXPECT_EQ(metrics.committed_transactions.load(), 0);
306 EXPECT_EQ(metrics.aborted_transactions.load(), 1);
307 EXPECT_NEAR(metrics.get_abort_rate(), 1.0, 0.01);
308}
309
310TEST_F(DataConsistencyTest, TransactionManagerDuplicateTransaction) {
311 transaction_config config;
312 transaction_manager manager("test_manager", config);
313
314 auto begin_result1 = manager.begin_transaction("tx1");
315 EXPECT_TRUE(begin_result1.is_ok());
316
317 // Should fail with duplicate ID
318 auto begin_result2 = manager.begin_transaction("tx1");
319 EXPECT_FALSE(begin_result2.is_ok());
320 EXPECT_EQ(begin_result2.error().code, static_cast<int>(monitoring_error_code::already_exists));
321}
322
323TEST_F(DataConsistencyTest, TransactionManagerDeadlockDetection) {
324 transaction_config config;
325 config.timeout = std::chrono::milliseconds(100);
326 transaction_manager manager("test_manager", config);
327
328 // Create long-running transaction
329 auto begin_result = manager.begin_transaction("long_tx");
330 EXPECT_TRUE(begin_result.is_ok());
331
332 // Wait longer than timeout
333 std::this_thread::sleep_for(std::chrono::milliseconds(250));
334
335 auto deadlocks = manager.detect_deadlocks();
336 EXPECT_TRUE(deadlocks.is_ok());
337 EXPECT_EQ(deadlocks.value().size(), 1);
338 EXPECT_EQ(deadlocks.value()[0], "long_tx");
339
340 const auto& metrics = manager.get_metrics();
341 EXPECT_EQ(metrics.deadlocks_detected.load(), 1);
342}
343
344TEST_F(DataConsistencyTest, TransactionManagerCleanup) {
345 transaction_config config;
346 transaction_manager manager("test_manager", config);
347
348 // Create and commit transaction
349 auto begin_result = manager.begin_transaction("tx1");
350 EXPECT_TRUE(begin_result.is_ok());
351
352 auto op = std::make_unique<transaction_operation>("op", [this]() { return test_operation(); });
353 begin_result.value()->add_operation(std::move(op));
354
355 manager.commit_transaction("tx1");
356 EXPECT_EQ(manager.completed_transaction_count(), 1);
357
358 // Cleanup should remove old transactions
359 manager.cleanup_completed_transactions(std::chrono::milliseconds(0));
360 EXPECT_EQ(manager.completed_transaction_count(), 0);
361}
362
363// Data Consistency Manager Tests
364TEST_F(DataConsistencyTest, DataConsistencyManagerTransactionManagers) {
365 data_consistency_manager consistency_manager("test_consistency");
366
367 transaction_config tx_config;
368 auto add_result = consistency_manager.add_transaction_manager("tx_manager", tx_config);
369 EXPECT_TRUE(add_result.is_ok());
370
371 auto* manager = consistency_manager.get_transaction_manager("tx_manager");
372 EXPECT_NE(manager, nullptr);
373 EXPECT_EQ(manager->get_name(), "tx_manager");
374
375 // Should fail to add duplicate
376 auto duplicate_result = consistency_manager.add_transaction_manager("tx_manager", tx_config);
377 EXPECT_FALSE(duplicate_result.is_ok());
378 EXPECT_EQ(duplicate_result.error().code, static_cast<int>(monitoring_error_code::already_exists));
379}
380
381TEST_F(DataConsistencyTest, DataConsistencyManagerStateValidators) {
382 data_consistency_manager consistency_manager("test_consistency");
383
384 validation_config val_config;
385 auto add_result = consistency_manager.add_state_validator("validator", val_config);
386 EXPECT_TRUE(add_result.is_ok());
387
388 auto* validator = consistency_manager.get_state_validator("validator");
389 EXPECT_NE(validator, nullptr);
390 EXPECT_EQ(validator->get_name(), "validator");
391}
392
393TEST_F(DataConsistencyTest, DataConsistencyManagerGlobalOperations) {
394 data_consistency_manager consistency_manager("test_consistency");
395
396 // Add validators
397 validation_config config;
398 config.validation_interval = std::chrono::milliseconds(100);
399
400 consistency_manager.add_state_validator("validator1", config);
401 consistency_manager.add_state_validator("validator2", config);
402
403 // Start all validators
404 auto start_result = consistency_manager.start_all_validators();
405 EXPECT_TRUE(start_result.is_ok());
406
407 std::this_thread::sleep_for(std::chrono::milliseconds(50));
408
409 // Stop all validators
410 auto stop_result = consistency_manager.stop_all_validators();
411 EXPECT_TRUE(stop_result.is_ok());
412}
413
414TEST_F(DataConsistencyTest, DataConsistencyManagerHealthCheck) {
415 data_consistency_manager consistency_manager("test_consistency");
416
417 // Add components
418 transaction_config tx_config;
419 consistency_manager.add_transaction_manager("tx_manager", tx_config);
420
421 validation_config val_config;
422 consistency_manager.add_state_validator("validator", val_config);
423
424 // Should be healthy initially
425 auto health = consistency_manager.is_healthy();
426 EXPECT_TRUE(health.is_ok());
427 EXPECT_TRUE(health.value());
428}
429
430TEST_F(DataConsistencyTest, DataConsistencyManagerMetrics) {
431 data_consistency_manager consistency_manager("test_consistency");
432
433 // Add components
434 transaction_config tx_config;
435 consistency_manager.add_transaction_manager("tx_manager", tx_config);
436
437 validation_config val_config;
438 consistency_manager.add_state_validator("validator", val_config);
439
440 // Get all metrics
441 auto all_metrics = consistency_manager.get_all_metrics();
442 EXPECT_EQ(all_metrics.size(), 2);
443 EXPECT_TRUE(all_metrics.find("tx_manager_transactions") != all_metrics.end());
444 EXPECT_TRUE(all_metrics.find("validator_validation") != all_metrics.end());
445}
446
447// Configuration Validation Tests
448TEST_F(DataConsistencyTest, TransactionConfigValidation) {
449 transaction_config config;
450
451 // Valid config
452 config.timeout = std::chrono::seconds(30);
453 config.lock_timeout = std::chrono::seconds(10);
454 config.max_retries = 3;
455 EXPECT_TRUE(config.validate());
456
457 // Invalid timeout
458 config.timeout = std::chrono::milliseconds(0);
459 EXPECT_FALSE(config.validate());
460
461 // Invalid lock timeout
462 config.timeout = std::chrono::seconds(30);
463 config.lock_timeout = std::chrono::milliseconds(0);
464 EXPECT_FALSE(config.validate());
465
466 // Invalid max retries
467 config.lock_timeout = std::chrono::seconds(10);
468 config.max_retries = 0;
469 EXPECT_FALSE(config.validate());
470}
471
472TEST_F(DataConsistencyTest, ValidationConfigValidation) {
473 validation_config config;
474
475 // Valid config
476 config.validation_interval = std::chrono::seconds(60);
477 config.max_validation_failures = 5;
478 config.corruption_threshold = 0.1;
479 EXPECT_TRUE(config.validate());
480
481 // Invalid validation interval
482 config.validation_interval = std::chrono::milliseconds(0);
483 EXPECT_FALSE(config.validate());
484
485 // Invalid max failures
486 config.validation_interval = std::chrono::seconds(60);
487 config.max_validation_failures = 0;
488 EXPECT_FALSE(config.validate());
489
490 // Invalid corruption threshold
491 config.max_validation_failures = 5;
492 config.corruption_threshold = -0.1;
493 EXPECT_FALSE(config.validate());
494
495 config.corruption_threshold = 1.1;
496 EXPECT_FALSE(config.validate());
497}
498
499// Concurrency Tests
500TEST_F(DataConsistencyTest, ConcurrentTransactions) {
501 transaction_config config;
502 transaction_manager manager("concurrent_manager", config);
503
504 const int num_threads = 5;
505 const int transactions_per_thread = 10;
506 std::atomic<int> successful_transactions{0};
507
508 std::vector<std::thread> threads;
509 for (int i = 0; i < num_threads; ++i) {
510 threads.emplace_back([&, i]() {
511 for (int j = 0; j < transactions_per_thread; ++j) {
512 std::string tx_id = "tx_" + std::to_string(i) + "_" + std::to_string(j);
513
514 auto begin_result = manager.begin_transaction(tx_id);
515 if (begin_result.is_ok()) {
516 auto tx = begin_result.value();
517
518 auto op = std::make_unique<transaction_operation>(
519 "op", [this]() { return test_operation(); });
520 tx->add_operation(std::move(op));
521
522 auto commit_result = manager.commit_transaction(tx_id);
523 if (commit_result) {
524 successful_transactions++;
525 }
526 }
527 }
528 });
529 }
530
531 for (auto& thread : threads) {
532 thread.join();
533 }
534
535 EXPECT_EQ(successful_transactions.load(), num_threads * transactions_per_thread);
536
537 const auto& metrics = manager.get_metrics();
538 EXPECT_EQ(metrics.total_transactions.load(), num_threads * transactions_per_thread);
539 EXPECT_EQ(metrics.committed_transactions.load(), num_threads * transactions_per_thread);
540}
541
542// Factory Function Tests
543TEST_F(DataConsistencyTest, FactoryFunctions) {
544 auto tx_manager = create_transaction_manager("factory_tx_manager");
545 EXPECT_NE(tx_manager, nullptr);
546 EXPECT_EQ(tx_manager->get_name(), "factory_tx_manager");
547
548 auto validator = create_state_validator("factory_validator");
549 EXPECT_NE(validator, nullptr);
550 EXPECT_EQ(validator->get_name(), "factory_validator");
551
552 auto consistency_manager = create_data_consistency_manager("factory_consistency");
553 EXPECT_NE(consistency_manager, nullptr);
554}
std::atomic< int > rollback_count
kcenon::common::VoidResult failing_operation()
std::atomic< int > call_count
kcenon::common::VoidResult test_operation()
std::atomic< int > success_count
kcenon::common::VoidResult test_repair()
validation_result test_validation()
kcenon::common::VoidResult rollback_operation()
validation_result failing_validation()
Data consistency manager coordinating transaction managers and validators.
std::unordered_map< std::string, std::string > get_all_metrics() const
transaction_manager * get_transaction_manager(const std::string &name)
common::VoidResult add_transaction_manager(const std::string &name, const transaction_config &config)
state_validator * get_state_validator(const std::string &name)
common::Result< bool > is_healthy() const
common::VoidResult add_state_validator(const std::string &name, const validation_config &config)
State validator for validating system state.
bool add_validation_rule(const std::string &name, validation_func_t validation_func, repair_func_t repair_func=nullptr)
common::Result< bool > is_healthy() const
common::Result< std::unordered_map< std::string, validation_result > > validate()
Transaction manager for coordinating transactions.
common::Result< std::shared_ptr< transaction > > begin_transaction(const std::string &id)
bool commit_transaction(const std::string &id)
void cleanup_completed_transactions(std::chrono::milliseconds)
common::Result< std::vector< std::string > > detect_deadlocks()
bool abort_transaction(const std::string &id)
Transaction containing multiple operations.
bool add_operation(std::unique_ptr< transaction_operation > op)
transaction_state state() const
Transaction states and consistency validation for metric storage.
std::shared_ptr< state_validator > create_state_validator(const std::string &name)
Factory function to create a state validator.
validation_result
Validation result states.
std::shared_ptr< data_consistency_manager > create_data_consistency_manager(const std::string &name)
Factory function to create a data consistency manager.
std::shared_ptr< transaction_manager > create_transaction_manager(const std::string &name)
Factory function to create a transaction manager.
std::chrono::milliseconds lock_timeout
std::chrono::milliseconds validation_interval
TEST_F(DataConsistencyTest, TransactionOperationBasic)