Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
database::async::transaction_coordinator Class Reference

Distributed transaction coordination. More...

#include <async_operations.h>

Collaboration diagram for database::async::transaction_coordinator:
Collaboration graph

Classes

struct  distributed_transaction
 

Public Types

enum class  transaction_state {
  active , preparing , prepared , committing ,
  committed , aborting , aborted
}
 

Public Member Functions

 transaction_coordinator ()=default
 Default constructor - used by database_context.
 
std::string begin_distributed_transaction (const std::vector< std::shared_ptr< core::database_backend > > &participants)
 
async_result< bool > commit_distributed_transaction (const std::string &transaction_id)
 
async_result< bool > rollback_distributed_transaction (const std::string &transaction_id)
 
async_result< bool > prepare_phase (const std::string &transaction_id)
 
async_result< bool > commit_phase (const std::string &transaction_id)
 
saga_builder create_saga ()
 
void recover_transactions ()
 
std::vector< distributed_transactionget_active_transactions () const
 

Private Member Functions

async_result< bool > two_phase_commit (const std::string &transaction_id)
 
void cleanup_completed_transactions ()
 

Private Attributes

std::mutex transactions_mutex_
 
std::unordered_map< std::string, distributed_transactionactive_transactions_
 
std::shared_ptr< async_executorexecutor_
 

Detailed Description

Distributed transaction coordination.

Note
This class uses dependency injection pattern. Access via database_context::get_transaction_coordinator() (Sprint 3, Task 3.1).

Definition at line 652 of file async_operations.h.

Member Enumeration Documentation

◆ transaction_state

Constructor & Destructor Documentation

◆ transaction_coordinator()

database::async::transaction_coordinator::transaction_coordinator ( )
default

Member Function Documentation

◆ begin_distributed_transaction()

std::string database::async::transaction_coordinator::begin_distributed_transaction ( const std::vector< std::shared_ptr< core::database_backend > > & participants)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 892 of file async_operations.h.

894 {
895 static std::atomic<uint64_t> id_counter{0};
896 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
897 std::chrono::system_clock::now().time_since_epoch()).count();
898 std::string txn_id = "txn-" + std::to_string(ms) + "-"
899 + std::to_string(id_counter.fetch_add(1));
900
901 std::lock_guard<std::mutex> lock(transactions_mutex_);
902 distributed_transaction txn;
903 txn.transaction_id = txn_id;
904 txn.participants = participants;
905 txn.state = transaction_state::active;
906 txn.start_time = std::chrono::system_clock::now();
907 txn.last_activity = txn.start_time;
908 active_transactions_[txn_id] = std::move(txn);
909 return txn_id;
910 }
std::unordered_map< std::string, distributed_transaction > active_transactions_

References active, active_transactions_, database::async::transaction_coordinator::distributed_transaction::last_activity, database::async::transaction_coordinator::distributed_transaction::participants, database::async::transaction_coordinator::distributed_transaction::start_time, database::async::transaction_coordinator::distributed_transaction::state, database::async::transaction_coordinator::distributed_transaction::transaction_id, and transactions_mutex_.

◆ cleanup_completed_transactions()

void database::async::transaction_coordinator::cleanup_completed_transactions ( )
inlineprivate
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1066 of file async_operations.h.

1067 {
1068 std::lock_guard<std::mutex> lock(transactions_mutex_);
1069 for (auto it = active_transactions_.begin();
1070 it != active_transactions_.end();) {
1071 if (it->second.state == transaction_state::committed
1072 || it->second.state == transaction_state::aborted) {
1073 it = active_transactions_.erase(it);
1074 } else {
1075 ++it;
1076 }
1077 }
1078 }

References aborted, active_transactions_, committed, and transactions_mutex_.

Referenced by recover_transactions().

Here is the caller graph for this function:

◆ commit_distributed_transaction()

async_result< bool > database::async::transaction_coordinator::commit_distributed_transaction ( const std::string & transaction_id)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 994 of file async_operations.h.

996 {
997 return two_phase_commit(transaction_id);
998 }
async_result< bool > two_phase_commit(const std::string &transaction_id)

References two_phase_commit().

Here is the call graph for this function:

◆ commit_phase()

async_result< bool > database::async::transaction_coordinator::commit_phase ( const std::string & transaction_id)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 955 of file async_operations.h.

957 {
958 std::vector<std::shared_ptr<core::database_backend>> participants;
959 {
960 std::lock_guard<std::mutex> lock(transactions_mutex_);
961 auto it = active_transactions_.find(transaction_id);
962 if (it == active_transactions_.end()) {
964 std::runtime_error("Transaction not found: " + transaction_id));
965 }
966 if (it->second.state != transaction_state::prepared) {
967 return make_ready_result(false);
968 }
969 it->second.state = transaction_state::committing;
970 it->second.last_activity = std::chrono::system_clock::now();
971 participants = it->second.participants;
972 }
973
974 bool all_committed = true;
975 for (const auto& participant : participants) {
976 auto result = participant->commit_transaction();
977 if (result.is_err()) {
978 all_committed = false;
979 }
980 }
981
982 {
983 std::lock_guard<std::mutex> lock(transactions_mutex_);
984 auto it = active_transactions_.find(transaction_id);
985 if (it != active_transactions_.end()) {
986 it->second.state = all_committed
989 }
990 }
991 return make_ready_result(all_committed);
992 }
async_result< T > make_ready_result(T value)
async_result< T > make_error_result(const std::exception &error)

References aborted, active_transactions_, committed, committing, database::async::make_error_result(), database::async::make_ready_result(), prepared, and transactions_mutex_.

Referenced by two_phase_commit().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ create_saga()

saga_builder database::async::transaction_coordinator::create_saga ( )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1061 of file async_operations.h.

1062 {
1063 return saga_builder(*this);
1064 }

Referenced by TEST_F().

Here is the caller graph for this function:

◆ get_active_transactions()

std::vector< transaction_coordinator::distributed_transaction > database::async::transaction_coordinator::get_active_transactions ( ) const
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1050 of file async_operations.h.

1051 {
1052 std::lock_guard<std::mutex> lock(transactions_mutex_);
1053 std::vector<distributed_transaction> result;
1054 result.reserve(active_transactions_.size());
1055 for (const auto& [id, txn] : active_transactions_) {
1056 result.push_back(txn);
1057 }
1058 return result;
1059 }

References active_transactions_, and transactions_mutex_.

◆ prepare_phase()

async_result< bool > database::async::transaction_coordinator::prepare_phase ( const std::string & transaction_id)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 912 of file async_operations.h.

914 {
915 std::vector<std::shared_ptr<core::database_backend>> participants;
916 {
917 std::lock_guard<std::mutex> lock(transactions_mutex_);
918 auto it = active_transactions_.find(transaction_id);
919 if (it == active_transactions_.end()) {
921 std::runtime_error("Transaction not found: " + transaction_id));
922 }
923 it->second.state = transaction_state::preparing;
924 it->second.last_activity = std::chrono::system_clock::now();
925 participants = it->second.participants;
926 }
927
928 std::vector<std::shared_ptr<core::database_backend>> prepared;
929 for (const auto& participant : participants) {
930 auto result = participant->begin_transaction();
931 if (result.is_err()) {
932 for (const auto& p : prepared) {
933 p->rollback_transaction();
934 }
935 std::lock_guard<std::mutex> lock(transactions_mutex_);
936 auto it = active_transactions_.find(transaction_id);
937 if (it != active_transactions_.end()) {
938 it->second.state = transaction_state::aborted;
939 }
940 return make_ready_result(false);
941 }
942 prepared.push_back(participant);
943 }
944
945 {
946 std::lock_guard<std::mutex> lock(transactions_mutex_);
947 auto it = active_transactions_.find(transaction_id);
948 if (it != active_transactions_.end()) {
949 it->second.state = transaction_state::prepared;
950 }
951 }
952 return make_ready_result(true);
953 }

References aborted, active_transactions_, database::async::make_error_result(), database::async::make_ready_result(), prepared, preparing, and transactions_mutex_.

Referenced by two_phase_commit().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ recover_transactions()

void database::async::transaction_coordinator::recover_transactions ( )
inline

◆ rollback_distributed_transaction()

async_result< bool > database::async::transaction_coordinator::rollback_distributed_transaction ( const std::string & transaction_id)
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1000 of file async_operations.h.

1002 {
1003 std::vector<std::shared_ptr<core::database_backend>> participants;
1004 {
1005 std::lock_guard<std::mutex> lock(transactions_mutex_);
1006 auto it = active_transactions_.find(transaction_id);
1007 if (it == active_transactions_.end()) {
1009 std::runtime_error("Transaction not found: " + transaction_id));
1010 }
1011 it->second.state = transaction_state::aborting;
1012 it->second.last_activity = std::chrono::system_clock::now();
1013 participants = it->second.participants;
1014 }
1015
1016 bool all_rolled_back = true;
1017 for (const auto& participant : participants) {
1018 auto result = participant->rollback_transaction();
1019 if (result.is_err()) {
1020 all_rolled_back = false;
1021 }
1022 }
1023
1024 {
1025 std::lock_guard<std::mutex> lock(transactions_mutex_);
1026 auto it = active_transactions_.find(transaction_id);
1027 if (it != active_transactions_.end()) {
1028 it->second.state = transaction_state::aborted;
1029 }
1030 }
1031 return make_ready_result(all_rolled_back);
1032 }

References aborted, aborting, active_transactions_, database::async::make_error_result(), database::async::make_ready_result(), and transactions_mutex_.

Here is the call graph for this function:

◆ two_phase_commit()

async_result< bool > database::async::transaction_coordinator::two_phase_commit ( const std::string & transaction_id)
inlineprivate
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 1034 of file async_operations.h.

1036 {
1037 bool prepared = prepare_phase(transaction_id).get();
1038 if (!prepared) {
1039 return make_ready_result(false);
1040 }
1041 return commit_phase(transaction_id);
1042 }
async_result< bool > prepare_phase(const std::string &transaction_id)
async_result< bool > commit_phase(const std::string &transaction_id)

References commit_phase(), database::async::async_result< T >::get(), database::async::make_ready_result(), prepare_phase(), and prepared.

Referenced by commit_distributed_transaction().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ active_transactions_

◆ executor_

std::shared_ptr<async_executor> database::async::transaction_coordinator::executor_
private

◆ transactions_mutex_


The documentation for this class was generated from the following file: