Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
postgresql_backend.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6#include "../core/result.h"
7
8#ifdef USE_POSTGRESQL
9#include <pqxx/pqxx>
10#elif defined(HAVE_LIBPQ)
11#include "libpq-fe.h"
12#endif
13
14#include <sstream>
15#include <iomanip>
16#include <variant>
17#include <iostream>
18
20
21namespace
22{
23const database::utils::backend_logger logger_("PostgreSQL");
24
25// PostgreSQL type OIDs (from pg_type.h)
26constexpr unsigned int PG_INT4OID = 23;
27constexpr unsigned int PG_INT8OID = 20;
28constexpr unsigned int PG_FLOAT4OID = 700;
29constexpr unsigned int PG_FLOAT8OID = 701;
30constexpr unsigned int PG_BOOLOID = 16;
31}
32
33namespace database
34{
35namespace backends
36{
37
39 : connection_(nullptr)
40{
41}
42
43kcenon::common::VoidResult postgresql_backend::do_initialize(const core::connection_config& config)
44{
45 connection_config_ = config;
46 std::string conn_str = build_connection_string(config);
47
48#ifdef USE_POSTGRESQL
49 try {
50 auto conn = std::make_unique<pqxx::connection>(conn_str);
51 if (conn->is_open()) {
52 connection_ = conn.release();
53 last_error_.clear();
54 return kcenon::common::ok();
55 }
56 } catch (const std::exception& e) {
57 last_error_ = std::string("Connection error: ") + sanitize_error(e.what());
58 logger_.error("do_initialize", last_error_);
59 }
60#elif defined(HAVE_LIBPQ)
61 try {
62 connection_ = PQconnectdb(conn_str.c_str());
63 if (PQstatus(static_cast<PGconn*>(connection_)) == CONNECTION_OK) {
64 last_error_.clear();
65 return kcenon::common::ok();
66 }
67 last_error_ = sanitize_error(PQerrorMessage(static_cast<PGconn*>(connection_)));
68 PQfinish(static_cast<PGconn*>(connection_));
69 connection_ = nullptr;
70 } catch (const std::exception& e) {
71 last_error_ = std::string("Connection error: ") + sanitize_error(e.what());
72 logger_.error("do_initialize", last_error_);
73 }
74#else
75 logger_.warning("PostgreSQL support not compiled. Connection: " + build_safe_connection_string(config));
76 // Mock mode for testing without PostgreSQL
77 last_error_.clear();
78 return kcenon::common::ok();
79#endif
80
81 if (last_error_.empty()) {
82 last_error_ = "Failed to connect to PostgreSQL server";
83 }
84 return kcenon::common::error_info{
87 "postgresql_backend"
88 };
89}
90
91kcenon::common::VoidResult postgresql_backend::do_shutdown()
92{
93 // Rollback any active transaction before disconnecting
94 if (in_transaction_) {
96 }
97
98#ifdef USE_POSTGRESQL
99 try {
100 delete static_cast<pqxx::connection*>(connection_);
101 connection_ = nullptr;
102 last_error_.clear();
103 return kcenon::common::ok();
104 } catch (const std::exception& e) {
105 last_error_ = std::string("Disconnect error: ") + e.what();
106 logger_.error("do_shutdown", last_error_);
107 }
108#elif defined(HAVE_LIBPQ)
109 try {
110 PQfinish(static_cast<PGconn*>(connection_));
111 connection_ = nullptr;
112 last_error_.clear();
113 return kcenon::common::ok();
114 } catch (const std::exception& e) {
115 last_error_ = std::string("Disconnect error: ") + e.what();
116 logger_.error("do_shutdown", last_error_);
117 }
118#else
119 connection_ = nullptr;
120 last_error_.clear();
121 return kcenon::common::ok();
122#endif
123
124 return kcenon::common::error_info{
127 "postgresql_backend"
128 };
129}
130
131
132unsigned int postgresql_backend::execute_modification_query(const std::string& query_string)
133{
134#ifdef USE_POSTGRESQL
135 if (!connection_) return 0;
136 try {
137 pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
138 pqxx::work txn(*conn);
139 pqxx::result result = txn.exec(query_string);
140 txn.commit();
141 return static_cast<unsigned int>(result.affected_rows());
142 } catch (const std::exception& e) {
143 last_error_ = std::string("Modification query error: ") + e.what();
144 logger_.error("execute_modification_query", last_error_);
145 }
146#elif defined(HAVE_LIBPQ)
147 if (!connection_) return 0;
148 try {
149 PGresult* result = PQexec(static_cast<PGconn*>(connection_), query_string.c_str());
150 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
151 last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
152 PQclear(result);
153 return 0;
154 }
155 const char* affected_rows = PQcmdTuples(result);
156 unsigned int count = 0;
157 if (affected_rows && *affected_rows) {
158 count = static_cast<unsigned int>(std::stoul(affected_rows));
159 }
160 PQclear(result);
161 return count;
162 } catch (const std::exception& e) {
163 last_error_ = std::string("Modification query error: ") + e.what();
164 logger_.error("execute_modification_query", last_error_);
165 }
166#else
167 logger_.warning("PostgreSQL support not compiled. Modification query: " + query_string.substr(0, 20) + "...");
168 return 1; // Mock: return 1 affected row
169#endif
170 return 0;
171}
172
173
174
175kcenon::common::Result<core::database_result> postgresql_backend::select_query(const std::string& query_string)
176{
177 if (!is_initialized()) {
178 last_error_ = "Backend not initialized";
179 return kcenon::common::error_info{
180 static_cast<int>(database::error_code::invalid_state),
182 "postgresql_backend"
183 };
184 }
185
187
188#ifdef USE_POSTGRESQL
189 if (!connection_) {
190 last_error_ = "No active connection";
191 return kcenon::common::error_info{
194 "postgresql_backend"
195 };
196 }
197 try {
198 pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
199 pqxx::work txn(*conn);
200 pqxx::result pqxx_result = txn.exec(query_string);
201 txn.commit();
202
203 for (const auto& row : pqxx_result) {
204 core::database_row db_row;
205 for (size_t i = 0; i < row.size(); ++i) {
206 std::string column_name = pqxx_result.column_name(i);
207 if (row[i].is_null()) {
208 db_row[column_name] = nullptr;
209 } else {
210 // Try to convert to appropriate type
211 if (row[i].type() == PG_INT8OID ||
212 row[i].type() == PG_INT4OID) {
213 db_row[column_name] = row[i].as<int64_t>();
214 } else if (row[i].type() == PG_FLOAT8OID ||
215 row[i].type() == PG_FLOAT4OID) {
216 db_row[column_name] = row[i].as<double>();
217 } else if (row[i].type() == PG_BOOLOID) {
218 db_row[column_name] = row[i].as<bool>();
219 } else {
220 db_row[column_name] = row[i].as<std::string>();
221 }
222 }
223 }
224 result.push_back(std::move(db_row));
225 }
226 } catch (const std::exception& e) {
227 last_error_ = std::string("Select query error: ") + e.what();
228 logger_.error("select_query", last_error_);
229 return kcenon::common::error_info{
230 static_cast<int>(database::error_code::query_failed),
232 "postgresql_backend"
233 };
234 }
235#elif defined(HAVE_LIBPQ)
236 if (!connection_) {
237 last_error_ = "No active connection";
238 return kcenon::common::error_info{
241 "postgresql_backend"
242 };
243 }
244 try {
245 PGresult* pg_result = PQexec(static_cast<PGconn*>(connection_), query_string.c_str());
246 if (PQresultStatus(pg_result) != PGRES_TUPLES_OK) {
247 last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
248 PQclear(pg_result);
249 return kcenon::common::error_info{
250 static_cast<int>(database::error_code::query_failed),
252 "postgresql_backend"
253 };
254 }
255
256 int rows = PQntuples(pg_result);
257 int cols = PQnfields(pg_result);
258
259 for (int row = 0; row < rows; ++row) {
260 core::database_row db_row;
261 for (int col = 0; col < cols; ++col) {
262 std::string column_name = PQfname(pg_result, col);
263 if (PQgetisnull(pg_result, row, col)) {
264 db_row[column_name] = nullptr;
265 } else {
266 const char* value = PQgetvalue(pg_result, row, col);
267 Oid type = PQftype(pg_result, col);
268
269 // Convert based on PostgreSQL type
270 if (type == 20 || type == 21 || type == 23) { // int8, int2, int4
271 db_row[column_name] = static_cast<int64_t>(std::stoll(value));
272 } else if (type == 700 || type == 701) { // float4, float8
273 db_row[column_name] = std::stod(value);
274 } else if (type == 16) { // bool
275 db_row[column_name] = (*value == 't' || *value == '1');
276 } else {
277 db_row[column_name] = std::string(value);
278 }
279 }
280 }
281 result.push_back(std::move(db_row));
282 }
283 PQclear(pg_result);
284 } catch (const std::exception& e) {
285 last_error_ = std::string("Select query error: ") + e.what();
286 logger_.error("select_query", last_error_);
287 return kcenon::common::error_info{
288 static_cast<int>(database::error_code::query_failed),
290 "postgresql_backend"
291 };
292 }
293#else
294 logger_.warning("PostgreSQL support not compiled. Select query: " + query_string.substr(0, 20) + "...");
295 // Return mock data for testing
296 if (query_string.find("SELECT") != std::string::npos) {
297 core::database_row mock_row;
298 mock_row["id"] = int64_t(1);
299 mock_row["name"] = std::string("mock_data");
300 mock_row["active"] = true;
301 result.push_back(mock_row);
302 }
303#endif
304
305 last_error_.clear();
306 return result;
307}
308
309kcenon::common::VoidResult postgresql_backend::execute_query(const std::string& query_string)
310{
311 if (!is_initialized()) {
312 last_error_ = "Backend not initialized";
313 return kcenon::common::error_info{
314 static_cast<int>(database::error_code::invalid_state),
316 "postgresql_backend"
317 };
318 }
319
320#ifdef USE_POSTGRESQL
321 try {
322 if (!connection_) {
323 last_error_ = "No active PostgreSQL connection";
324 logger_.error("execute_query", last_error_);
325 return kcenon::common::error_info{
328 "postgresql_backend"
329 };
330 }
331
332 pqxx::work txn{*static_cast<pqxx::connection*>(connection_)};
333 txn.exec(query_string);
334 txn.commit();
335 last_error_.clear();
336 return kcenon::common::ok();
337 } catch (const std::exception& e) {
338 last_error_ = std::string("Execute error: ") + e.what();
339 logger_.error("execute_query", last_error_);
340 return kcenon::common::error_info{
341 static_cast<int>(database::error_code::query_failed),
343 "postgresql_backend"
344 };
345 }
346#elif defined(HAVE_LIBPQ)
347 if (!connection_) {
348 last_error_ = "No active PostgreSQL connection";
349 logger_.error("execute_query", last_error_);
350 return kcenon::common::error_info{
353 "postgresql_backend"
354 };
355 }
356
357 PGresult* result = PQexec(static_cast<PGconn*>(connection_), query_string.c_str());
358 if (result == nullptr) {
359 last_error_ = "PostgreSQL execute failed";
360 logger_.error("execute_query", last_error_);
361 return kcenon::common::error_info{
362 static_cast<int>(database::error_code::query_failed),
364 "postgresql_backend"
365 };
366 }
367
368 ExecStatusType status = PQresultStatus(result);
369 bool success = (status == PGRES_COMMAND_OK) || (status == PGRES_TUPLES_OK);
370
371 if (!success) {
372 last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
373 logger_.error("execute_query", last_error_);
374 PQclear(result);
375 return kcenon::common::error_info{
376 static_cast<int>(database::error_code::query_failed),
378 "postgresql_backend"
379 };
380 }
381
382 PQclear(result);
383 last_error_.clear();
384 return kcenon::common::ok();
385#else
386 // Mock execution
387 logger_.info("PostgreSQL support not compiled. Mock execute: " + query_string);
388 last_error_.clear();
389 return kcenon::common::ok();
390#endif
391}
392
393kcenon::common::Result<core::database_result> postgresql_backend::select_prepared(
394 const std::string& query,
395 const std::vector<core::database_value>& params)
396{
397 if (!is_initialized()) {
398 last_error_ = "Backend not initialized";
399 return kcenon::common::error_info{
400 static_cast<int>(database::error_code::invalid_state),
402 "postgresql_backend"
403 };
404 }
405
407
408#ifdef USE_POSTGRESQL
409 if (!connection_) {
410 last_error_ = "No active connection";
411 return kcenon::common::error_info{
414 "postgresql_backend"
415 };
416 }
417 try {
418 pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
419 pqxx::work txn(*conn);
420
421 // Build pqxx::params from database_value vector
422 pqxx::params pq_params;
423 for (const auto& val : params) {
424 std::visit([&pq_params](const auto& v) {
425 using T = std::decay_t<decltype(v)>;
426 if constexpr (std::is_same_v<T, std::nullptr_t>) {
427 pq_params.append();
428 } else if constexpr (std::is_same_v<T, bool>) {
429 pq_params.append(v);
430 } else if constexpr (std::is_same_v<T, int64_t>) {
431 pq_params.append(v);
432 } else if constexpr (std::is_same_v<T, double>) {
433 pq_params.append(v);
434 } else if constexpr (std::is_same_v<T, std::string>) {
435 pq_params.append(v);
436 }
437 }, val);
438 }
439
440 pqxx::result pqxx_result = txn.exec_params(query, pq_params);
441 txn.commit();
442
443 for (const auto& row : pqxx_result) {
444 core::database_row db_row;
445 for (size_t i = 0; i < row.size(); ++i) {
446 std::string column_name = pqxx_result.column_name(i);
447 if (row[i].is_null()) {
448 db_row[column_name] = nullptr;
449 } else {
450 if (row[i].type() == PG_INT8OID ||
451 row[i].type() == PG_INT4OID) {
452 db_row[column_name] = row[i].as<int64_t>();
453 } else if (row[i].type() == PG_FLOAT8OID ||
454 row[i].type() == PG_FLOAT4OID) {
455 db_row[column_name] = row[i].as<double>();
456 } else if (row[i].type() == PG_BOOLOID) {
457 db_row[column_name] = row[i].as<bool>();
458 } else {
459 db_row[column_name] = row[i].as<std::string>();
460 }
461 }
462 }
463 result.push_back(std::move(db_row));
464 }
465 } catch (const std::exception& e) {
466 last_error_ = std::string("Select prepared error: ") + e.what();
467 logger_.error("select_prepared", last_error_);
468 return kcenon::common::error_info{
469 static_cast<int>(database::error_code::query_failed),
471 "postgresql_backend"
472 };
473 }
474#elif defined(HAVE_LIBPQ)
475 if (!connection_) {
476 last_error_ = "No active connection";
477 return kcenon::common::error_info{
480 "postgresql_backend"
481 };
482 }
483 try {
484 // Convert params to C string array for PQexecParams
485 std::vector<std::string> param_strings;
486 std::vector<const char*> param_values;
487 param_strings.reserve(params.size());
488 param_values.reserve(params.size());
489
490 for (const auto& val : params) {
491 std::visit([&param_strings, &param_values](const auto& v) {
492 using T = std::decay_t<decltype(v)>;
493 if constexpr (std::is_same_v<T, std::nullptr_t>) {
494 param_strings.emplace_back();
495 param_values.push_back(nullptr);
496 } else if constexpr (std::is_same_v<T, bool>) {
497 param_strings.push_back(v ? "t" : "f");
498 param_values.push_back(param_strings.back().c_str());
499 } else if constexpr (std::is_same_v<T, std::string>) {
500 param_strings.push_back(v);
501 param_values.push_back(param_strings.back().c_str());
502 } else {
503 param_strings.push_back(std::to_string(v));
504 param_values.push_back(param_strings.back().c_str());
505 }
506 }, val);
507 }
508
509 PGresult* pg_result = PQexecParams(
510 static_cast<PGconn*>(connection_),
511 query.c_str(),
512 static_cast<int>(params.size()),
513 nullptr, // let server infer types
514 param_values.data(),
515 nullptr, // text format lengths
516 nullptr, // text format
517 0 // text result format
518 );
519
520 if (PQresultStatus(pg_result) != PGRES_TUPLES_OK) {
521 last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
522 PQclear(pg_result);
523 return kcenon::common::error_info{
524 static_cast<int>(database::error_code::query_failed),
526 "postgresql_backend"
527 };
528 }
529
530 int rows = PQntuples(pg_result);
531 int cols = PQnfields(pg_result);
532
533 for (int row = 0; row < rows; ++row) {
534 core::database_row db_row;
535 for (int col = 0; col < cols; ++col) {
536 std::string column_name = PQfname(pg_result, col);
537 if (PQgetisnull(pg_result, row, col)) {
538 db_row[column_name] = nullptr;
539 } else {
540 const char* value = PQgetvalue(pg_result, row, col);
541 Oid type = PQftype(pg_result, col);
542
543 if (type == 20 || type == 21 || type == 23) {
544 db_row[column_name] = static_cast<int64_t>(std::stoll(value));
545 } else if (type == 700 || type == 701) {
546 db_row[column_name] = std::stod(value);
547 } else if (type == 16) {
548 db_row[column_name] = (*value == 't' || *value == '1');
549 } else {
550 db_row[column_name] = std::string(value);
551 }
552 }
553 }
554 result.push_back(std::move(db_row));
555 }
556 PQclear(pg_result);
557 } catch (const std::exception& e) {
558 last_error_ = std::string("Select prepared error: ") + e.what();
559 logger_.error("select_prepared", last_error_);
560 return kcenon::common::error_info{
561 static_cast<int>(database::error_code::query_failed),
563 "postgresql_backend"
564 };
565 }
566#else
567 // Fallback to string interpolation for mock mode
568 return database_backend::select_prepared(query, params);
569#endif
570
571 last_error_.clear();
572 return result;
573}
574
575kcenon::common::VoidResult postgresql_backend::execute_prepared(
576 const std::string& query,
577 const std::vector<core::database_value>& params)
578{
579 if (!is_initialized()) {
580 last_error_ = "Backend not initialized";
581 return kcenon::common::error_info{
582 static_cast<int>(database::error_code::invalid_state),
584 "postgresql_backend"
585 };
586 }
587
588#ifdef USE_POSTGRESQL
589 try {
590 if (!connection_) {
591 last_error_ = "No active PostgreSQL connection";
592 logger_.error("execute_prepared", last_error_);
593 return kcenon::common::error_info{
596 "postgresql_backend"
597 };
598 }
599
600 pqxx::connection* conn = static_cast<pqxx::connection*>(connection_);
601 pqxx::work txn(*conn);
602
603 pqxx::params pq_params;
604 for (const auto& val : params) {
605 std::visit([&pq_params](const auto& v) {
606 using T = std::decay_t<decltype(v)>;
607 if constexpr (std::is_same_v<T, std::nullptr_t>) {
608 pq_params.append();
609 } else if constexpr (std::is_same_v<T, bool>) {
610 pq_params.append(v);
611 } else if constexpr (std::is_same_v<T, int64_t>) {
612 pq_params.append(v);
613 } else if constexpr (std::is_same_v<T, double>) {
614 pq_params.append(v);
615 } else if constexpr (std::is_same_v<T, std::string>) {
616 pq_params.append(v);
617 }
618 }, val);
619 }
620
621 txn.exec_params(query, pq_params);
622 txn.commit();
623 last_error_.clear();
624 return kcenon::common::ok();
625 } catch (const std::exception& e) {
626 last_error_ = std::string("Execute prepared error: ") + e.what();
627 logger_.error("execute_prepared", last_error_);
628 return kcenon::common::error_info{
629 static_cast<int>(database::error_code::query_failed),
631 "postgresql_backend"
632 };
633 }
634#elif defined(HAVE_LIBPQ)
635 if (!connection_) {
636 last_error_ = "No active PostgreSQL connection";
637 logger_.error("execute_prepared", last_error_);
638 return kcenon::common::error_info{
641 "postgresql_backend"
642 };
643 }
644
645 std::vector<std::string> param_strings;
646 std::vector<const char*> param_values;
647 param_strings.reserve(params.size());
648 param_values.reserve(params.size());
649
650 for (const auto& val : params) {
651 std::visit([&param_strings, &param_values](const auto& v) {
652 using T = std::decay_t<decltype(v)>;
653 if constexpr (std::is_same_v<T, std::nullptr_t>) {
654 param_strings.emplace_back();
655 param_values.push_back(nullptr);
656 } else if constexpr (std::is_same_v<T, bool>) {
657 param_strings.push_back(v ? "t" : "f");
658 param_values.push_back(param_strings.back().c_str());
659 } else if constexpr (std::is_same_v<T, std::string>) {
660 param_strings.push_back(v);
661 param_values.push_back(param_strings.back().c_str());
662 } else {
663 param_strings.push_back(std::to_string(v));
664 param_values.push_back(param_strings.back().c_str());
665 }
666 }, val);
667 }
668
669 PGresult* pg_result = PQexecParams(
670 static_cast<PGconn*>(connection_),
671 query.c_str(),
672 static_cast<int>(params.size()),
673 nullptr,
674 param_values.data(),
675 nullptr,
676 nullptr,
677 0
678 );
679
680 if (pg_result == nullptr) {
681 last_error_ = "PostgreSQL execute prepared failed";
682 logger_.error("execute_prepared", last_error_);
683 return kcenon::common::error_info{
684 static_cast<int>(database::error_code::query_failed),
686 "postgresql_backend"
687 };
688 }
689
690 ExecStatusType status = PQresultStatus(pg_result);
691 bool success = (status == PGRES_COMMAND_OK) || (status == PGRES_TUPLES_OK);
692
693 if (!success) {
694 last_error_ = PQerrorMessage(static_cast<PGconn*>(connection_));
695 logger_.error("execute_prepared", last_error_);
696 PQclear(pg_result);
697 return kcenon::common::error_info{
698 static_cast<int>(database::error_code::query_failed),
700 "postgresql_backend"
701 };
702 }
703
704 PQclear(pg_result);
705 last_error_.clear();
706 return kcenon::common::ok();
707#else
708 return database_backend::execute_prepared(query, params);
709#endif
710}
711
712kcenon::common::VoidResult postgresql_backend::begin_transaction()
713{
714 if (!is_initialized()) {
715 last_error_ = "Backend not initialized";
716 return kcenon::common::error_info{
717 static_cast<int>(database::error_code::invalid_state),
719 "postgresql_backend"
720 };
721 }
722
723 if (in_transaction_) {
724 last_error_ = "Transaction already active";
725 return kcenon::common::error_info{
726 static_cast<int>(database::error_code::invalid_state),
728 "postgresql_backend"
729 };
730 }
731
732 auto result = execute_query("BEGIN");
733 if (result.is_err()) {
734 return result;
735 }
736
737 in_transaction_ = true;
738 last_error_.clear();
739 return kcenon::common::ok();
740}
741
742kcenon::common::VoidResult postgresql_backend::commit_transaction()
743{
744 if (!is_initialized()) {
745 last_error_ = "Backend not initialized";
746 return kcenon::common::error_info{
747 static_cast<int>(database::error_code::invalid_state),
749 "postgresql_backend"
750 };
751 }
752
753 if (!in_transaction_) {
754 last_error_ = "No active transaction";
755 return kcenon::common::error_info{
756 static_cast<int>(database::error_code::invalid_state),
758 "postgresql_backend"
759 };
760 }
761
762 auto result = execute_query("COMMIT");
763 if (result.is_err()) {
764 return result;
765 }
766
767 in_transaction_ = false;
768 last_error_.clear();
769 return kcenon::common::ok();
770}
771
773{
774 if (!is_initialized()) {
775 last_error_ = "Backend not initialized";
776 return kcenon::common::error_info{
777 static_cast<int>(database::error_code::invalid_state),
779 "postgresql_backend"
780 };
781 }
782
783 if (!in_transaction_) {
784 // Not an error - already rolled back or never started
785 return kcenon::common::ok();
786 }
787
788 auto result = execute_query("ROLLBACK");
789 in_transaction_ = false; // Force state reset even on error
790
791 if (result.is_err()) {
792 return result;
793 }
794
795 last_error_.clear();
796 return kcenon::common::ok();
797}
798
799kcenon::common::Result<uint64_t> postgresql_backend::execute_batch(
800 const std::vector<std::string>& queries)
801{
802 if (!is_initialized()) {
803 last_error_ = "Backend not initialized";
804 return kcenon::common::error_info{
805 static_cast<int>(database::error_code::invalid_state),
807 "postgresql_backend"
808 };
809 }
810
811 if (queries.empty()) {
812 return static_cast<uint64_t>(0);
813 }
814
815 // Use existing transaction API for atomicity
816 auto begin_result = begin_transaction();
817 if (begin_result.is_err()) {
818 return kcenon::common::error_info{
819 begin_result.error().code,
820 "Batch begin failed: " + begin_result.error().message,
821 "postgresql_backend"
822 };
823 }
824
825 uint64_t total_affected = 0;
826 for (size_t i = 0; i < queries.size(); ++i) {
827 unsigned int affected = execute_modification_query(queries[i]);
828 if (!last_error_.empty()) {
829 std::string batch_error = "Batch query " + std::to_string(i) +
830 " failed: " + last_error_;
832 last_error_ = batch_error;
833 return kcenon::common::error_info{
834 static_cast<int>(database::error_code::query_failed),
836 "postgresql_backend"
837 };
838 }
839 total_affected += affected;
840 }
841
842 auto commit_result = commit_transaction();
843 if (commit_result.is_err()) {
844 return kcenon::common::error_info{
845 commit_result.error().code,
846 "Batch commit failed: " + commit_result.error().message,
847 "postgresql_backend"
848 };
849 }
850
851 last_error_.clear();
852 return total_affected;
853}
854
856{
857 return in_transaction_;
858}
859
861{
862 return last_error_;
863}
864
865std::map<std::string, std::string> postgresql_backend::connection_info() const
866{
867 std::map<std::string, std::string> info;
868 info["backend"] = "postgresql";
869 info["host"] = connection_config_.host;
870 info["port"] = std::to_string(connection_config_.port);
871 info["database"] = connection_config_.database;
872 info["username"] = connection_config_.username;
873 info["initialized"] = initialized_ ? "true" : "false";
874 info["in_transaction"] = in_transaction_ ? "true" : "false";
875 return info;
876}
877
879{
880 std::ostringstream oss;
881
882 if (!config.host.empty()) {
883 oss << "host=" << config.host << " ";
884 }
885
886 if (config.port > 0) {
887 oss << "port=" << config.port << " ";
888 }
889
890 if (!config.database.empty()) {
891 oss << "dbname=" << config.database << " ";
892 }
893
894 if (!config.username.empty()) {
895 oss << "user=" << config.username << " ";
896 }
897
898 if (!config.password.empty()) {
899 oss << "password=" << config.password << " ";
900 }
901
902 // Append any additional options
903 for (const auto& [key, value] : config.options) {
904 oss << key << "=" << value << " ";
905 }
906
907 return oss.str();
908}
909
911{
912 std::ostringstream oss;
913
914 if (!config.host.empty()) {
915 oss << "host=" << config.host << " ";
916 }
917
918 if (config.port > 0) {
919 oss << "port=" << config.port << " ";
920 }
921
922 if (!config.database.empty()) {
923 oss << "dbname=" << config.database << " ";
924 }
925
926 if (!config.username.empty()) {
927 oss << "user=" << config.username << " ";
928 }
929
930 if (!config.password.empty()) {
931 oss << "password=*** ";
932 }
933
934 for (const auto& [key, value] : config.options) {
935 oss << key << "=" << value << " ";
936 }
937
938 return oss.str();
939}
940
941std::string postgresql_backend::sanitize_error(const std::string& error_message) const
942{
943 if (connection_config_.password.empty()) {
944 return error_message;
945 }
946
947 std::string sanitized = error_message;
948 std::string::size_type pos = 0;
949 while ((pos = sanitized.find(connection_config_.password, pos)) != std::string::npos) {
950 sanitized.replace(pos, connection_config_.password.length(), "***");
951 pos += 3;
952 }
953
954 return sanitized;
955}
956
957} // namespace backends
958} // namespace database
959
960// Auto-registration with backend_registry when PostgreSQL support is compiled in
961#ifdef USE_POSTGRESQL
962namespace {
964}
965#endif
Centralized logging utility for database backends.
unsigned int execute_modification_query(const std::string &query_string)
Execute a modification query (INSERT, UPDATE, DELETE)
kcenon::common::VoidResult begin_transaction() override
Begin a transaction.
std::string sanitize_error(const std::string &error_message) const
Remove password from an error message that may contain the connection string.
kcenon::common::Result< uint64_t > execute_batch(const std::vector< std::string > &queries)
Execute multiple queries in a single transaction (batch mode).
kcenon::common::VoidResult execute_query(const std::string &query_string) override
Execute a general SQL query (DDL, DML)
std::map< std::string, std::string > connection_info() const override
Get backend-specific connection information.
std::string build_connection_string(const core::connection_config &config) const
Convert connection_config to PostgreSQL connection string.
kcenon::common::VoidResult commit_transaction() override
Commit the current transaction.
kcenon::common::VoidResult do_initialize(const core::connection_config &config)
Database-specific initialization logic.
kcenon::common::VoidResult rollback_transaction() override
Rollback the current transaction.
kcenon::common::VoidResult execute_prepared(const std::string &query, const std::vector< core::database_value > &params) override
Execute a parameterized DML/DDL query (prepared statement)
kcenon::common::VoidResult do_shutdown()
Database-specific shutdown logic.
std::string last_error() const override
Get last error message from backend.
std::string build_safe_connection_string(const core::connection_config &config) const
Build a connection string with password masked for safe logging.
bool in_transaction() const override
Check if backend is currently in a transaction.
std::atomic< bool > in_transaction_
Transaction state.
core::connection_config connection_config_
Cached connection config.
kcenon::common::Result< core::database_result > select_prepared(const std::string &query, const std::vector< core::database_value > &params) override
Execute a parameterized SELECT query (prepared statement)
void * connection_
PostgreSQL connection (PGconn* or pqxx::connection*)
kcenon::common::Result< core::database_result > select_query(const std::string &query_string) override
Execute a SELECT query.
std::string last_error_
Last error message.
Helper class for automatic backend registration.
Centralized logging utility for database backends.
std::vector< database_row > database_result
std::map< std::string, database_value > database_row
PostgreSQL database backend plugin implementation.
Result<T> type for database_system error handling.
Configuration for database connection.
std::map< std::string, std::string > options