#include <iostream>
#include <memory>
#include <vector>
#include <thread>
#include <chrono>
#include <random>
#include <atomic>
#include <map>
#include <sstream>
#include <iomanip>
#include <condition_variable>
private:
struct Statistics {
public:
std::cout << "=== Advanced Container System Example ===" << std::endl;
std::cout << "Using variant-based API (set_value/get_value)" << std::endl;
std::cout << "===========================================" << std::endl;
}
}
std::cout << "\n--- Basic Container Operations ---" << std::endl;
auto container = std::make_shared<value_container>();
container->set_source("example_client", "session_001");
container->set_target("example_server", "processor_main");
container->set_message_type("user_profile_update");
container->set("username", std::string("john_doe"));
container->set("user_id", static_cast<int32_t>(12345));
container->set("account_balance", 1500.75);
container->set("is_premium", true);
container->set("last_login", static_cast<int64_t>(
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count()));
std::cout << "Container created:" << std::endl;
std::cout << " Source: " << container->source_id() << "/" << container->source_sub_id() << std::endl;
std::cout << " Target: " << container->target_id() << "/" << container->target_sub_id() << std::endl;
std::cout << " Type: " << container->message_type() << std::endl;
if (auto username_value = container->get("username")) {
if (auto* str = std::get_if<std::string>(&username_value->data)) {
std::cout << " Username: " << *str << std::endl;
}
}
if (auto balance_value = container->get("account_balance")) {
if (auto* val = std::get_if<double>(&balance_value->data)) {
std::cout << " Balance: $" << std::fixed << std::setprecision(2) << *val << std::endl;
}
}
std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
std::cout << " Serialized size: " << serialized.size() << " bytes" << std::endl;
auto deserialized = std::make_shared<value_container>(serialized);
std::cout << " Deserialization successful" << std::endl;
}
std::cout << "\n--- Multi-threaded Operations ---" << std::endl;
const int num_producers = 2;
const int num_consumers = 2;
const int items_per_producer = 500;
std::vector<std::shared_ptr<value_container>> shared_queue;
std::mutex queue_mutex;
std::condition_variable cv;
std::atomic<bool> producers_done{false};
std::vector<std::thread> producer_threads;
for (int p = 0; p < num_producers; ++p) {
producer_threads.emplace_back([&, p]() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 1000);
for (int i = 0; i < items_per_producer; ++i) {
auto container = std::make_shared<value_container>();
container->set_source("producer_" + std::to_string(p), "thread_" + std::to_string(p));
container->set_target("consumer_pool", "any_available");
container->set_message_type("work_item");
container->set("producer_id", static_cast<int32_t>(p));
container->set("item_id", static_cast<int32_t>(i));
container->set("random_value", static_cast<int32_t>(dis(gen)));
container->set("timestamp", static_cast<int64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count()));
{
std::lock_guard<std::mutex> lock(queue_mutex);
shared_queue.push_back(container);
}
cv.notify_one();
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
});
}
std::vector<std::thread> consumer_threads;
for (int c = 0; c < num_consumers; ++c) {
consumer_threads.emplace_back([&, c]() {
int items_processed = 0;
std::shared_ptr<value_container> container;
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv.wait(lock, [&]() { return !shared_queue.empty() || producers_done.load(); });
if (!shared_queue.empty()) {
container = shared_queue.back();
shared_queue.pop_back();
} else if (producers_done.load()) {
break;
}
}
if (container) {
std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
auto processed = std::make_shared<value_container>(serialized);
items_processed++;
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
}
std::cout << " Consumer " << c << " processed " << items_processed << " items" << std::endl;
});
}
for (auto& thread : producer_threads) {
thread.join();
}
producers_done = true;
cv.notify_all();
for (auto& thread : consumer_threads) {
thread.join();
}
std::cout << "Multi-threaded processing completed:" << std::endl;
std::cout <<
" Total bytes processed: " <<
processed_bytes_.load() << std::endl;
std::cout << " Remaining in queue: " << shared_queue.size() << std::endl;
}
std::cout << "\n--- Error Handling ---" << std::endl;
std::cout << "Testing edge cases:" << std::endl;
auto empty_container = std::make_shared<value_container>();
std::string empty_serialized = empty_container->serialize_string(value_container::serialization_format::binary).value();
auto empty_deserialized = std::make_shared<value_container>(empty_serialized);
std::cout << " - Empty container serialization/deserialization works" << std::endl;
std::string large_string(10000, 'A');
auto large_container = std::make_shared<value_container>();
large_container->set_message_type("large_data_test");
large_container->set("large_data", large_string);
std::string large_serialized = large_container->serialize_string(value_container::serialization_format::binary).value();
auto large_deserialized = std::make_shared<value_container>(large_serialized);
if (auto recovered_value = large_deserialized->get("large_data")) {
if (auto* str = std::get_if<std::string>(&recovered_value->data)) {
if (*str == large_string) {
std::cout << " - Large data handling works (" << large_string.size() << " bytes)" << std::endl;
}
}
}
}
std::cout << "\n--- Performance Scenarios ---" << std::endl;
std::cout << "Scenario 1: High-frequency small messages" << std::endl;
const int small_message_count = 10000;
auto start_time = std::chrono::high_resolution_clock::now();
for (int i = 0; i < small_message_count; ++i) {
auto container = std::make_shared<value_container>();
container->set_source("high_freq_client", "session_" + std::to_string(i % 100));
container->set_target("high_freq_server", "handler");
container->set_message_type("ping");
container->set("sequence", static_cast<int32_t>(i));
container->set("timestamp", static_cast<int64_t>(
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count()));
container->serialize_string(value_container::serialization_format::binary).value();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
double rate = (small_message_count * 1000000.0) / duration.count();
std::cout << " Rate: " << std::fixed << std::setprecision(2) << rate << " messages/second" << std::endl;
std::cout << "Scenario 2: Low-frequency large messages" << std::endl;
const int large_message_count = 100;
start_time = std::chrono::high_resolution_clock::now();
for (int i = 0; i < large_message_count; ++i) {
auto container = std::make_shared<value_container>();
container->set_source("large_msg_client", "upload_session");
container->set_target("large_msg_server", "file_handler");
container->set_message_type("file_upload");
std::string file_data(50000, static_cast<char>(i % 256));
container->set("file_content", file_data);
container->set("filename", std::string("large_file_" + std::to_string(i) + ".dat"));
container->set("file_size", static_cast<int32_t>(file_data.size()));
std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
}
end_time = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
double large_rate = (large_message_count * 1000.0) / duration.count();
std::cout << " Rate: " << std::fixed << std::setprecision(2) << large_rate << " large messages/second" << std::endl;
std::cout <<
" Data processed: " << (
processed_bytes_.load() / 1024 / 1024) <<
" MB" << std::endl;
}
auto end_time = std::chrono::steady_clock::now();
auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time -
stats_.
start_time);
std::cout << "\n=== Final Statistics ===" << std::endl;
std::cout << "Total runtime: " << total_duration.count() << " ms" << std::endl;
std::cout <<
"Containers created: " <<
stats_.
created.load() << std::endl;
std::cout <<
"Errors encountered: " <<
stats_.
errors.load() << std::endl;
if (total_duration.count() > 0) {
double containers_per_sec = (
stats_.
created.load() * 1000.0) / total_duration.count();
double serializations_per_sec = (
stats_.
serialized.load() * 1000.0) / total_duration.count();
std::cout << "Average creation rate: " << std::fixed << std::setprecision(2)
<< containers_per_sec << " containers/second" << std::endl;
std::cout << "Average serialization rate: " << serializations_per_sec
<< " operations/second" << std::endl;
}
std::cout << "========================" << std::endl;
}
try {
} catch (const std::exception& e) {
std::cerr << "Exception in demonstration: " << e.what() << std::endl;
}
}
};
try {
std::cout << "\nAdvanced Container System Example completed successfully!" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << "Fatal error: " << e.what() << std::endl;
return 1;
}
}
void run_all_demonstrations()
Runs all demonstrations.
std::atomic< bool > running_
void demonstrate_performance_scenarios()
Demonstrates performance scenarios.
struct AdvancedContainerExample::Statistics stats_
~AdvancedContainerExample()
AdvancedContainerExample()
void demonstrate_multithreaded_operations()
Demonstrates multi-threaded producer-consumer pattern.
void print_final_statistics()
Prints final statistics.
void demonstrate_error_handling()
Demonstrates error handling scenarios.
void demonstrate_basic_operations()
Demonstrates basic container operations using new API.
std::atomic< int > processed_containers_
std::atomic< int > processed_bytes_
std::atomic< int > errors
std::atomic< int > deserialized
std::atomic< int > created
std::chrono::steady_clock::time_point start_time
std::atomic< int > serialized