Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
async_operations.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
7#include "../database_types.h"
9#include "../core/concepts.h"
11#include <future>
12#include <memory>
13#include <stdexcept>
14#ifdef HAS_COROUTINES
15#include <coroutine>
16#endif
17#include <functional>
18#include <thread>
19#include <queue>
20#include <condition_variable>
21#include <atomic>
22#include <chrono>
23#include <string>
24#include <exception>
25#include <vector>
26#include <unordered_map>
27
28#ifdef USE_THREAD_SYSTEM
29 #include <kcenon/thread/core/job.h>
30 #include <kcenon/thread/core/thread_worker.h>
31 #include <kcenon/thread/interfaces/thread_context.h>
32 #include <kcenon/thread/core/error_handling.h>
33#endif
34
35namespace database::async
36{
37#ifdef USE_THREAD_SYSTEM
45 class lambda_job : public kcenon::thread::job {
46 public:
47 explicit lambda_job(std::function<void()> func, const std::string& name = "lambda_job")
48 : job(name), func_(std::move(func)) {}
49
50 kcenon::common::VoidResult do_work() override {
51 try {
52 if (func_) {
53 func_();
54 }
55 return kcenon::common::ok();
56 } catch (const std::exception& e) {
57 return kcenon::common::error_info{
58 static_cast<int>(kcenon::thread::error_code::job_execution_failed),
59 std::string("Exception in lambda_job: ") + e.what(),
60 "async_executor"
61 };
62 } catch (...) {
63 return kcenon::common::error_info{
64 static_cast<int>(kcenon::thread::error_code::job_execution_failed),
65 "Unknown exception in lambda_job",
66 "async_executor"
67 };
68 }
69 }
70
71 private:
72 std::function<void()> func_;
73 };
74#endif
75
76 // Forward declarations
77 template<typename T> class async_result;
78 class async_executor;
79 class transaction_coordinator;
80 class saga_builder;
81
89 template<typename T>
91 {
92 public:
93 async_result(std::future<T> future);
94
95 // Blocking operations
96 T get();
97 T get_for(std::chrono::milliseconds timeout);
98
99 // Non-blocking operations
100 bool is_ready() const;
101 std::future_status wait_for(std::chrono::milliseconds timeout) const;
102
103 // Callback support - thread-safe
104 // Uses C++20 concepts for type safety
105 template<concepts::VoidCallable<T> Callback>
106 void then(Callback&& callback);
107
108 template<concepts::ErrorHandler Handler>
109 void on_error(Handler&& error_handler);
110
111 // Legacy overloads for backward compatibility
112 void then(std::function<void(T)> callback);
113 void on_error(std::function<void(const std::exception&)> error_handler);
114
115 private:
116 std::future<T> future_;
117 mutable std::mutex callback_mutex_;
118 std::function<void(T)> success_callback_;
119 std::function<void(const std::exception&)> error_callback_;
120 };
121
122 // ── async_result<T> template method implementations ──────────────────
123
124 template<typename T>
125 async_result<T>::async_result(std::future<T> future)
126 : future_(std::move(future))
127 {
128 }
129
130 template<typename T>
132 {
133 std::function<void(T)> on_success;
134 std::function<void(const std::exception&)> on_error_cb;
135 {
136 std::lock_guard<std::mutex> lock(callback_mutex_);
137 on_success = success_callback_;
138 on_error_cb = error_callback_;
139 }
140
141 try {
142 T value = future_.get();
143 if (on_success) {
144 on_success(value);
145 }
146 return value;
147 } catch (const std::exception& e) {
148 if (on_error_cb) {
149 on_error_cb(e);
150 }
151 throw;
152 }
153 }
154
155 template<typename T>
156 T async_result<T>::get_for(std::chrono::milliseconds timeout)
157 {
158 auto status = future_.wait_for(timeout);
159 if (status == std::future_status::timeout) {
160 throw std::runtime_error("async_result::get_for timed out");
161 }
162 return get();
163 }
164
165 template<typename T>
167 {
168 return future_.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready;
169 }
170
171 template<typename T>
172 std::future_status async_result<T>::wait_for(std::chrono::milliseconds timeout) const
173 {
174 return future_.wait_for(timeout);
175 }
176
177 template<typename T>
178 template<concepts::VoidCallable<T> Callback>
179 void async_result<T>::then(Callback&& callback)
180 {
181 std::lock_guard<std::mutex> lock(callback_mutex_);
182 success_callback_ = std::forward<Callback>(callback);
183 }
184
185 template<typename T>
186 template<concepts::ErrorHandler Handler>
187 void async_result<T>::on_error(Handler&& error_handler)
188 {
189 std::lock_guard<std::mutex> lock(callback_mutex_);
190 error_callback_ = std::forward<Handler>(error_handler);
191 }
192
193 template<typename T>
194 void async_result<T>::then(std::function<void(T)> callback)
195 {
196 std::lock_guard<std::mutex> lock(callback_mutex_);
197 success_callback_ = std::move(callback);
198 }
199
200 template<typename T>
201 void async_result<T>::on_error(std::function<void(const std::exception&)> error_handler)
202 {
203 std::lock_guard<std::mutex> lock(callback_mutex_);
204 error_callback_ = std::move(error_handler);
205 }
206
207 // ── end async_result<T> implementations ──────────────────────────────
208
209#ifdef HAS_COROUTINES
215 template<typename T>
216 class database_awaitable
217 {
218 public:
219 struct promise_type
220 {
221 T result_;
222 std::exception_ptr exception_;
223
224 database_awaitable get_return_object() {
225 return database_awaitable{std::coroutine_handle<promise_type>::from_promise(*this)};
226 }
227
228 std::suspend_never initial_suspend() { return {}; }
229 std::suspend_never final_suspend() noexcept { return {}; }
230
231 void return_value(T value) { result_ = std::move(value); }
232 void unhandled_exception() { exception_ = std::current_exception(); }
233 };
234
235 database_awaitable(std::coroutine_handle<promise_type> handle) : handle_(handle) {}
236 ~database_awaitable() { if (handle_) handle_.destroy(); }
237
238 database_awaitable(const database_awaitable&) = delete;
239 database_awaitable& operator=(const database_awaitable&) = delete;
240
241 database_awaitable(database_awaitable&& other) noexcept : handle_(other.handle_) {
242 other.handle_ = nullptr;
243 }
244
245 database_awaitable& operator=(database_awaitable&& other) noexcept {
246 if (this != &other) {
247 if (handle_) handle_.destroy();
248 handle_ = other.handle_;
249 other.handle_ = nullptr;
250 }
251 return *this;
252 }
253
254 bool await_ready() const { return handle_.done(); }
255 void await_suspend(std::coroutine_handle<> waiting_coroutine) {
256 // Resume waiting coroutine when this one completes
257 }
258
259 T await_resume() {
260 if (handle_.promise().exception_) {
261 std::rethrow_exception(handle_.promise().exception_);
262 }
263 return std::move(handle_.promise().result_);
264 }
265
266 private:
267 std::coroutine_handle<promise_type> handle_;
268 };
269#endif // HAS_COROUTINES
270
276 {
277 public:
278 async_database(std::shared_ptr<core::database_backend> db, std::shared_ptr<async_executor> executor);
279
280 // Asynchronous query operations
281 async_result<bool> execute_async(const std::string& query);
282 async_result<core::database_result> select_async(const std::string& query);
283
284#ifdef HAS_COROUTINES
285 // Coroutine support (C++20 only)
286 database_awaitable<bool> execute_coro(const std::string& query);
287 database_awaitable<core::database_result> select_coro(const std::string& query);
288#endif
289
290 // Batch operations
291 async_result<std::vector<bool>> execute_batch_async(const std::vector<std::string>& queries);
292 async_result<std::vector<core::database_result>> select_batch_async(const std::vector<std::string>& queries);
293
294 // Transaction support
298
299 // Connection management
300 async_result<bool> connect_async(const std::string& connection_string);
302
303 private:
304 std::shared_ptr<core::database_backend> db_;
305 std::shared_ptr<async_executor> executor_;
306 };
307
329 {
330 public:
336#ifdef USE_THREAD_SYSTEM
337 explicit async_executor(
338 size_t thread_count = std::thread::hardware_concurrency(),
339 const thread_context_type& context = thread_context_type())
340 : pool_(std::make_shared<thread_pool_type>("db_async_executor", context))
342 {
343 auto job_queue = pool_->get_job_queue();
344 for (size_t i = 0; i < thread_count_; ++i) {
345 auto worker = std::make_unique<kcenon::thread::thread_worker>(true, context);
346 worker->set_job_queue(job_queue);
347
348 auto add_result = pool_->enqueue(std::move(worker));
349 if (add_result.is_err()) {
350 throw std::runtime_error("Failed to add worker: " +
351 add_result.error().message);
352 }
353 }
354
355 auto result = pool_->start();
356 if (result.is_err()) {
357 throw std::runtime_error("Failed to start async executor: " +
358 result.error().message);
359 }
360 }
361#else
363 size_t thread_count = std::thread::hardware_concurrency(),
366 , stop_(false)
367 {
368 workers_.reserve(thread_count_);
369 for (size_t i = 0; i < thread_count_; ++i) {
370 workers_.emplace_back([this] { worker_thread(); });
371 }
372 }
373#endif
374
376 shutdown();
377 }
378
379 // Prevent copying and moving
384
393 template<typename F, typename... Args>
394 requires concepts::SubmittableTask<F, Args...>
395 auto submit(F&& func, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
396 {
397 using return_type = std::invoke_result_t<F, Args...>;
398
399#ifdef USE_THREAD_SYSTEM
400 auto task = std::make_shared<std::packaged_task<return_type()>>(
401 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
402 );
403
404 auto future = task->get_future();
405
406 auto job = std::make_unique<lambda_job>(
407 [task]() { (*task)(); },
408 "async_task"
409 );
410
411 auto result = pool_->enqueue(std::move(job));
412 if (result.is_err()) {
413 throw std::runtime_error("Failed to enqueue job: " +
414 result.error().message);
415 }
416
417 return future;
418#else
419 auto task = std::make_shared<std::packaged_task<return_type()>>(
420 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
421 );
422
423 auto future = task->get_future();
424
425 {
426 std::unique_lock<std::mutex> lock(queue_mutex_);
427 if (stop_) {
428 throw std::runtime_error("Cannot submit task to stopped executor");
429 }
430 tasks_.emplace([task]() { (*task)(); });
431 }
432
433 condition_.notify_one();
434 return future;
435#endif
436 }
437
441 void shutdown() {
442#ifdef USE_THREAD_SYSTEM
443 if (pool_) {
444 pool_->stop(false);
445 }
446#else
447 {
448 std::unique_lock<std::mutex> lock(queue_mutex_);
449 stop_ = true;
450 }
451 condition_.notify_all();
452
453 for (auto& worker : workers_) {
454 if (worker.joinable()) {
455 worker.join();
456 }
457 }
458 workers_.clear();
459#endif
460 }
461
466#ifdef USE_THREAD_SYSTEM
467 if (pool_) {
468 while (pool_->get_job_queue()->size() > 0) {
469 std::this_thread::sleep_for(std::chrono::milliseconds(10));
470 }
471 }
472#else
473 while (true) {
474 {
475 std::unique_lock<std::mutex> lock(queue_mutex_);
476 if (tasks_.empty()) {
477 break;
478 }
479 }
480 std::this_thread::sleep_for(std::chrono::milliseconds(10));
481 }
482#endif
483 }
484
488 size_t pending_tasks() const {
489#ifdef USE_THREAD_SYSTEM
490 if (pool_) {
491 return pool_->get_job_queue()->size();
492 }
493 return 0;
494#else
495 std::unique_lock<std::mutex> lock(queue_mutex_);
496 return tasks_.size();
497#endif
498 }
499
503 size_t thread_count() const {
504 return thread_count_;
505 }
506
510 constexpr bool is_using_thread_system() const {
511 return using_thread_system;
512 }
513
514 private:
515#ifdef USE_THREAD_SYSTEM
516 std::shared_ptr<thread_pool_type> pool_;
517 size_t thread_count_;
518#else
520 while (true) {
521 std::function<void()> task;
522
523 {
524 std::unique_lock<std::mutex> lock(queue_mutex_);
525 condition_.wait(lock, [this] {
526 return stop_ || !tasks_.empty();
527 });
528
529 if (stop_ && tasks_.empty()) {
530 return;
531 }
532
533 if (!tasks_.empty()) {
534 task = std::move(tasks_.front());
535 tasks_.pop();
536 }
537 }
538
539 if (task) {
540 task();
541 }
542 }
543 }
544
545 std::vector<std::thread> workers_;
546 std::queue<std::function<void()>> tasks_;
547 mutable std::mutex queue_mutex_;
548 std::condition_variable condition_;
549 std::atomic<bool> stop_;
551#endif
552 };
553
564 {
565 public:
572
575 std::string channel;
576 std::string payload;
577 std::chrono::system_clock::time_point timestamp;
578 std::unordered_map<std::string, std::string> metadata;
579 };
580
581 stream_processor(std::shared_ptr<core::database_backend> db);
583
584 // Stream management - thread-safe
585 bool start_stream(stream_type type, const std::string& channel);
586 bool stop_stream(const std::string& channel);
587 void stop_all_streams();
588
589 // Event handling - thread-safe with C++20 concepts
590 // Defined inline to avoid MSVC C2244 with out-of-class concept constraints
591 template<concepts::StreamEventHandler<stream_event> Handler>
592 void register_event_handler(const std::string& channel, Handler&& handler)
593 {
594 std::lock_guard<std::mutex> lock(handlers_mutex_);
595 event_handlers_[channel] = std::forward<Handler>(handler);
596 }
597
598 template<concepts::StreamEventHandler<stream_event> Handler>
599 void register_global_handler(Handler&& handler)
600 {
601 std::lock_guard<std::mutex> lock(handlers_mutex_);
602 global_handlers_.push_back(std::forward<Handler>(handler));
603 }
604
605 // Legacy overloads for backward compatibility
606 void register_event_handler(const std::string& channel,
607 std::function<void(const stream_event&)> handler);
608 void register_global_handler(std::function<void(const stream_event&)> handler);
609
610 // Filter support - thread-safe with C++20 concepts
611 // Defined inline to avoid MSVC C2244 with out-of-class concept constraints
612 template<concepts::StreamEventFilter<stream_event> Filter>
613 void add_event_filter(const std::string& channel, Filter&& filter)
614 {
615 std::lock_guard<std::mutex> lock(handlers_mutex_);
616 event_filters_[channel] = std::forward<Filter>(filter);
617 }
618
619 // Legacy overload for backward compatibility
620 void add_event_filter(const std::string& channel,
621 std::function<bool(const stream_event&)> filter);
622
623 private:
624 void stream_thread(const std::string& channel, stream_type type);
625 void process_event(const stream_event& event);
626
627 std::shared_ptr<core::database_backend> db_;
628 std::mutex threads_mutex_; // Protects stream_threads_
629 std::unordered_map<std::string, std::thread> stream_threads_;
630 std::mutex handlers_mutex_; // Protects all handler/filter containers
631 std::unordered_map<std::string, std::function<void(const stream_event&)>> event_handlers_;
632 std::vector<std::function<void(const stream_event&)>> global_handlers_;
633 std::unordered_map<std::string, std::function<bool(const stream_event&)>> event_filters_;
634 std::atomic<bool> running_{true};
635 };
636
653 {
654 public:
655 enum class transaction_state {
656 active,
657 preparing,
658 prepared,
660 committed,
661 aborting,
662 aborted
663 };
664
666 std::string transaction_id;
667 std::vector<std::shared_ptr<core::database_backend>> participants;
669 std::chrono::system_clock::time_point start_time;
670 std::chrono::system_clock::time_point last_activity;
671 };
672
677
678 // Transaction management
679 std::string begin_distributed_transaction(const std::vector<std::shared_ptr<core::database_backend>>& participants);
680 async_result<bool> commit_distributed_transaction(const std::string& transaction_id);
681 async_result<bool> rollback_distributed_transaction(const std::string& transaction_id);
682
683 // Two-phase commit protocol
684 async_result<bool> prepare_phase(const std::string& transaction_id);
685 async_result<bool> commit_phase(const std::string& transaction_id);
686
687 // Saga pattern support
689
690 // Transaction recovery
692 std::vector<distributed_transaction> get_active_transactions() const;
693
694 private:
695 async_result<bool> two_phase_commit(const std::string& transaction_id);
697
698 mutable std::mutex transactions_mutex_;
699 std::unordered_map<std::string, distributed_transaction> active_transactions_;
700 std::shared_ptr<async_executor> executor_;
701 };
702
708 {
709 public:
711
712 // Saga step definition with C++20 concepts
713 template<concepts::TransactionAction Action, concepts::CompensationAction Compensation>
714 saga_builder& add_step(Action&& action, Compensation&& compensation);
715
716 // Legacy overload for backward compatibility
717 saga_builder& add_step(std::function<async_result<bool>()> action,
718 std::function<async_result<bool>()> compensation);
719
720 // Execution
722
723 private:
724 struct saga_step {
725 std::function<async_result<bool>()> action;
726 std::function<async_result<bool>()> compensation;
727 };
728
730 std::vector<saga_step> steps_;
731 };
732
733 // ── async_database inline implementations ───────────────────────────
734
736 std::shared_ptr<core::database_backend> db,
737 std::shared_ptr<async_executor> executor)
738 : db_(std::move(db))
739 , executor_(std::move(executor))
740 {
741 }
742
743 inline async_result<bool> async_database::execute_async(const std::string& query)
744 {
745 auto db = db_;
746 auto future = executor_->submit([db, query]() -> bool {
747 auto result = db->execute_query(query);
748 if (result.is_err()) {
749 throw std::runtime_error(result.error().message);
750 }
751 return true;
752 });
753 return async_result<bool>(std::move(future));
754 }
755
757 {
758 auto db = db_;
759 auto future = executor_->submit([db, query]() -> core::database_result {
760 auto result = db->select_query(query);
761 if (result.is_err()) {
762 throw std::runtime_error(result.error().message);
763 }
764 return result.value();
765 });
766 return async_result<core::database_result>(std::move(future));
767 }
768
770 const std::vector<std::string>& queries)
771 {
772 auto db = db_;
773 auto queries_copy = queries;
774 auto future = executor_->submit([db, queries_copy]() -> std::vector<bool> {
775 std::vector<bool> results;
776 results.reserve(queries_copy.size());
777 for (const auto& q : queries_copy) {
778 auto result = db->execute_query(q);
779 results.push_back(result.is_ok());
780 }
781 return results;
782 });
783 return async_result<std::vector<bool>>(std::move(future));
784 }
785
787 const std::vector<std::string>& queries)
788 {
789 auto db = db_;
790 auto queries_copy = queries;
791 auto future = executor_->submit([db, queries_copy]() -> std::vector<core::database_result> {
792 std::vector<core::database_result> results;
793 results.reserve(queries_copy.size());
794 for (const auto& q : queries_copy) {
795 auto result = db->select_query(q);
796 if (result.is_ok()) {
797 results.push_back(result.value());
798 } else {
799 results.push_back(core::database_result{});
800 }
801 }
802 return results;
803 });
804 return async_result<std::vector<core::database_result>>(std::move(future));
805 }
806
808 {
809 auto db = db_;
810 auto future = executor_->submit([db]() -> bool {
811 auto result = db->begin_transaction();
812 if (result.is_err()) {
813 throw std::runtime_error(result.error().message);
814 }
815 return true;
816 });
817 return async_result<bool>(std::move(future));
818 }
819
821 {
822 auto db = db_;
823 auto future = executor_->submit([db]() -> bool {
824 auto result = db->commit_transaction();
825 if (result.is_err()) {
826 throw std::runtime_error(result.error().message);
827 }
828 return true;
829 });
830 return async_result<bool>(std::move(future));
831 }
832
834 {
835 auto db = db_;
836 auto future = executor_->submit([db]() -> bool {
837 auto result = db->rollback_transaction();
838 if (result.is_err()) {
839 throw std::runtime_error(result.error().message);
840 }
841 return true;
842 });
843 return async_result<bool>(std::move(future));
844 }
845
846 inline async_result<bool> async_database::connect_async(const std::string& connection_string)
847 {
848 auto db = db_;
849 auto future = executor_->submit([db, connection_string]() -> bool {
850 auto config = core::connection_config::from_string(connection_string);
851 auto result = db->initialize(config);
852 if (result.is_err()) {
853 throw std::runtime_error(result.error().message);
854 }
855 return true;
856 });
857 return async_result<bool>(std::move(future));
858 }
859
861 {
862 auto db = db_;
863 auto future = executor_->submit([db]() -> bool {
864 auto result = db->shutdown();
865 if (result.is_err()) {
866 throw std::runtime_error(result.error().message);
867 }
868 return true;
869 });
870 return async_result<bool>(std::move(future));
871 }
872
873 // ── end async_database implementations ───────────────────────────────
874
875 // Helper functions for async operations
876 template<typename T>
878 std::promise<T> promise;
879 promise.set_value(std::move(value));
880 return async_result<T>(promise.get_future());
881 }
882
883 template<typename T>
884 async_result<T> make_error_result(const std::exception& error) {
885 std::promise<T> promise;
886 promise.set_exception(std::make_exception_ptr(error));
887 return async_result<T>(promise.get_future());
888 }
889
890 // ── transaction_coordinator inline implementations ───────────────────
891
893 const std::vector<std::shared_ptr<core::database_backend>>& participants)
894 {
895 static std::atomic<uint64_t> id_counter{0};
896 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
897 std::chrono::system_clock::now().time_since_epoch()).count();
898 std::string txn_id = "txn-" + std::to_string(ms) + "-"
899 + std::to_string(id_counter.fetch_add(1));
900
901 std::lock_guard<std::mutex> lock(transactions_mutex_);
903 txn.transaction_id = txn_id;
904 txn.participants = participants;
906 txn.start_time = std::chrono::system_clock::now();
907 txn.last_activity = txn.start_time;
908 active_transactions_[txn_id] = std::move(txn);
909 return txn_id;
910 }
911
913 const std::string& transaction_id)
914 {
915 std::vector<std::shared_ptr<core::database_backend>> participants;
916 {
917 std::lock_guard<std::mutex> lock(transactions_mutex_);
918 auto it = active_transactions_.find(transaction_id);
919 if (it == active_transactions_.end()) {
921 std::runtime_error("Transaction not found: " + transaction_id));
922 }
923 it->second.state = transaction_state::preparing;
924 it->second.last_activity = std::chrono::system_clock::now();
925 participants = it->second.participants;
926 }
927
928 std::vector<std::shared_ptr<core::database_backend>> prepared;
929 for (const auto& participant : participants) {
930 auto result = participant->begin_transaction();
931 if (result.is_err()) {
932 for (const auto& p : prepared) {
933 p->rollback_transaction();
934 }
935 std::lock_guard<std::mutex> lock(transactions_mutex_);
936 auto it = active_transactions_.find(transaction_id);
937 if (it != active_transactions_.end()) {
938 it->second.state = transaction_state::aborted;
939 }
940 return make_ready_result(false);
941 }
942 prepared.push_back(participant);
943 }
944
945 {
946 std::lock_guard<std::mutex> lock(transactions_mutex_);
947 auto it = active_transactions_.find(transaction_id);
948 if (it != active_transactions_.end()) {
949 it->second.state = transaction_state::prepared;
950 }
951 }
952 return make_ready_result(true);
953 }
954
956 const std::string& transaction_id)
957 {
958 std::vector<std::shared_ptr<core::database_backend>> participants;
959 {
960 std::lock_guard<std::mutex> lock(transactions_mutex_);
961 auto it = active_transactions_.find(transaction_id);
962 if (it == active_transactions_.end()) {
964 std::runtime_error("Transaction not found: " + transaction_id));
965 }
966 if (it->second.state != transaction_state::prepared) {
967 return make_ready_result(false);
968 }
969 it->second.state = transaction_state::committing;
970 it->second.last_activity = std::chrono::system_clock::now();
971 participants = it->second.participants;
972 }
973
974 bool all_committed = true;
975 for (const auto& participant : participants) {
976 auto result = participant->commit_transaction();
977 if (result.is_err()) {
978 all_committed = false;
979 }
980 }
981
982 {
983 std::lock_guard<std::mutex> lock(transactions_mutex_);
984 auto it = active_transactions_.find(transaction_id);
985 if (it != active_transactions_.end()) {
986 it->second.state = all_committed
989 }
990 }
991 return make_ready_result(all_committed);
992 }
993
995 const std::string& transaction_id)
996 {
997 return two_phase_commit(transaction_id);
998 }
999
1001 const std::string& transaction_id)
1002 {
1003 std::vector<std::shared_ptr<core::database_backend>> participants;
1004 {
1005 std::lock_guard<std::mutex> lock(transactions_mutex_);
1006 auto it = active_transactions_.find(transaction_id);
1007 if (it == active_transactions_.end()) {
1009 std::runtime_error("Transaction not found: " + transaction_id));
1010 }
1011 it->second.state = transaction_state::aborting;
1012 it->second.last_activity = std::chrono::system_clock::now();
1013 participants = it->second.participants;
1014 }
1015
1016 bool all_rolled_back = true;
1017 for (const auto& participant : participants) {
1018 auto result = participant->rollback_transaction();
1019 if (result.is_err()) {
1020 all_rolled_back = false;
1021 }
1022 }
1023
1024 {
1025 std::lock_guard<std::mutex> lock(transactions_mutex_);
1026 auto it = active_transactions_.find(transaction_id);
1027 if (it != active_transactions_.end()) {
1028 it->second.state = transaction_state::aborted;
1029 }
1030 }
1031 return make_ready_result(all_rolled_back);
1032 }
1033
1035 const std::string& transaction_id)
1036 {
1037 bool prepared = prepare_phase(transaction_id).get();
1038 if (!prepared) {
1039 return make_ready_result(false);
1040 }
1041 return commit_phase(transaction_id);
1042 }
1043
1048
1049 inline std::vector<transaction_coordinator::distributed_transaction>
1051 {
1052 std::lock_guard<std::mutex> lock(transactions_mutex_);
1053 std::vector<distributed_transaction> result;
1054 result.reserve(active_transactions_.size());
1055 for (const auto& [id, txn] : active_transactions_) {
1056 result.push_back(txn);
1057 }
1058 return result;
1059 }
1060
1062 {
1063 return saga_builder(*this);
1064 }
1065
1067 {
1068 std::lock_guard<std::mutex> lock(transactions_mutex_);
1069 for (auto it = active_transactions_.begin();
1070 it != active_transactions_.end();) {
1071 if (it->second.state == transaction_state::committed
1072 || it->second.state == transaction_state::aborted) {
1073 it = active_transactions_.erase(it);
1074 } else {
1075 ++it;
1076 }
1077 }
1078 }
1079
1080 // ── end transaction_coordinator implementations ──────────────────────
1081
1082 // ── saga_builder inline implementations ──────────────────────────────
1083
1085 : coordinator_(coordinator)
1086 {
1087 }
1088
1090 std::function<async_result<bool>()> action,
1091 std::function<async_result<bool>()> compensation)
1092 {
1093 steps_.push_back({std::move(action), std::move(compensation)});
1094 return *this;
1095 }
1096
1097 template<concepts::TransactionAction Action,
1098 concepts::CompensationAction Compensation>
1100 Action&& action, Compensation&& compensation)
1101 {
1102 steps_.push_back({
1103 std::function<async_result<bool>()>(std::forward<Action>(action)),
1104 std::function<async_result<bool>()>(std::forward<Compensation>(compensation))
1105 });
1106 return *this;
1107 }
1108
1110 {
1111 std::vector<size_t> completed;
1112
1113 for (size_t i = 0; i < steps_.size(); ++i) {
1114 try {
1115 bool success = steps_[i].action().get();
1116 if (!success) {
1117 for (auto it = completed.rbegin();
1118 it != completed.rend(); ++it) {
1119 try { steps_[*it].compensation().get(); }
1120 catch (...) {}
1121 }
1122 return make_ready_result(false);
1123 }
1124 completed.push_back(i);
1125 } catch (...) {
1126 for (auto it = completed.rbegin();
1127 it != completed.rend(); ++it) {
1128 try { steps_[*it].compensation().get(); }
1129 catch (...) {}
1130 }
1131 return make_ready_result(false);
1132 }
1133 }
1134
1135 return make_ready_result(true);
1136 }
1137
1138 // ── end saga_builder implementations ─────────────────────────────────
1139
1140 // ── stream_processor inline implementations ──────────────────────────
1141
1143 std::shared_ptr<core::database_backend> db)
1144 : db_(std::move(db))
1145 {
1146 }
1147
1152
1154 stream_type type, const std::string& channel)
1155 {
1156 std::lock_guard<std::mutex> lock(threads_mutex_);
1157 if (stream_threads_.count(channel) > 0) {
1158 return false;
1159 }
1160 stream_threads_.emplace(
1161 channel,
1162 std::thread(&stream_processor::stream_thread, this, channel, type));
1163 return true;
1164 }
1165
1166 inline bool stream_processor::stop_stream(const std::string& channel)
1167 {
1168 std::thread t;
1169 {
1170 std::lock_guard<std::mutex> lock(threads_mutex_);
1171 auto it = stream_threads_.find(channel);
1172 if (it == stream_threads_.end()) {
1173 return false;
1174 }
1175 t = std::move(it->second);
1176 stream_threads_.erase(it);
1177 }
1178 // Thread detects its channel removal via map-check and exits
1179 if (t.joinable()) {
1180 t.join();
1181 }
1182 return true;
1183 }
1184
1186 {
1187 running_.store(false);
1188 std::unordered_map<std::string, std::thread> threads;
1189 {
1190 std::lock_guard<std::mutex> lock(threads_mutex_);
1191 threads.swap(stream_threads_);
1192 }
1193 for (auto& [channel, t] : threads) {
1194 if (t.joinable()) {
1195 t.join();
1196 }
1197 }
1198 running_.store(true);
1199 }
1200
1202 const std::string& channel,
1203 std::function<void(const stream_event&)> handler)
1204 {
1205 std::lock_guard<std::mutex> lock(handlers_mutex_);
1206 event_handlers_[channel] = std::move(handler);
1207 }
1208
1210 std::function<void(const stream_event&)> handler)
1211 {
1212 std::lock_guard<std::mutex> lock(handlers_mutex_);
1213 global_handlers_.push_back(std::move(handler));
1214 }
1215
1217 const std::string& channel,
1218 std::function<bool(const stream_event&)> filter)
1219 {
1220 std::lock_guard<std::mutex> lock(handlers_mutex_);
1221 event_filters_[channel] = std::move(filter);
1222 }
1223
1225 const std::string& channel, stream_type type)
1226 {
1227 stream_event connected_event;
1228 connected_event.type = type;
1229 connected_event.channel = channel;
1230 connected_event.payload = "stream_connected";
1231 connected_event.timestamp = std::chrono::system_clock::now();
1232 process_event(connected_event);
1233
1234 while (running_.load()) {
1235 std::this_thread::sleep_for(std::chrono::milliseconds(10));
1236 {
1237 std::lock_guard<std::mutex> lock(threads_mutex_);
1238 if (stream_threads_.find(channel) == stream_threads_.end()) {
1239 return;
1240 }
1241 }
1242 }
1243 }
1244
1246 {
1247 std::lock_guard<std::mutex> lock(handlers_mutex_);
1248
1249 auto filter_it = event_filters_.find(event.channel);
1250 if (filter_it != event_filters_.end()) {
1251 if (!filter_it->second(event)) {
1252 return;
1253 }
1254 }
1255
1256 auto handler_it = event_handlers_.find(event.channel);
1257 if (handler_it != event_handlers_.end()) {
1258 handler_it->second(event);
1259 }
1260
1261 for (const auto& handler : global_handlers_) {
1262 handler(event);
1263 }
1264 }
1265
1266 // ── end stream_processor implementations ─────────────────────────────
1267
1268 // Coroutine helpers (C++20 only)
1269#ifdef HAS_COROUTINES
1270 inline auto when_all(std::vector<database_awaitable<bool>> awaitables) -> database_awaitable<std::vector<bool>> {
1271 std::vector<bool> results;
1272 for (auto& awaitable : awaitables) {
1273 results.push_back(co_await awaitable);
1274 }
1275 co_return results;
1276 }
1277
1278 inline auto when_any(std::vector<database_awaitable<bool>> awaitables) -> database_awaitable<bool> {
1279 // In a real implementation, this would race the awaitables
1280 if (!awaitables.empty()) {
1281 co_return co_await awaitables[0];
1282 }
1283 co_return false;
1284 }
1285#endif // HAS_COROUTINES
1286
1287} // namespace database::async
Asynchronous database interface wrapper.
async_result< bool > begin_transaction_async()
async_result< bool > execute_async(const std::string &query)
async_result< bool > commit_transaction_async()
async_result< bool > disconnect_async()
async_result< core::database_result > select_async(const std::string &query)
async_result< bool > rollback_transaction_async()
async_database(std::shared_ptr< core::database_backend > db, std::shared_ptr< async_executor > executor)
async_result< std::vector< core::database_result > > select_batch_async(const std::vector< std::string > &queries)
async_result< bool > connect_async(const std::string &connection_string)
std::shared_ptr< core::database_backend > db_
async_result< std::vector< bool > > execute_batch_async(const std::vector< std::string > &queries)
std::shared_ptr< async_executor > executor_
High-performance asynchronous executor using thread_system.
auto submit(F &&func, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submits a task for asynchronous execution.
async_executor & operator=(const async_executor &)=delete
async_executor(const async_executor &)=delete
void wait_for_completion()
Waits for all pending tasks to complete.
std::queue< std::function< void()> > tasks_
size_t thread_count() const
Returns the number of worker threads.
async_executor(size_t thread_count=std::thread::hardware_concurrency(), const thread_context_type &=thread_context_type())
Constructs an async executor with specified thread count.
void shutdown()
Gracefully shuts down the executor.
size_t pending_tasks() const
Returns the number of pending tasks.
std::vector< std::thread > workers_
constexpr bool is_using_thread_system() const
Checks if using thread_system implementation.
async_executor(async_executor &&)=delete
std::condition_variable condition_
async_executor & operator=(async_executor &&)=delete
Template class for asynchronous operation results.
std::function< void(T)> success_callback_
void on_error(Handler &&error_handler)
void then(Callback &&callback)
std::function< void(const std::exception &)> error_callback_
T get_for(std::chrono::milliseconds timeout)
std::future_status wait_for(std::chrono::milliseconds timeout) const
async_result(std::future< T > future)
Fallback thread context (empty implementation) Provides a no-op context when thread_system is not ava...
Builder for Saga pattern transactions.
saga_builder & add_step(Action &&action, Compensation &&compensation)
async_result< bool > execute()
std::vector< saga_step > steps_
transaction_coordinator & coordinator_
saga_builder(transaction_coordinator &coordinator)
Real-time data stream processing.
std::unordered_map< std::string, std::function< bool(const stream_event &)> > event_filters_
bool start_stream(stream_type type, const std::string &channel)
void register_global_handler(Handler &&handler)
void stream_thread(const std::string &channel, stream_type type)
void register_event_handler(const std::string &channel, Handler &&handler)
std::unordered_map< std::string, std::thread > stream_threads_
void add_event_filter(const std::string &channel, Filter &&filter)
stream_processor(std::shared_ptr< core::database_backend > db)
std::shared_ptr< core::database_backend > db_
void process_event(const stream_event &event)
std::vector< std::function< void(const stream_event &)> > global_handlers_
bool stop_stream(const std::string &channel)
std::unordered_map< std::string, std::function< void(const stream_event &)> > event_handlers_
Distributed transaction coordination.
transaction_coordinator()=default
Default constructor - used by database_context.
async_result< bool > prepare_phase(const std::string &transaction_id)
std::vector< distributed_transaction > get_active_transactions() const
async_result< bool > commit_phase(const std::string &transaction_id)
std::shared_ptr< async_executor > executor_
std::string begin_distributed_transaction(const std::vector< std::shared_ptr< core::database_backend > > &participants)
async_result< bool > rollback_distributed_transaction(const std::string &transaction_id)
async_result< bool > commit_distributed_transaction(const std::string &transaction_id)
async_result< bool > two_phase_commit(const std::string &transaction_id)
std::unordered_map< std::string, distributed_transaction > active_transactions_
A callable that represents a compensation (rollback) action.
Definition concepts.h:262
A callable suitable for submission to an async executor.
Definition concepts.h:327
A callable that represents a transaction action.
Definition concepts.h:247
C++20 concepts for database_system type validation.
static void worker(int thread_id, const std::string &connection_string)
Worker function: creates its own connection, inserts data, reads it back, and disconnects.
Abstract interface for database backends.
Defines the enumeration of supported database types.
fallback_context thread_context_type
Fallback thread context (empty)
async_result< T > make_ready_result(T value)
constexpr bool using_thread_system
Compile-time flag indicating fallback mode.
async_result< T > make_error_result(const std::exception &error)
std::vector< database_row > database_result
std::function< async_result< bool >()> action
std::function< async_result< bool >()> compensation
std::unordered_map< std::string, std::string > metadata
std::chrono::system_clock::time_point timestamp
std::vector< std::shared_ptr< core::database_backend > > participants
static connection_config from_string(const std::string &connect_string)
Construct connection_config from legacy connection string.
Adapter layer for thread_system integration.