70 std::cout <<
"\n--- Basic Container Operations ---" << std::endl;
73 auto container = std::make_shared<value_container>();
74 container->set_source(
"example_client",
"session_001");
75 container->set_target(
"example_server",
"processor_main");
76 container->set_message_type(
"user_profile_update");
79 container->set(
"username", std::string(
"john_doe"));
80 container->set(
"user_id",
static_cast<int32_t
>(12345));
81 container->set(
"account_balance", 1500.75);
82 container->set(
"is_premium",
true);
83 container->set(
"last_login",
static_cast<int64_t
>(
84 std::chrono::duration_cast<std::chrono::seconds>(
85 std::chrono::system_clock::now().time_since_epoch()).count()));
88 std::cout <<
"Container created:" << std::endl;
89 std::cout <<
" Source: " << container->source_id() <<
"/" << container->source_sub_id() << std::endl;
90 std::cout <<
" Target: " << container->target_id() <<
"/" << container->target_sub_id() << std::endl;
91 std::cout <<
" Type: " << container->message_type() << std::endl;
94 if (
auto username_value = container->get(
"username")) {
95 if (
auto* str = std::get_if<std::string>(&username_value->data)) {
96 std::cout <<
" Username: " << *str << std::endl;
100 if (
auto balance_value = container->get(
"account_balance")) {
101 if (
auto* val = std::get_if<double>(&balance_value->data)) {
102 std::cout <<
" Balance: $" << std::fixed << std::setprecision(2) << *val << std::endl;
107 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
108 std::cout <<
" Serialized size: " << serialized.size() <<
" bytes" << std::endl;
111 auto deserialized = std::make_shared<value_container>(serialized);
112 std::cout <<
" Deserialization successful" << std::endl;
123 std::cout <<
"\n--- Multi-threaded Operations ---" << std::endl;
125 const int num_producers = 2;
126 const int num_consumers = 2;
127 const int items_per_producer = 500;
130 std::vector<std::shared_ptr<value_container>> shared_queue;
131 std::mutex queue_mutex;
132 std::condition_variable cv;
133 std::atomic<bool> producers_done{
false};
136 std::vector<std::thread> producer_threads;
137 for (
int p = 0; p < num_producers; ++p) {
138 producer_threads.emplace_back([&, p]() {
139 std::random_device rd;
140 std::mt19937 gen(rd());
141 std::uniform_int_distribution<> dis(1, 1000);
143 for (
int i = 0; i < items_per_producer; ++i) {
145 auto container = std::make_shared<value_container>();
146 container->set_source(
"producer_" + std::to_string(p),
"thread_" + std::to_string(p));
147 container->set_target(
"consumer_pool",
"any_available");
148 container->set_message_type(
"work_item");
150 container->set(
"producer_id",
static_cast<int32_t
>(p));
151 container->set(
"item_id",
static_cast<int32_t
>(i));
152 container->set(
"random_value",
static_cast<int32_t
>(dis(gen)));
153 container->set(
"timestamp",
static_cast<int64_t
>(
154 std::chrono::duration_cast<std::chrono::milliseconds>(
155 std::chrono::system_clock::now().time_since_epoch()).count()));
159 std::lock_guard<std::mutex> lock(queue_mutex);
160 shared_queue.push_back(container);
167 std::this_thread::sleep_for(std::chrono::microseconds(100));
173 std::vector<std::thread> consumer_threads;
174 for (
int c = 0; c < num_consumers; ++c) {
175 consumer_threads.emplace_back([&, c]() {
176 int items_processed = 0;
179 std::shared_ptr<value_container> container;
183 std::unique_lock<std::mutex> lock(queue_mutex);
184 cv.wait(lock, [&]() {
return !shared_queue.empty() || producers_done.load(); });
186 if (!shared_queue.empty()) {
187 container = shared_queue.back();
188 shared_queue.pop_back();
189 }
else if (producers_done.load()) {
196 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
199 auto processed = std::make_shared<value_container>(serialized);
206 std::this_thread::sleep_for(std::chrono::microseconds(50));
210 std::cout <<
" Consumer " << c <<
" processed " << items_processed <<
" items" << std::endl;
215 for (
auto& thread : producer_threads) {
219 producers_done =
true;
223 for (
auto& thread : consumer_threads) {
227 std::cout <<
"Multi-threaded processing completed:" << std::endl;
229 std::cout <<
" Total bytes processed: " <<
processed_bytes_.load() << std::endl;
230 std::cout <<
" Remaining in queue: " << shared_queue.size() << std::endl;
237 std::cout <<
"\n--- Error Handling ---" << std::endl;
240 std::cout <<
"Testing edge cases:" << std::endl;
243 auto empty_container = std::make_shared<value_container>();
244 std::string empty_serialized = empty_container->serialize_string(value_container::serialization_format::binary).value();
245 auto empty_deserialized = std::make_shared<value_container>(empty_serialized);
246 std::cout <<
" - Empty container serialization/deserialization works" << std::endl;
249 std::string large_string(10000,
'A');
250 auto large_container = std::make_shared<value_container>();
251 large_container->set_message_type(
"large_data_test");
252 large_container->set(
"large_data", large_string);
254 std::string large_serialized = large_container->serialize_string(value_container::serialization_format::binary).value();
255 auto large_deserialized = std::make_shared<value_container>(large_serialized);
257 if (
auto recovered_value = large_deserialized->get(
"large_data")) {
258 if (
auto* str = std::get_if<std::string>(&recovered_value->data)) {
259 if (*str == large_string) {
260 std::cout <<
" - Large data handling works (" << large_string.size() <<
" bytes)" << std::endl;
274 std::cout <<
"\n--- Performance Scenarios ---" << std::endl;
277 std::cout <<
"Scenario 1: High-frequency small messages" << std::endl;
279 const int small_message_count = 10000;
280 auto start_time = std::chrono::high_resolution_clock::now();
282 for (
int i = 0; i < small_message_count; ++i) {
283 auto container = std::make_shared<value_container>();
284 container->set_source(
"high_freq_client",
"session_" + std::to_string(i % 100));
285 container->set_target(
"high_freq_server",
"handler");
286 container->set_message_type(
"ping");
288 container->set(
"sequence",
static_cast<int32_t
>(i));
289 container->set(
"timestamp",
static_cast<int64_t
>(
290 std::chrono::duration_cast<std::chrono::microseconds>(
291 std::chrono::system_clock::now().time_since_epoch()).count()));
294 container->serialize_string(value_container::serialization_format::binary).value();
297 auto end_time = std::chrono::high_resolution_clock::now();
298 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
300 double rate = (small_message_count * 1000000.0) / duration.count();
301 std::cout <<
" Rate: " << std::fixed << std::setprecision(2) << rate <<
" messages/second" << std::endl;
304 std::cout <<
"Scenario 2: Low-frequency large messages" << std::endl;
306 const int large_message_count = 100;
307 start_time = std::chrono::high_resolution_clock::now();
309 for (
int i = 0; i < large_message_count; ++i) {
310 auto container = std::make_shared<value_container>();
311 container->set_source(
"large_msg_client",
"upload_session");
312 container->set_target(
"large_msg_server",
"file_handler");
313 container->set_message_type(
"file_upload");
316 std::string file_data(50000,
static_cast<char>(i % 256));
317 container->set(
"file_content", file_data);
318 container->set(
"filename", std::string(
"large_file_" + std::to_string(i) +
".dat"));
319 container->set(
"file_size",
static_cast<int32_t
>(file_data.size()));
322 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
326 end_time = std::chrono::high_resolution_clock::now();
327 duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
329 double large_rate = (large_message_count * 1000.0) / duration.count();
330 std::cout <<
" Rate: " << std::fixed << std::setprecision(2) << large_rate <<
" large messages/second" << std::endl;
331 std::cout <<
" Data processed: " << (
processed_bytes_.load() / 1024 / 1024) <<
" MB" << std::endl;
341 auto end_time = std::chrono::steady_clock::now();
342 auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time -
stats_.
start_time);
344 std::cout <<
"\n=== Final Statistics ===" << std::endl;
345 std::cout <<
"Total runtime: " << total_duration.count() <<
" ms" << std::endl;
346 std::cout <<
"Containers created: " <<
stats_.
created.load() << std::endl;
349 std::cout <<
"Errors encountered: " <<
stats_.
errors.load() << std::endl;
350 std::cout <<
"Total bytes processed: " <<
processed_bytes_.load() << std::endl;
352 if (total_duration.count() > 0) {
353 double containers_per_sec = (
stats_.
created.load() * 1000.0) / total_duration.count();
354 double serializations_per_sec = (
stats_.
serialized.load() * 1000.0) / total_duration.count();
356 std::cout <<
"Average creation rate: " << std::fixed << std::setprecision(2)
357 << containers_per_sec <<
" containers/second" << std::endl;
358 std::cout <<
"Average serialization rate: " << serializations_per_sec
359 <<
" operations/second" << std::endl;
361 std::cout <<
"========================" << std::endl;