137 pqxx::connection* conn =
static_cast<pqxx::connection*
>(
connection_);
138 pqxx::work txn(*conn);
139 pqxx::result result = txn.exec(query_string);
141 return static_cast<unsigned int>(result.affected_rows());
142 }
catch (
const std::exception& e) {
143 last_error_ = std::string(
"Modification query error: ") + e.what();
144 logger_.error(
"execute_modification_query",
last_error_);
146#elif defined(HAVE_LIBPQ)
149 PGresult* result = PQexec(
static_cast<PGconn*
>(
connection_), query_string.c_str());
150 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
155 const char* affected_rows = PQcmdTuples(result);
156 unsigned int count = 0;
157 if (affected_rows && *affected_rows) {
158 count =
static_cast<unsigned int>(std::stoul(affected_rows));
162 }
catch (
const std::exception& e) {
163 last_error_ = std::string(
"Modification query error: ") + e.what();
164 logger_.error(
"execute_modification_query",
last_error_);
167 logger_.warning(
"PostgreSQL support not compiled. Modification query: " + query_string.substr(0, 20) +
"...");
179 return kcenon::common::error_info{
191 return kcenon::common::error_info{
198 pqxx::connection* conn =
static_cast<pqxx::connection*
>(
connection_);
199 pqxx::work txn(*conn);
200 pqxx::result pqxx_result = txn.exec(query_string);
203 for (
const auto& row : pqxx_result) {
205 for (
size_t i = 0; i < row.size(); ++i) {
206 std::string column_name = pqxx_result.column_name(i);
207 if (row[i].is_null()) {
208 db_row[column_name] =
nullptr;
211 if (row[i].
type() == PG_INT8OID ||
212 row[i].
type() == PG_INT4OID) {
213 db_row[column_name] = row[i].as<int64_t>();
214 }
else if (row[i].
type() == PG_FLOAT8OID ||
215 row[i].
type() == PG_FLOAT4OID) {
216 db_row[column_name] = row[i].as<
double>();
217 }
else if (row[i].
type() == PG_BOOLOID) {
218 db_row[column_name] = row[i].as<
bool>();
220 db_row[column_name] = row[i].as<std::string>();
224 result.push_back(std::move(db_row));
226 }
catch (
const std::exception& e) {
227 last_error_ = std::string(
"Select query error: ") + e.what();
229 return kcenon::common::error_info{
235#elif defined(HAVE_LIBPQ)
238 return kcenon::common::error_info{
245 PGresult* pg_result = PQexec(
static_cast<PGconn*
>(
connection_), query_string.c_str());
246 if (PQresultStatus(pg_result) != PGRES_TUPLES_OK) {
249 return kcenon::common::error_info{
256 int rows = PQntuples(pg_result);
257 int cols = PQnfields(pg_result);
259 for (
int row = 0; row < rows; ++row) {
261 for (
int col = 0; col < cols; ++col) {
262 std::string column_name = PQfname(pg_result, col);
263 if (PQgetisnull(pg_result, row, col)) {
264 db_row[column_name] =
nullptr;
266 const char* value = PQgetvalue(pg_result, row, col);
267 Oid
type = PQftype(pg_result, col);
271 db_row[column_name] =
static_cast<int64_t
>(std::stoll(value));
272 }
else if (
type == 700 ||
type == 701) {
273 db_row[column_name] = std::stod(value);
274 }
else if (
type == 16) {
275 db_row[column_name] = (*value ==
't' || *value ==
'1');
277 db_row[column_name] = std::string(value);
281 result.push_back(std::move(db_row));
284 }
catch (
const std::exception& e) {
285 last_error_ = std::string(
"Select query error: ") + e.what();
287 return kcenon::common::error_info{
294 logger_.warning(
"PostgreSQL support not compiled. Select query: " + query_string.substr(0, 20) +
"...");
296 if (query_string.find(
"SELECT") != std::string::npos) {
298 mock_row[
"id"] = int64_t(1);
299 mock_row[
"name"] = std::string(
"mock_data");
300 mock_row[
"active"] =
true;
301 result.push_back(mock_row);
313 return kcenon::common::error_info{
325 return kcenon::common::error_info{
332 pqxx::work txn{*
static_cast<pqxx::connection*
>(
connection_)};
333 txn.exec(query_string);
336 return kcenon::common::ok();
337 }
catch (
const std::exception& e) {
338 last_error_ = std::string(
"Execute error: ") + e.what();
340 return kcenon::common::error_info{
346#elif defined(HAVE_LIBPQ)
350 return kcenon::common::error_info{
357 PGresult* result = PQexec(
static_cast<PGconn*
>(
connection_), query_string.c_str());
358 if (result ==
nullptr) {
361 return kcenon::common::error_info{
368 ExecStatusType status = PQresultStatus(result);
369 bool success = (status == PGRES_COMMAND_OK) || (status == PGRES_TUPLES_OK);
375 return kcenon::common::error_info{
384 return kcenon::common::ok();
387 logger_.info(
"PostgreSQL support not compiled. Mock execute: " + query_string);
389 return kcenon::common::ok();
394 const std::string& query,
395 const std::vector<core::database_value>& params)
399 return kcenon::common::error_info{
411 return kcenon::common::error_info{
418 pqxx::connection* conn =
static_cast<pqxx::connection*
>(
connection_);
419 pqxx::work txn(*conn);
422 pqxx::params pq_params;
423 for (
const auto& val : params) {
424 std::visit([&pq_params](
const auto& v) {
425 using T = std::decay_t<
decltype(v)>;
426 if constexpr (std::is_same_v<T, std::nullptr_t>) {
428 }
else if constexpr (std::is_same_v<T, bool>) {
430 }
else if constexpr (std::is_same_v<T, int64_t>) {
432 }
else if constexpr (std::is_same_v<T, double>) {
434 }
else if constexpr (std::is_same_v<T, std::string>) {
440 pqxx::result pqxx_result = txn.exec_params(query, pq_params);
443 for (
const auto& row : pqxx_result) {
445 for (
size_t i = 0; i < row.size(); ++i) {
446 std::string column_name = pqxx_result.column_name(i);
447 if (row[i].is_null()) {
448 db_row[column_name] =
nullptr;
450 if (row[i].
type() == PG_INT8OID ||
451 row[i].
type() == PG_INT4OID) {
452 db_row[column_name] = row[i].as<int64_t>();
453 }
else if (row[i].
type() == PG_FLOAT8OID ||
454 row[i].
type() == PG_FLOAT4OID) {
455 db_row[column_name] = row[i].as<
double>();
456 }
else if (row[i].
type() == PG_BOOLOID) {
457 db_row[column_name] = row[i].as<
bool>();
459 db_row[column_name] = row[i].as<std::string>();
463 result.push_back(std::move(db_row));
465 }
catch (
const std::exception& e) {
466 last_error_ = std::string(
"Select prepared error: ") + e.what();
468 return kcenon::common::error_info{
474#elif defined(HAVE_LIBPQ)
477 return kcenon::common::error_info{
485 std::vector<std::string> param_strings;
486 std::vector<const char*> param_values;
487 param_strings.reserve(params.size());
488 param_values.reserve(params.size());
490 for (
const auto& val : params) {
491 std::visit([¶m_strings, ¶m_values](
const auto& v) {
492 using T = std::decay_t<
decltype(v)>;
493 if constexpr (std::is_same_v<T, std::nullptr_t>) {
494 param_strings.emplace_back();
495 param_values.push_back(
nullptr);
496 }
else if constexpr (std::is_same_v<T, bool>) {
497 param_strings.push_back(v ?
"t" :
"f");
498 param_values.push_back(param_strings.back().c_str());
499 }
else if constexpr (std::is_same_v<T, std::string>) {
500 param_strings.push_back(v);
501 param_values.push_back(param_strings.back().c_str());
503 param_strings.push_back(std::to_string(v));
504 param_values.push_back(param_strings.back().c_str());
509 PGresult* pg_result = PQexecParams(
512 static_cast<int>(params.size()),
520 if (PQresultStatus(pg_result) != PGRES_TUPLES_OK) {
523 return kcenon::common::error_info{
530 int rows = PQntuples(pg_result);
531 int cols = PQnfields(pg_result);
533 for (
int row = 0; row < rows; ++row) {
535 for (
int col = 0; col < cols; ++col) {
536 std::string column_name = PQfname(pg_result, col);
537 if (PQgetisnull(pg_result, row, col)) {
538 db_row[column_name] =
nullptr;
540 const char* value = PQgetvalue(pg_result, row, col);
541 Oid
type = PQftype(pg_result, col);
544 db_row[column_name] =
static_cast<int64_t
>(std::stoll(value));
545 }
else if (
type == 700 ||
type == 701) {
546 db_row[column_name] = std::stod(value);
547 }
else if (
type == 16) {
548 db_row[column_name] = (*value ==
't' || *value ==
'1');
550 db_row[column_name] = std::string(value);
554 result.push_back(std::move(db_row));
557 }
catch (
const std::exception& e) {
558 last_error_ = std::string(
"Select prepared error: ") + e.what();
560 return kcenon::common::error_info{
568 return database_backend::select_prepared(query, params);
576 const std::string& query,
577 const std::vector<core::database_value>& params)
581 return kcenon::common::error_info{
593 return kcenon::common::error_info{
600 pqxx::connection* conn =
static_cast<pqxx::connection*
>(
connection_);
601 pqxx::work txn(*conn);
603 pqxx::params pq_params;
604 for (
const auto& val : params) {
605 std::visit([&pq_params](
const auto& v) {
606 using T = std::decay_t<
decltype(v)>;
607 if constexpr (std::is_same_v<T, std::nullptr_t>) {
609 }
else if constexpr (std::is_same_v<T, bool>) {
611 }
else if constexpr (std::is_same_v<T, int64_t>) {
613 }
else if constexpr (std::is_same_v<T, double>) {
615 }
else if constexpr (std::is_same_v<T, std::string>) {
621 txn.exec_params(query, pq_params);
624 return kcenon::common::ok();
625 }
catch (
const std::exception& e) {
626 last_error_ = std::string(
"Execute prepared error: ") + e.what();
628 return kcenon::common::error_info{
634#elif defined(HAVE_LIBPQ)
638 return kcenon::common::error_info{
645 std::vector<std::string> param_strings;
646 std::vector<const char*> param_values;
647 param_strings.reserve(params.size());
648 param_values.reserve(params.size());
650 for (
const auto& val : params) {
651 std::visit([¶m_strings, ¶m_values](
const auto& v) {
652 using T = std::decay_t<
decltype(v)>;
653 if constexpr (std::is_same_v<T, std::nullptr_t>) {
654 param_strings.emplace_back();
655 param_values.push_back(
nullptr);
656 }
else if constexpr (std::is_same_v<T, bool>) {
657 param_strings.push_back(v ?
"t" :
"f");
658 param_values.push_back(param_strings.back().c_str());
659 }
else if constexpr (std::is_same_v<T, std::string>) {
660 param_strings.push_back(v);
661 param_values.push_back(param_strings.back().c_str());
663 param_strings.push_back(std::to_string(v));
664 param_values.push_back(param_strings.back().c_str());
669 PGresult* pg_result = PQexecParams(
672 static_cast<int>(params.size()),
680 if (pg_result ==
nullptr) {
681 last_error_ =
"PostgreSQL execute prepared failed";
683 return kcenon::common::error_info{
690 ExecStatusType status = PQresultStatus(pg_result);
691 bool success = (status == PGRES_COMMAND_OK) || (status == PGRES_TUPLES_OK);
697 return kcenon::common::error_info{
706 return kcenon::common::ok();
708 return database_backend::execute_prepared(query, params);
800 const std::vector<std::string>& queries)
804 return kcenon::common::error_info{
811 if (queries.empty()) {
812 return static_cast<uint64_t
>(0);
817 if (begin_result.is_err()) {
818 return kcenon::common::error_info{
819 begin_result.error().code,
820 "Batch begin failed: " + begin_result.error().message,
825 uint64_t total_affected = 0;
826 for (
size_t i = 0; i < queries.size(); ++i) {
829 std::string batch_error =
"Batch query " + std::to_string(i) +
833 return kcenon::common::error_info{
839 total_affected += affected;
843 if (commit_result.is_err()) {
844 return kcenon::common::error_info{
845 commit_result.error().code,
846 "Batch commit failed: " + commit_result.error().message,
852 return total_affected;
unsigned int execute_modification_query(const std::string &query_string)
Execute a modification query (INSERT, UPDATE, DELETE)
kcenon::common::VoidResult begin_transaction() override
Begin a transaction.
std::string sanitize_error(const std::string &error_message) const
Remove password from an error message that may contain the connection string.
kcenon::common::Result< uint64_t > execute_batch(const std::vector< std::string > &queries)
Execute multiple queries in a single transaction (batch mode).
kcenon::common::VoidResult execute_query(const std::string &query_string) override
Execute a general SQL query (DDL, DML)
std::map< std::string, std::string > connection_info() const override
Get backend-specific connection information.
std::string build_connection_string(const core::connection_config &config) const
Convert connection_config to PostgreSQL connection string.
kcenon::common::VoidResult commit_transaction() override
Commit the current transaction.
kcenon::common::VoidResult do_initialize(const core::connection_config &config)
Database-specific initialization logic.
kcenon::common::VoidResult rollback_transaction() override
Rollback the current transaction.
kcenon::common::VoidResult execute_prepared(const std::string &query, const std::vector< core::database_value > ¶ms) override
Execute a parameterized DML/DDL query (prepared statement)
kcenon::common::VoidResult do_shutdown()
Database-specific shutdown logic.
std::string last_error() const override
Get last error message from backend.
std::string build_safe_connection_string(const core::connection_config &config) const
Build a connection string with password masked for safe logging.
bool in_transaction() const override
Check if backend is currently in a transaction.
std::atomic< bool > in_transaction_
Transaction state.
core::connection_config connection_config_
Cached connection config.
kcenon::common::Result< core::database_result > select_prepared(const std::string &query, const std::vector< core::database_value > ¶ms) override
Execute a parameterized SELECT query (prepared statement)
void * connection_
PostgreSQL connection (PGconn* or pqxx::connection*)
kcenon::common::Result< core::database_result > select_query(const std::string &query_string) override
Execute a SELECT query.
PostgreSQL database backend plugin implementation.