Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
test_async_operations.cpp File Reference
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <future>
#include <string>
#include <thread>
#include <vector>
#include "database/async/async_operations.h"
#include "database/core/database_backend.h"
Include dependency graph for test_async_operations.cpp:

Go to the source code of this file.

Classes

class  AsyncResultTest
 
class  AsyncExecutorTest
 
class  AsyncDatabaseTest
 
class  TransactionCoordinatorTest
 
class  SagaBuilderTest
 
class  StreamProcessorTest
 

Functions

 TEST_F (AsyncResultTest, ConstructsFromFuture)
 
 TEST_F (AsyncResultTest, GetReturnsValue)
 
 TEST_F (AsyncResultTest, GetReturnsStringValue)
 
 TEST_F (AsyncResultTest, GetRethrowsException)
 
 TEST_F (AsyncResultTest, GetInvokesSuccessCallback)
 
 TEST_F (AsyncResultTest, GetInvokesErrorCallback)
 
 TEST_F (AsyncResultTest, GetForReturnsValueWithinTimeout)
 
 TEST_F (AsyncResultTest, GetForThrowsOnTimeout)
 
 TEST_F (AsyncResultTest, IsReadyReturnsFalseBeforeCompletion)
 
 TEST_F (AsyncResultTest, IsReadyReturnsTrueWhenReady)
 
 TEST_F (AsyncResultTest, WaitForReturnsReadyWhenComplete)
 
 TEST_F (AsyncResultTest, WaitForReturnsTimeoutWhenNotReady)
 
 TEST_F (AsyncResultTest, ThenWithLambda)
 
 TEST_F (AsyncResultTest, OnErrorWithLambda)
 
 TEST_F (AsyncResultTest, ThenLegacyOverload)
 
 TEST_F (AsyncResultTest, OnErrorLegacyOverload)
 
 TEST_F (AsyncResultTest, NoCallbackDoesNotCrashOnSuccess)
 
 TEST_F (AsyncResultTest, NoErrorCallbackDoesNotCrashOnFailure)
 
 TEST_F (AsyncResultTest, GetBlocksUntilValueAvailable)
 
 TEST_F (AsyncExecutorTest, ConstructsWithCustomThreadCount)
 
 TEST_F (AsyncExecutorTest, ConstructsWithDefaultThreadCount)
 
 TEST_F (AsyncExecutorTest, IsNotUsingThreadSystem)
 
 TEST_F (AsyncExecutorTest, SubmitExecutesCallable)
 
 TEST_F (AsyncExecutorTest, SubmitWithArguments)
 
 TEST_F (AsyncExecutorTest, SubmitReturnsString)
 
 TEST_F (AsyncExecutorTest, SubmitPropagatesException)
 
 TEST_F (AsyncExecutorTest, MultipleConcurrentSubmissions)
 
 TEST_F (AsyncExecutorTest, WaitForCompletionBlocksUntilDone)
 
 TEST_F (AsyncExecutorTest, ShutdownAfterSubmitCompletesGracefully)
 
 TEST_F (AsyncExecutorTest, SubmitAfterShutdownThrows)
 
 TEST_F (AsyncExecutorTest, ThreadCountReturnsConfigured)
 
 TEST (AsyncHelpersTest, MakeReadyResultIsImmediatelyAvailable)
 
 TEST (AsyncHelpersTest, MakeReadyResultWithString)
 
 TEST (AsyncHelpersTest, MakeErrorResultThrowsOnGet)
 
 TEST (AsyncHelpersTest, MakeErrorResultInvokesOnErrorCallback)
 
 TEST_F (AsyncDatabaseTest, ConstructsWithBackendAndExecutor)
 
 TEST_F (AsyncDatabaseTest, ExecuteAsyncReturnsTrue)
 
 TEST_F (AsyncDatabaseTest, ExecuteAsyncDelegatesToBackend)
 
 TEST_F (AsyncDatabaseTest, ExecuteAsyncThrowsOnBackendFailure)
 
 TEST_F (AsyncDatabaseTest, SelectAsyncReturnsRows)
 
 TEST_F (AsyncDatabaseTest, ExecuteBatchAsyncProcessesAllQueries)
 
 TEST_F (AsyncDatabaseTest, ExecuteBatchAsyncReportsFailures)
 
 TEST_F (AsyncDatabaseTest, SelectBatchAsyncReturnsMultipleResults)
 
 TEST_F (AsyncDatabaseTest, BeginTransactionAsyncSucceeds)
 
 TEST_F (AsyncDatabaseTest, CommitTransactionAsyncSucceeds)
 
 TEST_F (AsyncDatabaseTest, RollbackTransactionAsyncSucceeds)
 
 TEST_F (AsyncDatabaseTest, ConnectAsyncInitializesBackend)
 
 TEST_F (AsyncDatabaseTest, DisconnectAsyncShutsDownBackend)
 
 TEST_F (AsyncDatabaseTest, ConcurrentExecuteAsyncOperations)
 
 TEST_F (AsyncDatabaseTest, ExecuteAsyncWithThenCallback)
 
 TEST_F (AsyncDatabaseTest, ExecuteAsyncWithOnErrorCallback)
 
 TEST_F (TransactionCoordinatorTest, BeginCreatesTransaction)
 
 TEST_F (TransactionCoordinatorTest, BeginGeneratesUniqueIds)
 
 TEST_F (TransactionCoordinatorTest, PreparePhaseSuccess)
 
 TEST_F (TransactionCoordinatorTest, PreparePhaseThrowsForUnknownTransaction)
 
 TEST_F (TransactionCoordinatorTest, PreparePhaseRollsBackOnPartialFailure)
 
 TEST_F (TransactionCoordinatorTest, CommitPhaseSuccess)
 
 TEST_F (TransactionCoordinatorTest, CommitPhaseFailsIfNotPrepared)
 
 TEST_F (TransactionCoordinatorTest, CommitDistributedTransactionFull2PC)
 
 TEST_F (TransactionCoordinatorTest, CommitDistributedTransactionFailsOnPrepare)
 
 TEST_F (TransactionCoordinatorTest, RollbackDistributedTransaction)
 
 TEST_F (TransactionCoordinatorTest, RollbackThrowsForUnknownTransaction)
 
 TEST_F (TransactionCoordinatorTest, RecoverCleansUpCompletedTransactions)
 
 TEST_F (TransactionCoordinatorTest, CreateSagaReturnsSagaBuilder)
 
 TEST_F (SagaBuilderTest, EmptySagaSucceeds)
 
 TEST_F (SagaBuilderTest, AllStepsSucceed)
 
 TEST_F (SagaBuilderTest, CompensatesOnActionFailure)
 
 TEST_F (SagaBuilderTest, CompensatesOnException)
 
 TEST_F (SagaBuilderTest, SingleStepSuccess)
 
 TEST_F (SagaBuilderTest, CompensationExceptionDoesNotBreakChain)
 
 TEST_F (SagaBuilderTest, AddStepReturnsSelfForChaining)
 
 TEST_F (StreamProcessorTest, ConstructsWithBackend)
 
 TEST_F (StreamProcessorTest, StartStreamReturnsTrue)
 
 TEST_F (StreamProcessorTest, StartStreamDuplicateReturnsFalse)
 
 TEST_F (StreamProcessorTest, StopStreamReturnsTrue)
 
 TEST_F (StreamProcessorTest, StopStreamNonExistentReturnsFalse)
 
 TEST_F (StreamProcessorTest, StopAllStreamsStopsMultiple)
 
 TEST_F (StreamProcessorTest, EventHandlerReceivesConnectedEvent)
 
 TEST_F (StreamProcessorTest, EventHandlerReceivesCorrectChannel)
 
 TEST_F (StreamProcessorTest, EventHandlerReceivesCorrectType)
 
 TEST_F (StreamProcessorTest, GlobalHandlerReceivesAllEvents)
 
 TEST_F (StreamProcessorTest, EventFilterRejectsEvent)
 
 TEST_F (StreamProcessorTest, EventFilterAcceptsEvent)
 
 TEST_F (StreamProcessorTest, FilterDoesNotAffectGlobalHandler)
 
 TEST_F (StreamProcessorTest, StopOneStreamDoesNotAffectOthers)
 
 TEST_F (StreamProcessorTest, DestructorStopsAllStreams)
 
 TEST_F (StreamProcessorTest, TemplateEventHandlerWithLambda)
 
 TEST_F (StreamProcessorTest, TemplateGlobalHandlerWithLambda)
 
 TEST_F (StreamProcessorTest, TemplateEventFilterWithLambda)
 

Function Documentation

◆ TEST() [1/4]

TEST ( AsyncHelpersTest ,
MakeErrorResultInvokesOnErrorCallback  )

Definition at line 378 of file test_async_operations.cpp.

378 {
379 auto result = make_error_result<int>(std::runtime_error("callback_err"));
380
381 bool error_caught = false;
382 result.on_error([&error_caught](const std::exception&) {
383 error_caught = true;
384 });
385
386 EXPECT_THROW(result.get(), std::exception);
387 EXPECT_TRUE(error_caught);
388}
async_result< T > make_error_result(const std::exception &error)

References database::async::make_error_result().

Here is the call graph for this function:

◆ TEST() [2/4]

TEST ( AsyncHelpersTest ,
MakeErrorResultThrowsOnGet  )

Definition at line 369 of file test_async_operations.cpp.

369 {
370 // Note: make_error_result takes const std::exception& which causes
371 // object slicing — the thrown exception is std::exception, not the
372 // derived type. This tests the actual API behavior.
373 auto result = make_error_result<int>(std::runtime_error("err"));
374 EXPECT_TRUE(result.is_ready());
375 EXPECT_THROW(result.get(), std::exception);
376}

References database::async::make_error_result().

Here is the call graph for this function:

◆ TEST() [3/4]

TEST ( AsyncHelpersTest ,
MakeReadyResultIsImmediatelyAvailable  )

Definition at line 357 of file test_async_operations.cpp.

357 {
358 auto result = make_ready_result<int>(42);
359 EXPECT_TRUE(result.is_ready());
360 EXPECT_EQ(result.get(), 42);
361}
async_result< T > make_ready_result(T value)

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST() [4/4]

TEST ( AsyncHelpersTest ,
MakeReadyResultWithString  )

Definition at line 363 of file test_async_operations.cpp.

363 {
364 auto result = make_ready_result<std::string>("hello");
365 EXPECT_TRUE(result.is_ready());
366 EXPECT_EQ(result.get(), "hello");
367}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [1/85]

TEST_F ( AsyncDatabaseTest ,
BeginTransactionAsyncSucceeds  )

Definition at line 596 of file test_async_operations.cpp.

596 {
597 auto result = db_->begin_transaction_async();
598 EXPECT_TRUE(result.get());
599 EXPECT_TRUE(backend_->in_transaction());
600}

◆ TEST_F() [2/85]

TEST_F ( AsyncDatabaseTest ,
CommitTransactionAsyncSucceeds  )

Definition at line 602 of file test_async_operations.cpp.

602 {
603 db_->begin_transaction_async().get();
604 auto result = db_->commit_transaction_async();
605 EXPECT_TRUE(result.get());
606 EXPECT_FALSE(backend_->in_transaction());
607}

◆ TEST_F() [3/85]

TEST_F ( AsyncDatabaseTest ,
ConcurrentExecuteAsyncOperations  )

Definition at line 633 of file test_async_operations.cpp.

633 {
634 // async_result<T> is non-movable (contains std::mutex), so we call
635 // get() immediately per operation rather than collecting into a vector.
636 constexpr int NUM_OPS = 20;
637 std::atomic<int> success_count{0};
638
639 // Submit all operations, each getting its result in a detached context
640 std::vector<std::future<bool>> futures;
641 futures.reserve(NUM_OPS);
642 for (int i = 0; i < NUM_OPS; ++i) {
643 futures.push_back(executor_->submit([this, i]() -> bool {
644 auto result = backend_->execute_query(
645 "INSERT INTO t VALUES (" + std::to_string(i) + ")");
646 return result.is_ok();
647 }));
648 }
649
650 for (auto& f : futures) {
651 if (f.get()) {
652 success_count.fetch_add(1);
653 }
654 }
655 EXPECT_EQ(success_count.load(), NUM_OPS);
656}

◆ TEST_F() [4/85]

TEST_F ( AsyncDatabaseTest ,
ConnectAsyncInitializesBackend  )

Definition at line 618 of file test_async_operations.cpp.

618 {
619 auto result = db_->connect_async("host=localhost dbname=test");
620 EXPECT_TRUE(result.get());
621 EXPECT_TRUE(backend_->is_initialized());
622}

◆ TEST_F() [5/85]

TEST_F ( AsyncDatabaseTest ,
ConstructsWithBackendAndExecutor  )

Definition at line 519 of file test_async_operations.cpp.

519 {
520 EXPECT_NE(db_, nullptr);
521}

◆ TEST_F() [6/85]

TEST_F ( AsyncDatabaseTest ,
DisconnectAsyncShutsDownBackend  )

Definition at line 624 of file test_async_operations.cpp.

624 {
625 db_->connect_async("host=localhost").get();
626 auto result = db_->disconnect_async();
627 EXPECT_TRUE(result.get());
628 EXPECT_FALSE(backend_->is_initialized());
629}

◆ TEST_F() [7/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteAsyncDelegatesToBackend  )

Definition at line 530 of file test_async_operations.cpp.

530 {
531 auto result = db_->execute_async("DROP TABLE IF EXISTS t");
532 result.get();
533 EXPECT_EQ(backend_->get_last_query(), "DROP TABLE IF EXISTS t");
534}

◆ TEST_F() [8/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteAsyncReturnsTrue  )

Definition at line 525 of file test_async_operations.cpp.

525 {
526 auto result = db_->execute_async("CREATE TABLE t (id INT)");
527 EXPECT_TRUE(result.get());
528}

◆ TEST_F() [9/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteAsyncThrowsOnBackendFailure  )

Definition at line 536 of file test_async_operations.cpp.

536 {
537 backend_->set_fail_execute(true);
538 auto result = db_->execute_async("BAD QUERY");
539 EXPECT_THROW(result.get(), std::runtime_error);
540}

◆ TEST_F() [10/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteAsyncWithOnErrorCallback  )

Definition at line 673 of file test_async_operations.cpp.

673 {
674 backend_->set_fail_execute(true);
675 auto result = db_->execute_async("BAD");
676
677 std::string error_msg;
678 result.on_error([&error_msg](const std::exception& e) {
679 error_msg = e.what();
680 });
681
682 EXPECT_THROW(result.get(), std::runtime_error);
683 EXPECT_EQ(error_msg, "execute_failed");
684}

◆ TEST_F() [11/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteAsyncWithThenCallback  )

Definition at line 660 of file test_async_operations.cpp.

660 {
661 auto result = db_->execute_async("CREATE TABLE t2 (id INT)");
662
663 bool callback_invoked = false;
664 result.then([&callback_invoked](bool success) {
665 callback_invoked = true;
666 EXPECT_TRUE(success);
667 });
668
669 result.get();
670 EXPECT_TRUE(callback_invoked);
671}

◆ TEST_F() [12/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteBatchAsyncProcessesAllQueries  )

Definition at line 553 of file test_async_operations.cpp.

553 {
554 std::vector<std::string> queries = {
555 "INSERT INTO t VALUES (1)",
556 "INSERT INTO t VALUES (2)",
557 "INSERT INTO t VALUES (3)"
558 };
559 auto result = db_->execute_batch_async(queries);
560 auto results = result.get();
561
562 ASSERT_EQ(results.size(), 3u);
563 EXPECT_TRUE(results[0]);
564 EXPECT_TRUE(results[1]);
565 EXPECT_TRUE(results[2]);
566}
#define ASSERT_EQ(expected, actual, message)

References ASSERT_EQ.

◆ TEST_F() [13/85]

TEST_F ( AsyncDatabaseTest ,
ExecuteBatchAsyncReportsFailures  )

Definition at line 568 of file test_async_operations.cpp.

568 {
569 backend_->set_fail_execute(true);
570 std::vector<std::string> queries = {"Q1", "Q2"};
571 auto result = db_->execute_batch_async(queries);
572 auto results = result.get();
573
574 ASSERT_EQ(results.size(), 2u);
575 EXPECT_FALSE(results[0]);
576 EXPECT_FALSE(results[1]);
577}

References ASSERT_EQ.

◆ TEST_F() [14/85]

TEST_F ( AsyncDatabaseTest ,
RollbackTransactionAsyncSucceeds  )

Definition at line 609 of file test_async_operations.cpp.

609 {
610 db_->begin_transaction_async().get();
611 auto result = db_->rollback_transaction_async();
612 EXPECT_TRUE(result.get());
613 EXPECT_FALSE(backend_->in_transaction());
614}

◆ TEST_F() [15/85]

TEST_F ( AsyncDatabaseTest ,
SelectAsyncReturnsRows  )

Definition at line 544 of file test_async_operations.cpp.

544 {
545 auto result = db_->select_async("SELECT * FROM t");
546 auto rows = result.get();
547 ASSERT_EQ(rows.size(), 1u);
548 EXPECT_EQ(std::get<std::string>(rows[0].at("name")), "stub_row");
549}

References ASSERT_EQ.

◆ TEST_F() [16/85]

TEST_F ( AsyncDatabaseTest ,
SelectBatchAsyncReturnsMultipleResults  )

Definition at line 581 of file test_async_operations.cpp.

581 {
582 std::vector<std::string> queries = {
583 "SELECT * FROM t",
584 "SELECT * FROM t"
585 };
586 auto result = db_->select_batch_async(queries);
587 auto results = result.get();
588
589 ASSERT_EQ(results.size(), 2u);
590 EXPECT_EQ(results[0].size(), 1u);
591 EXPECT_EQ(results[1].size(), 1u);
592}

References ASSERT_EQ.

◆ TEST_F() [17/85]

TEST_F ( AsyncExecutorTest ,
ConstructsWithCustomThreadCount  )

Definition at line 255 of file test_async_operations.cpp.

255 {
256 EXPECT_EQ(executor_->thread_count(), 2u);
257}

◆ TEST_F() [18/85]

TEST_F ( AsyncExecutorTest ,
ConstructsWithDefaultThreadCount  )

Definition at line 259 of file test_async_operations.cpp.

259 {
260 auto exec = std::make_unique<async_executor>();
261 EXPECT_EQ(exec->thread_count(), std::thread::hardware_concurrency());
262 exec->shutdown();
263}

◆ TEST_F() [19/85]

TEST_F ( AsyncExecutorTest ,
IsNotUsingThreadSystem  )

Definition at line 265 of file test_async_operations.cpp.

265 {
266 EXPECT_FALSE(executor_->is_using_thread_system());
267}

◆ TEST_F() [20/85]

TEST_F ( AsyncExecutorTest ,
MultipleConcurrentSubmissions  )

Definition at line 297 of file test_async_operations.cpp.

297 {
298 constexpr int NUM_TASKS = 100;
299 std::vector<std::future<int>> futures;
300 futures.reserve(NUM_TASKS);
301
302 for (int i = 0; i < NUM_TASKS; ++i) {
303 futures.push_back(executor_->submit([i]() { return i * 2; }));
304 }
305
306 for (int i = 0; i < NUM_TASKS; ++i) {
307 EXPECT_EQ(futures[i].get(), i * 2);
308 }
309}

◆ TEST_F() [21/85]

TEST_F ( AsyncExecutorTest ,
ShutdownAfterSubmitCompletesGracefully  )

Definition at line 332 of file test_async_operations.cpp.

332 {
333 auto future = executor_->submit([]() { return 1; });
334 executor_->shutdown();
335 EXPECT_EQ(future.get(), 1);
336}

◆ TEST_F() [22/85]

TEST_F ( AsyncExecutorTest ,
SubmitAfterShutdownThrows  )

Definition at line 338 of file test_async_operations.cpp.

338 {
339 executor_->shutdown();
340 EXPECT_THROW(
341 executor_->submit([]() { return 0; }),
342 std::runtime_error);
343}

◆ TEST_F() [23/85]

TEST_F ( AsyncExecutorTest ,
SubmitExecutesCallable  )

Definition at line 271 of file test_async_operations.cpp.

271 {
272 auto future = executor_->submit([]() { return 42; });
273 EXPECT_EQ(future.get(), 42);
274}

◆ TEST_F() [24/85]

TEST_F ( AsyncExecutorTest ,
SubmitPropagatesException  )

Definition at line 288 of file test_async_operations.cpp.

288 {
289 auto future = executor_->submit([]() -> int {
290 throw std::runtime_error("task_failed");
291 });
292 EXPECT_THROW(future.get(), std::runtime_error);
293}

◆ TEST_F() [25/85]

TEST_F ( AsyncExecutorTest ,
SubmitReturnsString  )

Definition at line 281 of file test_async_operations.cpp.

281 {
282 auto future = executor_->submit([]() -> std::string {
283 return "async_result";
284 });
285 EXPECT_EQ(future.get(), "async_result");
286}

◆ TEST_F() [26/85]

TEST_F ( AsyncExecutorTest ,
SubmitWithArguments  )

Definition at line 276 of file test_async_operations.cpp.

276 {
277 auto future = executor_->submit([](int a, int b) { return a + b; }, 3, 7);
278 EXPECT_EQ(future.get(), 10);
279}

◆ TEST_F() [27/85]

TEST_F ( AsyncExecutorTest ,
ThreadCountReturnsConfigured  )

Definition at line 347 of file test_async_operations.cpp.

347 {
348 async_executor exec4(4);
349 EXPECT_EQ(exec4.thread_count(), 4u);
350 exec4.shutdown();
351}
High-performance asynchronous executor using thread_system.

References database::async::async_executor::shutdown(), and database::async::async_executor::thread_count().

Here is the call graph for this function:

◆ TEST_F() [28/85]

TEST_F ( AsyncExecutorTest ,
WaitForCompletionBlocksUntilDone  )

Definition at line 313 of file test_async_operations.cpp.

313 {
314 std::atomic<int> counter{0};
315
316 for (int i = 0; i < 10; ++i) {
317 executor_->submit([&counter]() {
318 std::this_thread::sleep_for(std::chrono::milliseconds(5));
319 counter.fetch_add(1);
320 });
321 }
322
323 executor_->wait_for_completion();
324 // Allow brief settling time for tasks that are executing but haven't
325 // decremented the queue yet
326 std::this_thread::sleep_for(std::chrono::milliseconds(50));
327 EXPECT_EQ(counter.load(), 10);
328}

◆ TEST_F() [29/85]

TEST_F ( AsyncResultTest ,
ConstructsFromFuture  )

Definition at line 60 of file test_async_operations.cpp.

60 {
61 std::promise<int> p;
62 auto future = p.get_future();
63 p.set_value(42);
64
65 async_result<int> result(std::move(future));
66 EXPECT_TRUE(result.is_ready());
67}
Template class for asynchronous operation results.

References database::async::async_result< T >::is_ready().

Here is the call graph for this function:

◆ TEST_F() [30/85]

TEST_F ( AsyncResultTest ,
GetBlocksUntilValueAvailable  )

Definition at line 221 of file test_async_operations.cpp.

221 {
222 std::promise<int> p;
223 async_result<int> result(p.get_future());
224
225 std::thread setter([&p]() {
226 std::this_thread::sleep_for(std::chrono::milliseconds(20));
227 p.set_value(123);
228 });
229
230 EXPECT_EQ(result.get(), 123);
231 setter.join();
232}

References database::async::async_result< T >::get().

Here is the call graph for this function:

◆ TEST_F() [31/85]

TEST_F ( AsyncResultTest ,
GetForReturnsValueWithinTimeout  )

Definition at line 110 of file test_async_operations.cpp.

110 {
111 auto result = make_ready<int>(77);
112 EXPECT_EQ(result.get_for(std::chrono::milliseconds(100)), 77);
113}

◆ TEST_F() [32/85]

TEST_F ( AsyncResultTest ,
GetForThrowsOnTimeout  )

Definition at line 115 of file test_async_operations.cpp.

115 {
116 std::promise<int> p;
117 async_result<int> result(p.get_future());
118
119 EXPECT_THROW(
120 result.get_for(std::chrono::milliseconds(10)),
121 std::runtime_error);
122}

References database::async::async_result< T >::get_for().

Here is the call graph for this function:

◆ TEST_F() [33/85]

TEST_F ( AsyncResultTest ,
GetInvokesErrorCallback  )

Definition at line 96 of file test_async_operations.cpp.

96 {
97 auto result = make_failing<int>();
98
99 std::string captured_msg;
100 result.on_error([&captured_msg](const std::exception& e) {
101 captured_msg = e.what();
102 });
103
104 EXPECT_THROW(result.get(), std::runtime_error);
105 EXPECT_EQ(captured_msg, "test_error");
106}

◆ TEST_F() [34/85]

TEST_F ( AsyncResultTest ,
GetInvokesSuccessCallback  )

Definition at line 86 of file test_async_operations.cpp.

86 {
87 auto result = make_ready<int>(42);
88
89 int captured = 0;
90 result.then([&captured](int v) { captured = v; });
91
92 EXPECT_EQ(result.get(), 42);
93 EXPECT_EQ(captured, 42);
94}

◆ TEST_F() [35/85]

TEST_F ( AsyncResultTest ,
GetRethrowsException  )

Definition at line 81 of file test_async_operations.cpp.

81 {
82 auto result = make_failing<int>();
83 EXPECT_THROW(result.get(), std::runtime_error);
84}

◆ TEST_F() [36/85]

TEST_F ( AsyncResultTest ,
GetReturnsStringValue  )

Definition at line 76 of file test_async_operations.cpp.

76 {
77 auto result = make_ready<std::string>("hello");
78 EXPECT_EQ(result.get(), "hello");
79}

◆ TEST_F() [37/85]

TEST_F ( AsyncResultTest ,
GetReturnsValue  )

Definition at line 71 of file test_async_operations.cpp.

71 {
72 auto result = make_ready<int>(99);
73 EXPECT_EQ(result.get(), 99);
74}

◆ TEST_F() [38/85]

TEST_F ( AsyncResultTest ,
IsReadyReturnsFalseBeforeCompletion  )

Definition at line 126 of file test_async_operations.cpp.

126 {
127 std::promise<int> p;
128 async_result<int> result(p.get_future());
129 EXPECT_FALSE(result.is_ready());
130
131 p.set_value(1);
132 EXPECT_TRUE(result.is_ready());
133}

References database::async::async_result< T >::is_ready().

Here is the call graph for this function:

◆ TEST_F() [39/85]

TEST_F ( AsyncResultTest ,
IsReadyReturnsTrueWhenReady  )

Definition at line 135 of file test_async_operations.cpp.

135 {
136 auto result = make_ready<int>(1);
137 EXPECT_TRUE(result.is_ready());
138}

◆ TEST_F() [40/85]

TEST_F ( AsyncResultTest ,
NoCallbackDoesNotCrashOnSuccess  )

Definition at line 209 of file test_async_operations.cpp.

209 {
210 auto result = make_ready<int>(42);
211 EXPECT_NO_THROW(result.get());
212}

◆ TEST_F() [41/85]

TEST_F ( AsyncResultTest ,
NoErrorCallbackDoesNotCrashOnFailure  )

Definition at line 214 of file test_async_operations.cpp.

214 {
215 auto result = make_failing<int>();
216 EXPECT_THROW(result.get(), std::runtime_error);
217}

◆ TEST_F() [42/85]

TEST_F ( AsyncResultTest ,
OnErrorLegacyOverload  )

Definition at line 195 of file test_async_operations.cpp.

195 {
196 auto result = make_failing<int>();
197
198 std::string msg;
199 std::function<void(const std::exception&)> cb =
200 [&msg](const std::exception& e) { msg = e.what(); };
201 result.on_error(cb);
202
203 EXPECT_THROW(result.get(), std::runtime_error);
204 EXPECT_EQ(msg, "test_error");
205}

◆ TEST_F() [43/85]

TEST_F ( AsyncResultTest ,
OnErrorWithLambda  )

Definition at line 170 of file test_async_operations.cpp.

170 {
171 auto result = make_failing<int>();
172
173 bool error_caught = false;
174 result.on_error([&error_caught](const std::exception&) {
175 error_caught = true;
176 });
177
178 EXPECT_THROW(result.get(), std::runtime_error);
179 EXPECT_TRUE(error_caught);
180}

◆ TEST_F() [44/85]

TEST_F ( AsyncResultTest ,
ThenLegacyOverload  )

Definition at line 184 of file test_async_operations.cpp.

184 {
185 auto result = make_ready<int>(5);
186
187 int captured = 0;
188 std::function<void(int)> cb = [&captured](int v) { captured = v; };
189 result.then(cb);
190 result.get();
191
192 EXPECT_EQ(captured, 5);
193}

◆ TEST_F() [45/85]

TEST_F ( AsyncResultTest ,
ThenWithLambda  )

Definition at line 160 of file test_async_operations.cpp.

160 {
161 auto result = make_ready<int>(10);
162
163 int captured = 0;
164 result.then([&captured](int v) { captured = v; });
165 result.get();
166
167 EXPECT_EQ(captured, 10);
168}

◆ TEST_F() [46/85]

TEST_F ( AsyncResultTest ,
WaitForReturnsReadyWhenComplete  )

Definition at line 142 of file test_async_operations.cpp.

142 {
143 auto result = make_ready<int>(1);
144 auto status = result.wait_for(std::chrono::milliseconds(0));
145 EXPECT_EQ(status, std::future_status::ready);
146}

◆ TEST_F() [47/85]

TEST_F ( AsyncResultTest ,
WaitForReturnsTimeoutWhenNotReady  )

Definition at line 148 of file test_async_operations.cpp.

148 {
149 std::promise<int> p;
150 async_result<int> result(p.get_future());
151
152 auto status = result.wait_for(std::chrono::milliseconds(1));
153 EXPECT_EQ(status, std::future_status::timeout);
154
155 p.set_value(0); // cleanup
156}

References database::async::async_result< T >::wait_for().

Here is the call graph for this function:

◆ TEST_F() [48/85]

TEST_F ( SagaBuilderTest ,
AddStepReturnsSelfForChaining  )

Definition at line 1145 of file test_async_operations.cpp.

1145 {
1146 auto saga = coord_.create_saga();
1147 auto& ref = saga.add_step(
1148 []() -> async_result<bool> { return make_ready_result(true); },
1149 []() -> async_result<bool> { return make_ready_result(true); }
1150 );
1151 EXPECT_EQ(&ref, &saga);
1152}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [49/85]

TEST_F ( SagaBuilderTest ,
AllStepsSucceed  )

Definition at line 979 of file test_async_operations.cpp.

979 {
980 std::vector<int> execution_order;
981
982 auto saga = coord_.create_saga();
983 saga.add_step(
984 [&]() -> async_result<bool> {
985 execution_order.push_back(1);
986 return make_ready_result(true);
987 },
988 [&]() -> async_result<bool> {
989 execution_order.push_back(-1);
990 return make_ready_result(true);
991 }
992 ).add_step(
993 [&]() -> async_result<bool> {
994 execution_order.push_back(2);
995 return make_ready_result(true);
996 },
997 [&]() -> async_result<bool> {
998 execution_order.push_back(-2);
999 return make_ready_result(true);
1000 }
1001 ).add_step(
1002 [&]() -> async_result<bool> {
1003 execution_order.push_back(3);
1004 return make_ready_result(true);
1005 },
1006 [&]() -> async_result<bool> {
1007 execution_order.push_back(-3);
1008 return make_ready_result(true);
1009 }
1010 );
1011
1012 bool result = saga.execute().get();
1013 EXPECT_TRUE(result);
1014 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3}));
1015}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [50/85]

TEST_F ( SagaBuilderTest ,
CompensatesOnActionFailure  )

Definition at line 1017 of file test_async_operations.cpp.

1017 {
1018 std::vector<int> execution_order;
1019
1020 auto saga = coord_.create_saga();
1021 saga.add_step(
1022 [&]() -> async_result<bool> {
1023 execution_order.push_back(1);
1024 return make_ready_result(true);
1025 },
1026 [&]() -> async_result<bool> {
1027 execution_order.push_back(-1);
1028 return make_ready_result(true);
1029 }
1030 ).add_step(
1031 [&]() -> async_result<bool> {
1032 execution_order.push_back(2);
1033 return make_ready_result(true);
1034 },
1035 [&]() -> async_result<bool> {
1036 execution_order.push_back(-2);
1037 return make_ready_result(true);
1038 }
1039 ).add_step(
1040 [&]() -> async_result<bool> {
1041 execution_order.push_back(3);
1042 return make_ready_result(false); // Step 3 fails
1043 },
1044 [&]() -> async_result<bool> {
1045 execution_order.push_back(-3);
1046 return make_ready_result(true);
1047 }
1048 );
1049
1050 bool result = saga.execute().get();
1051 EXPECT_FALSE(result);
1052 // Steps 1,2 executed; step 3 failed; compensate 2,1 in reverse
1053 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1054}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [51/85]

TEST_F ( SagaBuilderTest ,
CompensatesOnException  )

Definition at line 1056 of file test_async_operations.cpp.

1056 {
1057 std::vector<int> execution_order;
1058
1059 auto saga = coord_.create_saga();
1060 saga.add_step(
1061 [&]() -> async_result<bool> {
1062 execution_order.push_back(1);
1063 return make_ready_result(true);
1064 },
1065 [&]() -> async_result<bool> {
1066 execution_order.push_back(-1);
1067 return make_ready_result(true);
1068 }
1069 ).add_step(
1070 [&]() -> async_result<bool> {
1071 execution_order.push_back(2);
1072 throw std::runtime_error("step 2 exploded");
1073 return make_ready_result(true);
1074 },
1075 [&]() -> async_result<bool> {
1076 execution_order.push_back(-2);
1077 return make_ready_result(true);
1078 }
1079 );
1080
1081 bool result = saga.execute().get();
1082 EXPECT_FALSE(result);
1083 // Step 1 executed; step 2 threw; compensate 1 in reverse
1084 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, -1}));
1085}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [52/85]

TEST_F ( SagaBuilderTest ,
CompensationExceptionDoesNotBreakChain  )

Definition at line 1105 of file test_async_operations.cpp.

1105 {
1106 std::vector<int> execution_order;
1107
1108 auto saga = coord_.create_saga();
1109 saga.add_step(
1110 [&]() -> async_result<bool> {
1111 execution_order.push_back(1);
1112 return make_ready_result(true);
1113 },
1114 [&]() -> async_result<bool> {
1115 execution_order.push_back(-1);
1116 return make_ready_result(true);
1117 }
1118 ).add_step(
1119 [&]() -> async_result<bool> {
1120 execution_order.push_back(2);
1121 return make_ready_result(true);
1122 },
1123 [&]() -> async_result<bool> {
1124 execution_order.push_back(-2);
1125 throw std::runtime_error("compensation 2 failed");
1126 return make_ready_result(true);
1127 }
1128 ).add_step(
1129 [&]() -> async_result<bool> {
1130 execution_order.push_back(3);
1131 return make_ready_result(false); // fails
1132 },
1133 [&]() -> async_result<bool> {
1134 execution_order.push_back(-3);
1135 return make_ready_result(true);
1136 }
1137 );
1138
1139 bool result = saga.execute().get();
1140 EXPECT_FALSE(result);
1141 // Compensation -2 throws, but -1 still executes
1142 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1143}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [53/85]

TEST_F ( SagaBuilderTest ,
EmptySagaSucceeds  )

Definition at line 973 of file test_async_operations.cpp.

973 {
974 auto saga = coord_.create_saga();
975 bool result = saga.execute().get();
976 EXPECT_TRUE(result);
977}

References database::async::transaction_coordinator::create_saga(), and database::async::saga_builder::execute().

Here is the call graph for this function:

◆ TEST_F() [54/85]

TEST_F ( SagaBuilderTest ,
SingleStepSuccess  )

Definition at line 1087 of file test_async_operations.cpp.

1087 {
1088 bool action_called = false;
1089 auto saga = coord_.create_saga();
1090 saga.add_step(
1091 [&]() -> async_result<bool> {
1092 action_called = true;
1093 return make_ready_result(true);
1094 },
1095 [&]() -> async_result<bool> {
1096 return make_ready_result(true);
1097 }
1098 );
1099
1100 bool result = saga.execute().get();
1101 EXPECT_TRUE(result);
1102 EXPECT_TRUE(action_called);
1103}

References database::async::make_ready_result().

Here is the call graph for this function:

◆ TEST_F() [55/85]

TEST_F ( StreamProcessorTest ,
ConstructsWithBackend  )

Definition at line 1197 of file test_async_operations.cpp.

1197 {
1198 EXPECT_NE(processor_, nullptr);
1199}

◆ TEST_F() [56/85]

TEST_F ( StreamProcessorTest ,
DestructorStopsAllStreams  )

Definition at line 1433 of file test_async_operations.cpp.

1433 {
1434 auto local_backend = std::make_shared<async_stub_backend>();
1435 {
1436 stream_processor sp(local_backend);
1437 sp.start_stream(
1438 stream_processor::stream_type::custom, "ch1");
1439 sp.start_stream(
1440 stream_processor::stream_type::custom, "ch2");
1441 // Destructor runs here — must not crash or hang
1442 }
1443 SUCCEED();
1444}
Real-time data stream processing.

References database::async::stream_processor::start_stream().

Here is the call graph for this function:

◆ TEST_F() [57/85]

TEST_F ( StreamProcessorTest ,
EventFilterAcceptsEvent  )

Definition at line 1355 of file test_async_operations.cpp.

1355 {
1356 std::atomic<bool> handler_called{false};
1357
1358 processor_->add_event_filter("ch1",
1359 [](const stream_processor::stream_event& event) {
1360 return event.payload == "stream_connected";
1361 });
1362
1363 processor_->register_event_handler("ch1",
1364 [&](const stream_processor::stream_event&) {
1365 handler_called.store(true);
1366 });
1367
1368 processor_->start_stream(
1369 stream_processor::stream_type::custom, "ch1");
1370
1371 ASSERT_TRUE(wait_for_flag(handler_called));
1372 processor_->stop_all_streams();
1373}
#define ASSERT_TRUE(condition, message)

References ASSERT_TRUE.

◆ TEST_F() [58/85]

TEST_F ( StreamProcessorTest ,
EventFilterRejectsEvent  )

Definition at line 1333 of file test_async_operations.cpp.

1333 {
1334 std::atomic<bool> handler_called{false};
1335
1336 processor_->add_event_filter("ch1",
1338 return false; // Reject all events
1339 });
1340
1341 processor_->register_event_handler("ch1",
1342 [&](const stream_processor::stream_event&) {
1343 handler_called.store(true);
1344 });
1345
1346 processor_->start_stream(
1347 stream_processor::stream_type::custom, "ch1");
1348
1349 // Give enough time for the event to potentially arrive
1350 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1351 EXPECT_FALSE(handler_called.load());
1352 processor_->stop_all_streams();
1353}

◆ TEST_F() [59/85]

TEST_F ( StreamProcessorTest ,
EventHandlerReceivesConnectedEvent  )

Definition at line 1246 of file test_async_operations.cpp.

1246 {
1247 std::atomic<bool> received{false};
1248 std::string captured_payload;
1249 std::mutex capture_mutex;
1250
1251 processor_->register_event_handler("ch1",
1252 [&](const stream_processor::stream_event& event) {
1253 std::lock_guard<std::mutex> lock(capture_mutex);
1254 captured_payload = event.payload;
1255 received.store(true);
1256 });
1257
1258 processor_->start_stream(
1259 stream_processor::stream_type::custom, "ch1");
1260
1261 ASSERT_TRUE(wait_for_flag(received));
1262 {
1263 std::lock_guard<std::mutex> lock(capture_mutex);
1264 EXPECT_EQ(captured_payload, "stream_connected");
1265 }
1266 processor_->stop_all_streams();
1267}

References ASSERT_TRUE.

◆ TEST_F() [60/85]

TEST_F ( StreamProcessorTest ,
EventHandlerReceivesCorrectChannel  )

Definition at line 1269 of file test_async_operations.cpp.

1269 {
1270 std::atomic<bool> received{false};
1271 std::string captured_channel;
1272 std::mutex capture_mutex;
1273
1274 processor_->register_event_handler("notifications",
1275 [&](const stream_processor::stream_event& event) {
1276 std::lock_guard<std::mutex> lock(capture_mutex);
1277 captured_channel = event.channel;
1278 received.store(true);
1279 });
1280
1281 processor_->start_stream(
1282 stream_processor::stream_type::postgresql_notify, "notifications");
1283
1284 ASSERT_TRUE(wait_for_flag(received));
1285 {
1286 std::lock_guard<std::mutex> lock(capture_mutex);
1287 EXPECT_EQ(captured_channel, "notifications");
1288 }
1289 processor_->stop_all_streams();
1290}

References ASSERT_TRUE.

◆ TEST_F() [61/85]

TEST_F ( StreamProcessorTest ,
EventHandlerReceivesCorrectType  )

Definition at line 1292 of file test_async_operations.cpp.

1292 {
1293 std::atomic<bool> received{false};
1294 stream_processor::stream_type captured_type;
1295
1296 processor_->register_event_handler("ch1",
1297 [&](const stream_processor::stream_event& event) {
1298 captured_type = event.type;
1299 received.store(true);
1300 });
1301
1302 processor_->start_stream(
1303 stream_processor::stream_type::mongodb_change_stream, "ch1");
1304
1305 ASSERT_TRUE(wait_for_flag(received));
1306 EXPECT_EQ(captured_type,
1307 stream_processor::stream_type::mongodb_change_stream);
1308 processor_->stop_all_streams();
1309}

References ASSERT_TRUE.

◆ TEST_F() [62/85]

TEST_F ( StreamProcessorTest ,
FilterDoesNotAffectGlobalHandler  )

Definition at line 1375 of file test_async_operations.cpp.

1375 {
1376 std::atomic<bool> global_called{false};
1377
1378 processor_->add_event_filter("ch1",
1380 return false; // Reject
1381 });
1382
1383 processor_->register_global_handler(
1384 [&](const stream_processor::stream_event&) {
1385 global_called.store(true);
1386 });
1387
1388 processor_->start_stream(
1389 stream_processor::stream_type::custom, "ch1");
1390
1391 // Global handler should NOT be called when filter rejects
1392 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1393 EXPECT_FALSE(global_called.load());
1394 processor_->stop_all_streams();
1395}

◆ TEST_F() [63/85]

TEST_F ( StreamProcessorTest ,
GlobalHandlerReceivesAllEvents  )

Definition at line 1313 of file test_async_operations.cpp.

1313 {
1314 std::atomic<int> event_count{0};
1315
1316 processor_->register_global_handler(
1317 [&](const stream_processor::stream_event&) {
1318 event_count.fetch_add(1);
1319 });
1320
1321 processor_->start_stream(
1322 stream_processor::stream_type::custom, "ch1");
1323 processor_->start_stream(
1324 stream_processor::stream_type::custom, "ch2");
1325
1326 ASSERT_TRUE(wait_for_count(event_count, 2));
1327 EXPECT_GE(event_count.load(), 2);
1328 processor_->stop_all_streams();
1329}

References ASSERT_TRUE.

◆ TEST_F() [64/85]

TEST_F ( StreamProcessorTest ,
StartStreamDuplicateReturnsFalse  )

Definition at line 1209 of file test_async_operations.cpp.

1209 {
1210 EXPECT_TRUE(processor_->start_stream(
1211 stream_processor::stream_type::custom, "ch1"));
1212 EXPECT_FALSE(processor_->start_stream(
1213 stream_processor::stream_type::custom, "ch1"));
1214 processor_->stop_all_streams();
1215}

◆ TEST_F() [65/85]

TEST_F ( StreamProcessorTest ,
StartStreamReturnsTrue  )

Definition at line 1203 of file test_async_operations.cpp.

1203 {
1204 EXPECT_TRUE(processor_->start_stream(
1205 stream_processor::stream_type::custom, "test_channel"));
1206 processor_->stop_all_streams();
1207}

◆ TEST_F() [66/85]

TEST_F ( StreamProcessorTest ,
StopAllStreamsStopsMultiple  )

Definition at line 1227 of file test_async_operations.cpp.

1227 {
1228 processor_->start_stream(
1229 stream_processor::stream_type::custom, "ch1");
1230 processor_->start_stream(
1231 stream_processor::stream_type::redis_pubsub, "ch2");
1232 processor_->start_stream(
1233 stream_processor::stream_type::postgresql_notify, "ch3");
1234
1235 // Should not hang or crash
1236 processor_->stop_all_streams();
1237
1238 // All stopped — stopping again returns false
1239 EXPECT_FALSE(processor_->stop_stream("ch1"));
1240 EXPECT_FALSE(processor_->stop_stream("ch2"));
1241 EXPECT_FALSE(processor_->stop_stream("ch3"));
1242}

◆ TEST_F() [67/85]

TEST_F ( StreamProcessorTest ,
StopOneStreamDoesNotAffectOthers  )

Definition at line 1399 of file test_async_operations.cpp.

1399 {
1400 std::atomic<int> ch1_count{0};
1401 std::atomic<int> ch2_count{0};
1402
1403 processor_->register_event_handler("ch1",
1404 [&](const stream_processor::stream_event&) {
1405 ch1_count.fetch_add(1);
1406 });
1407 processor_->register_event_handler("ch2",
1408 [&](const stream_processor::stream_event&) {
1409 ch2_count.fetch_add(1);
1410 });
1411
1412 processor_->start_stream(
1413 stream_processor::stream_type::custom, "ch1");
1414 processor_->start_stream(
1415 stream_processor::stream_type::custom, "ch2");
1416
1417 // Wait for both connected events
1418 ASSERT_TRUE(wait_for_count(ch1_count, 1));
1419 ASSERT_TRUE(wait_for_count(ch2_count, 1));
1420
1421 // Stop only ch1 — ch2 should still be alive
1422 EXPECT_TRUE(processor_->stop_stream("ch1"));
1423
1424 // Verify ch1 is stopped
1425 EXPECT_FALSE(processor_->stop_stream("ch1"));
1426
1427 // ch2 stream should still be running (can be stopped)
1428 EXPECT_TRUE(processor_->stop_stream("ch2"));
1429}

References ASSERT_TRUE.

◆ TEST_F() [68/85]

TEST_F ( StreamProcessorTest ,
StopStreamNonExistentReturnsFalse  )

Definition at line 1223 of file test_async_operations.cpp.

1223 {
1224 EXPECT_FALSE(processor_->stop_stream("no_such_channel"));
1225}

◆ TEST_F() [69/85]

TEST_F ( StreamProcessorTest ,
StopStreamReturnsTrue  )

Definition at line 1217 of file test_async_operations.cpp.

1217 {
1218 processor_->start_stream(
1219 stream_processor::stream_type::custom, "ch1");
1220 EXPECT_TRUE(processor_->stop_stream("ch1"));
1221}

◆ TEST_F() [70/85]

TEST_F ( StreamProcessorTest ,
TemplateEventFilterWithLambda  )

Definition at line 1479 of file test_async_operations.cpp.

1479 {
1480 std::atomic<bool> handler_called{false};
1481
1482 auto filter = [](const stream_processor::stream_event& event) {
1483 return event.payload == "stream_connected";
1484 };
1485 processor_->add_event_filter("ch1", filter);
1486
1487 processor_->register_event_handler("ch1",
1488 [&](const stream_processor::stream_event&) {
1489 handler_called.store(true);
1490 });
1491
1492 processor_->start_stream(
1493 stream_processor::stream_type::custom, "ch1");
1494
1495 ASSERT_TRUE(wait_for_flag(handler_called));
1496 processor_->stop_all_streams();
1497}

References ASSERT_TRUE.

◆ TEST_F() [71/85]

TEST_F ( StreamProcessorTest ,
TemplateEventHandlerWithLambda  )

Definition at line 1448 of file test_async_operations.cpp.

1448 {
1449 std::atomic<bool> received{false};
1450
1451 // Uses the concept-constrained template overload
1452 auto handler = [&](const stream_processor::stream_event&) {
1453 received.store(true);
1454 };
1455 processor_->register_event_handler("ch1", handler);
1456
1457 processor_->start_stream(
1458 stream_processor::stream_type::custom, "ch1");
1459
1460 ASSERT_TRUE(wait_for_flag(received));
1461 processor_->stop_all_streams();
1462}

References ASSERT_TRUE.

◆ TEST_F() [72/85]

TEST_F ( StreamProcessorTest ,
TemplateGlobalHandlerWithLambda  )

Definition at line 1464 of file test_async_operations.cpp.

1464 {
1465 std::atomic<int> count{0};
1466
1467 auto handler = [&](const stream_processor::stream_event&) {
1468 count.fetch_add(1);
1469 };
1470 processor_->register_global_handler(handler);
1471
1472 processor_->start_stream(
1473 stream_processor::stream_type::custom, "ch1");
1474
1475 ASSERT_TRUE(wait_for_count(count, 1));
1476 processor_->stop_all_streams();
1477}

References ASSERT_TRUE.

◆ TEST_F() [73/85]

TEST_F ( TransactionCoordinatorTest ,
BeginCreatesTransaction  )

Definition at line 812 of file test_async_operations.cpp.

812 {
813 auto txn_id = coord_.begin_distributed_transaction(participants_);
814 EXPECT_FALSE(txn_id.empty());
815
816 auto active = coord_.get_active_transactions();
817 ASSERT_EQ(active.size(), 1u);
818 EXPECT_EQ(active[0].transaction_id, txn_id);
819 EXPECT_EQ(active[0].participants.size(), 3u);
820 EXPECT_EQ(active[0].state, transaction_coordinator::transaction_state::active);
821}

References ASSERT_EQ.

◆ TEST_F() [74/85]

TEST_F ( TransactionCoordinatorTest ,
BeginGeneratesUniqueIds  )

Definition at line 823 of file test_async_operations.cpp.

823 {
824 auto id1 = coord_.begin_distributed_transaction(participants_);
825 auto id2 = coord_.begin_distributed_transaction(participants_);
826 EXPECT_NE(id1, id2);
827 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
828}

◆ TEST_F() [75/85]

TEST_F ( TransactionCoordinatorTest ,
CommitDistributedTransactionFailsOnPrepare  )

Definition at line 904 of file test_async_operations.cpp.

904 {
905 p3_->set_fail_begin(true);
906 auto txn_id = coord_.begin_distributed_transaction(participants_);
907 bool result = coord_.commit_distributed_transaction(txn_id).get();
908 EXPECT_FALSE(result);
909
910 // p1, p2 prepared then rolled back
911 EXPECT_EQ(p1_->rollback_count(), 1);
912 EXPECT_EQ(p2_->rollback_count(), 1);
913 EXPECT_EQ(p3_->commit_count(), 0);
914}

◆ TEST_F() [76/85]

TEST_F ( TransactionCoordinatorTest ,
CommitDistributedTransactionFull2PC  )

Definition at line 893 of file test_async_operations.cpp.

893 {
894 auto txn_id = coord_.begin_distributed_transaction(participants_);
895 bool result = coord_.commit_distributed_transaction(txn_id).get();
896 EXPECT_TRUE(result);
897
898 EXPECT_EQ(p1_->begin_count(), 1);
899 EXPECT_EQ(p1_->commit_count(), 1);
900 EXPECT_EQ(p2_->begin_count(), 1);
901 EXPECT_EQ(p2_->commit_count(), 1);
902}

◆ TEST_F() [77/85]

TEST_F ( TransactionCoordinatorTest ,
CommitPhaseFailsIfNotPrepared  )

Definition at line 883 of file test_async_operations.cpp.

883 {
884 auto txn_id = coord_.begin_distributed_transaction(participants_);
885 // Skip prepare_phase — state is still "active"
886 bool result = coord_.commit_phase(txn_id).get();
887 EXPECT_FALSE(result);
888 EXPECT_EQ(p1_->commit_count(), 0);
889}

◆ TEST_F() [78/85]

TEST_F ( TransactionCoordinatorTest ,
CommitPhaseSuccess  )

Definition at line 868 of file test_async_operations.cpp.

868 {
869 auto txn_id = coord_.begin_distributed_transaction(participants_);
870 coord_.prepare_phase(txn_id).get();
871
872 bool result = coord_.commit_phase(txn_id).get();
873 EXPECT_TRUE(result);
874
875 EXPECT_EQ(p1_->commit_count(), 1);
876 EXPECT_EQ(p2_->commit_count(), 1);
877 EXPECT_EQ(p3_->commit_count(), 1);
878
879 auto txns = coord_.get_active_transactions();
880 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::committed);
881}

◆ TEST_F() [79/85]

TEST_F ( TransactionCoordinatorTest ,
CreateSagaReturnsSagaBuilder  )

Definition at line 957 of file test_async_operations.cpp.

957 {
958 auto saga = coord_.create_saga();
959 // Verify the saga builder can execute (empty saga succeeds)
960 bool result = saga.execute().get();
961 EXPECT_TRUE(result);
962}

◆ TEST_F() [80/85]

TEST_F ( TransactionCoordinatorTest ,
PreparePhaseRollsBackOnPartialFailure  )

Definition at line 849 of file test_async_operations.cpp.

849 {
850 p2_->set_fail_begin(true);
851 auto txn_id = coord_.begin_distributed_transaction(participants_);
852 bool result = coord_.prepare_phase(txn_id).get();
853 EXPECT_FALSE(result);
854
855 // p1 was prepared then rolled back; p2 failed; p3 never reached
856 EXPECT_EQ(p1_->begin_count(), 1);
857 EXPECT_EQ(p1_->rollback_count(), 1);
858 EXPECT_EQ(p2_->begin_count(), 0);
859 EXPECT_EQ(p3_->begin_count(), 0);
860
861 auto txns = coord_.get_active_transactions();
862 ASSERT_EQ(txns.size(), 1u);
863 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
864}

References ASSERT_EQ.

◆ TEST_F() [81/85]

TEST_F ( TransactionCoordinatorTest ,
PreparePhaseSuccess  )

Definition at line 832 of file test_async_operations.cpp.

832 {
833 auto txn_id = coord_.begin_distributed_transaction(participants_);
834 bool result = coord_.prepare_phase(txn_id).get();
835 EXPECT_TRUE(result);
836
837 auto txns = coord_.get_active_transactions();
838 ASSERT_EQ(txns.size(), 1u);
839 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::prepared);
840 EXPECT_EQ(p1_->begin_count(), 1);
841 EXPECT_EQ(p2_->begin_count(), 1);
842 EXPECT_EQ(p3_->begin_count(), 1);
843}

References ASSERT_EQ.

◆ TEST_F() [82/85]

TEST_F ( TransactionCoordinatorTest ,
PreparePhaseThrowsForUnknownTransaction  )

Definition at line 845 of file test_async_operations.cpp.

845 {
846 EXPECT_THROW(coord_.prepare_phase("nonexistent").get(), std::exception);
847}

◆ TEST_F() [83/85]

TEST_F ( TransactionCoordinatorTest ,
RecoverCleansUpCompletedTransactions  )

Definition at line 941 of file test_async_operations.cpp.

941 {
942 auto id1 = coord_.begin_distributed_transaction(participants_);
943 auto id2 = coord_.begin_distributed_transaction(participants_);
944 coord_.commit_distributed_transaction(id1).get();
945
946 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
947 coord_.recover_transactions();
948
949 // id1 is committed → cleaned up; id2 is still active
950 auto remaining = coord_.get_active_transactions();
951 ASSERT_EQ(remaining.size(), 1u);
952 EXPECT_EQ(remaining[0].transaction_id, id2);
953}

References ASSERT_EQ.

◆ TEST_F() [84/85]

TEST_F ( TransactionCoordinatorTest ,
RollbackDistributedTransaction  )

Definition at line 918 of file test_async_operations.cpp.

918 {
919 auto txn_id = coord_.begin_distributed_transaction(participants_);
920 coord_.prepare_phase(txn_id).get();
921
922 bool result = coord_.rollback_distributed_transaction(txn_id).get();
923 EXPECT_TRUE(result);
924
925 EXPECT_EQ(p1_->rollback_count(), 1);
926 EXPECT_EQ(p2_->rollback_count(), 1);
927 EXPECT_EQ(p3_->rollback_count(), 1);
928
929 auto txns = coord_.get_active_transactions();
930 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
931}

◆ TEST_F() [85/85]

TEST_F ( TransactionCoordinatorTest ,
RollbackThrowsForUnknownTransaction  )

Definition at line 933 of file test_async_operations.cpp.

933 {
934 EXPECT_THROW(
935 coord_.rollback_distributed_transaction("nonexistent").get(),
936 std::exception);
937}