34#include <condition_variable>
39#if __has_include(<asio.hpp>)
40 #define HAS_STANDALONE_ASIO 1
42#elif __has_include(<boost/asio.hpp>)
43 #define HAS_BOOST_ASIO 1
44 #include <boost/asio.hpp>
45 namespace asio = boost::asio;
50#if CONTAINER_HAS_COROUTINES
60 std::cout <<
"\n" << std::string(60,
'=') << std::endl;
61 std::cout <<
" " << title << std::endl;
62 std::cout << std::string(60,
'=') << std::endl;
69 std::cout <<
"[OK] " << message << std::endl;
76 std::cout <<
"[INFO] " << message << std::endl;
79#if defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO)
87class message_processor {
89 explicit message_processor(asio::io_context& io_ctx)
91 , strand_(asio::make_strand(io_ctx))
98 void process_async(std::shared_ptr<value_container> container,
99 std::function<
void(std::vector<uint8_t>)> callback) {
101 asio::post(strand_, [
this, container, callback]() {
102 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
103 print_info(
"Processed container: " + std::to_string(serialized.size()) +
" bytes");
105 callback(std::move(serialized));
113 void deserialize_async(std::vector<uint8_t> data,
114 std::function<
void(std::shared_ptr<value_container>)> callback) {
115 asio::post(strand_, [data = std::move(data), callback]() {
116 auto container = std::make_shared<value_container>(data,
false);
125 asio::io_context& io_ctx_;
126 asio::strand<asio::io_context::executor_type> strand_;
132void demonstrate_basic_asio_integration() {
135 asio::io_context io_ctx;
136 message_processor processor(io_ctx);
139 auto container = std::make_shared<value_container>();
140 container->set_message_type(
"asio_demo");
141 container->set(
"request_id",
static_cast<int32_t
>(12345));
142 container->set(
"action", std::string(
"process_data"));
143 container->set(
"payload", std::string(
"Hello from Asio!"));
145 std::cout <<
"Created container with request data" << std::endl;
148 std::atomic<bool> completed{
false};
149 std::vector<uint8_t> result_data;
151 processor.process_async(container,
152 [&completed, &result_data](std::vector<uint8_t> serialized) {
153 result_data = std::move(serialized);
154 completed.store(
true);
158 std::cout <<
"Running Asio io_context..." << std::endl;
161 if (completed.load()) {
163 std::to_string(result_data.size()) +
" bytes");
170void demonstrate_scheduled_processing() {
173 asio::io_context io_ctx;
174 asio::steady_timer timer(io_ctx);
176 auto container = std::make_shared<value_container>();
177 container->set_message_type(
"scheduled_task");
178 container->set(
"timestamp",
static_cast<int64_t
>(
179 std::chrono::duration_cast<std::chrono::seconds>(
180 std::chrono::system_clock::now().time_since_epoch()).count()));
182 std::cout <<
"Scheduling container processing in 100ms..." << std::endl;
184 timer.expires_after(std::chrono::milliseconds(100));
185 timer.async_wait([container](
const asio::error_code& ec) {
187 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
189 std::to_string(serialized.size()) +
" bytes");
199void demonstrate_concurrent_processing() {
202 asio::io_context io_ctx;
203 auto work_guard = asio::make_work_guard(io_ctx);
206 std::vector<std::thread> workers;
207 for (
int i = 0; i < 4; ++i) {
208 workers.emplace_back([&io_ctx]() {
213 std::cout <<
"Started 4 worker threads" << std::endl;
216 std::atomic<int> completed_count{0};
217 const int total_tasks = 10;
219 for (
int i = 0; i < total_tasks; ++i) {
220 asio::post(io_ctx, [i, &completed_count, total_tasks]() {
221 auto container = std::make_shared<value_container>();
222 container->set_message_type(
"concurrent_task");
223 container->set(
"task_id",
static_cast<int32_t
>(i));
224 container->set(
"data", std::string(
"Task data " + std::to_string(i)));
226 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
228 int count = completed_count.fetch_add(1) + 1;
229 std::cout <<
" Task " << i <<
" completed (" << count <<
"/" << total_tasks <<
")" << std::endl;
234 std::this_thread::sleep_for(std::chrono::milliseconds(100));
237 for (
auto& t : workers) {
241 print_success(
"All " + std::to_string(total_tasks) +
" tasks completed");
247void demonstrate_message_queue() {
250 asio::io_context io_ctx;
251 auto work_guard = asio::make_work_guard(io_ctx);
254 std::queue<std::vector<uint8_t>> message_queue;
255 std::mutex queue_mutex;
256 std::atomic<bool> producer_done{
false};
257 std::atomic<int> consumed_count{0};
260 std::thread producer([&]() {
261 for (
int i = 0; i < 5; ++i) {
262 auto container = std::make_shared<value_container>();
263 container->set_message_type(
"queue_message");
264 container->set(
"seq",
static_cast<int32_t
>(i));
265 container->set(
"body", std::string(
"Message " + std::to_string(i)));
267 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
270 std::lock_guard<std::mutex> lock(queue_mutex);
271 message_queue.push(std::move(serialized));
274 std::cout <<
" Produced message " << i << std::endl;
275 std::this_thread::sleep_for(std::chrono::milliseconds(20));
277 producer_done.store(
true);
281 std::function<void()> consume_next;
282 consume_next = [&]() {
283 std::vector<uint8_t> data;
285 std::lock_guard<std::mutex> lock(queue_mutex);
286 if (!message_queue.empty()) {
287 data = std::move(message_queue.front());
293 auto container = std::make_shared<value_container>(data,
false);
294 auto seq = container->get(
"seq");
295 if (seq && std::holds_alternative<int32_t>(seq->data)) {
296 std::cout <<
" Consumed message " << std::get<int32_t>(seq->data) << std::endl;
298 consumed_count.fetch_add(1);
302 if (!producer_done.load() || !message_queue.empty()) {
303 asio::post(io_ctx, consume_next);
309 asio::post(io_ctx, consume_next);
312 std::thread consumer([&io_ctx]() {
319 print_success(
"Processed " + std::to_string(consumed_count.load()) +
" messages");
324#if CONTAINER_HAS_COROUTINES && (defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO))
334void demonstrate_coroutine_asio_hybrid() {
337 std::cout <<
"This example shows how to use container_system's" << std::endl;
338 std::cout <<
"coroutine-based async API alongside Asio." << std::endl;
339 std::cout << std::endl;
342 auto container = std::make_shared<value_container>();
343 container->set_message_type(
"hybrid_demo");
344 container->set(
"mode", std::string(
"coroutine_asio"));
349 auto serialize_task = async_cont.serialize_async();
350 while (!serialize_task.done()) {
351 std::this_thread::sleep_for(std::chrono::microseconds(100));
354#if CONTAINER_HAS_COMMON_RESULT
355 auto result = std::move(serialize_task).get();
356 if (result.is_ok()) {
357 std::cout <<
"Coroutine serialization: " << result.value().size() <<
" bytes" << std::endl;
360 asio::io_context io_ctx;
361 asio::post(io_ctx, [data = result.value()]() {
363 print_info(
"Would send " + std::to_string(data.size()) +
" bytes over network");
368 auto bytes = std::move(serialize_task).get();
369 std::cout <<
"Coroutine serialization: " << bytes.size() <<
" bytes" << std::endl;
372 asio::io_context io_ctx;
373 asio::post(io_ctx, [bytes = std::move(bytes)]() {
374 print_info(
"Would send " + std::to_string(bytes.size()) +
" bytes over network");
385 std::cout << std::string(60,
'*') << std::endl;
386 std::cout <<
" Container System - Asio Integration Examples" << std::endl;
387 std::cout << std::string(60,
'*') << std::endl;
389#if defined(HAS_STANDALONE_ASIO)
390 std::cout <<
"Using: Standalone Asio" << std::endl;
391#elif defined(HAS_BOOST_ASIO)
392 std::cout <<
"Using: Boost.Asio" << std::endl;
394 std::cout <<
"Asio not available!" << std::endl;
395 std::cout <<
"Install standalone Asio or Boost.Asio to run this example." << std::endl;
396 std::cout << std::endl;
397 std::cout <<
"Install via vcpkg:" << std::endl;
398 std::cout <<
" vcpkg install asio" << std::endl;
399 std::cout <<
" # or" << std::endl;
400 std::cout <<
" vcpkg install boost-asio" << std::endl;
404#if defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO)
405 demonstrate_basic_asio_integration();
406 demonstrate_scheduled_processing();
407 demonstrate_concurrent_processing();
408 demonstrate_message_queue();
410#if CONTAINER_HAS_COROUTINES
411 demonstrate_coroutine_asio_hybrid();
413 std::cout <<
"\nNote: C++20 coroutine demos skipped (compiler support required)" << std::endl;
417 std::cout <<
"These patterns can be adapted for real network applications" << std::endl;
418 std::cout <<
"using TCP/UDP sockets with Asio." << std::endl;
void print_section(const std::string &title)
Helper to print section headers.
void print_info(const std::string &message)
Helper to print info message.
void print_success(const std::string &message)
Helper to print success message.
Main header for C++20 coroutine async support.
Async wrapper for value_container operations.