35inline kcenon::common::VoidResult make_error(
const std::string& msg,
int code = -1,
const std::string& context =
"")
37 return kcenon::common::VoidResult(kcenon::common::error_info{code, msg, context});
42inline kcenon::common::Result<T>
make_error_result(
const std::string& msg,
int code = -1,
const std::string& context =
"")
44 return kcenon::common::Result<T>(kcenon::common::error_info{code, msg, context});
72 return std::make_shared<postgres_manager>();
79 return std::shared_ptr<core::database_backend>(std::move(backend));
91 return std::visit([](
const auto& v) -> std::string {
92 using T = std::decay_t<
decltype(v)>;
93 if constexpr (std::is_same_v<T, std::string>) {
95 }
else if constexpr (std::is_same_v<T, int64_t>) {
96 return std::to_string(v);
97 }
else if constexpr (std::is_same_v<T, double>) {
98 return std::to_string(v);
99 }
else if constexpr (std::is_same_v<T, bool>) {
100 return v ?
"true" :
"false";
101 }
else if constexpr (std::is_same_v<T, std::nullptr_t>) {
113 std::chrono::microseconds exec_time) {
116 result.
rows.reserve(db_result.size());
119 for (
const auto& db_row : db_result) {
121 for (
const auto& [key, value] : db_row) {
124 result.
rows.push_back(std::move(converted_row));
142std::vector<core::database_value> convert_params(
const std::vector<query_param>& params) {
143 std::vector<core::database_value> values;
144 values.reserve(params.size());
145 for (
const auto& p : params) {
147 values.emplace_back(
nullptr);
149 values.emplace_back(p.get_value());
158bool is_select_query(
const std::string& query) {
160 auto pos = query.find_first_not_of(
" \t\n\r");
161 if (pos == std::string::npos)
return false;
163 auto keyword = query.substr(pos, 6);
164 for (
auto& c : keyword) c =
static_cast<char>(std::tolower(
static_cast<unsigned char>(c)));
165 return keyword.substr(0, 6) ==
"select" || keyword.substr(0, 4) ==
"with";
181 auto result =
backend_->begin_transaction();
182 if (result.is_ok()) {
196 const std::string& query,
197 const std::vector<query_param>& params)
override {
200 return make_error_result<query_result>(
"Transaction not active", -1,
"transaction");
203 auto start = std::chrono::steady_clock::now();
205 kcenon::common::Result<core::database_result> db_result = [&]() {
206 if (params.empty()) {
207 if (is_select_query(query)) {
208 return backend_->select_query(query);
210 auto exec_res =
backend_->execute_query(query);
211 if (exec_res.is_err()) {
212 return kcenon::common::Result<core::database_result>(exec_res.error());
216 auto values = convert_params(params);
217 if (is_select_query(query)) {
218 return backend_->select_prepared(query, values);
220 auto exec_res =
backend_->execute_prepared(query, values);
221 if (exec_res.is_err()) {
222 return kcenon::common::Result<core::database_result>(exec_res.error());
227 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
228 std::chrono::steady_clock::now() - start);
230 if (db_result.is_err()) {
231 return db_result.error();
238 if (!is_select_query(query) && qr.affected_rows == 0) {
239 qr.affected_rows = 1;
244 kcenon::common::VoidResult
commit()
override {
246 return make_error(
"Transaction not active", -1,
"transaction");
249 auto result =
backend_->commit_transaction();
250 if (result.is_err()) {
251 return make_error(
"Commit failed: " + result.error().message, -1,
"transaction");
255 return kcenon::common::ok();
260 return make_error(
"Transaction not active", -1,
"transaction");
263 auto result =
backend_->rollback_transaction();
264 if (result.is_err()) {
265 return make_error(
"Rollback failed: " + result.error().message, -1,
"transaction");
269 return kcenon::common::ok();
307 std::lock_guard<std::mutex> lock(
mutex_);
310 if (!init_result.is_ok()) {
315 return make_error(
"Already connected", -1,
"unified_database_system");
324 return make_error(
"Unsupported backend type", -2,
"unified_database_system");
329 auto result =
backend_->initialize(config);
330 if (result.is_err()) {
331 return make_error(
"Connection failed: " + result.error().message, -3,
"unified_database_system");
338 logger->log_connection_event(
340 "Connection established to: " + connection_string
344 return kcenon::common::ok();
348 std::lock_guard<std::mutex> lock(
mutex_);
351 return kcenon::common::ok();
364 logger->log_connection_event(
370 return kcenon::common::ok();
374 std::lock_guard<std::mutex> lock(
mutex_);
381 const std::string& query,
382 const std::vector<query_param>& params) {
384 std::lock_guard<std::mutex> lock(
mutex_);
387 return make_error_result<query_result>(
"Not connected to database", -1,
"unified_database_system");
391 auto start = std::chrono::steady_clock::now();
394 kcenon::common::Result<core::database_result> db_result = [&]() {
395 if (params.empty()) {
396 if (is_select_query(query)) {
397 return backend_->select_query(query);
399 auto exec_res =
backend_->execute_query(query);
400 if (exec_res.is_err()) {
401 return kcenon::common::Result<core::database_result>(exec_res.error());
405 auto values = convert_params(params);
406 if (is_select_query(query)) {
407 return backend_->select_prepared(query, values);
409 auto exec_res =
backend_->execute_prepared(query, values);
410 if (exec_res.is_err()) {
411 return kcenon::common::Result<core::database_result>(exec_res.error());
417 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
418 std::chrono::steady_clock::now() - start);
420 if (db_result.is_err()) {
422 return db_result.error();
432 "Query executed: " + query +
" (" + std::to_string(duration.count()) +
"us)"
438 monitor->record_query_execution(duration,
true);
445 if (!is_select_query(query) && qr.affected_rows == 0) {
446 qr.affected_rows = 1;
454 const std::string& query,
455 const std::vector<query_param>& params) {
460 std::promise<kcenon::common::Result<query_result>> promise;
461 promise.set_value(make_error_result<query_result>(
"Thread pool not available", -1,
"unified_database_system"));
462 return promise.get_future();
466 return thread_pool->submit([
this, query, params]() -> kcenon::common::Result<query_result> {
467 return this->
execute(query, params);
472 const std::string& query,
474 const std::vector<query_param>& params) {
478 std::promise<kcenon::common::Result<query_result>> promise;
479 promise.set_value(make_error_result<query_result>(
"Thread pool not available", -1,
"unified_database_system"));
480 return promise.get_future();
487 return thread_pool->submit([
this, query, params]() -> kcenon::common::Result<query_result> {
488 return this->
execute(query, params);
495 std::lock_guard<std::mutex> lock(
mutex_);
498 return make_error_result<std::unique_ptr<transaction>>(
"Not connected to database", -1,
"unified_database_system");
503 auto tx_impl = std::make_unique<transaction_impl>(
backend_);
504 std::unique_ptr<transaction> tx = std::move(tx_impl);
510 if (!tx_result.is_ok()) {
511 return tx_result.error();
514 auto tx = std::move(tx_result.value());
516 for (
const auto& query : queries) {
517 auto result = tx->execute(query, {});
518 if (!result.is_ok()) {
521 return result.error();
525 auto commit_result = tx->commit();
526 if (commit_result.is_ok()) {
532 return commit_result;
538 std::lock_guard<std::mutex> lock(
mutex_);
543 std::lock_guard<std::mutex> lock(
mutex_);
546 health.
last_check = std::chrono::steady_clock::now();
551 if (coord_health.is_ok() && coord_health.value()) {
564 health.
issues.push_back(
"Not connected to database");
573 std::lock_guard<std::mutex> lock(
mutex_);
589 std::lock_guard<std::mutex> lock(
mutex_);
612 return kcenon::common::ok();
615 if (!init_result.is_ok()) {
616 return make_error(
"Failed to initialize database coordinator: " +
617 init_result.error().message, -10,
"unified_database_system");
620 return kcenon::common::ok();
644 constexpr double alpha = 0.1;
651 auto new_latency_us = latency.count();
652 auto ema_us =
static_cast<int64_t
>(alpha * new_latency_us + (1.0 - alpha) * current_avg_us);
658 auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration).count();
695 : pimpl_(std::make_unique<
impl>(config)) {
711 const std::string& connection_string) {
712 return pimpl_->connect(backend, connection_string);
716 return pimpl_->disconnect();
720 return pimpl_->is_connected();
726 const std::string& query,
727 const std::vector<query_param>& params) {
728 return pimpl_->execute(query, params);
732 const std::string& query,
733 const std::vector<query_param>& params) {
734 return pimpl_->execute(query, params);
738 const std::string& query,
739 const std::vector<query_param>& params) {
740 auto result =
pimpl_->execute(query, params);
741 if (result.is_ok()) {
742 return result.value().affected_rows;
744 return result.error();
748 const std::string& query,
749 const std::vector<query_param>& params) {
750 auto result =
pimpl_->execute(query, params);
751 if (result.is_ok()) {
752 return result.value().affected_rows;
754 return result.error();
758 const std::string& query,
759 const std::vector<query_param>& params) {
760 auto result =
pimpl_->execute(query, params);
761 if (result.is_ok()) {
762 return result.value().affected_rows;
764 return result.error();
770 const std::string& query,
771 const std::vector<query_param>& params) {
772 return pimpl_->execute_async(query, params);
776 const std::string& query,
778 const std::vector<query_param>& params) {
779 return pimpl_->execute_async_priority(query, priority, params);
785 return pimpl_->begin_transaction();
789 const std::vector<std::string>& queries) {
790 return pimpl_->execute_transaction(queries);
796 return pimpl_->get_metrics();
800 return pimpl_->check_health();
810 return pimpl_->get_config();
814 return pimpl_->get_backend_type();
818 return pimpl_->get_pool_stats();
824 return pimpl_->create_query_builder();
850 config_.database.type = type;
855 const std::string& conn_str) {
856 connection_string_ = conn_str;
863 config_.connection_pool.min_connections = min_size;
864 config_.connection_pool.max_connections = max_size;
870 const std::string& log_dir) {
871 config_.logger.min_log_level = level;
872 config_.logger.log_directory = log_dir;
873 config_.logger.enable_file_logging =
true;
879 config_.monitoring.enable_metrics = enable;
884 size_t worker_threads) {
885 config_.thread.thread_count = worker_threads;
890 std::chrono::milliseconds threshold) {
891 config_.logger.slow_query_threshold = threshold;
896 auto system = std::make_unique<unified_database_system>(config_);
899 if (!connection_string_.empty()) {
900 auto result = system->connect(config_.database.type, connection_string_);
901 if (!result.is_ok()) {
902 return make_error_result<std::unique_ptr<unified_database_system>>(
903 "Failed to connect: " + result.error().message,
905 "unified_database_system"
910 return std::move(system);
Registry for database backend plugins.
static backend_registry & instance()
Get the singleton instance.
std::unique_ptr< database_backend > create(const std::string &name) const
Create a backend instance by name.
Manages lifecycle and coordination of all database system adapters.
bool is_active() const override
Check if transaction is active.
kcenon::common::VoidResult commit() override
Commit the transaction.
transaction_impl(std::shared_ptr< core::database_backend > backend)
kcenon::common::Result< query_result > execute(const std::string &query, const std::vector< query_param > ¶ms) override
Execute a query within the transaction.
~transaction_impl() override
kcenon::common::VoidResult rollback() override
Rollback the transaction.
std::shared_ptr< core::database_backend > backend_
Transaction interface for ACID operations.
Builder class for custom configuration.
builder & enable_monitoring(bool enable=true)
Enable monitoring and metrics collection.
builder & set_connection_string(const std::string &conn_str)
Set the connection string.
builder & set_pool_size(size_t min_size, size_t max_size)
Set connection pool size.
unified_db_config config_
builder & enable_async(size_t worker_threads=4)
Enable async operations.
builder & set_slow_query_threshold(std::chrono::milliseconds threshold)
Set slow query threshold.
kcenon::common::Result< std::unique_ptr< unified_database_system > > build()
Build and return the configured database system.
builder & enable_logging(db_log_level level, const std::string &log_dir="./logs")
Enable logging.
builder & set_backend(backend_type type)
Set the database backend type.
std::future< kcenon::common::Result< query_result > > execute_async(const std::string &query, const std::vector< query_param > ¶ms)
backend_type backend_type_
database_metrics get_metrics() const
const unified_db_config & get_config() const
database_metrics metrics_
std::string connection_string_
impl(const unified_db_config &config)
bool is_connected() const
query_builder create_query_builder() const
kcenon::common::VoidResult connect(backend_type backend, const std::string &connection_string)
kcenon::common::VoidResult execute_transaction(const std::vector< std::string > &queries)
kcenon::common::Result< query_result > execute(const std::string &query, const std::vector< query_param > ¶ms)
std::unique_ptr< database_coordinator > coordinator_
backend_type get_backend_type() const
void update_metrics(std::chrono::microseconds latency, bool success)
unified_db_config config_
std::shared_ptr< core::database_backend > backend_
unified_database_system::pool_stats get_pool_stats() const
kcenon::common::VoidResult disconnect()
kcenon::common::Result< std::unique_ptr< transaction > > begin_transaction()
kcenon::common::VoidResult ensure_initialized()
std::future< kcenon::common::Result< query_result > > execute_async_priority(const std::string &query, int priority, const std::vector< query_param > ¶ms)
health_check check_health() const
Main unified database system class.
const unified_db_config & get_config() const
Get current configuration.
~unified_database_system()
Destructor - automatically disconnects and cleans up.
kcenon::common::Result< std::unique_ptr< transaction > > begin_transaction()
std::unique_ptr< impl > pimpl_
std::future< kcenon::common::Result< query_result > > execute_async(const std::string &query, const std::vector< query_param > ¶ms={})
void reset_metrics()
Reset metrics counters.
backend_type get_backend_type() const
Get database backend type.
database_metrics get_metrics() const
Get current performance metrics.
kcenon::common::Result< size_t > update(const std::string &query, const std::vector< query_param > ¶ms={})
Execute an UPDATE query.
kcenon::common::Result< query_result > execute(const std::string &query, const std::vector< query_param > ¶ms={})
health_check check_health() const
Perform health check.
kcenon::common::VoidResult execute_transaction(const std::vector< std::string > &queries)
Execute multiple queries in a transaction.
kcenon::common::VoidResult connect(const std::string &connection_string)
Connect to database.
static builder create_builder()
Create a builder for custom configuration.
kcenon::common::Result< size_t > insert(const std::string &query, const std::vector< query_param > ¶ms={})
Execute an INSERT query.
kcenon::common::Result< query_result > select(const std::string &query, const std::vector< query_param > ¶ms={})
Execute a SELECT query.
kcenon::common::Result< size_t > remove(const std::string &query, const std::vector< query_param > ¶ms={})
Execute a DELETE query.
unified_database_system()
Default constructor (zero-config)
pool_stats get_pool_stats() const
kcenon::common::VoidResult disconnect()
Disconnect from database.
query_builder create_query_builder() const
std::future< kcenon::common::Result< query_result > > execute_async_priority(const std::string &query, int priority, const std::vector< query_param > ¶ms={})
Execute a SQL query asynchronously with priority.
bool is_connected() const
Check if connected to database.
Universal query builder that adapts to different database types.
Abstract interface for database backends.
Lifecycle manager for all database system adapters (Phase 5)
Database logging adapter with runtime backend selection.
Database monitoring adapter with runtime backend selection.
async_result< T > make_error_result(const std::exception &error)
std::vector< database_row > database_result
std::variant< std::string, int64_t, double, bool, std::nullptr_t > database_value
static std::string backend_type_to_name(backend_type type)
Convert backend_type to registry name string.
db_log_level
Database logging level enumeration.
@ info
Informational messages (default)
static query_result convert_result(const core::database_result &db_result, std::chrono::microseconds exec_time)
Convert core::database_result to query_result.
std::map< std::string, std::string > row_data
Database row representation.
static std::shared_ptr< core::database_backend > create_backend(backend_type type)
Create database backend instance.
static std::string value_to_string(const core::database_value &value)
Convert database_value (variant) to string.
backend_type
Database backend type enumeration.
@ mongodb
MongoDB NoSQL database.
@ sqlite
SQLite embedded database.
@ redis
Redis key-value store.
@ postgres
PostgreSQL database.
kcenon::common::VoidResult VoidResult
Primary VoidResult type - use this for void operations.
@ sqlite
Indicates a SQLite database.
static connection_config from_string(const std::string &connect_string)
Construct connection_config from legacy connection string.
Database performance metrics.
std::chrono::microseconds max_latency
size_t successful_queries
size_t transactions_committed
size_t transactions_started
std::chrono::microseconds min_latency
size_t transactions_rolled_back
double queries_per_second
std::chrono::microseconds average_latency
std::chrono::steady_clock::time_point measurement_start
bool enable_file_logging
Enable logging to file (in addition to console)
std::string log_directory
Directory for log files.
std::chrono::milliseconds slow_query_threshold
Threshold for considering a query "slow".
db_log_level min_log_level
Minimum log level to output.
bool enable_metrics
Enable metrics collection.
std::chrono::seconds metrics_interval
Interval for collecting metrics.
std::size_t thread_count
Number of worker threads (0 = auto-detect from hardware)
bool connection_pool_healthy
double connection_pool_utilization
std::vector< std::string > issues
std::chrono::steady_clock::time_point last_check
std::size_t min_connections
Minimum number of connections to maintain.
std::chrono::seconds connection_timeout
Timeout for acquiring a connection from the pool.
std::size_t max_connections
Maximum number of connections allowed.
std::chrono::microseconds execution_time
std::vector< row_data > rows
Get connection pool statistics.
double utilization_percent
size_t active_connections
db_logger_config logger
Logger configuration.
pool_config connection_pool
Connection pool configuration.
db_monitoring_config monitoring
Monitoring configuration.
db_thread_config thread
Thread pool configuration.
Thread pool adapter with runtime backend selection.
Zero-configuration database system with integrated adapters (Phase 6)