Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
test_async_operations.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
16// Force std::thread fallback for unit testing — avoids external thread_system
17// library dependency while testing the same public API.
18#ifdef USE_THREAD_SYSTEM
19#undef USE_THREAD_SYSTEM
20#endif
21
22#include <gtest/gtest.h>
23#include <atomic>
24#include <chrono>
25#include <future>
26#include <string>
27#include <thread>
28#include <vector>
29
31
32using namespace database::async;
33
34//=============================================================================
35// async_result<T> Tests
36//=============================================================================
37
38class AsyncResultTest : public ::testing::Test {
39protected:
40 // Helper: create a ready async_result with the given value
41 template<typename T>
43 std::promise<T> p;
44 p.set_value(std::move(value));
45 return async_result<T>(p.get_future());
46 }
47
48 // Helper: create an async_result that throws
49 template<typename T>
51 std::promise<T> p;
52 p.set_exception(
53 std::make_exception_ptr(std::runtime_error("test_error")));
54 return async_result<T>(p.get_future());
55 }
56};
57
58// -- Constructor --
59
60TEST_F(AsyncResultTest, ConstructsFromFuture) {
61 std::promise<int> p;
62 auto future = p.get_future();
63 p.set_value(42);
64
65 async_result<int> result(std::move(future));
66 EXPECT_TRUE(result.is_ready());
67}
68
69// -- get() --
70
71TEST_F(AsyncResultTest, GetReturnsValue) {
72 auto result = make_ready<int>(99);
73 EXPECT_EQ(result.get(), 99);
74}
75
76TEST_F(AsyncResultTest, GetReturnsStringValue) {
77 auto result = make_ready<std::string>("hello");
78 EXPECT_EQ(result.get(), "hello");
79}
80
81TEST_F(AsyncResultTest, GetRethrowsException) {
82 auto result = make_failing<int>();
83 EXPECT_THROW(result.get(), std::runtime_error);
84}
85
86TEST_F(AsyncResultTest, GetInvokesSuccessCallback) {
87 auto result = make_ready<int>(42);
88
89 int captured = 0;
90 result.then([&captured](int v) { captured = v; });
91
92 EXPECT_EQ(result.get(), 42);
93 EXPECT_EQ(captured, 42);
94}
95
96TEST_F(AsyncResultTest, GetInvokesErrorCallback) {
97 auto result = make_failing<int>();
98
99 std::string captured_msg;
100 result.on_error([&captured_msg](const std::exception& e) {
101 captured_msg = e.what();
102 });
103
104 EXPECT_THROW(result.get(), std::runtime_error);
105 EXPECT_EQ(captured_msg, "test_error");
106}
107
108// -- get_for() --
109
110TEST_F(AsyncResultTest, GetForReturnsValueWithinTimeout) {
111 auto result = make_ready<int>(77);
112 EXPECT_EQ(result.get_for(std::chrono::milliseconds(100)), 77);
113}
114
115TEST_F(AsyncResultTest, GetForThrowsOnTimeout) {
116 std::promise<int> p;
117 async_result<int> result(p.get_future());
118
119 EXPECT_THROW(
120 result.get_for(std::chrono::milliseconds(10)),
121 std::runtime_error);
122}
123
124// -- is_ready() --
125
126TEST_F(AsyncResultTest, IsReadyReturnsFalseBeforeCompletion) {
127 std::promise<int> p;
128 async_result<int> result(p.get_future());
129 EXPECT_FALSE(result.is_ready());
130
131 p.set_value(1);
132 EXPECT_TRUE(result.is_ready());
133}
134
135TEST_F(AsyncResultTest, IsReadyReturnsTrueWhenReady) {
136 auto result = make_ready<int>(1);
137 EXPECT_TRUE(result.is_ready());
138}
139
140// -- wait_for() --
141
142TEST_F(AsyncResultTest, WaitForReturnsReadyWhenComplete) {
143 auto result = make_ready<int>(1);
144 auto status = result.wait_for(std::chrono::milliseconds(0));
145 EXPECT_EQ(status, std::future_status::ready);
146}
147
148TEST_F(AsyncResultTest, WaitForReturnsTimeoutWhenNotReady) {
149 std::promise<int> p;
150 async_result<int> result(p.get_future());
151
152 auto status = result.wait_for(std::chrono::milliseconds(1));
153 EXPECT_EQ(status, std::future_status::timeout);
154
155 p.set_value(0); // cleanup
156}
157
158// -- then() / on_error() concept-based --
159
160TEST_F(AsyncResultTest, ThenWithLambda) {
161 auto result = make_ready<int>(10);
162
163 int captured = 0;
164 result.then([&captured](int v) { captured = v; });
165 result.get();
166
167 EXPECT_EQ(captured, 10);
168}
169
170TEST_F(AsyncResultTest, OnErrorWithLambda) {
171 auto result = make_failing<int>();
172
173 bool error_caught = false;
174 result.on_error([&error_caught](const std::exception&) {
175 error_caught = true;
176 });
177
178 EXPECT_THROW(result.get(), std::runtime_error);
179 EXPECT_TRUE(error_caught);
180}
181
182// -- then() / on_error() legacy overloads --
183
184TEST_F(AsyncResultTest, ThenLegacyOverload) {
185 auto result = make_ready<int>(5);
186
187 int captured = 0;
188 std::function<void(int)> cb = [&captured](int v) { captured = v; };
189 result.then(cb);
190 result.get();
191
192 EXPECT_EQ(captured, 5);
193}
194
195TEST_F(AsyncResultTest, OnErrorLegacyOverload) {
196 auto result = make_failing<int>();
197
198 std::string msg;
199 std::function<void(const std::exception&)> cb =
200 [&msg](const std::exception& e) { msg = e.what(); };
201 result.on_error(cb);
202
203 EXPECT_THROW(result.get(), std::runtime_error);
204 EXPECT_EQ(msg, "test_error");
205}
206
207// -- Callbacks not invoked when not set --
208
209TEST_F(AsyncResultTest, NoCallbackDoesNotCrashOnSuccess) {
210 auto result = make_ready<int>(42);
211 EXPECT_NO_THROW(result.get());
212}
213
214TEST_F(AsyncResultTest, NoErrorCallbackDoesNotCrashOnFailure) {
215 auto result = make_failing<int>();
216 EXPECT_THROW(result.get(), std::runtime_error);
217}
218
219// -- Deferred completion with thread --
220
221TEST_F(AsyncResultTest, GetBlocksUntilValueAvailable) {
222 std::promise<int> p;
223 async_result<int> result(p.get_future());
224
225 std::thread setter([&p]() {
226 std::this_thread::sleep_for(std::chrono::milliseconds(20));
227 p.set_value(123);
228 });
229
230 EXPECT_EQ(result.get(), 123);
231 setter.join();
232}
233
234//=============================================================================
235// async_executor Tests
236//=============================================================================
237
238class AsyncExecutorTest : public ::testing::Test {
239protected:
240 void SetUp() override {
241 executor_ = std::make_unique<async_executor>(2);
242 }
243
244 void TearDown() override {
245 if (executor_) {
246 executor_->shutdown();
247 }
248 }
249
250 std::unique_ptr<async_executor> executor_;
251};
252
253// -- Construction --
254
255TEST_F(AsyncExecutorTest, ConstructsWithCustomThreadCount) {
256 EXPECT_EQ(executor_->thread_count(), 2u);
257}
258
259TEST_F(AsyncExecutorTest, ConstructsWithDefaultThreadCount) {
260 auto exec = std::make_unique<async_executor>();
261 EXPECT_EQ(exec->thread_count(), std::thread::hardware_concurrency());
262 exec->shutdown();
263}
264
265TEST_F(AsyncExecutorTest, IsNotUsingThreadSystem) {
266 EXPECT_FALSE(executor_->is_using_thread_system());
267}
268
269// -- submit() --
270
271TEST_F(AsyncExecutorTest, SubmitExecutesCallable) {
272 auto future = executor_->submit([]() { return 42; });
273 EXPECT_EQ(future.get(), 42);
274}
275
276TEST_F(AsyncExecutorTest, SubmitWithArguments) {
277 auto future = executor_->submit([](int a, int b) { return a + b; }, 3, 7);
278 EXPECT_EQ(future.get(), 10);
279}
280
281TEST_F(AsyncExecutorTest, SubmitReturnsString) {
282 auto future = executor_->submit([]() -> std::string {
283 return "async_result";
284 });
285 EXPECT_EQ(future.get(), "async_result");
286}
287
288TEST_F(AsyncExecutorTest, SubmitPropagatesException) {
289 auto future = executor_->submit([]() -> int {
290 throw std::runtime_error("task_failed");
291 });
292 EXPECT_THROW(future.get(), std::runtime_error);
293}
294
295// -- Multiple concurrent submissions --
296
297TEST_F(AsyncExecutorTest, MultipleConcurrentSubmissions) {
298 constexpr int NUM_TASKS = 100;
299 std::vector<std::future<int>> futures;
300 futures.reserve(NUM_TASKS);
301
302 for (int i = 0; i < NUM_TASKS; ++i) {
303 futures.push_back(executor_->submit([i]() { return i * 2; }));
304 }
305
306 for (int i = 0; i < NUM_TASKS; ++i) {
307 EXPECT_EQ(futures[i].get(), i * 2);
308 }
309}
310
311// -- wait_for_completion() --
312
313TEST_F(AsyncExecutorTest, WaitForCompletionBlocksUntilDone) {
314 std::atomic<int> counter{0};
315
316 for (int i = 0; i < 10; ++i) {
317 executor_->submit([&counter]() {
318 std::this_thread::sleep_for(std::chrono::milliseconds(5));
319 counter.fetch_add(1);
320 });
321 }
322
323 executor_->wait_for_completion();
324 // Allow brief settling time for tasks that are executing but haven't
325 // decremented the queue yet
326 std::this_thread::sleep_for(std::chrono::milliseconds(50));
327 EXPECT_EQ(counter.load(), 10);
328}
329
330// -- shutdown() --
331
332TEST_F(AsyncExecutorTest, ShutdownAfterSubmitCompletesGracefully) {
333 auto future = executor_->submit([]() { return 1; });
334 executor_->shutdown();
335 EXPECT_EQ(future.get(), 1);
336}
337
338TEST_F(AsyncExecutorTest, SubmitAfterShutdownThrows) {
339 executor_->shutdown();
340 EXPECT_THROW(
341 executor_->submit([]() { return 0; }),
342 std::runtime_error);
343}
344
345// -- thread_count() / pending_tasks() --
346
347TEST_F(AsyncExecutorTest, ThreadCountReturnsConfigured) {
348 async_executor exec4(4);
349 EXPECT_EQ(exec4.thread_count(), 4u);
350 exec4.shutdown();
351}
352
353//=============================================================================
354// Helper Function Tests
355//=============================================================================
356
357TEST(AsyncHelpersTest, MakeReadyResultIsImmediatelyAvailable) {
358 auto result = make_ready_result<int>(42);
359 EXPECT_TRUE(result.is_ready());
360 EXPECT_EQ(result.get(), 42);
361}
362
363TEST(AsyncHelpersTest, MakeReadyResultWithString) {
364 auto result = make_ready_result<std::string>("hello");
365 EXPECT_TRUE(result.is_ready());
366 EXPECT_EQ(result.get(), "hello");
367}
368
369TEST(AsyncHelpersTest, MakeErrorResultThrowsOnGet) {
370 // Note: make_error_result takes const std::exception& which causes
371 // object slicing — the thrown exception is std::exception, not the
372 // derived type. This tests the actual API behavior.
373 auto result = make_error_result<int>(std::runtime_error("err"));
374 EXPECT_TRUE(result.is_ready());
375 EXPECT_THROW(result.get(), std::exception);
376}
377
378TEST(AsyncHelpersTest, MakeErrorResultInvokesOnErrorCallback) {
379 auto result = make_error_result<int>(std::runtime_error("callback_err"));
380
381 bool error_caught = false;
382 result.on_error([&error_caught](const std::exception&) {
383 error_caught = true;
384 });
385
386 EXPECT_THROW(result.get(), std::exception);
387 EXPECT_TRUE(error_caught);
388}
389
390//=============================================================================
391// async_database Tests (#371)
392//=============================================================================
393
395
396namespace {
397
398// Stub backend for async_database testing.
399// Uses global-scope prefix to avoid the namespace alias conflict with
400// 'using database = unified_database_system'.
401class async_stub_backend : public ::database::core::database_backend {
402public:
403 ::database::database_types type() const override {
404 return ::database::database_types::sqlite;
405 }
406
407 kcenon::common::VoidResult initialize(
408 const ::database::core::connection_config&) override
409 {
410 std::lock_guard<std::mutex> lock(mutex_);
411 initialized_ = true;
412 return kcenon::common::ok();
413 }
414
415 kcenon::common::VoidResult shutdown() override {
416 std::lock_guard<std::mutex> lock(mutex_);
417 initialized_ = false;
418 return kcenon::common::ok();
419 }
420
421 bool is_initialized() const override {
422 std::lock_guard<std::mutex> lock(mutex_);
423 return initialized_;
424 }
425
426 kcenon::common::Result<::database::core::database_result> select_query(
427 const std::string&) override
428 {
431 row["id"] = int64_t{1};
432 row["name"] = std::string("stub_row");
433 rows.push_back(row);
434 return kcenon::common::Result<::database::core::database_result>(
435 std::move(rows));
436 }
437
438 kcenon::common::VoidResult execute_query(
439 const std::string& query) override
440 {
441 std::lock_guard<std::mutex> lock(mutex_);
442 if (should_fail_execute_) {
443 return kcenon::common::error_info{1, "execute_failed", "stub"};
444 }
445 last_executed_query_ = query;
446 return kcenon::common::ok();
447 }
448
449 kcenon::common::VoidResult begin_transaction() override {
450 std::lock_guard<std::mutex> lock(mutex_);
451 in_transaction_ = true;
452 return kcenon::common::ok();
453 }
454
455 kcenon::common::VoidResult commit_transaction() override {
456 std::lock_guard<std::mutex> lock(mutex_);
457 in_transaction_ = false;
458 return kcenon::common::ok();
459 }
460
461 kcenon::common::VoidResult rollback_transaction() override {
462 std::lock_guard<std::mutex> lock(mutex_);
463 in_transaction_ = false;
464 return kcenon::common::ok();
465 }
466
467 bool in_transaction() const override {
468 std::lock_guard<std::mutex> lock(mutex_);
469 return in_transaction_;
470 }
471 std::string last_error() const override { return ""; }
472
473 std::map<std::string, std::string> connection_info() const override {
474 return {{"type", "stub"}};
475 }
476
477 // Test helpers
478 void set_fail_execute(bool fail) {
479 std::lock_guard<std::mutex> lock(mutex_);
480 should_fail_execute_ = fail;
481 }
482 std::string get_last_query() const {
483 std::lock_guard<std::mutex> lock(mutex_);
484 return last_executed_query_;
485 }
486
487private:
488 mutable std::mutex mutex_;
489 bool initialized_ = false;
490 bool in_transaction_ = false;
491 bool should_fail_execute_ = false;
492 std::string last_executed_query_;
493};
494
495} // anonymous namespace
496
497class AsyncDatabaseTest : public ::testing::Test {
498protected:
499 void SetUp() override {
500 backend_ = std::make_shared<async_stub_backend>();
501 executor_ = std::make_shared<async_executor>(2);
502 db_ = std::make_unique<async_database>(backend_, executor_);
503 }
504
505 void TearDown() override {
506 db_.reset();
507 if (executor_) {
508 executor_->shutdown();
509 }
510 }
511
512 std::shared_ptr<async_stub_backend> backend_;
513 std::shared_ptr<async_executor> executor_;
514 std::unique_ptr<async_database> db_;
515};
516
517// -- Constructor --
518
519TEST_F(AsyncDatabaseTest, ConstructsWithBackendAndExecutor) {
520 EXPECT_NE(db_, nullptr);
521}
522
523// -- execute_async() --
524
525TEST_F(AsyncDatabaseTest, ExecuteAsyncReturnsTrue) {
526 auto result = db_->execute_async("CREATE TABLE t (id INT)");
527 EXPECT_TRUE(result.get());
528}
529
530TEST_F(AsyncDatabaseTest, ExecuteAsyncDelegatesToBackend) {
531 auto result = db_->execute_async("DROP TABLE IF EXISTS t");
532 result.get();
533 EXPECT_EQ(backend_->get_last_query(), "DROP TABLE IF EXISTS t");
534}
535
536TEST_F(AsyncDatabaseTest, ExecuteAsyncThrowsOnBackendFailure) {
537 backend_->set_fail_execute(true);
538 auto result = db_->execute_async("BAD QUERY");
539 EXPECT_THROW(result.get(), std::runtime_error);
540}
541
542// -- select_async() --
543
544TEST_F(AsyncDatabaseTest, SelectAsyncReturnsRows) {
545 auto result = db_->select_async("SELECT * FROM t");
546 auto rows = result.get();
547 ASSERT_EQ(rows.size(), 1u);
548 EXPECT_EQ(std::get<std::string>(rows[0].at("name")), "stub_row");
549}
550
551// -- execute_batch_async() --
552
553TEST_F(AsyncDatabaseTest, ExecuteBatchAsyncProcessesAllQueries) {
554 std::vector<std::string> queries = {
555 "INSERT INTO t VALUES (1)",
556 "INSERT INTO t VALUES (2)",
557 "INSERT INTO t VALUES (3)"
558 };
559 auto result = db_->execute_batch_async(queries);
560 auto results = result.get();
561
562 ASSERT_EQ(results.size(), 3u);
563 EXPECT_TRUE(results[0]);
564 EXPECT_TRUE(results[1]);
565 EXPECT_TRUE(results[2]);
566}
567
568TEST_F(AsyncDatabaseTest, ExecuteBatchAsyncReportsFailures) {
569 backend_->set_fail_execute(true);
570 std::vector<std::string> queries = {"Q1", "Q2"};
571 auto result = db_->execute_batch_async(queries);
572 auto results = result.get();
573
574 ASSERT_EQ(results.size(), 2u);
575 EXPECT_FALSE(results[0]);
576 EXPECT_FALSE(results[1]);
577}
578
579// -- select_batch_async() --
580
581TEST_F(AsyncDatabaseTest, SelectBatchAsyncReturnsMultipleResults) {
582 std::vector<std::string> queries = {
583 "SELECT * FROM t",
584 "SELECT * FROM t"
585 };
586 auto result = db_->select_batch_async(queries);
587 auto results = result.get();
588
589 ASSERT_EQ(results.size(), 2u);
590 EXPECT_EQ(results[0].size(), 1u);
591 EXPECT_EQ(results[1].size(), 1u);
592}
593
594// -- Transaction methods --
595
596TEST_F(AsyncDatabaseTest, BeginTransactionAsyncSucceeds) {
597 auto result = db_->begin_transaction_async();
598 EXPECT_TRUE(result.get());
599 EXPECT_TRUE(backend_->in_transaction());
600}
601
602TEST_F(AsyncDatabaseTest, CommitTransactionAsyncSucceeds) {
603 db_->begin_transaction_async().get();
604 auto result = db_->commit_transaction_async();
605 EXPECT_TRUE(result.get());
606 EXPECT_FALSE(backend_->in_transaction());
607}
608
609TEST_F(AsyncDatabaseTest, RollbackTransactionAsyncSucceeds) {
610 db_->begin_transaction_async().get();
611 auto result = db_->rollback_transaction_async();
612 EXPECT_TRUE(result.get());
613 EXPECT_FALSE(backend_->in_transaction());
614}
615
616// -- Connection management --
617
618TEST_F(AsyncDatabaseTest, ConnectAsyncInitializesBackend) {
619 auto result = db_->connect_async("host=localhost dbname=test");
620 EXPECT_TRUE(result.get());
621 EXPECT_TRUE(backend_->is_initialized());
622}
623
624TEST_F(AsyncDatabaseTest, DisconnectAsyncShutsDownBackend) {
625 db_->connect_async("host=localhost").get();
626 auto result = db_->disconnect_async();
627 EXPECT_TRUE(result.get());
628 EXPECT_FALSE(backend_->is_initialized());
629}
630
631// -- Concurrent operations --
632
633TEST_F(AsyncDatabaseTest, ConcurrentExecuteAsyncOperations) {
634 // async_result<T> is non-movable (contains std::mutex), so we call
635 // get() immediately per operation rather than collecting into a vector.
636 constexpr int NUM_OPS = 20;
637 std::atomic<int> success_count{0};
638
639 // Submit all operations, each getting its result in a detached context
640 std::vector<std::future<bool>> futures;
641 futures.reserve(NUM_OPS);
642 for (int i = 0; i < NUM_OPS; ++i) {
643 futures.push_back(executor_->submit([this, i]() -> bool {
644 auto result = backend_->execute_query(
645 "INSERT INTO t VALUES (" + std::to_string(i) + ")");
646 return result.is_ok();
647 }));
648 }
649
650 for (auto& f : futures) {
651 if (f.get()) {
652 success_count.fetch_add(1);
653 }
654 }
655 EXPECT_EQ(success_count.load(), NUM_OPS);
656}
657
658// -- Callback integration --
659
660TEST_F(AsyncDatabaseTest, ExecuteAsyncWithThenCallback) {
661 auto result = db_->execute_async("CREATE TABLE t2 (id INT)");
662
663 bool callback_invoked = false;
664 result.then([&callback_invoked](bool success) {
665 callback_invoked = true;
666 EXPECT_TRUE(success);
667 });
668
669 result.get();
670 EXPECT_TRUE(callback_invoked);
671}
672
673TEST_F(AsyncDatabaseTest, ExecuteAsyncWithOnErrorCallback) {
674 backend_->set_fail_execute(true);
675 auto result = db_->execute_async("BAD");
676
677 std::string error_msg;
678 result.on_error([&error_msg](const std::exception& e) {
679 error_msg = e.what();
680 });
681
682 EXPECT_THROW(result.get(), std::runtime_error);
683 EXPECT_EQ(error_msg, "execute_failed");
684}
685
686// ============================================================================
687// transaction_coordinator tests
688// ============================================================================
689
690namespace {
691
692// Controllable stub for transaction coordinator testing.
693// Each instance represents one participant in a distributed transaction.
694class txn_stub_backend : public ::database::core::database_backend {
695public:
696 ::database::database_types type() const override {
697 return ::database::database_types::sqlite;
698 }
699
700 kcenon::common::VoidResult initialize(
701 const ::database::core::connection_config&) override
702 {
703 return kcenon::common::ok();
704 }
705
706 kcenon::common::VoidResult shutdown() override {
707 return kcenon::common::ok();
708 }
709
710 bool is_initialized() const override { return true; }
711
712 kcenon::common::Result<::database::core::database_result> select_query(
713 const std::string&) override
714 {
715 return kcenon::common::Result<::database::core::database_result>(
717 }
718
719 kcenon::common::VoidResult execute_query(const std::string&) override {
720 return kcenon::common::ok();
721 }
722
723 kcenon::common::VoidResult begin_transaction() override {
724 std::lock_guard<std::mutex> lock(mutex_);
725 if (should_fail_begin_) {
726 return kcenon::common::error_info{1, "begin_failed", "stub"};
727 }
728 begin_count_++;
729 in_txn_ = true;
730 return kcenon::common::ok();
731 }
732
733 kcenon::common::VoidResult commit_transaction() override {
734 std::lock_guard<std::mutex> lock(mutex_);
735 if (should_fail_commit_) {
736 return kcenon::common::error_info{1, "commit_failed", "stub"};
737 }
738 commit_count_++;
739 in_txn_ = false;
740 return kcenon::common::ok();
741 }
742
743 kcenon::common::VoidResult rollback_transaction() override {
744 std::lock_guard<std::mutex> lock(mutex_);
745 rollback_count_++;
746 in_txn_ = false;
747 return kcenon::common::ok();
748 }
749
750 bool in_transaction() const override {
751 std::lock_guard<std::mutex> lock(mutex_);
752 return in_txn_;
753 }
754
755 std::string last_error() const override { return ""; }
756 std::map<std::string, std::string> connection_info() const override {
757 return {{"type", "txn_stub"}};
758 }
759
760 // Test controls
761 void set_fail_begin(bool fail) {
762 std::lock_guard<std::mutex> lock(mutex_);
763 should_fail_begin_ = fail;
764 }
765 void set_fail_commit(bool fail) {
766 std::lock_guard<std::mutex> lock(mutex_);
767 should_fail_commit_ = fail;
768 }
769 int begin_count() const {
770 std::lock_guard<std::mutex> lock(mutex_);
771 return begin_count_;
772 }
773 int commit_count() const {
774 std::lock_guard<std::mutex> lock(mutex_);
775 return commit_count_;
776 }
777 int rollback_count() const {
778 std::lock_guard<std::mutex> lock(mutex_);
779 return rollback_count_;
780 }
781
782private:
783 mutable std::mutex mutex_;
784 bool in_txn_ = false;
785 bool should_fail_begin_ = false;
786 bool should_fail_commit_ = false;
787 int begin_count_ = 0;
788 int commit_count_ = 0;
789 int rollback_count_ = 0;
790};
791
792} // anonymous namespace
793
794class TransactionCoordinatorTest : public ::testing::Test {
795protected:
796 void SetUp() override {
797 p1_ = std::make_shared<txn_stub_backend>();
798 p2_ = std::make_shared<txn_stub_backend>();
799 p3_ = std::make_shared<txn_stub_backend>();
800 participants_ = {p1_, p2_, p3_};
801 }
802
804 std::shared_ptr<txn_stub_backend> p1_;
805 std::shared_ptr<txn_stub_backend> p2_;
806 std::shared_ptr<txn_stub_backend> p3_;
807 std::vector<std::shared_ptr<::database::core::database_backend>> participants_;
808};
809
810// -- begin_distributed_transaction --
811
812TEST_F(TransactionCoordinatorTest, BeginCreatesTransaction) {
813 auto txn_id = coord_.begin_distributed_transaction(participants_);
814 EXPECT_FALSE(txn_id.empty());
815
816 auto active = coord_.get_active_transactions();
817 ASSERT_EQ(active.size(), 1u);
818 EXPECT_EQ(active[0].transaction_id, txn_id);
819 EXPECT_EQ(active[0].participants.size(), 3u);
820 EXPECT_EQ(active[0].state, transaction_coordinator::transaction_state::active);
821}
822
823TEST_F(TransactionCoordinatorTest, BeginGeneratesUniqueIds) {
824 auto id1 = coord_.begin_distributed_transaction(participants_);
825 auto id2 = coord_.begin_distributed_transaction(participants_);
826 EXPECT_NE(id1, id2);
827 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
828}
829
830// -- prepare_phase --
831
832TEST_F(TransactionCoordinatorTest, PreparePhaseSuccess) {
833 auto txn_id = coord_.begin_distributed_transaction(participants_);
834 bool result = coord_.prepare_phase(txn_id).get();
835 EXPECT_TRUE(result);
836
837 auto txns = coord_.get_active_transactions();
838 ASSERT_EQ(txns.size(), 1u);
839 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::prepared);
840 EXPECT_EQ(p1_->begin_count(), 1);
841 EXPECT_EQ(p2_->begin_count(), 1);
842 EXPECT_EQ(p3_->begin_count(), 1);
843}
844
845TEST_F(TransactionCoordinatorTest, PreparePhaseThrowsForUnknownTransaction) {
846 EXPECT_THROW(coord_.prepare_phase("nonexistent").get(), std::exception);
847}
848
849TEST_F(TransactionCoordinatorTest, PreparePhaseRollsBackOnPartialFailure) {
850 p2_->set_fail_begin(true);
851 auto txn_id = coord_.begin_distributed_transaction(participants_);
852 bool result = coord_.prepare_phase(txn_id).get();
853 EXPECT_FALSE(result);
854
855 // p1 was prepared then rolled back; p2 failed; p3 never reached
856 EXPECT_EQ(p1_->begin_count(), 1);
857 EXPECT_EQ(p1_->rollback_count(), 1);
858 EXPECT_EQ(p2_->begin_count(), 0);
859 EXPECT_EQ(p3_->begin_count(), 0);
860
861 auto txns = coord_.get_active_transactions();
862 ASSERT_EQ(txns.size(), 1u);
863 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
864}
865
866// -- commit_phase --
867
868TEST_F(TransactionCoordinatorTest, CommitPhaseSuccess) {
869 auto txn_id = coord_.begin_distributed_transaction(participants_);
870 coord_.prepare_phase(txn_id).get();
871
872 bool result = coord_.commit_phase(txn_id).get();
873 EXPECT_TRUE(result);
874
875 EXPECT_EQ(p1_->commit_count(), 1);
876 EXPECT_EQ(p2_->commit_count(), 1);
877 EXPECT_EQ(p3_->commit_count(), 1);
878
879 auto txns = coord_.get_active_transactions();
880 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::committed);
881}
882
883TEST_F(TransactionCoordinatorTest, CommitPhaseFailsIfNotPrepared) {
884 auto txn_id = coord_.begin_distributed_transaction(participants_);
885 // Skip prepare_phase — state is still "active"
886 bool result = coord_.commit_phase(txn_id).get();
887 EXPECT_FALSE(result);
888 EXPECT_EQ(p1_->commit_count(), 0);
889}
890
891// -- commit_distributed_transaction (full 2PC) --
892
893TEST_F(TransactionCoordinatorTest, CommitDistributedTransactionFull2PC) {
894 auto txn_id = coord_.begin_distributed_transaction(participants_);
895 bool result = coord_.commit_distributed_transaction(txn_id).get();
896 EXPECT_TRUE(result);
897
898 EXPECT_EQ(p1_->begin_count(), 1);
899 EXPECT_EQ(p1_->commit_count(), 1);
900 EXPECT_EQ(p2_->begin_count(), 1);
901 EXPECT_EQ(p2_->commit_count(), 1);
902}
903
904TEST_F(TransactionCoordinatorTest, CommitDistributedTransactionFailsOnPrepare) {
905 p3_->set_fail_begin(true);
906 auto txn_id = coord_.begin_distributed_transaction(participants_);
907 bool result = coord_.commit_distributed_transaction(txn_id).get();
908 EXPECT_FALSE(result);
909
910 // p1, p2 prepared then rolled back
911 EXPECT_EQ(p1_->rollback_count(), 1);
912 EXPECT_EQ(p2_->rollback_count(), 1);
913 EXPECT_EQ(p3_->commit_count(), 0);
914}
915
916// -- rollback_distributed_transaction --
917
918TEST_F(TransactionCoordinatorTest, RollbackDistributedTransaction) {
919 auto txn_id = coord_.begin_distributed_transaction(participants_);
920 coord_.prepare_phase(txn_id).get();
921
922 bool result = coord_.rollback_distributed_transaction(txn_id).get();
923 EXPECT_TRUE(result);
924
925 EXPECT_EQ(p1_->rollback_count(), 1);
926 EXPECT_EQ(p2_->rollback_count(), 1);
927 EXPECT_EQ(p3_->rollback_count(), 1);
928
929 auto txns = coord_.get_active_transactions();
930 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
931}
932
933TEST_F(TransactionCoordinatorTest, RollbackThrowsForUnknownTransaction) {
934 EXPECT_THROW(
935 coord_.rollback_distributed_transaction("nonexistent").get(),
936 std::exception);
937}
938
939// -- recover_transactions --
940
941TEST_F(TransactionCoordinatorTest, RecoverCleansUpCompletedTransactions) {
942 auto id1 = coord_.begin_distributed_transaction(participants_);
943 auto id2 = coord_.begin_distributed_transaction(participants_);
944 coord_.commit_distributed_transaction(id1).get();
945
946 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
947 coord_.recover_transactions();
948
949 // id1 is committed → cleaned up; id2 is still active
950 auto remaining = coord_.get_active_transactions();
951 ASSERT_EQ(remaining.size(), 1u);
952 EXPECT_EQ(remaining[0].transaction_id, id2);
953}
954
955// -- create_saga --
956
957TEST_F(TransactionCoordinatorTest, CreateSagaReturnsSagaBuilder) {
958 auto saga = coord_.create_saga();
959 // Verify the saga builder can execute (empty saga succeeds)
960 bool result = saga.execute().get();
961 EXPECT_TRUE(result);
962}
963
964// ============================================================================
965// saga_builder tests
966// ============================================================================
967
968class SagaBuilderTest : public ::testing::Test {
969protected:
971};
972
973TEST_F(SagaBuilderTest, EmptySagaSucceeds) {
974 auto saga = coord_.create_saga();
975 bool result = saga.execute().get();
976 EXPECT_TRUE(result);
977}
978
979TEST_F(SagaBuilderTest, AllStepsSucceed) {
980 std::vector<int> execution_order;
981
982 auto saga = coord_.create_saga();
983 saga.add_step(
984 [&]() -> async_result<bool> {
985 execution_order.push_back(1);
986 return make_ready_result(true);
987 },
988 [&]() -> async_result<bool> {
989 execution_order.push_back(-1);
990 return make_ready_result(true);
991 }
992 ).add_step(
993 [&]() -> async_result<bool> {
994 execution_order.push_back(2);
995 return make_ready_result(true);
996 },
997 [&]() -> async_result<bool> {
998 execution_order.push_back(-2);
999 return make_ready_result(true);
1000 }
1001 ).add_step(
1002 [&]() -> async_result<bool> {
1003 execution_order.push_back(3);
1004 return make_ready_result(true);
1005 },
1006 [&]() -> async_result<bool> {
1007 execution_order.push_back(-3);
1008 return make_ready_result(true);
1009 }
1010 );
1011
1012 bool result = saga.execute().get();
1013 EXPECT_TRUE(result);
1014 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3}));
1015}
1016
1017TEST_F(SagaBuilderTest, CompensatesOnActionFailure) {
1018 std::vector<int> execution_order;
1019
1020 auto saga = coord_.create_saga();
1021 saga.add_step(
1022 [&]() -> async_result<bool> {
1023 execution_order.push_back(1);
1024 return make_ready_result(true);
1025 },
1026 [&]() -> async_result<bool> {
1027 execution_order.push_back(-1);
1028 return make_ready_result(true);
1029 }
1030 ).add_step(
1031 [&]() -> async_result<bool> {
1032 execution_order.push_back(2);
1033 return make_ready_result(true);
1034 },
1035 [&]() -> async_result<bool> {
1036 execution_order.push_back(-2);
1037 return make_ready_result(true);
1038 }
1039 ).add_step(
1040 [&]() -> async_result<bool> {
1041 execution_order.push_back(3);
1042 return make_ready_result(false); // Step 3 fails
1043 },
1044 [&]() -> async_result<bool> {
1045 execution_order.push_back(-3);
1046 return make_ready_result(true);
1047 }
1048 );
1049
1050 bool result = saga.execute().get();
1051 EXPECT_FALSE(result);
1052 // Steps 1,2 executed; step 3 failed; compensate 2,1 in reverse
1053 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1054}
1055
1056TEST_F(SagaBuilderTest, CompensatesOnException) {
1057 std::vector<int> execution_order;
1058
1059 auto saga = coord_.create_saga();
1060 saga.add_step(
1061 [&]() -> async_result<bool> {
1062 execution_order.push_back(1);
1063 return make_ready_result(true);
1064 },
1065 [&]() -> async_result<bool> {
1066 execution_order.push_back(-1);
1067 return make_ready_result(true);
1068 }
1069 ).add_step(
1070 [&]() -> async_result<bool> {
1071 execution_order.push_back(2);
1072 throw std::runtime_error("step 2 exploded");
1073 return make_ready_result(true);
1074 },
1075 [&]() -> async_result<bool> {
1076 execution_order.push_back(-2);
1077 return make_ready_result(true);
1078 }
1079 );
1080
1081 bool result = saga.execute().get();
1082 EXPECT_FALSE(result);
1083 // Step 1 executed; step 2 threw; compensate 1 in reverse
1084 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, -1}));
1085}
1086
1087TEST_F(SagaBuilderTest, SingleStepSuccess) {
1088 bool action_called = false;
1089 auto saga = coord_.create_saga();
1090 saga.add_step(
1091 [&]() -> async_result<bool> {
1092 action_called = true;
1093 return make_ready_result(true);
1094 },
1095 [&]() -> async_result<bool> {
1096 return make_ready_result(true);
1097 }
1098 );
1099
1100 bool result = saga.execute().get();
1101 EXPECT_TRUE(result);
1102 EXPECT_TRUE(action_called);
1103}
1104
1105TEST_F(SagaBuilderTest, CompensationExceptionDoesNotBreakChain) {
1106 std::vector<int> execution_order;
1107
1108 auto saga = coord_.create_saga();
1109 saga.add_step(
1110 [&]() -> async_result<bool> {
1111 execution_order.push_back(1);
1112 return make_ready_result(true);
1113 },
1114 [&]() -> async_result<bool> {
1115 execution_order.push_back(-1);
1116 return make_ready_result(true);
1117 }
1118 ).add_step(
1119 [&]() -> async_result<bool> {
1120 execution_order.push_back(2);
1121 return make_ready_result(true);
1122 },
1123 [&]() -> async_result<bool> {
1124 execution_order.push_back(-2);
1125 throw std::runtime_error("compensation 2 failed");
1126 return make_ready_result(true);
1127 }
1128 ).add_step(
1129 [&]() -> async_result<bool> {
1130 execution_order.push_back(3);
1131 return make_ready_result(false); // fails
1132 },
1133 [&]() -> async_result<bool> {
1134 execution_order.push_back(-3);
1135 return make_ready_result(true);
1136 }
1137 );
1138
1139 bool result = saga.execute().get();
1140 EXPECT_FALSE(result);
1141 // Compensation -2 throws, but -1 still executes
1142 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1143}
1144
1145TEST_F(SagaBuilderTest, AddStepReturnsSelfForChaining) {
1146 auto saga = coord_.create_saga();
1147 auto& ref = saga.add_step(
1148 []() -> async_result<bool> { return make_ready_result(true); },
1149 []() -> async_result<bool> { return make_ready_result(true); }
1150 );
1151 EXPECT_EQ(&ref, &saga);
1152}
1153
1154//=============================================================================
1155// stream_processor Tests
1156//=============================================================================
1157
1158class StreamProcessorTest : public ::testing::Test {
1159protected:
1160 void SetUp() override {
1161 backend_ = std::make_shared<async_stub_backend>();
1162 processor_ = std::make_unique<stream_processor>(backend_);
1163 }
1164
1165 void TearDown() override {
1166 processor_.reset();
1167 }
1168
1169 // Wait for an atomic flag to become true, with timeout
1170 bool wait_for_flag(const std::atomic<bool>& flag,
1171 std::chrono::milliseconds timeout = std::chrono::milliseconds(500))
1172 {
1173 auto deadline = std::chrono::steady_clock::now() + timeout;
1174 while (!flag.load() && std::chrono::steady_clock::now() < deadline) {
1175 std::this_thread::sleep_for(std::chrono::milliseconds(5));
1176 }
1177 return flag.load();
1178 }
1179
1180 // Wait for an atomic counter to reach a target value, with timeout
1181 bool wait_for_count(const std::atomic<int>& counter, int target,
1182 std::chrono::milliseconds timeout = std::chrono::milliseconds(500))
1183 {
1184 auto deadline = std::chrono::steady_clock::now() + timeout;
1185 while (counter.load() < target && std::chrono::steady_clock::now() < deadline) {
1186 std::this_thread::sleep_for(std::chrono::milliseconds(5));
1187 }
1188 return counter.load() >= target;
1189 }
1190
1191 std::shared_ptr<async_stub_backend> backend_;
1192 std::unique_ptr<stream_processor> processor_;
1193};
1194
1195// -- Construction --
1196
1197TEST_F(StreamProcessorTest, ConstructsWithBackend) {
1198 EXPECT_NE(processor_, nullptr);
1199}
1200
1201// -- start_stream / stop_stream --
1202
1203TEST_F(StreamProcessorTest, StartStreamReturnsTrue) {
1204 EXPECT_TRUE(processor_->start_stream(
1205 stream_processor::stream_type::custom, "test_channel"));
1206 processor_->stop_all_streams();
1207}
1208
1209TEST_F(StreamProcessorTest, StartStreamDuplicateReturnsFalse) {
1210 EXPECT_TRUE(processor_->start_stream(
1211 stream_processor::stream_type::custom, "ch1"));
1212 EXPECT_FALSE(processor_->start_stream(
1213 stream_processor::stream_type::custom, "ch1"));
1214 processor_->stop_all_streams();
1215}
1216
1217TEST_F(StreamProcessorTest, StopStreamReturnsTrue) {
1218 processor_->start_stream(
1219 stream_processor::stream_type::custom, "ch1");
1220 EXPECT_TRUE(processor_->stop_stream("ch1"));
1221}
1222
1223TEST_F(StreamProcessorTest, StopStreamNonExistentReturnsFalse) {
1224 EXPECT_FALSE(processor_->stop_stream("no_such_channel"));
1225}
1226
1227TEST_F(StreamProcessorTest, StopAllStreamsStopsMultiple) {
1228 processor_->start_stream(
1229 stream_processor::stream_type::custom, "ch1");
1230 processor_->start_stream(
1231 stream_processor::stream_type::redis_pubsub, "ch2");
1232 processor_->start_stream(
1233 stream_processor::stream_type::postgresql_notify, "ch3");
1234
1235 // Should not hang or crash
1236 processor_->stop_all_streams();
1237
1238 // All stopped — stopping again returns false
1239 EXPECT_FALSE(processor_->stop_stream("ch1"));
1240 EXPECT_FALSE(processor_->stop_stream("ch2"));
1241 EXPECT_FALSE(processor_->stop_stream("ch3"));
1242}
1243
1244// -- Event handler --
1245
1246TEST_F(StreamProcessorTest, EventHandlerReceivesConnectedEvent) {
1247 std::atomic<bool> received{false};
1248 std::string captured_payload;
1249 std::mutex capture_mutex;
1250
1251 processor_->register_event_handler("ch1",
1252 [&](const stream_processor::stream_event& event) {
1253 std::lock_guard<std::mutex> lock(capture_mutex);
1254 captured_payload = event.payload;
1255 received.store(true);
1256 });
1257
1258 processor_->start_stream(
1259 stream_processor::stream_type::custom, "ch1");
1260
1261 ASSERT_TRUE(wait_for_flag(received));
1262 {
1263 std::lock_guard<std::mutex> lock(capture_mutex);
1264 EXPECT_EQ(captured_payload, "stream_connected");
1265 }
1266 processor_->stop_all_streams();
1267}
1268
1269TEST_F(StreamProcessorTest, EventHandlerReceivesCorrectChannel) {
1270 std::atomic<bool> received{false};
1271 std::string captured_channel;
1272 std::mutex capture_mutex;
1273
1274 processor_->register_event_handler("notifications",
1275 [&](const stream_processor::stream_event& event) {
1276 std::lock_guard<std::mutex> lock(capture_mutex);
1277 captured_channel = event.channel;
1278 received.store(true);
1279 });
1280
1281 processor_->start_stream(
1282 stream_processor::stream_type::postgresql_notify, "notifications");
1283
1284 ASSERT_TRUE(wait_for_flag(received));
1285 {
1286 std::lock_guard<std::mutex> lock(capture_mutex);
1287 EXPECT_EQ(captured_channel, "notifications");
1288 }
1289 processor_->stop_all_streams();
1290}
1291
1292TEST_F(StreamProcessorTest, EventHandlerReceivesCorrectType) {
1293 std::atomic<bool> received{false};
1294 stream_processor::stream_type captured_type;
1295
1296 processor_->register_event_handler("ch1",
1297 [&](const stream_processor::stream_event& event) {
1298 captured_type = event.type;
1299 received.store(true);
1300 });
1301
1302 processor_->start_stream(
1303 stream_processor::stream_type::mongodb_change_stream, "ch1");
1304
1305 ASSERT_TRUE(wait_for_flag(received));
1306 EXPECT_EQ(captured_type,
1307 stream_processor::stream_type::mongodb_change_stream);
1308 processor_->stop_all_streams();
1309}
1310
1311// -- Global handler --
1312
1313TEST_F(StreamProcessorTest, GlobalHandlerReceivesAllEvents) {
1314 std::atomic<int> event_count{0};
1315
1316 processor_->register_global_handler(
1317 [&](const stream_processor::stream_event&) {
1318 event_count.fetch_add(1);
1319 });
1320
1321 processor_->start_stream(
1322 stream_processor::stream_type::custom, "ch1");
1323 processor_->start_stream(
1324 stream_processor::stream_type::custom, "ch2");
1325
1326 ASSERT_TRUE(wait_for_count(event_count, 2));
1327 EXPECT_GE(event_count.load(), 2);
1328 processor_->stop_all_streams();
1329}
1330
1331// -- Event filter --
1332
1333TEST_F(StreamProcessorTest, EventFilterRejectsEvent) {
1334 std::atomic<bool> handler_called{false};
1335
1336 processor_->add_event_filter("ch1",
1338 return false; // Reject all events
1339 });
1340
1341 processor_->register_event_handler("ch1",
1342 [&](const stream_processor::stream_event&) {
1343 handler_called.store(true);
1344 });
1345
1346 processor_->start_stream(
1347 stream_processor::stream_type::custom, "ch1");
1348
1349 // Give enough time for the event to potentially arrive
1350 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1351 EXPECT_FALSE(handler_called.load());
1352 processor_->stop_all_streams();
1353}
1354
1355TEST_F(StreamProcessorTest, EventFilterAcceptsEvent) {
1356 std::atomic<bool> handler_called{false};
1357
1358 processor_->add_event_filter("ch1",
1359 [](const stream_processor::stream_event& event) {
1360 return event.payload == "stream_connected";
1361 });
1362
1363 processor_->register_event_handler("ch1",
1364 [&](const stream_processor::stream_event&) {
1365 handler_called.store(true);
1366 });
1367
1368 processor_->start_stream(
1369 stream_processor::stream_type::custom, "ch1");
1370
1371 ASSERT_TRUE(wait_for_flag(handler_called));
1372 processor_->stop_all_streams();
1373}
1374
1375TEST_F(StreamProcessorTest, FilterDoesNotAffectGlobalHandler) {
1376 std::atomic<bool> global_called{false};
1377
1378 processor_->add_event_filter("ch1",
1380 return false; // Reject
1381 });
1382
1383 processor_->register_global_handler(
1384 [&](const stream_processor::stream_event&) {
1385 global_called.store(true);
1386 });
1387
1388 processor_->start_stream(
1389 stream_processor::stream_type::custom, "ch1");
1390
1391 // Global handler should NOT be called when filter rejects
1392 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1393 EXPECT_FALSE(global_called.load());
1394 processor_->stop_all_streams();
1395}
1396
1397// -- Per-channel independence --
1398
1399TEST_F(StreamProcessorTest, StopOneStreamDoesNotAffectOthers) {
1400 std::atomic<int> ch1_count{0};
1401 std::atomic<int> ch2_count{0};
1402
1403 processor_->register_event_handler("ch1",
1404 [&](const stream_processor::stream_event&) {
1405 ch1_count.fetch_add(1);
1406 });
1407 processor_->register_event_handler("ch2",
1408 [&](const stream_processor::stream_event&) {
1409 ch2_count.fetch_add(1);
1410 });
1411
1412 processor_->start_stream(
1413 stream_processor::stream_type::custom, "ch1");
1414 processor_->start_stream(
1415 stream_processor::stream_type::custom, "ch2");
1416
1417 // Wait for both connected events
1418 ASSERT_TRUE(wait_for_count(ch1_count, 1));
1419 ASSERT_TRUE(wait_for_count(ch2_count, 1));
1420
1421 // Stop only ch1 — ch2 should still be alive
1422 EXPECT_TRUE(processor_->stop_stream("ch1"));
1423
1424 // Verify ch1 is stopped
1425 EXPECT_FALSE(processor_->stop_stream("ch1"));
1426
1427 // ch2 stream should still be running (can be stopped)
1428 EXPECT_TRUE(processor_->stop_stream("ch2"));
1429}
1430
1431// -- Destructor safety --
1432
1433TEST_F(StreamProcessorTest, DestructorStopsAllStreams) {
1434 auto local_backend = std::make_shared<async_stub_backend>();
1435 {
1436 stream_processor sp(local_backend);
1437 sp.start_stream(
1438 stream_processor::stream_type::custom, "ch1");
1439 sp.start_stream(
1440 stream_processor::stream_type::custom, "ch2");
1441 // Destructor runs here — must not crash or hang
1442 }
1443 SUCCEED();
1444}
1445
1446// -- Template concept overloads --
1447
1448TEST_F(StreamProcessorTest, TemplateEventHandlerWithLambda) {
1449 std::atomic<bool> received{false};
1450
1451 // Uses the concept-constrained template overload
1452 auto handler = [&](const stream_processor::stream_event&) {
1453 received.store(true);
1454 };
1455 processor_->register_event_handler("ch1", handler);
1456
1457 processor_->start_stream(
1458 stream_processor::stream_type::custom, "ch1");
1459
1460 ASSERT_TRUE(wait_for_flag(received));
1461 processor_->stop_all_streams();
1462}
1463
1464TEST_F(StreamProcessorTest, TemplateGlobalHandlerWithLambda) {
1465 std::atomic<int> count{0};
1466
1467 auto handler = [&](const stream_processor::stream_event&) {
1468 count.fetch_add(1);
1469 };
1470 processor_->register_global_handler(handler);
1471
1472 processor_->start_stream(
1473 stream_processor::stream_type::custom, "ch1");
1474
1475 ASSERT_TRUE(wait_for_count(count, 1));
1476 processor_->stop_all_streams();
1477}
1478
1479TEST_F(StreamProcessorTest, TemplateEventFilterWithLambda) {
1480 std::atomic<bool> handler_called{false};
1481
1482 auto filter = [](const stream_processor::stream_event& event) {
1483 return event.payload == "stream_connected";
1484 };
1485 processor_->add_event_filter("ch1", filter);
1486
1487 processor_->register_event_handler("ch1",
1488 [&](const stream_processor::stream_event&) {
1489 handler_called.store(true);
1490 });
1491
1492 processor_->start_stream(
1493 stream_processor::stream_type::custom, "ch1");
1494
1495 ASSERT_TRUE(wait_for_flag(handler_called));
1496 processor_->stop_all_streams();
1497}
std::unique_ptr< async_database > db_
std::shared_ptr< async_executor > executor_
std::shared_ptr< async_stub_backend > backend_
std::unique_ptr< async_executor > executor_
async_result< T > make_ready(T value)
async_result< T > make_failing()
transaction_coordinator coord_
bool wait_for_flag(const std::atomic< bool > &flag, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
std::unique_ptr< stream_processor > processor_
std::shared_ptr< async_stub_backend > backend_
bool wait_for_count(const std::atomic< int > &counter, int target, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
std::shared_ptr< txn_stub_backend > p1_
std::shared_ptr< txn_stub_backend > p2_
std::shared_ptr< txn_stub_backend > p3_
std::vector< std::shared_ptr<::database::core::database_backend > > participants_
High-performance asynchronous executor using thread_system.
size_t thread_count() const
Returns the number of worker threads.
void shutdown()
Gracefully shuts down the executor.
Template class for asynchronous operation results.
T get_for(std::chrono::milliseconds timeout)
std::future_status wait_for(std::chrono::milliseconds timeout) const
async_result< bool > execute()
Real-time data stream processing.
bool start_stream(stream_type type, const std::string &channel)
Distributed transaction coordination.
Abstract base class for database backends.
Abstract interface for database backends.
async_result< T > make_ready_result(T value)
async_result< T > make_error_result(const std::exception &error)
std::vector< database_row > database_result
std::map< std::string, database_value > database_row
database_types
Represents various database backends or modes.
TEST_F(AsyncResultTest, ConstructsFromFuture)
#define ASSERT_EQ(expected, actual, message)
#define ASSERT_TRUE(condition, message)
#define TEST(name)