Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
asio_integration_example.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
26#include <iostream>
27#include <memory>
28#include <vector>
29#include <chrono>
30#include <thread>
31#include <functional>
32#include <queue>
33#include <mutex>
34#include <condition_variable>
35
36#include "container.h"
37
38// Check for Asio availability
39#if __has_include(<asio.hpp>)
40 #define HAS_STANDALONE_ASIO 1
41 #include <asio.hpp>
42#elif __has_include(<boost/asio.hpp>)
43 #define HAS_BOOST_ASIO 1
44 #include <boost/asio.hpp>
45 namespace asio = boost::asio;
46#else
47 #define HAS_ASIO 0
48#endif
49
50#if CONTAINER_HAS_COROUTINES
52#endif
53
54using namespace kcenon::container;
55
59void print_section(const std::string& title) {
60 std::cout << "\n" << std::string(60, '=') << std::endl;
61 std::cout << " " << title << std::endl;
62 std::cout << std::string(60, '=') << std::endl;
63}
64
68void print_success(const std::string& message) {
69 std::cout << "[OK] " << message << std::endl;
70}
71
75void print_info(const std::string& message) {
76 std::cout << "[INFO] " << message << std::endl;
77}
78
79#if defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO)
80
87class message_processor {
88public:
89 explicit message_processor(asio::io_context& io_ctx)
90 : io_ctx_(io_ctx)
91 , strand_(asio::make_strand(io_ctx))
92 {
93 }
94
98 void process_async(std::shared_ptr<value_container> container,
99 std::function<void(std::vector<uint8_t>)> callback) {
100 // Post work to the strand for thread-safe execution
101 asio::post(strand_, [this, container, callback]() {
102 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
103 print_info("Processed container: " + std::to_string(serialized.size()) + " bytes");
104 if (callback) {
105 callback(std::move(serialized));
106 }
107 });
108 }
109
113 void deserialize_async(std::vector<uint8_t> data,
114 std::function<void(std::shared_ptr<value_container>)> callback) {
115 asio::post(strand_, [data = std::move(data), callback]() {
116 auto container = std::make_shared<value_container>(data, false);
117 print_info("Deserialized container");
118 if (callback) {
119 callback(container);
120 }
121 });
122 }
123
124private:
125 asio::io_context& io_ctx_;
126 asio::strand<asio::io_context::executor_type> strand_;
127};
128
132void demonstrate_basic_asio_integration() {
133 print_section("Basic Asio Integration");
134
135 asio::io_context io_ctx;
136 message_processor processor(io_ctx);
137
138 // Create a container
139 auto container = std::make_shared<value_container>();
140 container->set_message_type("asio_demo");
141 container->set("request_id", static_cast<int32_t>(12345));
142 container->set("action", std::string("process_data"));
143 container->set("payload", std::string("Hello from Asio!"));
144
145 std::cout << "Created container with request data" << std::endl;
146
147 // Process asynchronously
148 std::atomic<bool> completed{false};
149 std::vector<uint8_t> result_data;
150
151 processor.process_async(container,
152 [&completed, &result_data](std::vector<uint8_t> serialized) {
153 result_data = std::move(serialized);
154 completed.store(true);
155 });
156
157 // Run the io_context
158 std::cout << "Running Asio io_context..." << std::endl;
159 io_ctx.run();
160
161 if (completed.load()) {
162 print_success("Async processing completed: " +
163 std::to_string(result_data.size()) + " bytes");
164 }
165}
166
170void demonstrate_scheduled_processing() {
171 print_section("Scheduled Processing with Asio Timer");
172
173 asio::io_context io_ctx;
174 asio::steady_timer timer(io_ctx);
175
176 auto container = std::make_shared<value_container>();
177 container->set_message_type("scheduled_task");
178 container->set("timestamp", static_cast<int64_t>(
179 std::chrono::duration_cast<std::chrono::seconds>(
180 std::chrono::system_clock::now().time_since_epoch()).count()));
181
182 std::cout << "Scheduling container processing in 100ms..." << std::endl;
183
184 timer.expires_after(std::chrono::milliseconds(100));
185 timer.async_wait([container](const asio::error_code& ec) {
186 if (!ec) {
187 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
188 print_success("Timer triggered! Serialized " +
189 std::to_string(serialized.size()) + " bytes");
190 }
191 });
192
193 io_ctx.run();
194}
195
199void demonstrate_concurrent_processing() {
200 print_section("Concurrent Processing with Thread Pool");
201
202 asio::io_context io_ctx;
203 auto work_guard = asio::make_work_guard(io_ctx);
204
205 // Start worker threads
206 std::vector<std::thread> workers;
207 for (int i = 0; i < 4; ++i) {
208 workers.emplace_back([&io_ctx]() {
209 io_ctx.run();
210 });
211 }
212
213 std::cout << "Started 4 worker threads" << std::endl;
214
215 // Submit multiple containers for processing
216 std::atomic<int> completed_count{0};
217 const int total_tasks = 10;
218
219 for (int i = 0; i < total_tasks; ++i) {
220 asio::post(io_ctx, [i, &completed_count, total_tasks]() {
221 auto container = std::make_shared<value_container>();
222 container->set_message_type("concurrent_task");
223 container->set("task_id", static_cast<int32_t>(i));
224 container->set("data", std::string("Task data " + std::to_string(i)));
225
226 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
227
228 int count = completed_count.fetch_add(1) + 1;
229 std::cout << " Task " << i << " completed (" << count << "/" << total_tasks << ")" << std::endl;
230 });
231 }
232
233 // Wait for completion
234 std::this_thread::sleep_for(std::chrono::milliseconds(100));
235 work_guard.reset();
236
237 for (auto& t : workers) {
238 t.join();
239 }
240
241 print_success("All " + std::to_string(total_tasks) + " tasks completed");
242}
243
247void demonstrate_message_queue() {
248 print_section("Message Queue Pattern");
249
250 asio::io_context io_ctx;
251 auto work_guard = asio::make_work_guard(io_ctx);
252
253 // Message queue with mutex protection
254 std::queue<std::vector<uint8_t>> message_queue;
255 std::mutex queue_mutex;
256 std::atomic<bool> producer_done{false};
257 std::atomic<int> consumed_count{0};
258
259 // Producer thread
260 std::thread producer([&]() {
261 for (int i = 0; i < 5; ++i) {
262 auto container = std::make_shared<value_container>();
263 container->set_message_type("queue_message");
264 container->set("seq", static_cast<int32_t>(i));
265 container->set("body", std::string("Message " + std::to_string(i)));
266
267 auto serialized = container->serialize(value_container::serialization_format::binary).value_or(std::vector<uint8_t>{});
268
269 {
270 std::lock_guard<std::mutex> lock(queue_mutex);
271 message_queue.push(std::move(serialized));
272 }
273
274 std::cout << " Produced message " << i << std::endl;
275 std::this_thread::sleep_for(std::chrono::milliseconds(20));
276 }
277 producer_done.store(true);
278 });
279
280 // Consumer in Asio context
281 std::function<void()> consume_next;
282 consume_next = [&]() {
283 std::vector<uint8_t> data;
284 {
285 std::lock_guard<std::mutex> lock(queue_mutex);
286 if (!message_queue.empty()) {
287 data = std::move(message_queue.front());
288 message_queue.pop();
289 }
290 }
291
292 if (!data.empty()) {
293 auto container = std::make_shared<value_container>(data, false);
294 auto seq = container->get("seq");
295 if (seq && std::holds_alternative<int32_t>(seq->data)) {
296 std::cout << " Consumed message " << std::get<int32_t>(seq->data) << std::endl;
297 }
298 consumed_count.fetch_add(1);
299 }
300
301 // Check if we should continue
302 if (!producer_done.load() || !message_queue.empty()) {
303 asio::post(io_ctx, consume_next);
304 } else {
305 work_guard.reset();
306 }
307 };
308
309 asio::post(io_ctx, consume_next);
310
311 // Run consumer
312 std::thread consumer([&io_ctx]() {
313 io_ctx.run();
314 });
315
316 producer.join();
317 consumer.join();
318
319 print_success("Processed " + std::to_string(consumed_count.load()) + " messages");
320}
321
322#endif // HAS_ASIO
323
324#if CONTAINER_HAS_COROUTINES && (defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO))
325
326using namespace kcenon::container::async;
327
334void demonstrate_coroutine_asio_hybrid() {
335 print_section("Coroutine + Asio Hybrid Approach");
336
337 std::cout << "This example shows how to use container_system's" << std::endl;
338 std::cout << "coroutine-based async API alongside Asio." << std::endl;
339 std::cout << std::endl;
340
341 // Create container and use async API
342 auto container = std::make_shared<value_container>();
343 container->set_message_type("hybrid_demo");
344 container->set("mode", std::string("coroutine_asio"));
345
346 async_container async_cont(container);
347
348 // Run coroutine-based serialization
349 auto serialize_task = async_cont.serialize_async();
350 while (!serialize_task.done()) {
351 std::this_thread::sleep_for(std::chrono::microseconds(100));
352 }
353
354#if CONTAINER_HAS_COMMON_RESULT
355 auto result = std::move(serialize_task).get();
356 if (result.is_ok()) {
357 std::cout << "Coroutine serialization: " << result.value().size() << " bytes" << std::endl;
358
359 // Now use the serialized data with Asio
360 asio::io_context io_ctx;
361 asio::post(io_ctx, [data = result.value()]() {
362 // Simulate network send
363 print_info("Would send " + std::to_string(data.size()) + " bytes over network");
364 });
365 io_ctx.run();
366 }
367#else
368 auto bytes = std::move(serialize_task).get();
369 std::cout << "Coroutine serialization: " << bytes.size() << " bytes" << std::endl;
370
371 // Now use the serialized data with Asio
372 asio::io_context io_ctx;
373 asio::post(io_ctx, [bytes = std::move(bytes)]() {
374 print_info("Would send " + std::to_string(bytes.size()) + " bytes over network");
375 });
376 io_ctx.run();
377#endif
378
379 print_success("Hybrid approach demonstration complete");
380}
381
382#endif // CONTAINER_HAS_COROUTINES && HAS_ASIO
383
384int main() {
385 std::cout << std::string(60, '*') << std::endl;
386 std::cout << " Container System - Asio Integration Examples" << std::endl;
387 std::cout << std::string(60, '*') << std::endl;
388
389#if defined(HAS_STANDALONE_ASIO)
390 std::cout << "Using: Standalone Asio" << std::endl;
391#elif defined(HAS_BOOST_ASIO)
392 std::cout << "Using: Boost.Asio" << std::endl;
393#else
394 std::cout << "Asio not available!" << std::endl;
395 std::cout << "Install standalone Asio or Boost.Asio to run this example." << std::endl;
396 std::cout << std::endl;
397 std::cout << "Install via vcpkg:" << std::endl;
398 std::cout << " vcpkg install asio" << std::endl;
399 std::cout << " # or" << std::endl;
400 std::cout << " vcpkg install boost-asio" << std::endl;
401 return 1;
402#endif
403
404#if defined(HAS_STANDALONE_ASIO) || defined(HAS_BOOST_ASIO)
405 demonstrate_basic_asio_integration();
406 demonstrate_scheduled_processing();
407 demonstrate_concurrent_processing();
408 demonstrate_message_queue();
409
410#if CONTAINER_HAS_COROUTINES
411 demonstrate_coroutine_asio_hybrid();
412#else
413 std::cout << "\nNote: C++20 coroutine demos skipped (compiler support required)" << std::endl;
414#endif
415
416 print_section("All Asio Integration Examples Complete");
417 std::cout << "These patterns can be adapted for real network applications" << std::endl;
418 std::cout << "using TCP/UDP sockets with Asio." << std::endl;
419#endif
420
421 return 0;
422}
void print_section(const std::string &title)
Helper to print section headers.
void print_info(const std::string &message)
Helper to print info message.
void print_success(const std::string &message)
Helper to print success message.
Main header for C++20 coroutine async support.
Async wrapper for value_container operations.