Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
advanced_container_example.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
23#include <iostream>
24#include <memory>
25#include <vector>
26#include <thread>
27#include <chrono>
28#include <random>
29#include <atomic>
30#include <map>
31#include <sstream>
32#include <iomanip>
33#include <condition_variable>
34
35#include "container.h"
36
37using namespace kcenon::container;
39private:
40 std::atomic<bool> running_{true};
41 std::atomic<int> processed_containers_{0};
42 std::atomic<int> processed_bytes_{0};
43
44 // Statistics
45 struct Statistics {
46 std::atomic<int> created{0};
47 std::atomic<int> serialized{0};
48 std::atomic<int> deserialized{0};
49 std::atomic<int> errors{0};
50 std::chrono::steady_clock::time_point start_time;
52
53public:
55 stats_.start_time = std::chrono::steady_clock::now();
56 std::cout << "=== Advanced Container System Example ===" << std::endl;
57 std::cout << "Using variant-based API (set_value/get_value)" << std::endl;
58 std::cout << "===========================================" << std::endl;
59 }
60
65
70 std::cout << "\n--- Basic Container Operations ---" << std::endl;
71
72 // Create a container with various value types using set_value API
73 auto container = std::make_shared<value_container>();
74 container->set_source("example_client", "session_001");
75 container->set_target("example_server", "processor_main");
76 container->set_message_type("user_profile_update");
77
78 // Add different types of values using set_value
79 container->set("username", std::string("john_doe"));
80 container->set("user_id", static_cast<int32_t>(12345));
81 container->set("account_balance", 1500.75);
82 container->set("is_premium", true);
83 container->set("last_login", static_cast<int64_t>(
84 std::chrono::duration_cast<std::chrono::seconds>(
85 std::chrono::system_clock::now().time_since_epoch()).count()));
86
87 // Display container information
88 std::cout << "Container created:" << std::endl;
89 std::cout << " Source: " << container->source_id() << "/" << container->source_sub_id() << std::endl;
90 std::cout << " Target: " << container->target_id() << "/" << container->target_sub_id() << std::endl;
91 std::cout << " Type: " << container->message_type() << std::endl;
92
93 // Demonstrate value access using get_value
94 if (auto username_value = container->get("username")) {
95 if (auto* str = std::get_if<std::string>(&username_value->data)) {
96 std::cout << " Username: " << *str << std::endl;
97 }
98 }
99
100 if (auto balance_value = container->get("account_balance")) {
101 if (auto* val = std::get_if<double>(&balance_value->data)) {
102 std::cout << " Balance: $" << std::fixed << std::setprecision(2) << *val << std::endl;
103 }
104 }
105
106 // Demonstrate serialization
107 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
108 std::cout << " Serialized size: " << serialized.size() << " bytes" << std::endl;
109
110 // Demonstrate deserialization
111 auto deserialized = std::make_shared<value_container>(serialized);
112 std::cout << " Deserialization successful" << std::endl;
113
114 stats_.created++;
117 }
118
123 std::cout << "\n--- Multi-threaded Operations ---" << std::endl;
124
125 const int num_producers = 2;
126 const int num_consumers = 2;
127 const int items_per_producer = 500;
128
129 // Shared queue simulation using vector with mutex
130 std::vector<std::shared_ptr<value_container>> shared_queue;
131 std::mutex queue_mutex;
132 std::condition_variable cv;
133 std::atomic<bool> producers_done{false};
134
135 // Producer threads
136 std::vector<std::thread> producer_threads;
137 for (int p = 0; p < num_producers; ++p) {
138 producer_threads.emplace_back([&, p]() {
139 std::random_device rd;
140 std::mt19937 gen(rd());
141 std::uniform_int_distribution<> dis(1, 1000);
142
143 for (int i = 0; i < items_per_producer; ++i) {
144 // Create container using set_value API
145 auto container = std::make_shared<value_container>();
146 container->set_source("producer_" + std::to_string(p), "thread_" + std::to_string(p));
147 container->set_target("consumer_pool", "any_available");
148 container->set_message_type("work_item");
149
150 container->set("producer_id", static_cast<int32_t>(p));
151 container->set("item_id", static_cast<int32_t>(i));
152 container->set("random_value", static_cast<int32_t>(dis(gen)));
153 container->set("timestamp", static_cast<int64_t>(
154 std::chrono::duration_cast<std::chrono::milliseconds>(
155 std::chrono::system_clock::now().time_since_epoch()).count()));
156
157 // Add to queue
158 {
159 std::lock_guard<std::mutex> lock(queue_mutex);
160 shared_queue.push_back(container);
161 }
162 cv.notify_one();
163
164 stats_.created++;
165
166 // Small delay to simulate work
167 std::this_thread::sleep_for(std::chrono::microseconds(100));
168 }
169 });
170 }
171
172 // Consumer threads
173 std::vector<std::thread> consumer_threads;
174 for (int c = 0; c < num_consumers; ++c) {
175 consumer_threads.emplace_back([&, c]() {
176 int items_processed = 0;
177
178 while (running_) {
179 std::shared_ptr<value_container> container;
180
181 // Get item from queue
182 {
183 std::unique_lock<std::mutex> lock(queue_mutex);
184 cv.wait(lock, [&]() { return !shared_queue.empty() || producers_done.load(); });
185
186 if (!shared_queue.empty()) {
187 container = shared_queue.back();
188 shared_queue.pop_back();
189 } else if (producers_done.load()) {
190 break;
191 }
192 }
193
194 if (container) {
195 // Process container (serialize/deserialize simulation)
196 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
197 processed_bytes_ += serialized.size();
198
199 auto processed = std::make_shared<value_container>(serialized);
200 items_processed++;
204
205 // Simulate processing time
206 std::this_thread::sleep_for(std::chrono::microseconds(50));
207 }
208 }
209
210 std::cout << " Consumer " << c << " processed " << items_processed << " items" << std::endl;
211 });
212 }
213
214 // Wait for producers to finish
215 for (auto& thread : producer_threads) {
216 thread.join();
217 }
218
219 producers_done = true;
220 cv.notify_all();
221
222 // Wait for consumers to finish
223 for (auto& thread : consumer_threads) {
224 thread.join();
225 }
226
227 std::cout << "Multi-threaded processing completed:" << std::endl;
228 std::cout << " Total items processed: " << processed_containers_.load() << std::endl;
229 std::cout << " Total bytes processed: " << processed_bytes_.load() << std::endl;
230 std::cout << " Remaining in queue: " << shared_queue.size() << std::endl;
231 }
232
237 std::cout << "\n--- Error Handling ---" << std::endl;
238
239 // Test edge cases
240 std::cout << "Testing edge cases:" << std::endl;
241
242 // Empty container serialization
243 auto empty_container = std::make_shared<value_container>();
244 std::string empty_serialized = empty_container->serialize_string(value_container::serialization_format::binary).value();
245 auto empty_deserialized = std::make_shared<value_container>(empty_serialized);
246 std::cout << " - Empty container serialization/deserialization works" << std::endl;
247
248 // Large value handling
249 std::string large_string(10000, 'A');
250 auto large_container = std::make_shared<value_container>();
251 large_container->set_message_type("large_data_test");
252 large_container->set("large_data", large_string);
253
254 std::string large_serialized = large_container->serialize_string(value_container::serialization_format::binary).value();
255 auto large_deserialized = std::make_shared<value_container>(large_serialized);
256
257 if (auto recovered_value = large_deserialized->get("large_data")) {
258 if (auto* str = std::get_if<std::string>(&recovered_value->data)) {
259 if (*str == large_string) {
260 std::cout << " - Large data handling works (" << large_string.size() << " bytes)" << std::endl;
261 }
262 }
263 }
264
265 stats_.created += 2;
266 stats_.serialized += 2;
267 stats_.deserialized += 2;
268 }
269
274 std::cout << "\n--- Performance Scenarios ---" << std::endl;
275
276 // Scenario 1: High-frequency small messages
277 std::cout << "Scenario 1: High-frequency small messages" << std::endl;
278
279 const int small_message_count = 10000;
280 auto start_time = std::chrono::high_resolution_clock::now();
281
282 for (int i = 0; i < small_message_count; ++i) {
283 auto container = std::make_shared<value_container>();
284 container->set_source("high_freq_client", "session_" + std::to_string(i % 100));
285 container->set_target("high_freq_server", "handler");
286 container->set_message_type("ping");
287
288 container->set("sequence", static_cast<int32_t>(i));
289 container->set("timestamp", static_cast<int64_t>(
290 std::chrono::duration_cast<std::chrono::microseconds>(
291 std::chrono::system_clock::now().time_since_epoch()).count()));
292
293 // Quick serialization test
294 container->serialize_string(value_container::serialization_format::binary).value();
295 }
296
297 auto end_time = std::chrono::high_resolution_clock::now();
298 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
299
300 double rate = (small_message_count * 1000000.0) / duration.count();
301 std::cout << " Rate: " << std::fixed << std::setprecision(2) << rate << " messages/second" << std::endl;
302
303 // Scenario 2: Low-frequency large messages
304 std::cout << "Scenario 2: Low-frequency large messages" << std::endl;
305
306 const int large_message_count = 100;
307 start_time = std::chrono::high_resolution_clock::now();
308
309 for (int i = 0; i < large_message_count; ++i) {
310 auto container = std::make_shared<value_container>();
311 container->set_source("large_msg_client", "upload_session");
312 container->set_target("large_msg_server", "file_handler");
313 container->set_message_type("file_upload");
314
315 // Simulate large file data using string
316 std::string file_data(50000, static_cast<char>(i % 256));
317 container->set("file_content", file_data);
318 container->set("filename", std::string("large_file_" + std::to_string(i) + ".dat"));
319 container->set("file_size", static_cast<int32_t>(file_data.size()));
320
321 // Serialization test
322 std::string serialized = container->serialize_string(value_container::serialization_format::binary).value();
323 processed_bytes_ += serialized.size();
324 }
325
326 end_time = std::chrono::high_resolution_clock::now();
327 duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
328
329 double large_rate = (large_message_count * 1000.0) / duration.count();
330 std::cout << " Rate: " << std::fixed << std::setprecision(2) << large_rate << " large messages/second" << std::endl;
331 std::cout << " Data processed: " << (processed_bytes_.load() / 1024 / 1024) << " MB" << std::endl;
332
333 stats_.created += small_message_count + large_message_count;
334 stats_.serialized += small_message_count + large_message_count;
335 }
336
341 auto end_time = std::chrono::steady_clock::now();
342 auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - stats_.start_time);
343
344 std::cout << "\n=== Final Statistics ===" << std::endl;
345 std::cout << "Total runtime: " << total_duration.count() << " ms" << std::endl;
346 std::cout << "Containers created: " << stats_.created.load() << std::endl;
347 std::cout << "Serializations: " << stats_.serialized.load() << std::endl;
348 std::cout << "Deserializations: " << stats_.deserialized.load() << std::endl;
349 std::cout << "Errors encountered: " << stats_.errors.load() << std::endl;
350 std::cout << "Total bytes processed: " << processed_bytes_.load() << std::endl;
351
352 if (total_duration.count() > 0) {
353 double containers_per_sec = (stats_.created.load() * 1000.0) / total_duration.count();
354 double serializations_per_sec = (stats_.serialized.load() * 1000.0) / total_duration.count();
355
356 std::cout << "Average creation rate: " << std::fixed << std::setprecision(2)
357 << containers_per_sec << " containers/second" << std::endl;
358 std::cout << "Average serialization rate: " << serializations_per_sec
359 << " operations/second" << std::endl;
360 }
361 std::cout << "========================" << std::endl;
362 }
363
368 try {
373
374 } catch (const std::exception& e) {
375 std::cerr << "Exception in demonstration: " << e.what() << std::endl;
376 stats_.errors++;
377 }
378 }
379};
380
381int main() {
382 try {
384 example.run_all_demonstrations();
385
386 std::cout << "\nAdvanced Container System Example completed successfully!" << std::endl;
387 return 0;
388
389 } catch (const std::exception& e) {
390 std::cerr << "Fatal error: " << e.what() << std::endl;
391 return 1;
392 }
393}
void run_all_demonstrations()
Runs all demonstrations.
void demonstrate_performance_scenarios()
Demonstrates performance scenarios.
struct AdvancedContainerExample::Statistics stats_
void demonstrate_multithreaded_operations()
Demonstrates multi-threaded producer-consumer pattern.
void print_final_statistics()
Prints final statistics.
void demonstrate_error_handling()
Demonstrates error handling scenarios.
void demonstrate_basic_operations()
Demonstrates basic container operations using new API.
std::chrono::steady_clock::time_point start_time