Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
async_operations_demo.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, šŸ€ā˜€šŸŒ•šŸŒ„ 🌊
3// See the LICENSE file in the project root for full license information.
4
22#ifndef HAS_COROUTINES
23#include <iostream>
24int main() {
25 std::cout << "This demo requires C++20 coroutines support.\n";
26 std::cout << "Please build with -DCMAKE_CXX_STANDARD=20 and ensure your compiler supports coroutines.\n";
27 return 1;
28}
29#else
30
31#include <iostream>
32#include <string>
33#include <chrono>
34#include <future>
35#include <memory>
36#include <vector>
37#include <coroutine>
40
41using namespace database;
42using namespace database::async;
43
44void demonstrate_basic_async_operations() {
45 std::cout << "=== Basic Asynchronous Database Operations ===\n";
46
47 async_executor& executor = async_executor::instance();
48
49 // Configure async executor
50 async_config config;
51 config.thread_pool_size = 8;
52 config.max_concurrent_operations = 100;
53 config.operation_timeout = std::chrono::seconds(30);
54 config.enable_coroutines = true;
55
56 executor.configure(config);
57 std::cout << "Async executor configured with " << config.thread_pool_size << " threads\n";
58
59 // Demonstrate async query execution
60 std::cout << "\nExecuting asynchronous queries...\n";
61
62 // Submit multiple async queries
63 std::vector<std::future<query_result>> futures;
64
65 for (int i = 0; i < 5; ++i) {
66 std::string query = "SELECT * FROM users WHERE department_id = " + std::to_string(i + 1);
67
68 auto future = executor.execute_async([query, i]() -> query_result {
69 // Simulate database query execution
70 std::this_thread::sleep_for(std::chrono::milliseconds(100 + (i * 50)));
71
72 query_result result;
73 result.success = true;
74 result.rows_affected = (i + 1) * 10;
75 result.execution_time = std::chrono::milliseconds(100 + (i * 50));
76 result.query = query;
77
78 return result;
79 });
80
81 futures.push_back(std::move(future));
82 std::cout << " šŸš€ Query " << (i + 1) << " submitted asynchronously\n";
83 }
84
85 // Collect results as they complete
86 std::cout << "\nCollecting async query results:\n";
87 for (size_t i = 0; i < futures.size(); ++i) {
88 auto& future = futures[i];
89 auto result = future.get();
90
91 std::cout << " āœ… Query " << (i + 1) << " completed: "
92 << result.rows_affected << " rows, "
93 << result.execution_time.count() << "ms\n";
94 }
95}
96
97// C++20 Coroutine demonstration
98struct task {
99 struct promise_type {
100 task get_return_object() { return task{std::coroutine_handle<promise_type>::from_promise(*this)}; }
101 std::suspend_never initial_suspend() { return {}; }
102 std::suspend_never final_suspend() noexcept { return {}; }
103 void return_void() {}
104 void unhandled_exception() {}
105 };
106
107 std::coroutine_handle<promise_type> h;
108 task(std::coroutine_handle<promise_type> handle) : h(handle) {}
109 ~task() { if (h) h.destroy(); }
110
111 task(const task&) = delete;
112 task& operator=(const task&) = delete;
113 task(task&& other) noexcept : h(std::exchange(other.h, {})) {}
114 task& operator=(task&& other) noexcept {
115 if (this != &other) {
116 if (h) h.destroy();
117 h = std::exchange(other.h, {});
118 }
119 return *this;
120 }
121};
122
123task async_database_operation(const std::string& operation_name) {
124 std::cout << " šŸ”„ Starting " << operation_name << "\n";
125
126 // Simulate async database work
127 std::this_thread::sleep_for(std::chrono::milliseconds(200));
128
129 std::cout << " āœ… Completed " << operation_name << "\n";
130 co_return;
131}
132
133void demonstrate_coroutine_operations() {
134 std::cout << "\n=== C++20 Coroutine Database Operations ===\n";
135
136 std::cout << "Using coroutines for non-blocking database operations...\n";
137
138 // Execute multiple coroutine-based operations
139 std::vector<task> tasks;
140
141 tasks.push_back(async_database_operation("User authentication"));
142 tasks.push_back(async_database_operation("Data validation"));
143 tasks.push_back(async_database_operation("Cache update"));
144 tasks.push_back(async_database_operation("Audit logging"));
145
146 std::cout << "All coroutine operations initiated and completed.\n";
147
148 std::cout << "\nCoroutine Benefits:\n";
149 std::cout << " • Non-blocking execution\n";
150 std::cout << " • Efficient memory usage\n";
151 std::cout << " • Natural async/await syntax\n";
152 std::cout << " • Better exception handling\n";
153}
154
155void demonstrate_async_connection_pool() {
156 std::cout << "\n=== Asynchronous Connection Pool ===\n";
157
158 async_connection_pool pool;
159
160 // Configure async connection pool
161 async_pool_config config;
162 config.min_connections = 5;
163 config.max_connections = 20;
164 config.acquire_timeout = std::chrono::milliseconds(5000);
165 config.idle_timeout = std::chrono::minutes(10);
166 config.health_check_interval = std::chrono::seconds(30);
167
168 pool.configure(config);
169 std::cout << "Async connection pool configured:\n";
170 std::cout << " Min connections: " << config.min_connections << "\n";
171 std::cout << " Max connections: " << config.max_connections << "\n";
172 std::cout << " Acquire timeout: " << config.acquire_timeout.count() << "ms\n";
173
174 // Simulate concurrent connection requests
175 std::cout << "\nSimulating concurrent connection requests...\n";
176
177 std::vector<std::future<connection_result>> connection_futures;
178
179 for (int i = 0; i < 15; ++i) {
180 auto future = pool.get_connection_async();
181 connection_futures.push_back(std::move(future));
182 std::cout << " šŸ“” Connection request " << (i + 1) << " submitted\n";
183 }
184
185 // Process connection results
186 std::cout << "\nProcessing connection acquisitions:\n";
187 int successful_connections = 0;
188
189 for (size_t i = 0; i < connection_futures.size(); ++i) {
190 auto& future = connection_futures[i];
191
192 try {
193 auto result = future.get();
194 if (result.success) {
195 successful_connections++;
196 std::cout << " āœ… Connection " << (i + 1) << " acquired in "
197 << result.acquisition_time.count() << "ms\n";
198 } else {
199 std::cout << " āŒ Connection " << (i + 1) << " failed: " << result.error_message << "\n";
200 }
201 } catch (const std::exception& e) {
202 std::cout << " āŒ Connection " << (i + 1) << " exception: " << e.what() << "\n";
203 }
204 }
205
206 std::cout << "\nConnection Pool Summary:\n";
207 std::cout << " Successful connections: " << successful_connections << "/" << connection_futures.size() << "\n";
208 std::cout << " Pool utilization: " << pool.get_utilization_percentage() << "%\n";
209}
210
211void demonstrate_real_time_streams() {
212 std::cout << "\n=== Real-Time Data Streams ===\n";
213
214 // PostgreSQL NOTIFY/LISTEN demonstration
215 std::cout << "šŸ”” PostgreSQL NOTIFY/LISTEN Stream:\n";
216
217 postgres_stream_listener listener;
218 listener.subscribe("user_changes", [](const notification& notif) {
219 std::cout << " šŸ“¢ Received notification: " << notif.channel
220 << " → " << notif.payload << "\n";
221 });
222
223 // Simulate notifications
224 std::vector<std::string> notifications = {
225 "User alice.smith logged in",
226 "User bob.jones updated profile",
227 "User carol.wilson changed password",
228 "New user david.brown registered"
229 };
230
231 for (const auto& msg : notifications) {
232 listener.simulate_notification("user_changes", msg);
233 std::this_thread::sleep_for(std::chrono::milliseconds(500));
234 }
235
236 // MongoDB Change Streams demonstration
237 std::cout << "\nšŸ“Š MongoDB Change Streams:\n";
238
239 mongodb_change_stream stream;
240 stream.watch_collection("users", [](const change_event& event) {
241 std::cout << " šŸ”„ Change detected: " << event.operation_type
242 << " on document " << event.document_id << "\n";
243 });
244
245 // Simulate change events
246 std::vector<std::tuple<std::string, std::string>> changes = {
247 {"insert", "user_001"},
248 {"update", "user_002"},
249 {"delete", "user_003"},
250 {"replace", "user_004"}
251 };
252
253 for (const auto& [op, doc_id] : changes) {
254 stream.simulate_change(op, doc_id);
255 std::this_thread::sleep_for(std::chrono::milliseconds(300));
256 }
257
258 std::cout << "\nReal-time stream capabilities:\n";
259 std::cout << " • Low-latency event processing\n";
260 std::cout << " • Automatic reconnection handling\n";
261 std::cout << " • Backpressure management\n";
262 std::cout << " • Event filtering and routing\n";
263}
264
265void demonstrate_distributed_transactions() {
266 std::cout << "\n=== Distributed Transaction Coordination ===\n";
267
268 distributed_transaction_coordinator coordinator;
269
270 // Configure distributed transaction
271 transaction_config config;
272 config.enable_two_phase_commit = true;
273 config.transaction_timeout = std::chrono::seconds(30);
274 config.max_participants = 5;
275 config.isolation_level = isolation_level::serializable;
276
277 coordinator.configure(config);
278 std::cout << "Distributed transaction coordinator configured:\n";
279 std::cout << " Two-phase commit: enabled\n";
280 std::cout << " Timeout: " << config.transaction_timeout.count() << "s\n";
281 std::cout << " Max participants: " << config.max_participants << "\n";
282
283 // Register transaction participants
284 std::vector<std::string> participants = {
285 "postgres_primary",
286 "postgres_replica",
287 "mongodb_cluster",
288 "redis_cache"
289 };
290
291 std::cout << "\nRegistering transaction participants:\n";
292 for (const auto& participant : participants) {
293 coordinator.register_participant(participant);
294 std::cout << " šŸ“ Registered: " << participant << "\n";
295 }
296
297 // Execute distributed transaction
298 std::cout << "\nExecuting distributed transaction...\n";
299
300 auto transaction_future = coordinator.begin_transaction_async();
301
302 // Simulate transaction operations on each participant
303 std::vector<std::future<operation_result>> operation_futures;
304
305 for (const auto& participant : participants) {
306 auto future = coordinator.execute_operation_async(participant, [participant]() -> operation_result {
307 // Simulate operation on this participant
308 std::this_thread::sleep_for(std::chrono::milliseconds(100));
309
310 operation_result result;
311 result.success = true;
312 result.participant = participant;
313 result.operation_time = std::chrono::milliseconds(100);
314
315 return result;
316 });
317
318 operation_futures.push_back(std::move(future));
319 std::cout << " šŸ”„ Operation submitted to " << participant << "\n";
320 }
321
322 // Collect operation results
323 std::cout << "\nCollecting operation results:\n";
324 bool all_successful = true;
325
326 for (auto& future : operation_futures) {
327 auto result = future.get();
328 std::cout << " " << (result.success ? "āœ…" : "āŒ")
329 << " " << result.participant
330 << " (" << result.operation_time.count() << "ms)\n";
331
332 if (!result.success) {
333 all_successful = false;
334 }
335 }
336
337 // Commit or rollback based on results
338 if (all_successful) {
339 auto commit_result = coordinator.commit_transaction_async().get();
340 std::cout << "\nšŸŽ‰ Distributed transaction COMMITTED successfully\n";
341 std::cout << " All " << participants.size() << " participants confirmed\n";
342 } else {
343 auto rollback_result = coordinator.rollback_transaction_async().get();
344 std::cout << "\nšŸ”„ Distributed transaction ROLLED BACK\n";
345 std::cout << " All participants restored to original state\n";
346 }
347}
348
349void demonstrate_saga_pattern() {
350 std::cout << "\n=== Saga Pattern for Long-Running Transactions ===\n";
351
352 saga_coordinator saga;
353
354 std::cout << "Implementing saga pattern for order processing workflow...\n";
355
356 // Define saga steps
357 std::vector<saga_step> steps = {
358 {"validate_payment", "Payment validation and authorization"},
359 {"reserve_inventory", "Reserve products in inventory"},
360 {"create_shipment", "Create shipping label and schedule"},
361 {"update_customer", "Update customer order history"},
362 {"send_confirmation", "Send order confirmation email"}
363 };
364
365 std::cout << "\nSaga workflow steps:\n";
366 for (size_t i = 0; i < steps.size(); ++i) {
367 saga.add_step(steps[i]);
368 std::cout << " " << (i + 1) << ". " << steps[i].description << "\n";
369 }
370
371 // Execute saga
372 std::cout << "\nExecuting saga workflow...\n";
373
374 auto saga_future = saga.execute_async();
375
376 // Simulate step execution
377 for (size_t i = 0; i < steps.size(); ++i) {
378 std::this_thread::sleep_for(std::chrono::milliseconds(200));
379
380 bool step_success = (i != 2); // Simulate failure at step 3
381
382 if (step_success) {
383 std::cout << " āœ… Step " << (i + 1) << " (" << steps[i].name << ") completed\n";
384 } else {
385 std::cout << " āŒ Step " << (i + 1) << " (" << steps[i].name << ") FAILED\n";
386 std::cout << " šŸ”„ Initiating compensating actions...\n";
387
388 // Execute compensating actions for completed steps
389 for (int j = i - 1; j >= 0; --j) {
390 std::this_thread::sleep_for(std::chrono::milliseconds(100));
391 std::cout << " ā†©ļø Compensating step " << (j + 1) << " (" << steps[j].name << ")\n";
392 }
393
394 std::cout << " šŸ”„ Saga compensation completed - system restored to consistent state\n";
395 break;
396 }
397 }
398
399 std::cout << "\nSaga Pattern Benefits:\n";
400 std::cout << " • Eventual consistency for distributed systems\n";
401 std::cout << " • Automatic compensation on failures\n";
402 std::cout << " • Better resilience than distributed transactions\n";
403 std::cout << " • Suitable for long-running business processes\n";
404}
405
406void demonstrate_async_batch_processing() {
407 std::cout << "\n=== Asynchronous Batch Processing ===\n";
408
409 batch_processor processor;
410
411 // Configure batch processing
412 batch_config config;
413 config.batch_size = 100;
414 config.max_parallel_batches = 4;
415 config.processing_timeout = std::chrono::minutes(5);
416 config.retry_attempts = 3;
417
418 processor.configure(config);
419 std::cout << "Batch processor configured:\n";
420 std::cout << " Batch size: " << config.batch_size << " records\n";
421 std::cout << " Parallel batches: " << config.max_parallel_batches << "\n";
422 std::cout << " Timeout: " << config.processing_timeout.count() << " minutes\n";
423
424 // Submit large dataset for processing
425 std::cout << "\nProcessing large dataset asynchronously...\n";
426
427 const int total_records = 1000;
428 auto processing_future = processor.process_async(total_records);
429
430 // Monitor progress
431 std::cout << "Batch processing progress:\n";
432 for (int progress = 0; progress <= 100; progress += 20) {
433 std::this_thread::sleep_for(std::chrono::milliseconds(300));
434 std::cout << " šŸ“Š Progress: " << progress << "% ("
435 << (progress * total_records / 100) << "/" << total_records << " records)\n";
436 }
437
438 auto result = processing_future.get();
439 std::cout << "\nšŸŽ‰ Batch processing completed:\n";
440 std::cout << " Total records: " << result.total_records << "\n";
441 std::cout << " Successful: " << result.successful_records << "\n";
442 std::cout << " Failed: " << result.failed_records << "\n";
443 std::cout << " Processing time: " << result.total_time.count() << "ms\n";
444 std::cout << " Throughput: " << (result.successful_records * 1000 / result.total_time.count()) << " records/sec\n";
445}
446
447int main() {
448 std::cout << "=== Asynchronous Operations Framework Demonstration ===\n";
449 std::cout << "This sample demonstrates C++20 coroutines, async database operations,\n";
450 std::cout << "and distributed transaction patterns for modern applications.\n";
451
452 try {
453 demonstrate_basic_async_operations();
454 demonstrate_coroutine_operations();
455 demonstrate_async_connection_pool();
456 demonstrate_real_time_streams();
457 demonstrate_distributed_transactions();
458 demonstrate_saga_pattern();
459 demonstrate_async_batch_processing();
460
461 std::cout << "\n=== Async Operations Features Summary ===\n";
462 std::cout << "āœ“ C++20 coroutines with co_await support\n";
463 std::cout << "āœ“ std::future-based asynchronous operations\n";
464 std::cout << "āœ“ Non-blocking connection pool management\n";
465 std::cout << "āœ“ Real-time data streams (PostgreSQL NOTIFY, MongoDB Change Streams)\n";
466 std::cout << "āœ“ Distributed transaction coordination with 2PC\n";
467 std::cout << "āœ“ Saga pattern for long-running transactions\n";
468 std::cout << "āœ“ Asynchronous batch processing with progress tracking\n";
469 std::cout << "āœ“ Exception handling and automatic retries\n";
470
471 std::cout << "\nFor production deployment:\n";
472 std::cout << " async_executor::instance().configure(async_config);\n";
473 std::cout << " auto result = async_executor::instance().execute_async(operation);\n";
474 std::cout << " // Use co_await for coroutine-based operations\n";
475
476 } catch (const std::exception& e) {
477 std::cout << "Error: " << e.what() << std::endl;
478 return 1;
479 }
480
481 return 0;
482}
483
484#endif // HAS_COROUTINES
High-performance asynchronous executor using thread_system.