25 std::cout <<
"This demo requires C++20 coroutines support.\n";
26 std::cout <<
"Please build with -DCMAKE_CXX_STANDARD=20 and ensure your compiler supports coroutines.\n";
44void demonstrate_basic_async_operations() {
45 std::cout <<
"=== Basic Asynchronous Database Operations ===\n";
51 config.thread_pool_size = 8;
52 config.max_concurrent_operations = 100;
53 config.operation_timeout = std::chrono::seconds(30);
54 config.enable_coroutines =
true;
56 executor.configure(config);
57 std::cout <<
"Async executor configured with " << config.thread_pool_size <<
" threads\n";
60 std::cout <<
"\nExecuting asynchronous queries...\n";
63 std::vector<std::future<query_result>> futures;
65 for (
int i = 0; i < 5; ++i) {
66 std::string query =
"SELECT * FROM users WHERE department_id = " + std::to_string(i + 1);
68 auto future = executor.execute_async([query, i]() -> query_result {
70 std::this_thread::sleep_for(std::chrono::milliseconds(100 + (i * 50)));
73 result.success =
true;
74 result.rows_affected = (i + 1) * 10;
75 result.execution_time = std::chrono::milliseconds(100 + (i * 50));
81 futures.push_back(std::move(future));
82 std::cout <<
" š Query " << (i + 1) <<
" submitted asynchronously\n";
86 std::cout <<
"\nCollecting async query results:\n";
87 for (
size_t i = 0; i < futures.size(); ++i) {
88 auto& future = futures[i];
89 auto result = future.get();
91 std::cout <<
" ā
Query " << (i + 1) <<
" completed: "
92 << result.rows_affected <<
" rows, "
93 << result.execution_time.count() <<
"ms\n";
100 task get_return_object() {
return task{std::coroutine_handle<promise_type>::from_promise(*
this)}; }
101 std::suspend_never initial_suspend() {
return {}; }
102 std::suspend_never final_suspend() noexcept {
return {}; }
103 void return_void() {}
104 void unhandled_exception() {}
107 std::coroutine_handle<promise_type> h;
108 task(std::coroutine_handle<promise_type> handle) : h(handle) {}
109 ~task() {
if (h) h.destroy(); }
111 task(
const task&) =
delete;
112 task& operator=(
const task&) =
delete;
113 task(task&& other) noexcept : h(std::exchange(other.h, {})) {}
114 task& operator=(task&& other)
noexcept {
115 if (
this != &other) {
117 h = std::exchange(other.h, {});
123task async_database_operation(
const std::string& operation_name) {
124 std::cout <<
" š Starting " << operation_name <<
"\n";
127 std::this_thread::sleep_for(std::chrono::milliseconds(200));
129 std::cout <<
" ā
Completed " << operation_name <<
"\n";
133void demonstrate_coroutine_operations() {
134 std::cout <<
"\n=== C++20 Coroutine Database Operations ===\n";
136 std::cout <<
"Using coroutines for non-blocking database operations...\n";
139 std::vector<task> tasks;
141 tasks.push_back(async_database_operation(
"User authentication"));
142 tasks.push_back(async_database_operation(
"Data validation"));
143 tasks.push_back(async_database_operation(
"Cache update"));
144 tasks.push_back(async_database_operation(
"Audit logging"));
146 std::cout <<
"All coroutine operations initiated and completed.\n";
148 std::cout <<
"\nCoroutine Benefits:\n";
149 std::cout <<
" ⢠Non-blocking execution\n";
150 std::cout <<
" ⢠Efficient memory usage\n";
151 std::cout <<
" ⢠Natural async/await syntax\n";
152 std::cout <<
" ⢠Better exception handling\n";
155void demonstrate_async_connection_pool() {
156 std::cout <<
"\n=== Asynchronous Connection Pool ===\n";
158 async_connection_pool pool;
161 async_pool_config config;
162 config.min_connections = 5;
163 config.max_connections = 20;
164 config.acquire_timeout = std::chrono::milliseconds(5000);
165 config.idle_timeout = std::chrono::minutes(10);
166 config.health_check_interval = std::chrono::seconds(30);
168 pool.configure(config);
169 std::cout <<
"Async connection pool configured:\n";
170 std::cout <<
" Min connections: " << config.min_connections <<
"\n";
171 std::cout <<
" Max connections: " << config.max_connections <<
"\n";
172 std::cout <<
" Acquire timeout: " << config.acquire_timeout.count() <<
"ms\n";
175 std::cout <<
"\nSimulating concurrent connection requests...\n";
177 std::vector<std::future<connection_result>> connection_futures;
179 for (
int i = 0; i < 15; ++i) {
180 auto future = pool.get_connection_async();
181 connection_futures.push_back(std::move(future));
182 std::cout <<
" š” Connection request " << (i + 1) <<
" submitted\n";
186 std::cout <<
"\nProcessing connection acquisitions:\n";
187 int successful_connections = 0;
189 for (
size_t i = 0; i < connection_futures.size(); ++i) {
190 auto& future = connection_futures[i];
193 auto result = future.get();
194 if (result.success) {
195 successful_connections++;
196 std::cout <<
" ā
Connection " << (i + 1) <<
" acquired in "
197 << result.acquisition_time.count() <<
"ms\n";
199 std::cout <<
" ā Connection " << (i + 1) <<
" failed: " << result.error_message <<
"\n";
201 }
catch (
const std::exception& e) {
202 std::cout <<
" ā Connection " << (i + 1) <<
" exception: " << e.what() <<
"\n";
206 std::cout <<
"\nConnection Pool Summary:\n";
207 std::cout <<
" Successful connections: " << successful_connections <<
"/" << connection_futures.size() <<
"\n";
208 std::cout <<
" Pool utilization: " << pool.get_utilization_percentage() <<
"%\n";
211void demonstrate_real_time_streams() {
212 std::cout <<
"\n=== Real-Time Data Streams ===\n";
215 std::cout <<
"š PostgreSQL NOTIFY/LISTEN Stream:\n";
217 postgres_stream_listener listener;
218 listener.subscribe(
"user_changes", [](
const notification& notif) {
219 std::cout <<
" š¢ Received notification: " << notif.channel
220 <<
" ā " << notif.payload <<
"\n";
224 std::vector<std::string> notifications = {
225 "User alice.smith logged in",
226 "User bob.jones updated profile",
227 "User carol.wilson changed password",
228 "New user david.brown registered"
231 for (
const auto& msg : notifications) {
232 listener.simulate_notification(
"user_changes", msg);
233 std::this_thread::sleep_for(std::chrono::milliseconds(500));
237 std::cout <<
"\nš MongoDB Change Streams:\n";
239 mongodb_change_stream stream;
240 stream.watch_collection(
"users", [](
const change_event& event) {
241 std::cout <<
" š Change detected: " <<
event.operation_type
242 <<
" on document " <<
event.document_id <<
"\n";
246 std::vector<std::tuple<std::string, std::string>> changes = {
247 {
"insert",
"user_001"},
248 {
"update",
"user_002"},
249 {
"delete",
"user_003"},
250 {
"replace",
"user_004"}
253 for (
const auto& [op, doc_id] : changes) {
254 stream.simulate_change(op, doc_id);
255 std::this_thread::sleep_for(std::chrono::milliseconds(300));
258 std::cout <<
"\nReal-time stream capabilities:\n";
259 std::cout <<
" ⢠Low-latency event processing\n";
260 std::cout <<
" ⢠Automatic reconnection handling\n";
261 std::cout <<
" ⢠Backpressure management\n";
262 std::cout <<
" ⢠Event filtering and routing\n";
265void demonstrate_distributed_transactions() {
266 std::cout <<
"\n=== Distributed Transaction Coordination ===\n";
268 distributed_transaction_coordinator coordinator;
271 transaction_config config;
272 config.enable_two_phase_commit =
true;
273 config.transaction_timeout = std::chrono::seconds(30);
274 config.max_participants = 5;
275 config.isolation_level = isolation_level::serializable;
277 coordinator.configure(config);
278 std::cout <<
"Distributed transaction coordinator configured:\n";
279 std::cout <<
" Two-phase commit: enabled\n";
280 std::cout <<
" Timeout: " << config.transaction_timeout.count() <<
"s\n";
281 std::cout <<
" Max participants: " << config.max_participants <<
"\n";
284 std::vector<std::string> participants = {
291 std::cout <<
"\nRegistering transaction participants:\n";
292 for (
const auto& participant : participants) {
293 coordinator.register_participant(participant);
294 std::cout <<
" š Registered: " << participant <<
"\n";
298 std::cout <<
"\nExecuting distributed transaction...\n";
300 auto transaction_future = coordinator.begin_transaction_async();
303 std::vector<std::future<operation_result>> operation_futures;
305 for (
const auto& participant : participants) {
306 auto future = coordinator.execute_operation_async(participant, [participant]() -> operation_result {
308 std::this_thread::sleep_for(std::chrono::milliseconds(100));
310 operation_result result;
311 result.success =
true;
312 result.participant = participant;
313 result.operation_time = std::chrono::milliseconds(100);
318 operation_futures.push_back(std::move(future));
319 std::cout <<
" š Operation submitted to " << participant <<
"\n";
323 std::cout <<
"\nCollecting operation results:\n";
324 bool all_successful =
true;
326 for (
auto& future : operation_futures) {
327 auto result = future.get();
328 std::cout <<
" " << (result.success ?
"ā
" :
"ā")
329 <<
" " << result.participant
330 <<
" (" << result.operation_time.count() <<
"ms)\n";
332 if (!result.success) {
333 all_successful =
false;
338 if (all_successful) {
339 auto commit_result = coordinator.commit_transaction_async().get();
340 std::cout <<
"\nš Distributed transaction COMMITTED successfully\n";
341 std::cout <<
" All " << participants.size() <<
" participants confirmed\n";
343 auto rollback_result = coordinator.rollback_transaction_async().get();
344 std::cout <<
"\nš Distributed transaction ROLLED BACK\n";
345 std::cout <<
" All participants restored to original state\n";
349void demonstrate_saga_pattern() {
350 std::cout <<
"\n=== Saga Pattern for Long-Running Transactions ===\n";
352 saga_coordinator saga;
354 std::cout <<
"Implementing saga pattern for order processing workflow...\n";
357 std::vector<saga_step> steps = {
358 {
"validate_payment",
"Payment validation and authorization"},
359 {
"reserve_inventory",
"Reserve products in inventory"},
360 {
"create_shipment",
"Create shipping label and schedule"},
361 {
"update_customer",
"Update customer order history"},
362 {
"send_confirmation",
"Send order confirmation email"}
365 std::cout <<
"\nSaga workflow steps:\n";
366 for (
size_t i = 0; i < steps.size(); ++i) {
367 saga.add_step(steps[i]);
368 std::cout <<
" " << (i + 1) <<
". " << steps[i].description <<
"\n";
372 std::cout <<
"\nExecuting saga workflow...\n";
374 auto saga_future = saga.execute_async();
377 for (
size_t i = 0; i < steps.size(); ++i) {
378 std::this_thread::sleep_for(std::chrono::milliseconds(200));
380 bool step_success = (i != 2);
383 std::cout <<
" ā
Step " << (i + 1) <<
" (" << steps[i].name <<
") completed\n";
385 std::cout <<
" ā Step " << (i + 1) <<
" (" << steps[i].name <<
") FAILED\n";
386 std::cout <<
" š Initiating compensating actions...\n";
389 for (
int j = i - 1; j >= 0; --j) {
390 std::this_thread::sleep_for(std::chrono::milliseconds(100));
391 std::cout <<
" ā©ļø Compensating step " << (j + 1) <<
" (" << steps[j].name <<
")\n";
394 std::cout <<
" š Saga compensation completed - system restored to consistent state\n";
399 std::cout <<
"\nSaga Pattern Benefits:\n";
400 std::cout <<
" ⢠Eventual consistency for distributed systems\n";
401 std::cout <<
" ⢠Automatic compensation on failures\n";
402 std::cout <<
" ⢠Better resilience than distributed transactions\n";
403 std::cout <<
" ⢠Suitable for long-running business processes\n";
406void demonstrate_async_batch_processing() {
407 std::cout <<
"\n=== Asynchronous Batch Processing ===\n";
409 batch_processor processor;
413 config.batch_size = 100;
414 config.max_parallel_batches = 4;
415 config.processing_timeout = std::chrono::minutes(5);
416 config.retry_attempts = 3;
418 processor.configure(config);
419 std::cout <<
"Batch processor configured:\n";
420 std::cout <<
" Batch size: " << config.batch_size <<
" records\n";
421 std::cout <<
" Parallel batches: " << config.max_parallel_batches <<
"\n";
422 std::cout <<
" Timeout: " << config.processing_timeout.count() <<
" minutes\n";
425 std::cout <<
"\nProcessing large dataset asynchronously...\n";
427 const int total_records = 1000;
428 auto processing_future = processor.process_async(total_records);
431 std::cout <<
"Batch processing progress:\n";
432 for (
int progress = 0; progress <= 100; progress += 20) {
433 std::this_thread::sleep_for(std::chrono::milliseconds(300));
434 std::cout <<
" š Progress: " << progress <<
"% ("
435 << (progress * total_records / 100) <<
"/" << total_records <<
" records)\n";
438 auto result = processing_future.get();
439 std::cout <<
"\nš Batch processing completed:\n";
440 std::cout <<
" Total records: " << result.total_records <<
"\n";
441 std::cout <<
" Successful: " << result.successful_records <<
"\n";
442 std::cout <<
" Failed: " << result.failed_records <<
"\n";
443 std::cout <<
" Processing time: " << result.total_time.count() <<
"ms\n";
444 std::cout <<
" Throughput: " << (result.successful_records * 1000 / result.total_time.count()) <<
" records/sec\n";
448 std::cout <<
"=== Asynchronous Operations Framework Demonstration ===\n";
449 std::cout <<
"This sample demonstrates C++20 coroutines, async database operations,\n";
450 std::cout <<
"and distributed transaction patterns for modern applications.\n";
453 demonstrate_basic_async_operations();
454 demonstrate_coroutine_operations();
455 demonstrate_async_connection_pool();
456 demonstrate_real_time_streams();
457 demonstrate_distributed_transactions();
458 demonstrate_saga_pattern();
459 demonstrate_async_batch_processing();
461 std::cout <<
"\n=== Async Operations Features Summary ===\n";
462 std::cout <<
"ā C++20 coroutines with co_await support\n";
463 std::cout <<
"ā std::future-based asynchronous operations\n";
464 std::cout <<
"ā Non-blocking connection pool management\n";
465 std::cout <<
"ā Real-time data streams (PostgreSQL NOTIFY, MongoDB Change Streams)\n";
466 std::cout <<
"ā Distributed transaction coordination with 2PC\n";
467 std::cout <<
"ā Saga pattern for long-running transactions\n";
468 std::cout <<
"ā Asynchronous batch processing with progress tracking\n";
469 std::cout <<
"ā Exception handling and automatic retries\n";
471 std::cout <<
"\nFor production deployment:\n";
472 std::cout <<
" async_executor::instance().configure(async_config);\n";
473 std::cout <<
" auto result = async_executor::instance().execute_async(operation);\n";
474 std::cout <<
" // Use co_await for coroutine-based operations\n";
476 }
catch (
const std::exception& e) {
477 std::cout <<
"Error: " << e.what() << std::endl;
High-performance asynchronous executor using thread_system.