#ifndef HAS_COROUTINES
#include <iostream>
std::cout << "This demo requires C++20 coroutines support.\n";
std::cout << "Please build with -DCMAKE_CXX_STANDARD=20 and ensure your compiler supports coroutines.\n";
return 1;
}
#else
#include <iostream>
#include <string>
#include <chrono>
#include <future>
#include <memory>
#include <vector>
#include <coroutine>
void demonstrate_basic_async_operations() {
std::cout << "=== Basic Asynchronous Database Operations ===\n";
async_config config;
config.thread_pool_size = 8;
config.max_concurrent_operations = 100;
config.operation_timeout = std::chrono::seconds(30);
config.enable_coroutines = true;
executor.configure(config);
std::cout << "Async executor configured with " << config.thread_pool_size << " threads\n";
std::cout << "\nExecuting asynchronous queries...\n";
std::vector<std::future<query_result>> futures;
for (int i = 0; i < 5; ++i) {
std::string query = "SELECT * FROM users WHERE department_id = " + std::to_string(i + 1);
auto future = executor.execute_async([query, i]() -> query_result {
std::this_thread::sleep_for(std::chrono::milliseconds(100 + (i * 50)));
query_result result;
result.success = true;
result.rows_affected = (i + 1) * 10;
result.execution_time = std::chrono::milliseconds(100 + (i * 50));
result.query = query;
return result;
});
futures.push_back(std::move(future));
std::cout << " š Query " << (i + 1) << " submitted asynchronously\n";
}
std::cout << "\nCollecting async query results:\n";
for (size_t i = 0; i < futures.size(); ++i) {
auto& future = futures[i];
auto result = future.get();
std::cout << " ā
Query " << (i + 1) << " completed: "
<< result.rows_affected << " rows, "
<< result.execution_time.count() << "ms\n";
}
}
struct task {
struct promise_type {
task get_return_object() { return task{std::coroutine_handle<promise_type>::from_promise(*this)}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
std::coroutine_handle<promise_type> h;
task(std::coroutine_handle<promise_type> handle) : h(handle) {}
~task() { if (h) h.destroy(); }
task(const task&) = delete;
task& operator=(const task&) = delete;
task(task&& other) noexcept : h(std::exchange(other.h, {})) {}
task& operator=(task&& other) noexcept {
if (this != &other) {
if (h) h.destroy();
h = std::exchange(other.h, {});
}
return *this;
}
};
task async_database_operation(const std::string& operation_name) {
std::cout << " š Starting " << operation_name << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << " ā
Completed " << operation_name << "\n";
co_return;
}
void demonstrate_coroutine_operations() {
std::cout << "\n=== C++20 Coroutine Database Operations ===\n";
std::cout << "Using coroutines for non-blocking database operations...\n";
std::vector<task> tasks;
tasks.push_back(async_database_operation("User authentication"));
tasks.push_back(async_database_operation("Data validation"));
tasks.push_back(async_database_operation("Cache update"));
tasks.push_back(async_database_operation("Audit logging"));
std::cout << "All coroutine operations initiated and completed.\n";
std::cout << "\nCoroutine Benefits:\n";
std::cout << " ⢠Non-blocking execution\n";
std::cout << " ⢠Efficient memory usage\n";
std::cout << " ⢠Natural async/await syntax\n";
std::cout << " ⢠Better exception handling\n";
}
void demonstrate_async_connection_pool() {
std::cout << "\n=== Asynchronous Connection Pool ===\n";
async_connection_pool pool;
async_pool_config config;
config.min_connections = 5;
config.max_connections = 20;
config.acquire_timeout = std::chrono::milliseconds(5000);
config.idle_timeout = std::chrono::minutes(10);
config.health_check_interval = std::chrono::seconds(30);
pool.configure(config);
std::cout << "Async connection pool configured:\n";
std::cout << " Min connections: " << config.min_connections << "\n";
std::cout << " Max connections: " << config.max_connections << "\n";
std::cout << " Acquire timeout: " << config.acquire_timeout.count() << "ms\n";
std::cout << "\nSimulating concurrent connection requests...\n";
std::vector<std::future<connection_result>> connection_futures;
for (int i = 0; i < 15; ++i) {
auto future = pool.get_connection_async();
connection_futures.push_back(std::move(future));
std::cout << " š” Connection request " << (i + 1) << " submitted\n";
}
std::cout << "\nProcessing connection acquisitions:\n";
int successful_connections = 0;
for (size_t i = 0; i < connection_futures.size(); ++i) {
auto& future = connection_futures[i];
try {
auto result = future.get();
if (result.success) {
successful_connections++;
std::cout << " ā
Connection " << (i + 1) << " acquired in "
<< result.acquisition_time.count() << "ms\n";
} else {
std::cout << " ā Connection " << (i + 1) << " failed: " << result.error_message << "\n";
}
} catch (const std::exception& e) {
std::cout << " ā Connection " << (i + 1) << " exception: " << e.what() << "\n";
}
}
std::cout << "\nConnection Pool Summary:\n";
std::cout << " Successful connections: " << successful_connections << "/" << connection_futures.size() << "\n";
std::cout << " Pool utilization: " << pool.get_utilization_percentage() << "%\n";
}
void demonstrate_real_time_streams() {
std::cout << "\n=== Real-Time Data Streams ===\n";
std::cout << "š PostgreSQL NOTIFY/LISTEN Stream:\n";
postgres_stream_listener listener;
listener.subscribe("user_changes", [](const notification& notif) {
std::cout << " š¢ Received notification: " << notif.channel
<< " ā " << notif.payload << "\n";
});
std::vector<std::string> notifications = {
"User alice.smith logged in",
"User bob.jones updated profile",
"User carol.wilson changed password",
"New user david.brown registered"
};
for (const auto& msg : notifications) {
listener.simulate_notification("user_changes", msg);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
std::cout << "\nš MongoDB Change Streams:\n";
mongodb_change_stream stream;
stream.watch_collection("users", [](const change_event& event) {
std::cout << " š Change detected: " << event.operation_type
<< " on document " << event.document_id << "\n";
});
std::vector<std::tuple<std::string, std::string>> changes = {
{"insert", "user_001"},
{"update", "user_002"},
{"delete", "user_003"},
{"replace", "user_004"}
};
for (const auto& [op, doc_id] : changes) {
stream.simulate_change(op, doc_id);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
std::cout << "\nReal-time stream capabilities:\n";
std::cout << " ⢠Low-latency event processing\n";
std::cout << " ⢠Automatic reconnection handling\n";
std::cout << " ⢠Backpressure management\n";
std::cout << " ⢠Event filtering and routing\n";
}
void demonstrate_distributed_transactions() {
std::cout << "\n=== Distributed Transaction Coordination ===\n";
distributed_transaction_coordinator coordinator;
transaction_config config;
config.enable_two_phase_commit = true;
config.transaction_timeout = std::chrono::seconds(30);
config.max_participants = 5;
config.isolation_level = isolation_level::serializable;
coordinator.configure(config);
std::cout << "Distributed transaction coordinator configured:\n";
std::cout << " Two-phase commit: enabled\n";
std::cout << " Timeout: " << config.transaction_timeout.count() << "s\n";
std::cout << " Max participants: " << config.max_participants << "\n";
std::vector<std::string> participants = {
"postgres_primary",
"postgres_replica",
"mongodb_cluster",
"redis_cache"
};
std::cout << "\nRegistering transaction participants:\n";
for (const auto& participant : participants) {
coordinator.register_participant(participant);
std::cout << " š Registered: " << participant << "\n";
}
std::cout << "\nExecuting distributed transaction...\n";
auto transaction_future = coordinator.begin_transaction_async();
std::vector<std::future<operation_result>> operation_futures;
for (const auto& participant : participants) {
auto future = coordinator.execute_operation_async(participant, [participant]() -> operation_result {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
operation_result result;
result.success = true;
result.participant = participant;
result.operation_time = std::chrono::milliseconds(100);
return result;
});
operation_futures.push_back(std::move(future));
std::cout << " š Operation submitted to " << participant << "\n";
}
std::cout << "\nCollecting operation results:\n";
bool all_successful = true;
for (auto& future : operation_futures) {
auto result = future.get();
std::cout << " " << (result.success ? "ā
" : "ā")
<< " " << result.participant
<< " (" << result.operation_time.count() << "ms)\n";
if (!result.success) {
all_successful = false;
}
}
if (all_successful) {
auto commit_result = coordinator.commit_transaction_async().get();
std::cout << "\nš Distributed transaction COMMITTED successfully\n";
std::cout << " All " << participants.size() << " participants confirmed\n";
} else {
auto rollback_result = coordinator.rollback_transaction_async().get();
std::cout << "\nš Distributed transaction ROLLED BACK\n";
std::cout << " All participants restored to original state\n";
}
}
void demonstrate_saga_pattern() {
std::cout << "\n=== Saga Pattern for Long-Running Transactions ===\n";
saga_coordinator saga;
std::cout << "Implementing saga pattern for order processing workflow...\n";
std::vector<saga_step> steps = {
{"validate_payment", "Payment validation and authorization"},
{"reserve_inventory", "Reserve products in inventory"},
{"create_shipment", "Create shipping label and schedule"},
{"update_customer", "Update customer order history"},
{"send_confirmation", "Send order confirmation email"}
};
std::cout << "\nSaga workflow steps:\n";
for (size_t i = 0; i < steps.size(); ++i) {
saga.add_step(steps[i]);
std::cout << " " << (i + 1) << ". " << steps[i].description << "\n";
}
std::cout << "\nExecuting saga workflow...\n";
auto saga_future = saga.execute_async();
for (size_t i = 0; i < steps.size(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
bool step_success = (i != 2);
if (step_success) {
std::cout << " ā
Step " << (i + 1) << " (" << steps[i].name << ") completed\n";
} else {
std::cout << " ā Step " << (i + 1) << " (" << steps[i].name << ") FAILED\n";
std::cout << " š Initiating compensating actions...\n";
for (int j = i - 1; j >= 0; --j) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << " ā©ļø Compensating step " << (j + 1) << " (" << steps[j].name << ")\n";
}
std::cout << " š Saga compensation completed - system restored to consistent state\n";
break;
}
}
std::cout << "\nSaga Pattern Benefits:\n";
std::cout << " ⢠Eventual consistency for distributed systems\n";
std::cout << " ⢠Automatic compensation on failures\n";
std::cout << " ⢠Better resilience than distributed transactions\n";
std::cout << " ⢠Suitable for long-running business processes\n";
}
void demonstrate_async_batch_processing() {
std::cout << "\n=== Asynchronous Batch Processing ===\n";
batch_processor processor;
batch_config config;
config.batch_size = 100;
config.max_parallel_batches = 4;
config.processing_timeout = std::chrono::minutes(5);
config.retry_attempts = 3;
processor.configure(config);
std::cout << "Batch processor configured:\n";
std::cout << " Batch size: " << config.batch_size << " records\n";
std::cout << " Parallel batches: " << config.max_parallel_batches << "\n";
std::cout << " Timeout: " << config.processing_timeout.count() << " minutes\n";
std::cout << "\nProcessing large dataset asynchronously...\n";
const int total_records = 1000;
auto processing_future = processor.process_async(total_records);
std::cout << "Batch processing progress:\n";
for (int progress = 0; progress <= 100; progress += 20) {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::cout << " š Progress: " << progress << "% ("
<< (progress * total_records / 100) << "/" << total_records << " records)\n";
}
auto result = processing_future.get();
std::cout << "\nš Batch processing completed:\n";
std::cout << " Total records: " << result.total_records << "\n";
std::cout << " Successful: " << result.successful_records << "\n";
std::cout << " Failed: " << result.failed_records << "\n";
std::cout << " Processing time: " << result.total_time.count() << "ms\n";
std::cout << " Throughput: " << (result.successful_records * 1000 / result.total_time.count()) << " records/sec\n";
}
std::cout << "=== Asynchronous Operations Framework Demonstration ===\n";
std::cout << "This sample demonstrates C++20 coroutines, async database operations,\n";
std::cout << "and distributed transaction patterns for modern applications.\n";
try {
demonstrate_basic_async_operations();
demonstrate_coroutine_operations();
demonstrate_async_connection_pool();
demonstrate_real_time_streams();
demonstrate_distributed_transactions();
demonstrate_saga_pattern();
demonstrate_async_batch_processing();
std::cout << "\n=== Async Operations Features Summary ===\n";
std::cout << "ā C++20 coroutines with co_await support\n";
std::cout << "ā std::future-based asynchronous operations\n";
std::cout << "ā Non-blocking connection pool management\n";
std::cout << "ā Real-time data streams (PostgreSQL NOTIFY, MongoDB Change Streams)\n";
std::cout << "ā Distributed transaction coordination with 2PC\n";
std::cout << "ā Saga pattern for long-running transactions\n";
std::cout << "ā Asynchronous batch processing with progress tracking\n";
std::cout << "ā Exception handling and automatic retries\n";
std::cout << "\nFor production deployment:\n";
std::cout << " async_executor::instance().configure(async_config);\n";
std::cout << " auto result = async_executor::instance().execute_async(operation);\n";
std::cout << " // Use co_await for coroutine-based operations\n";
} catch (const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
#endif
High-performance asynchronous executor using thread_system.