18#ifdef USE_THREAD_SYSTEM
19#undef USE_THREAD_SYSTEM
22#include <gtest/gtest.h>
44 p.set_value(std::move(value));
53 std::make_exception_ptr(std::runtime_error(
"test_error")));
62 auto future = p.get_future();
72 auto result = make_ready<int>(99);
73 EXPECT_EQ(result.get(), 99);
77 auto result = make_ready<std::string>(
"hello");
78 EXPECT_EQ(result.get(),
"hello");
82 auto result = make_failing<int>();
83 EXPECT_THROW(result.get(), std::runtime_error);
87 auto result = make_ready<int>(42);
90 result.then([&captured](
int v) { captured = v; });
92 EXPECT_EQ(result.get(), 42);
93 EXPECT_EQ(captured, 42);
97 auto result = make_failing<int>();
99 std::string captured_msg;
100 result.on_error([&captured_msg](
const std::exception& e) {
101 captured_msg = e.what();
104 EXPECT_THROW(result.get(), std::runtime_error);
105 EXPECT_EQ(captured_msg,
"test_error");
111 auto result = make_ready<int>(77);
112 EXPECT_EQ(result.get_for(std::chrono::milliseconds(100)), 77);
120 result.
get_for(std::chrono::milliseconds(10)),
136 auto result = make_ready<int>(1);
137 EXPECT_TRUE(result.is_ready());
143 auto result = make_ready<int>(1);
144 auto status = result.wait_for(std::chrono::milliseconds(0));
145 EXPECT_EQ(status, std::future_status::ready);
152 auto status = result.
wait_for(std::chrono::milliseconds(1));
153 EXPECT_EQ(status, std::future_status::timeout);
161 auto result = make_ready<int>(10);
164 result.then([&captured](
int v) { captured = v; });
167 EXPECT_EQ(captured, 10);
171 auto result = make_failing<int>();
173 bool error_caught =
false;
174 result.on_error([&error_caught](
const std::exception&) {
178 EXPECT_THROW(result.get(), std::runtime_error);
179 EXPECT_TRUE(error_caught);
185 auto result = make_ready<int>(5);
188 std::function<void(
int)> cb = [&captured](
int v) { captured = v; };
192 EXPECT_EQ(captured, 5);
196 auto result = make_failing<int>();
199 std::function<void(
const std::exception&)> cb =
200 [&msg](
const std::exception& e) { msg = e.what(); };
203 EXPECT_THROW(result.get(), std::runtime_error);
204 EXPECT_EQ(msg,
"test_error");
210 auto result = make_ready<int>(42);
211 EXPECT_NO_THROW(result.get());
215 auto result = make_failing<int>();
216 EXPECT_THROW(result.get(), std::runtime_error);
225 std::thread setter([&p]() {
226 std::this_thread::sleep_for(std::chrono::milliseconds(20));
230 EXPECT_EQ(result.
get(), 123);
241 executor_ = std::make_unique<async_executor>(2);
256 EXPECT_EQ(executor_->thread_count(), 2u);
260 auto exec = std::make_unique<async_executor>();
261 EXPECT_EQ(exec->thread_count(), std::thread::hardware_concurrency());
266 EXPECT_FALSE(executor_->is_using_thread_system());
272 auto future = executor_->submit([]() {
return 42; });
273 EXPECT_EQ(future.get(), 42);
277 auto future = executor_->submit([](
int a,
int b) {
return a + b; }, 3, 7);
278 EXPECT_EQ(future.get(), 10);
282 auto future = executor_->submit([]() -> std::string {
283 return "async_result";
285 EXPECT_EQ(future.get(),
"async_result");
289 auto future = executor_->submit([]() ->
int {
290 throw std::runtime_error(
"task_failed");
292 EXPECT_THROW(future.get(), std::runtime_error);
298 constexpr int NUM_TASKS = 100;
299 std::vector<std::future<int>> futures;
300 futures.reserve(NUM_TASKS);
302 for (
int i = 0; i < NUM_TASKS; ++i) {
303 futures.push_back(executor_->submit([i]() { return i * 2; }));
306 for (
int i = 0; i < NUM_TASKS; ++i) {
307 EXPECT_EQ(futures[i].get(), i * 2);
314 std::atomic<int> counter{0};
316 for (
int i = 0; i < 10; ++i) {
317 executor_->submit([&counter]() {
318 std::this_thread::sleep_for(std::chrono::milliseconds(5));
319 counter.fetch_add(1);
323 executor_->wait_for_completion();
326 std::this_thread::sleep_for(std::chrono::milliseconds(50));
327 EXPECT_EQ(counter.load(), 10);
333 auto future = executor_->submit([]() {
return 1; });
334 executor_->shutdown();
335 EXPECT_EQ(future.get(), 1);
339 executor_->shutdown();
341 executor_->submit([]() { return 0; }),
357TEST(AsyncHelpersTest, MakeReadyResultIsImmediatelyAvailable) {
359 EXPECT_TRUE(result.is_ready());
360 EXPECT_EQ(result.get(), 42);
363TEST(AsyncHelpersTest, MakeReadyResultWithString) {
365 EXPECT_TRUE(result.is_ready());
366 EXPECT_EQ(result.get(),
"hello");
369TEST(AsyncHelpersTest, MakeErrorResultThrowsOnGet) {
374 EXPECT_TRUE(result.is_ready());
375 EXPECT_THROW(result.get(), std::exception);
378TEST(AsyncHelpersTest, MakeErrorResultInvokesOnErrorCallback) {
381 bool error_caught =
false;
382 result.on_error([&error_caught](
const std::exception&) {
386 EXPECT_THROW(result.get(), std::exception);
387 EXPECT_TRUE(error_caught);
404 return ::database::database_types::sqlite;
407 kcenon::common::VoidResult initialize(
408 const ::database::core::connection_config&)
override
410 std::lock_guard<std::mutex> lock(mutex_);
412 return kcenon::common::ok();
415 kcenon::common::VoidResult shutdown()
override {
416 std::lock_guard<std::mutex> lock(mutex_);
417 initialized_ =
false;
418 return kcenon::common::ok();
421 bool is_initialized()
const override {
422 std::lock_guard<std::mutex> lock(mutex_);
426 kcenon::common::Result<::database::core::database_result> select_query(
427 const std::string&)
override
431 row[
"id"] = int64_t{1};
432 row[
"name"] = std::string(
"stub_row");
434 return kcenon::common::Result<::database::core::database_result>(
438 kcenon::common::VoidResult execute_query(
439 const std::string& query)
override
441 std::lock_guard<std::mutex> lock(mutex_);
442 if (should_fail_execute_) {
443 return kcenon::common::error_info{1,
"execute_failed",
"stub"};
445 last_executed_query_ = query;
446 return kcenon::common::ok();
449 kcenon::common::VoidResult begin_transaction()
override {
450 std::lock_guard<std::mutex> lock(mutex_);
451 in_transaction_ =
true;
452 return kcenon::common::ok();
455 kcenon::common::VoidResult commit_transaction()
override {
456 std::lock_guard<std::mutex> lock(mutex_);
457 in_transaction_ =
false;
458 return kcenon::common::ok();
461 kcenon::common::VoidResult rollback_transaction()
override {
462 std::lock_guard<std::mutex> lock(mutex_);
463 in_transaction_ =
false;
464 return kcenon::common::ok();
467 bool in_transaction()
const override {
468 std::lock_guard<std::mutex> lock(mutex_);
469 return in_transaction_;
471 std::string last_error()
const override {
return ""; }
473 std::map<std::string, std::string> connection_info()
const override {
474 return {{
"type",
"stub"}};
478 void set_fail_execute(
bool fail) {
479 std::lock_guard<std::mutex> lock(mutex_);
480 should_fail_execute_ = fail;
482 std::string get_last_query()
const {
483 std::lock_guard<std::mutex> lock(mutex_);
484 return last_executed_query_;
488 mutable std::mutex mutex_;
489 bool initialized_ =
false;
490 bool in_transaction_ =
false;
491 bool should_fail_execute_ =
false;
492 std::string last_executed_query_;
500 backend_ = std::make_shared<async_stub_backend>();
501 executor_ = std::make_shared<async_executor>(2);
514 std::unique_ptr<async_database>
db_;
520 EXPECT_NE(db_,
nullptr);
526 auto result = db_->execute_async(
"CREATE TABLE t (id INT)");
527 EXPECT_TRUE(result.get());
531 auto result = db_->execute_async(
"DROP TABLE IF EXISTS t");
533 EXPECT_EQ(backend_->get_last_query(),
"DROP TABLE IF EXISTS t");
537 backend_->set_fail_execute(
true);
538 auto result = db_->execute_async(
"BAD QUERY");
539 EXPECT_THROW(result.get(), std::runtime_error);
545 auto result = db_->select_async(
"SELECT * FROM t");
546 auto rows = result.get();
548 EXPECT_EQ(std::get<std::string>(rows[0].at(
"name")),
"stub_row");
554 std::vector<std::string> queries = {
555 "INSERT INTO t VALUES (1)",
556 "INSERT INTO t VALUES (2)",
557 "INSERT INTO t VALUES (3)"
559 auto result = db_->execute_batch_async(queries);
560 auto results = result.get();
563 EXPECT_TRUE(results[0]);
564 EXPECT_TRUE(results[1]);
565 EXPECT_TRUE(results[2]);
569 backend_->set_fail_execute(
true);
570 std::vector<std::string> queries = {
"Q1",
"Q2"};
571 auto result = db_->execute_batch_async(queries);
572 auto results = result.get();
575 EXPECT_FALSE(results[0]);
576 EXPECT_FALSE(results[1]);
582 std::vector<std::string> queries = {
586 auto result = db_->select_batch_async(queries);
587 auto results = result.get();
590 EXPECT_EQ(results[0].size(), 1u);
591 EXPECT_EQ(results[1].size(), 1u);
597 auto result = db_->begin_transaction_async();
598 EXPECT_TRUE(result.get());
599 EXPECT_TRUE(backend_->in_transaction());
603 db_->begin_transaction_async().get();
604 auto result = db_->commit_transaction_async();
605 EXPECT_TRUE(result.get());
606 EXPECT_FALSE(backend_->in_transaction());
610 db_->begin_transaction_async().get();
611 auto result = db_->rollback_transaction_async();
612 EXPECT_TRUE(result.get());
613 EXPECT_FALSE(backend_->in_transaction());
619 auto result = db_->connect_async(
"host=localhost dbname=test");
620 EXPECT_TRUE(result.get());
621 EXPECT_TRUE(backend_->is_initialized());
625 db_->connect_async(
"host=localhost").get();
626 auto result = db_->disconnect_async();
627 EXPECT_TRUE(result.get());
628 EXPECT_FALSE(backend_->is_initialized());
636 constexpr int NUM_OPS = 20;
637 std::atomic<int> success_count{0};
640 std::vector<std::future<bool>> futures;
641 futures.reserve(NUM_OPS);
642 for (
int i = 0; i < NUM_OPS; ++i) {
643 futures.push_back(executor_->submit([
this, i]() ->
bool {
644 auto result = backend_->execute_query(
645 "INSERT INTO t VALUES (" + std::to_string(i) +
")");
646 return result.is_ok();
650 for (
auto& f : futures) {
652 success_count.fetch_add(1);
655 EXPECT_EQ(success_count.load(), NUM_OPS);
661 auto result = db_->execute_async(
"CREATE TABLE t2 (id INT)");
663 bool callback_invoked =
false;
664 result.then([&callback_invoked](
bool success) {
665 callback_invoked =
true;
666 EXPECT_TRUE(success);
670 EXPECT_TRUE(callback_invoked);
674 backend_->set_fail_execute(
true);
675 auto result = db_->execute_async(
"BAD");
677 std::string error_msg;
678 result.on_error([&error_msg](
const std::exception& e) {
679 error_msg = e.what();
682 EXPECT_THROW(result.get(), std::runtime_error);
683 EXPECT_EQ(error_msg,
"execute_failed");
697 return ::database::database_types::sqlite;
700 kcenon::common::VoidResult initialize(
701 const ::database::core::connection_config&)
override
703 return kcenon::common::ok();
706 kcenon::common::VoidResult shutdown()
override {
707 return kcenon::common::ok();
710 bool is_initialized()
const override {
return true; }
712 kcenon::common::Result<::database::core::database_result> select_query(
713 const std::string&)
override
715 return kcenon::common::Result<::database::core::database_result>(
719 kcenon::common::VoidResult execute_query(
const std::string&)
override {
720 return kcenon::common::ok();
723 kcenon::common::VoidResult begin_transaction()
override {
724 std::lock_guard<std::mutex> lock(mutex_);
725 if (should_fail_begin_) {
726 return kcenon::common::error_info{1,
"begin_failed",
"stub"};
730 return kcenon::common::ok();
733 kcenon::common::VoidResult commit_transaction()
override {
734 std::lock_guard<std::mutex> lock(mutex_);
735 if (should_fail_commit_) {
736 return kcenon::common::error_info{1,
"commit_failed",
"stub"};
740 return kcenon::common::ok();
743 kcenon::common::VoidResult rollback_transaction()
override {
744 std::lock_guard<std::mutex> lock(mutex_);
747 return kcenon::common::ok();
750 bool in_transaction()
const override {
751 std::lock_guard<std::mutex> lock(mutex_);
755 std::string last_error()
const override {
return ""; }
756 std::map<std::string, std::string> connection_info()
const override {
757 return {{
"type",
"txn_stub"}};
761 void set_fail_begin(
bool fail) {
762 std::lock_guard<std::mutex> lock(mutex_);
763 should_fail_begin_ = fail;
765 void set_fail_commit(
bool fail) {
766 std::lock_guard<std::mutex> lock(mutex_);
767 should_fail_commit_ = fail;
769 int begin_count()
const {
770 std::lock_guard<std::mutex> lock(mutex_);
773 int commit_count()
const {
774 std::lock_guard<std::mutex> lock(mutex_);
775 return commit_count_;
777 int rollback_count()
const {
778 std::lock_guard<std::mutex> lock(mutex_);
779 return rollback_count_;
783 mutable std::mutex mutex_;
784 bool in_txn_ =
false;
785 bool should_fail_begin_ =
false;
786 bool should_fail_commit_ =
false;
787 int begin_count_ = 0;
788 int commit_count_ = 0;
789 int rollback_count_ = 0;
797 p1_ = std::make_shared<txn_stub_backend>();
798 p2_ = std::make_shared<txn_stub_backend>();
799 p3_ = std::make_shared<txn_stub_backend>();
804 std::shared_ptr<txn_stub_backend>
p1_;
805 std::shared_ptr<txn_stub_backend>
p2_;
806 std::shared_ptr<txn_stub_backend>
p3_;
807 std::vector<std::shared_ptr<::database::core::database_backend>>
participants_;
813 auto txn_id = coord_.begin_distributed_transaction(participants_);
814 EXPECT_FALSE(txn_id.empty());
816 auto active = coord_.get_active_transactions();
818 EXPECT_EQ(active[0].transaction_id, txn_id);
819 EXPECT_EQ(active[0].participants.size(), 3u);
820 EXPECT_EQ(active[0].state, transaction_coordinator::transaction_state::active);
824 auto id1 = coord_.begin_distributed_transaction(participants_);
825 auto id2 = coord_.begin_distributed_transaction(participants_);
827 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
833 auto txn_id = coord_.begin_distributed_transaction(participants_);
834 bool result = coord_.prepare_phase(txn_id).get();
837 auto txns = coord_.get_active_transactions();
839 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::prepared);
840 EXPECT_EQ(p1_->begin_count(), 1);
841 EXPECT_EQ(p2_->begin_count(), 1);
842 EXPECT_EQ(p3_->begin_count(), 1);
846 EXPECT_THROW(coord_.prepare_phase(
"nonexistent").get(), std::exception);
850 p2_->set_fail_begin(
true);
851 auto txn_id = coord_.begin_distributed_transaction(participants_);
852 bool result = coord_.prepare_phase(txn_id).get();
853 EXPECT_FALSE(result);
856 EXPECT_EQ(p1_->begin_count(), 1);
857 EXPECT_EQ(p1_->rollback_count(), 1);
858 EXPECT_EQ(p2_->begin_count(), 0);
859 EXPECT_EQ(p3_->begin_count(), 0);
861 auto txns = coord_.get_active_transactions();
863 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
869 auto txn_id = coord_.begin_distributed_transaction(participants_);
870 coord_.prepare_phase(txn_id).get();
872 bool result = coord_.commit_phase(txn_id).get();
875 EXPECT_EQ(p1_->commit_count(), 1);
876 EXPECT_EQ(p2_->commit_count(), 1);
877 EXPECT_EQ(p3_->commit_count(), 1);
879 auto txns = coord_.get_active_transactions();
880 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::committed);
884 auto txn_id = coord_.begin_distributed_transaction(participants_);
886 bool result = coord_.commit_phase(txn_id).get();
887 EXPECT_FALSE(result);
888 EXPECT_EQ(p1_->commit_count(), 0);
894 auto txn_id = coord_.begin_distributed_transaction(participants_);
895 bool result = coord_.commit_distributed_transaction(txn_id).get();
898 EXPECT_EQ(p1_->begin_count(), 1);
899 EXPECT_EQ(p1_->commit_count(), 1);
900 EXPECT_EQ(p2_->begin_count(), 1);
901 EXPECT_EQ(p2_->commit_count(), 1);
905 p3_->set_fail_begin(
true);
906 auto txn_id = coord_.begin_distributed_transaction(participants_);
907 bool result = coord_.commit_distributed_transaction(txn_id).get();
908 EXPECT_FALSE(result);
911 EXPECT_EQ(p1_->rollback_count(), 1);
912 EXPECT_EQ(p2_->rollback_count(), 1);
913 EXPECT_EQ(p3_->commit_count(), 0);
919 auto txn_id = coord_.begin_distributed_transaction(participants_);
920 coord_.prepare_phase(txn_id).get();
922 bool result = coord_.rollback_distributed_transaction(txn_id).get();
925 EXPECT_EQ(p1_->rollback_count(), 1);
926 EXPECT_EQ(p2_->rollback_count(), 1);
927 EXPECT_EQ(p3_->rollback_count(), 1);
929 auto txns = coord_.get_active_transactions();
930 EXPECT_EQ(txns[0].state, transaction_coordinator::transaction_state::aborted);
935 coord_.rollback_distributed_transaction(
"nonexistent").get(),
942 auto id1 = coord_.begin_distributed_transaction(participants_);
943 auto id2 = coord_.begin_distributed_transaction(participants_);
944 coord_.commit_distributed_transaction(id1).get();
946 EXPECT_EQ(coord_.get_active_transactions().size(), 2u);
947 coord_.recover_transactions();
950 auto remaining = coord_.get_active_transactions();
952 EXPECT_EQ(remaining[0].transaction_id, id2);
958 auto saga = coord_.create_saga();
960 bool result = saga.execute().get();
975 bool result = saga.
execute().get();
980 std::vector<int> execution_order;
982 auto saga = coord_.create_saga();
985 execution_order.push_back(1);
989 execution_order.push_back(-1);
994 execution_order.push_back(2);
998 execution_order.push_back(-2);
1003 execution_order.push_back(3);
1007 execution_order.push_back(-3);
1012 bool result = saga.execute().get();
1013 EXPECT_TRUE(result);
1014 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3}));
1018 std::vector<int> execution_order;
1020 auto saga = coord_.create_saga();
1023 execution_order.push_back(1);
1027 execution_order.push_back(-1);
1032 execution_order.push_back(2);
1036 execution_order.push_back(-2);
1041 execution_order.push_back(3);
1045 execution_order.push_back(-3);
1050 bool result = saga.execute().get();
1051 EXPECT_FALSE(result);
1053 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1057 std::vector<int> execution_order;
1059 auto saga = coord_.create_saga();
1062 execution_order.push_back(1);
1066 execution_order.push_back(-1);
1071 execution_order.push_back(2);
1072 throw std::runtime_error(
"step 2 exploded");
1076 execution_order.push_back(-2);
1081 bool result = saga.execute().get();
1082 EXPECT_FALSE(result);
1084 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, -1}));
1088 bool action_called =
false;
1089 auto saga = coord_.create_saga();
1092 action_called =
true;
1100 bool result = saga.execute().get();
1101 EXPECT_TRUE(result);
1102 EXPECT_TRUE(action_called);
1106 std::vector<int> execution_order;
1108 auto saga = coord_.create_saga();
1111 execution_order.push_back(1);
1115 execution_order.push_back(-1);
1120 execution_order.push_back(2);
1124 execution_order.push_back(-2);
1125 throw std::runtime_error(
"compensation 2 failed");
1130 execution_order.push_back(3);
1134 execution_order.push_back(-3);
1139 bool result = saga.execute().get();
1140 EXPECT_FALSE(result);
1142 EXPECT_EQ(execution_order, (std::vector<int>{1, 2, 3, -2, -1}));
1146 auto saga = coord_.create_saga();
1147 auto& ref = saga.add_step(
1151 EXPECT_EQ(&ref, &saga);
1161 backend_ = std::make_shared<async_stub_backend>();
1171 std::chrono::milliseconds timeout = std::chrono::milliseconds(500))
1173 auto deadline = std::chrono::steady_clock::now() + timeout;
1174 while (!flag.load() && std::chrono::steady_clock::now() < deadline) {
1175 std::this_thread::sleep_for(std::chrono::milliseconds(5));
1182 std::chrono::milliseconds timeout = std::chrono::milliseconds(500))
1184 auto deadline = std::chrono::steady_clock::now() + timeout;
1185 while (counter.load() < target && std::chrono::steady_clock::now() < deadline) {
1186 std::this_thread::sleep_for(std::chrono::milliseconds(5));
1188 return counter.load() >= target;
1198 EXPECT_NE(processor_,
nullptr);
1204 EXPECT_TRUE(processor_->start_stream(
1205 stream_processor::stream_type::custom,
"test_channel"));
1206 processor_->stop_all_streams();
1210 EXPECT_TRUE(processor_->start_stream(
1211 stream_processor::stream_type::custom,
"ch1"));
1212 EXPECT_FALSE(processor_->start_stream(
1213 stream_processor::stream_type::custom,
"ch1"));
1214 processor_->stop_all_streams();
1218 processor_->start_stream(
1219 stream_processor::stream_type::custom,
"ch1");
1220 EXPECT_TRUE(processor_->stop_stream(
"ch1"));
1224 EXPECT_FALSE(processor_->stop_stream(
"no_such_channel"));
1228 processor_->start_stream(
1229 stream_processor::stream_type::custom,
"ch1");
1230 processor_->start_stream(
1231 stream_processor::stream_type::redis_pubsub,
"ch2");
1232 processor_->start_stream(
1233 stream_processor::stream_type::postgresql_notify,
"ch3");
1236 processor_->stop_all_streams();
1239 EXPECT_FALSE(processor_->stop_stream(
"ch1"));
1240 EXPECT_FALSE(processor_->stop_stream(
"ch2"));
1241 EXPECT_FALSE(processor_->stop_stream(
"ch3"));
1247 std::atomic<bool> received{
false};
1248 std::string captured_payload;
1249 std::mutex capture_mutex;
1251 processor_->register_event_handler(
"ch1",
1253 std::lock_guard<std::mutex> lock(capture_mutex);
1254 captured_payload =
event.payload;
1255 received.store(
true);
1258 processor_->start_stream(
1259 stream_processor::stream_type::custom,
"ch1");
1263 std::lock_guard<std::mutex> lock(capture_mutex);
1264 EXPECT_EQ(captured_payload,
"stream_connected");
1266 processor_->stop_all_streams();
1270 std::atomic<bool> received{
false};
1271 std::string captured_channel;
1272 std::mutex capture_mutex;
1274 processor_->register_event_handler(
"notifications",
1276 std::lock_guard<std::mutex> lock(capture_mutex);
1277 captured_channel =
event.channel;
1278 received.store(
true);
1281 processor_->start_stream(
1282 stream_processor::stream_type::postgresql_notify,
"notifications");
1286 std::lock_guard<std::mutex> lock(capture_mutex);
1287 EXPECT_EQ(captured_channel,
"notifications");
1289 processor_->stop_all_streams();
1293 std::atomic<bool> received{
false};
1296 processor_->register_event_handler(
"ch1",
1298 captured_type =
event.type;
1299 received.store(
true);
1302 processor_->start_stream(
1303 stream_processor::stream_type::mongodb_change_stream,
"ch1");
1306 EXPECT_EQ(captured_type,
1307 stream_processor::stream_type::mongodb_change_stream);
1308 processor_->stop_all_streams();
1314 std::atomic<int> event_count{0};
1316 processor_->register_global_handler(
1318 event_count.fetch_add(1);
1321 processor_->start_stream(
1322 stream_processor::stream_type::custom,
"ch1");
1323 processor_->start_stream(
1324 stream_processor::stream_type::custom,
"ch2");
1327 EXPECT_GE(event_count.load(), 2);
1328 processor_->stop_all_streams();
1334 std::atomic<bool> handler_called{
false};
1336 processor_->add_event_filter(
"ch1",
1341 processor_->register_event_handler(
"ch1",
1343 handler_called.store(
true);
1346 processor_->start_stream(
1347 stream_processor::stream_type::custom,
"ch1");
1350 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1351 EXPECT_FALSE(handler_called.load());
1352 processor_->stop_all_streams();
1356 std::atomic<bool> handler_called{
false};
1358 processor_->add_event_filter(
"ch1",
1360 return event.payload ==
"stream_connected";
1363 processor_->register_event_handler(
"ch1",
1365 handler_called.store(
true);
1368 processor_->start_stream(
1369 stream_processor::stream_type::custom,
"ch1");
1372 processor_->stop_all_streams();
1376 std::atomic<bool> global_called{
false};
1378 processor_->add_event_filter(
"ch1",
1383 processor_->register_global_handler(
1385 global_called.store(
true);
1388 processor_->start_stream(
1389 stream_processor::stream_type::custom,
"ch1");
1392 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1393 EXPECT_FALSE(global_called.load());
1394 processor_->stop_all_streams();
1400 std::atomic<int> ch1_count{0};
1401 std::atomic<int> ch2_count{0};
1403 processor_->register_event_handler(
"ch1",
1405 ch1_count.fetch_add(1);
1407 processor_->register_event_handler(
"ch2",
1409 ch2_count.fetch_add(1);
1412 processor_->start_stream(
1413 stream_processor::stream_type::custom,
"ch1");
1414 processor_->start_stream(
1415 stream_processor::stream_type::custom,
"ch2");
1422 EXPECT_TRUE(processor_->stop_stream(
"ch1"));
1425 EXPECT_FALSE(processor_->stop_stream(
"ch1"));
1428 EXPECT_TRUE(processor_->stop_stream(
"ch2"));
1434 auto local_backend = std::make_shared<async_stub_backend>();
1438 stream_processor::stream_type::custom,
"ch1");
1440 stream_processor::stream_type::custom,
"ch2");
1449 std::atomic<bool> received{
false};
1453 received.store(
true);
1455 processor_->register_event_handler(
"ch1", handler);
1457 processor_->start_stream(
1458 stream_processor::stream_type::custom,
"ch1");
1461 processor_->stop_all_streams();
1465 std::atomic<int> count{0};
1470 processor_->register_global_handler(handler);
1472 processor_->start_stream(
1473 stream_processor::stream_type::custom,
"ch1");
1476 processor_->stop_all_streams();
1480 std::atomic<bool> handler_called{
false};
1483 return event.payload ==
"stream_connected";
1485 processor_->add_event_filter(
"ch1", filter);
1487 processor_->register_event_handler(
"ch1",
1489 handler_called.store(
true);
1492 processor_->start_stream(
1493 stream_processor::stream_type::custom,
"ch1");
1496 processor_->stop_all_streams();
std::unique_ptr< async_database > db_
std::shared_ptr< async_executor > executor_
std::shared_ptr< async_stub_backend > backend_
std::unique_ptr< async_executor > executor_
async_result< T > make_ready(T value)
async_result< T > make_failing()
transaction_coordinator coord_
bool wait_for_flag(const std::atomic< bool > &flag, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
std::unique_ptr< stream_processor > processor_
std::shared_ptr< async_stub_backend > backend_
bool wait_for_count(const std::atomic< int > &counter, int target, std::chrono::milliseconds timeout=std::chrono::milliseconds(500))
std::shared_ptr< txn_stub_backend > p1_
std::shared_ptr< txn_stub_backend > p2_
transaction_coordinator coord_
std::shared_ptr< txn_stub_backend > p3_
std::vector< std::shared_ptr<::database::core::database_backend > > participants_
High-performance asynchronous executor using thread_system.
size_t thread_count() const
Returns the number of worker threads.
void shutdown()
Gracefully shuts down the executor.
Template class for asynchronous operation results.
T get_for(std::chrono::milliseconds timeout)
std::future_status wait_for(std::chrono::milliseconds timeout) const
async_result< bool > execute()
Real-time data stream processing.
bool start_stream(stream_type type, const std::string &channel)
Distributed transaction coordination.
saga_builder create_saga()
Abstract base class for database backends.
Abstract interface for database backends.
async_result< T > make_ready_result(T value)
async_result< T > make_error_result(const std::exception &error)
std::vector< database_row > database_result
std::map< std::string, database_value > database_row
database_types
Represents various database backends or modes.
TEST_F(AsyncResultTest, ConstructsFromFuture)
#define ASSERT_EQ(expected, actual, message)
#define ASSERT_TRUE(condition, message)