Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
async_queries.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, πŸ€β˜€πŸŒ•πŸŒ₯ 🌊
3// See the LICENSE file in the project root for full license information.
4
21#include <iostream>
22#include <vector>
23#include <future>
24#include <chrono>
25#include <iomanip>
26
27using namespace database::integrated;
28using namespace std::chrono;
29
30void print_header(const std::string& title) {
31 std::cout << "\n" << std::string(60, '=') << "\n";
32 std::cout << title << "\n";
33 std::cout << std::string(60, '=') << "\n\n";
34}
35
40 print_header("Example 1: Single Async Query");
41
42 std::cout << "Submitting async query...\n";
43
44 auto start = steady_clock::now();
45
46 // Submit query asynchronously
47 auto future = db.execute_async("SELECT 1 as test_value");
48
49 std::cout << "Query submitted, continuing with other work...\n";
50
51 // Do other work while query executes
52 std::this_thread::sleep_for(milliseconds(10));
53 std::cout << "Other work completed\n";
54
55 // Get result
56 std::cout << "Waiting for query result...\n";
57 auto result = future.get();
58
59 auto duration = duration_cast<milliseconds>(steady_clock::now() - start);
60
61 if (result.is_ok()) {
62 std::cout << "βœ… Query succeeded\n";
63 std::cout << " Rows: " << result.value().size() << "\n";
64 std::cout << " Duration: " << duration.count() << " ms\n";
65 } else {
66 std::cout << "❌ Query failed: " << result.error().message << "\n";
67 }
68}
69
74 print_header("Example 2: Multiple Concurrent Queries");
75
76 const int num_queries = 5;
77 std::vector<std::future<kcenon::common::Result<query_result>>> futures;
78
79 std::cout << "Submitting " << num_queries << " concurrent queries...\n";
80
81 auto start = steady_clock::now();
82
83 // Submit all queries concurrently
84 for (int i = 0; i < num_queries; ++i) {
85 std::string query = "SELECT " + std::to_string(i) + " as query_id";
86 futures.push_back(db.execute_async(query));
87 }
88
89 std::cout << "All queries submitted\n";
90 std::cout << "Waiting for results...\n";
91
92 // Collect all results
93 int succeeded = 0;
94 int failed = 0;
95
96 for (size_t i = 0; i < futures.size(); ++i) {
97 auto result = futures[i].get();
98
99 if (result.is_ok()) {
100 succeeded++;
101 std::cout << " Query " << (i + 1) << ": βœ… Success\n";
102 } else {
103 failed++;
104 std::cout << " Query " << (i + 1) << ": ❌ Failed - "
105 << result.error().message << "\n";
106 }
107 }
108
109 auto duration = duration_cast<milliseconds>(steady_clock::now() - start);
110
111 std::cout << "\nResults:\n";
112 std::cout << " Succeeded: " << succeeded << "\n";
113 std::cout << " Failed: " << failed << "\n";
114 std::cout << " Total Duration: " << duration.count() << " ms\n";
115 std::cout << " Avg Duration: " << (duration.count() / num_queries) << " ms/query\n";
116}
117
122 print_header("Example 3: Query with Timeout");
123
124 std::cout << "Submitting query with 2 second timeout...\n";
125
126 auto future = db.execute_async("SELECT pg_sleep(1)"); // Sleeps for 1 second
127
128 // Wait for result with timeout
129 auto status = future.wait_for(seconds(2));
130
131 if (status == std::future_status::ready) {
132 std::cout << "βœ… Query completed within timeout\n";
133 auto result = future.get();
134 if (result.is_ok()) {
135 std::cout << " Result: Success\n";
136 } else {
137 std::cout << " Result: " << result.error().message << "\n";
138 }
139 } else if (status == std::future_status::timeout) {
140 std::cout << "⏱️ Query timed out (still executing in background)\n";
141 } else {
142 std::cout << "⏸️ Query deferred\n";
143 }
144}
145
150 print_header("Example 4: Batch Async Operations");
151
152 struct QueryTask {
153 std::string name;
154 std::string query;
155 std::future<kcenon::common::Result<query_result>> future;
156 };
157
158 std::vector<QueryTask> tasks;
159
160 // Prepare batch of queries
161 tasks.push_back({"User count", "SELECT COUNT(*) FROM users", {}});
162 tasks.push_back({"Active sessions", "SELECT COUNT(*) FROM sessions WHERE active = true", {}});
163 tasks.push_back({"Recent orders", "SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 hour'", {}});
164 tasks.push_back({"System status", "SELECT 1", {}});
165
166 std::cout << "Submitting batch of " << tasks.size() << " queries...\n";
167
168 auto start = steady_clock::now();
169
170 // Submit all queries
171 for (auto& task : tasks) {
172 task.future = db.execute_async(task.query);
173 }
174
175 std::cout << "All queries submitted, processing results...\n\n";
176
177 // Process results
178 for (auto& task : tasks) {
179 std::cout << " " << std::setw(20) << std::left << task.name << ": ";
180
181 auto result = task.future.get();
182
183 if (result.is_ok()) {
184 std::cout << "βœ… " << result.value().size() << " rows\n";
185 } else {
186 std::cout << "❌ " << result.error().message << "\n";
187 }
188 }
189
190 auto duration = duration_cast<milliseconds>(steady_clock::now() - start);
191 std::cout << "\nTotal Duration: " << duration.count() << " ms\n";
192}
193
198 print_header("Example 5: Error Handling in Async Context");
199
200 std::cout << "Submitting intentionally invalid query...\n";
201
202 // Submit an invalid query
203 auto future = db.execute_async("SELECT * FROM nonexistent_table");
204
205 std::cout << "Query submitted, waiting for result...\n";
206
207 auto result = future.get();
208
209 if (result.is_ok()) {
210 std::cout << "βœ… Query succeeded (unexpected)\n";
211 } else {
212 std::cout << "❌ Query failed (expected):\n";
213 std::cout << " Error Code: " << result.error().code << "\n";
214 std::cout << " Error Message: " << result.error().message << "\n";
215 }
216}
217
218int main(int argc, char* argv[]) {
219 print_header("Unified Database System - Async Queries Example");
220
221 // Create database instance
222 std::cout << "Creating database instance with async support...\n";
223
225 .enable_logging(db_log_level::info, "./logs")
226 .enable_monitoring(true)
227 .enable_async(8) // 8 async worker threads
228 .set_pool_size(2, 10)
229 .build();
230
231 if (db_result.is_err()) {
232 std::cerr << "Failed to create database instance: " << db_result.error().message << "\n";
233 return 1;
234 }
235 auto db = std::move(db_result.value());
236
237 std::cout << "βœ… Database instance created with async support\n";
238
239 // Connect to database (if connection string provided)
240 if (argc > 1) {
241 std::string conn_string = argv[1];
242 std::cout << "Connecting to database...\n";
243
244 auto connect_result = db->connect(conn_string);
245
246 if (connect_result.is_ok()) {
247 std::cout << "βœ… Connected successfully\n";
248
249 // Run examples
255
256 // Show final metrics
257 print_header("Final Metrics");
258 auto metrics = db->get_metrics();
259 std::cout << "Total Queries: " << metrics.total_queries << "\n";
260 std::cout << "Successful: " << metrics.successful_queries << "\n";
261 std::cout << "Failed: " << metrics.failed_queries << "\n";
262 std::cout << "Avg Latency: " << std::fixed << std::setprecision(3)
263 << (metrics.average_latency.count() / 1000.0) << " ms\n";
264
265 // Disconnect
266 db->disconnect();
267 std::cout << "\nβœ… Disconnected\n";
268
269 } else {
270 std::cout << "❌ Connection failed: " << connect_result.error().message << "\n";
271 std::cout << "\nNote: This example requires a real database connection.\n";
272 std::cout << "Usage: " << argv[0] << " \"host=localhost dbname=test user=test password=test\"\n";
273 return 1;
274 }
275
276 } else {
277 std::cout << "\n❌ No connection string provided.\n";
278 std::cout << "Usage: " << argv[0] << " [connection_string]\n";
279 std::cout << "Example: " << argv[0]
280 << " \"host=localhost dbname=test user=test password=test\"\n";
281 return 1;
282 }
283
284 print_header("Example Complete");
285 std::cout << "βœ… All async query examples completed!\n\n";
286
287 return 0;
288}
void example_query_with_timeout(unified_database_system &db)
void example_concurrent_queries(unified_database_system &db)
void print_header(const std::string &title)
void example_error_handling(unified_database_system &db)
void example_single_async_query(unified_database_system &db)
void example_batch_operations(unified_database_system &db)
builder & enable_monitoring(bool enable=true)
Enable monitoring and metrics collection.
builder & set_pool_size(size_t min_size, size_t max_size)
Set connection pool size.
builder & enable_async(size_t worker_threads=4)
Enable async operations.
kcenon::common::Result< std::unique_ptr< unified_database_system > > build()
Build and return the configured database system.
builder & enable_logging(db_log_level level, const std::string &log_dir="./logs")
Enable logging.
std::future< kcenon::common::Result< query_result > > execute_async(const std::string &query, const std::vector< query_param > &params={})
static builder create_builder()
Create a builder for custom configuration.
Zero-configuration database system with integrated adapters (Phase 6)