20#include <condition_variable>
26#include <unordered_map>
28#ifdef USE_THREAD_SYSTEM
29 #include <kcenon/thread/core/job.h>
30 #include <kcenon/thread/core/thread_worker.h>
31 #include <kcenon/thread/interfaces/thread_context.h>
32 #include <kcenon/thread/core/error_handling.h>
37#ifdef USE_THREAD_SYSTEM
45 class lambda_job :
public kcenon::thread::job {
47 explicit lambda_job(std::function<
void()> func,
const std::string& name =
"lambda_job")
48 : job(name), func_(std::move(func)) {}
50 kcenon::common::VoidResult do_work()
override {
55 return kcenon::common::ok();
56 }
catch (
const std::exception& e) {
57 return kcenon::common::error_info{
58 static_cast<int>(kcenon::thread::error_code::job_execution_failed),
59 std::string(
"Exception in lambda_job: ") + e.what(),
63 return kcenon::common::error_info{
64 static_cast<int>(kcenon::thread::error_code::job_execution_failed),
65 "Unknown exception in lambda_job",
72 std::function<void()> func_;
77 template<
typename T>
class async_result;
79 class transaction_coordinator;
105 template<concepts::Vo
idCallable<T> Callback>
106 void then(Callback&& callback);
108 template<concepts::ErrorHandler Handler>
109 void on_error(Handler&& error_handler);
112 void then(std::function<
void(T)> callback);
113 void on_error(std::function<
void(
const std::exception&)> error_handler);
126 : future_(std::move(future))
133 std::function<void(T)> on_success;
134 std::function<void(
const std::exception&)> on_error_cb;
136 std::lock_guard<std::mutex> lock(callback_mutex_);
137 on_success = success_callback_;
138 on_error_cb = error_callback_;
142 T value = future_.get();
147 }
catch (
const std::exception& e) {
158 auto status = future_.wait_for(
timeout);
159 if (status == std::future_status::timeout) {
160 throw std::runtime_error(
"async_result::get_for timed out");
168 return future_.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready;
174 return future_.wait_for(
timeout);
178 template<concepts::Vo
idCallable<T> Callback>
181 std::lock_guard<std::mutex> lock(callback_mutex_);
182 success_callback_ = std::forward<Callback>(callback);
186 template<concepts::ErrorHandler Handler>
189 std::lock_guard<std::mutex> lock(callback_mutex_);
190 error_callback_ = std::forward<Handler>(error_handler);
196 std::lock_guard<std::mutex> lock(callback_mutex_);
197 success_callback_ = std::move(callback);
203 std::lock_guard<std::mutex> lock(callback_mutex_);
204 error_callback_ = std::move(error_handler);
216 class database_awaitable
222 std::exception_ptr exception_;
224 database_awaitable get_return_object() {
225 return database_awaitable{std::coroutine_handle<promise_type>::from_promise(*
this)};
228 std::suspend_never initial_suspend() {
return {}; }
229 std::suspend_never final_suspend() noexcept {
return {}; }
231 void return_value(T value) { result_ = std::move(value); }
232 void unhandled_exception() { exception_ = std::current_exception(); }
235 database_awaitable(std::coroutine_handle<promise_type> handle) : handle_(handle) {}
236 ~database_awaitable() {
if (handle_) handle_.destroy(); }
238 database_awaitable(
const database_awaitable&) =
delete;
239 database_awaitable& operator=(
const database_awaitable&) =
delete;
241 database_awaitable(database_awaitable&& other) noexcept : handle_(other.handle_) {
242 other.handle_ =
nullptr;
245 database_awaitable& operator=(database_awaitable&& other)
noexcept {
246 if (
this != &other) {
247 if (handle_) handle_.destroy();
248 handle_ = other.handle_;
249 other.handle_ =
nullptr;
254 bool await_ready()
const {
return handle_.done(); }
255 void await_suspend(std::coroutine_handle<> waiting_coroutine) {
260 if (handle_.promise().exception_) {
261 std::rethrow_exception(handle_.promise().exception_);
263 return std::move(handle_.promise().result_);
267 std::coroutine_handle<promise_type> handle_;
278 async_database(std::shared_ptr<core::database_backend> db, std::shared_ptr<async_executor> executor);
286 database_awaitable<bool> execute_coro(
const std::string& query);
287 database_awaitable<core::database_result> select_coro(
const std::string& query);
304 std::shared_ptr<core::database_backend>
db_;
336#ifdef USE_THREAD_SYSTEM
338 size_t thread_count = std::thread::hardware_concurrency(),
340 : pool_(std::make_shared<thread_pool_type>(
"db_async_executor", context))
343 auto job_queue = pool_->get_job_queue();
345 auto worker = std::make_unique<kcenon::thread::thread_worker>(
true, context);
346 worker->set_job_queue(job_queue);
348 auto add_result = pool_->enqueue(std::move(
worker));
349 if (add_result.is_err()) {
350 throw std::runtime_error(
"Failed to add worker: " +
351 add_result.error().message);
355 auto result = pool_->start();
356 if (result.is_err()) {
357 throw std::runtime_error(
"Failed to start async executor: " +
358 result.error().message);
363 size_t thread_count = std::thread::hardware_concurrency(),
393 template<
typename F,
typename... Args>
395 auto submit(F&& func, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
397 using return_type = std::invoke_result_t<F, Args...>;
399#ifdef USE_THREAD_SYSTEM
400 auto task = std::make_shared<std::packaged_task<return_type()>>(
401 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
404 auto future = task->get_future();
406 auto job = std::make_unique<lambda_job>(
407 [task]() { (*task)(); },
411 auto result = pool_->enqueue(std::move(job));
412 if (result.is_err()) {
413 throw std::runtime_error(
"Failed to enqueue job: " +
414 result.error().message);
419 auto task = std::make_shared<std::packaged_task<return_type()>>(
420 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
423 auto future = task->get_future();
428 throw std::runtime_error(
"Cannot submit task to stopped executor");
430 tasks_.emplace([task]() { (*task)(); });
442#ifdef USE_THREAD_SYSTEM
466#ifdef USE_THREAD_SYSTEM
468 while (pool_->get_job_queue()->size() > 0) {
469 std::this_thread::sleep_for(std::chrono::milliseconds(10));
480 std::this_thread::sleep_for(std::chrono::milliseconds(10));
489#ifdef USE_THREAD_SYSTEM
491 return pool_->get_job_queue()->size();
515#ifdef USE_THREAD_SYSTEM
516 std::shared_ptr<thread_pool_type> pool_;
521 std::function<void()> task;
534 task = std::move(
tasks_.front());
546 std::queue<std::function<void()>>
tasks_;
578 std::unordered_map<std::string, std::string>
metadata;
591 template<concepts::StreamEventHandler<stream_event> Handler>
598 template<concepts::StreamEventHandler<stream_event> Handler>
607 std::function<
void(
const stream_event&)> handler);
612 template<concepts::StreamEventFilter<stream_event> Filter>
621 std::function<
bool(
const stream_event&)> filter);
627 std::shared_ptr<core::database_backend>
db_;
713 template<concepts::TransactionAction Action, concepts::CompensationAction Compensation>
725 std::function<async_result<bool>()>
action;
736 std::shared_ptr<core::database_backend> db,
737 std::shared_ptr<async_executor> executor)
739 , executor_(std::move(executor))
746 auto future =
executor_->submit([db, query]() ->
bool {
747 auto result = db->execute_query(query);
748 if (result.is_err()) {
749 throw std::runtime_error(result.error().message);
760 auto result = db->select_query(query);
761 if (result.is_err()) {
762 throw std::runtime_error(result.error().message);
764 return result.value();
770 const std::vector<std::string>& queries)
773 auto queries_copy = queries;
774 auto future =
executor_->submit([db, queries_copy]() -> std::vector<bool> {
775 std::vector<bool> results;
776 results.reserve(queries_copy.size());
777 for (
const auto& q : queries_copy) {
778 auto result = db->execute_query(q);
779 results.push_back(result.is_ok());
787 const std::vector<std::string>& queries)
790 auto queries_copy = queries;
791 auto future =
executor_->submit([db, queries_copy]() -> std::vector<core::database_result> {
792 std::vector<core::database_result> results;
793 results.reserve(queries_copy.size());
794 for (
const auto& q : queries_copy) {
795 auto result = db->select_query(q);
796 if (result.is_ok()) {
797 results.push_back(result.value());
810 auto future =
executor_->submit([db]() ->
bool {
811 auto result = db->begin_transaction();
812 if (result.is_err()) {
813 throw std::runtime_error(result.error().message);
823 auto future =
executor_->submit([db]() ->
bool {
824 auto result = db->commit_transaction();
825 if (result.is_err()) {
826 throw std::runtime_error(result.error().message);
836 auto future =
executor_->submit([db]() ->
bool {
837 auto result = db->rollback_transaction();
838 if (result.is_err()) {
839 throw std::runtime_error(result.error().message);
849 auto future =
executor_->submit([db, connection_string]() ->
bool {
851 auto result = db->initialize(config);
852 if (result.is_err()) {
853 throw std::runtime_error(result.error().message);
863 auto future =
executor_->submit([db]() ->
bool {
864 auto result = db->shutdown();
865 if (result.is_err()) {
866 throw std::runtime_error(result.error().message);
878 std::promise<T> promise;
879 promise.set_value(std::move(value));
885 std::promise<T> promise;
886 promise.set_exception(std::make_exception_ptr(error));
893 const std::vector<std::shared_ptr<core::database_backend>>& participants)
895 static std::atomic<uint64_t> id_counter{0};
896 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
897 std::chrono::system_clock::now().time_since_epoch()).count();
898 std::string txn_id =
"txn-" + std::to_string(ms) +
"-"
899 + std::to_string(id_counter.fetch_add(1));
906 txn.
start_time = std::chrono::system_clock::now();
913 const std::string& transaction_id)
915 std::vector<std::shared_ptr<core::database_backend>> participants;
921 std::runtime_error(
"Transaction not found: " + transaction_id));
924 it->second.last_activity = std::chrono::system_clock::now();
925 participants = it->second.participants;
928 std::vector<std::shared_ptr<core::database_backend>>
prepared;
929 for (
const auto& participant : participants) {
930 auto result = participant->begin_transaction();
931 if (result.is_err()) {
933 p->rollback_transaction();
956 const std::string& transaction_id)
958 std::vector<std::shared_ptr<core::database_backend>> participants;
964 std::runtime_error(
"Transaction not found: " + transaction_id));
970 it->second.last_activity = std::chrono::system_clock::now();
971 participants = it->second.participants;
974 bool all_committed =
true;
975 for (
const auto& participant : participants) {
976 auto result = participant->commit_transaction();
977 if (result.is_err()) {
978 all_committed =
false;
986 it->second.state = all_committed
995 const std::string& transaction_id)
1001 const std::string& transaction_id)
1003 std::vector<std::shared_ptr<core::database_backend>> participants;
1009 std::runtime_error(
"Transaction not found: " + transaction_id));
1012 it->second.last_activity = std::chrono::system_clock::now();
1013 participants = it->second.participants;
1016 bool all_rolled_back =
true;
1017 for (
const auto& participant : participants) {
1018 auto result = participant->rollback_transaction();
1019 if (result.is_err()) {
1020 all_rolled_back =
false;
1035 const std::string& transaction_id)
1049 inline std::vector<transaction_coordinator::distributed_transaction>
1053 std::vector<distributed_transaction> result;
1056 result.push_back(txn);
1085 : coordinator_(coordinator)
1093 steps_.push_back({std::move(action), std::move(compensation)});
1100 Action&& action, Compensation&& compensation)
1103 std::function<async_result<bool>()>(std::forward<Action>(action)),
1111 std::vector<size_t> completed;
1113 for (
size_t i = 0; i <
steps_.size(); ++i) {
1117 for (
auto it = completed.rbegin();
1118 it != completed.rend(); ++it) {
1119 try {
steps_[*it].compensation().get(); }
1124 completed.push_back(i);
1126 for (
auto it = completed.rbegin();
1127 it != completed.rend(); ++it) {
1128 try {
steps_[*it].compensation().get(); }
1143 std::shared_ptr<core::database_backend> db)
1144 : db_(std::move(db))
1175 t = std::move(it->second);
1188 std::unordered_map<std::string, std::thread> threads;
1193 for (
auto& [channel, t] : threads) {
1202 const std::string& channel,
1217 const std::string& channel,
1228 connected_event.
type = type;
1229 connected_event.
channel = channel;
1230 connected_event.
payload =
"stream_connected";
1231 connected_event.
timestamp = std::chrono::system_clock::now();
1235 std::this_thread::sleep_for(std::chrono::milliseconds(10));
1251 if (!filter_it->second(event)) {
1258 handler_it->second(event);
1269#ifdef HAS_COROUTINES
1270 inline auto when_all(std::vector<database_awaitable<bool>> awaitables) -> database_awaitable<std::vector<bool>> {
1271 std::vector<bool> results;
1272 for (
auto& awaitable : awaitables) {
1273 results.push_back(
co_await awaitable);
1278 inline auto when_any(std::vector<database_awaitable<bool>> awaitables) -> database_awaitable<bool> {
1280 if (!awaitables.empty()) {
1281 co_return co_await awaitables[0];
Asynchronous database interface wrapper.
async_result< bool > begin_transaction_async()
async_result< bool > execute_async(const std::string &query)
async_result< bool > commit_transaction_async()
async_result< bool > disconnect_async()
async_result< core::database_result > select_async(const std::string &query)
async_result< bool > rollback_transaction_async()
async_database(std::shared_ptr< core::database_backend > db, std::shared_ptr< async_executor > executor)
async_result< std::vector< core::database_result > > select_batch_async(const std::vector< std::string > &queries)
async_result< bool > connect_async(const std::string &connection_string)
std::shared_ptr< core::database_backend > db_
async_result< std::vector< bool > > execute_batch_async(const std::vector< std::string > &queries)
std::shared_ptr< async_executor > executor_
High-performance asynchronous executor using thread_system.
auto submit(F &&func, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submits a task for asynchronous execution.
async_executor & operator=(const async_executor &)=delete
async_executor(const async_executor &)=delete
void wait_for_completion()
Waits for all pending tasks to complete.
std::queue< std::function< void()> > tasks_
size_t thread_count() const
Returns the number of worker threads.
async_executor(size_t thread_count=std::thread::hardware_concurrency(), const thread_context_type &=thread_context_type())
Constructs an async executor with specified thread count.
void shutdown()
Gracefully shuts down the executor.
size_t pending_tasks() const
Returns the number of pending tasks.
std::vector< std::thread > workers_
constexpr bool is_using_thread_system() const
Checks if using thread_system implementation.
async_executor(async_executor &&)=delete
std::condition_variable condition_
async_executor & operator=(async_executor &&)=delete
std::atomic< bool > stop_
Template class for asynchronous operation results.
std::function< void(T)> success_callback_
void on_error(Handler &&error_handler)
void then(Callback &&callback)
std::function< void(const std::exception &)> error_callback_
T get_for(std::chrono::milliseconds timeout)
std::future_status wait_for(std::chrono::milliseconds timeout) const
std::mutex callback_mutex_
async_result(std::future< T > future)
Fallback thread context (empty implementation) Provides a no-op context when thread_system is not ava...
Builder for Saga pattern transactions.
saga_builder & add_step(Action &&action, Compensation &&compensation)
async_result< bool > execute()
std::vector< saga_step > steps_
transaction_coordinator & coordinator_
saga_builder(transaction_coordinator &coordinator)
Real-time data stream processing.
std::unordered_map< std::string, std::function< bool(const stream_event &)> > event_filters_
std::mutex handlers_mutex_
bool start_stream(stream_type type, const std::string &channel)
void register_global_handler(Handler &&handler)
void stream_thread(const std::string &channel, stream_type type)
void register_event_handler(const std::string &channel, Handler &&handler)
std::unordered_map< std::string, std::thread > stream_threads_
void add_event_filter(const std::string &channel, Filter &&filter)
stream_processor(std::shared_ptr< core::database_backend > db)
std::shared_ptr< core::database_backend > db_
std::mutex threads_mutex_
void process_event(const stream_event &event)
std::vector< std::function< void(const stream_event &)> > global_handlers_
bool stop_stream(const std::string &channel)
std::atomic< bool > running_
std::unordered_map< std::string, std::function< void(const stream_event &)> > event_handlers_
Distributed transaction coordination.
transaction_coordinator()=default
Default constructor - used by database_context.
async_result< bool > prepare_phase(const std::string &transaction_id)
std::vector< distributed_transaction > get_active_transactions() const
std::mutex transactions_mutex_
async_result< bool > commit_phase(const std::string &transaction_id)
void cleanup_completed_transactions()
saga_builder create_saga()
std::shared_ptr< async_executor > executor_
std::string begin_distributed_transaction(const std::vector< std::shared_ptr< core::database_backend > > &participants)
async_result< bool > rollback_distributed_transaction(const std::string &transaction_id)
async_result< bool > commit_distributed_transaction(const std::string &transaction_id)
async_result< bool > two_phase_commit(const std::string &transaction_id)
std::unordered_map< std::string, distributed_transaction > active_transactions_
void recover_transactions()
A callable that represents a compensation (rollback) action.
A callable suitable for submission to an async executor.
A callable that represents a transaction action.
C++20 concepts for database_system type validation.
static void worker(int thread_id, const std::string &connection_string)
Worker function: creates its own connection, inserts data, reads it back, and disconnects.
Abstract interface for database backends.
Defines the enumeration of supported database types.
fallback_context thread_context_type
Fallback thread context (empty)
async_result< T > make_ready_result(T value)
constexpr bool using_thread_system
Compile-time flag indicating fallback mode.
async_result< T > make_error_result(const std::exception &error)
std::vector< database_row > database_result
std::function< async_result< bool >()> action
std::function< async_result< bool >()> compensation
std::unordered_map< std::string, std::string > metadata
std::chrono::system_clock::time_point timestamp
std::vector< std::shared_ptr< core::database_backend > > participants
std::chrono::system_clock::time_point start_time
std::string transaction_id
std::chrono::system_clock::time_point last_activity
static connection_config from_string(const std::string &connect_string)
Construct connection_config from legacy connection string.
Adapter layer for thread_system integration.