Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
async_container.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
29#pragma once
30
31#include "task.h"
32#include "generator.h"
33#include "core/container.h"
35
36#include <algorithm>
37#include <atomic>
38#include <fstream>
39#include <functional>
40#include <memory>
41#include <optional>
42#include <span>
43#include <string>
44#include <thread>
45#include <vector>
46
48{
54 using progress_callback = std::function<void(size_t bytes_processed, size_t total_bytes)>;
55
56 namespace detail
57 {
65 template<typename T>
67 {
68 std::function<T()> work_;
69 std::optional<T> result_;
70 std::exception_ptr exception_;
71 std::atomic<bool> ready_{false};
72
73 explicit async_state(std::function<T()> work)
74 : work_(std::move(work)) {}
75
76 // Non-copyable, non-movable
77 async_state(const async_state&) = delete;
81 };
82
89 template<typename T>
91 {
92 std::shared_ptr<async_state<T>> state_;
93
94 explicit async_awaitable(std::function<T()> work)
95 : state_(std::make_shared<async_state<T>>(std::move(work))) {}
96
97 async_awaitable(async_awaitable&&) noexcept = default;
98 async_awaitable& operator=(async_awaitable&&) noexcept = default;
99
101 async_awaitable& operator=(const async_awaitable&) = delete;
102
103 [[nodiscard]] bool await_ready() const noexcept
104 {
105 return false;
106 }
107
108 void await_suspend(std::coroutine_handle<> handle)
109 {
110 // Copy shared_ptr for thread to ensure state lifetime
111 auto state = state_;
112 std::thread([state, handle]() mutable {
113 try {
114 state->result_.emplace(state->work_());
115 } catch (...) {
116 state->exception_ = std::current_exception();
117 }
118 // Release barrier ensures all writes above are visible
119 // to the thread that reads with acquire semantics
120 state->ready_.store(true, std::memory_order_release);
121 handle.resume();
122 }).detach();
123 }
124
126 {
127 // Acquire barrier synchronizes with the release store above,
128 // ensuring all writes in the worker thread are visible here
129 while (!state_->ready_.load(std::memory_order_acquire)) {
130 // Spin until ready - in practice, handle.resume()
131 // ensures we only get here after ready_ is set
132 }
133 if (state_->exception_) {
134 std::rethrow_exception(state_->exception_);
135 }
136 return std::move(*state_->result_);
137 }
138 };
139
140 template<typename F>
142 {
143 using result_type = std::invoke_result_t<F>;
144 return async_awaitable<result_type>(std::forward<F>(func));
145 }
146
147 } // namespace detail
148
165 {
166 public:
171 explicit async_container(std::shared_ptr<value_container> container)
172 : container_(std::move(container))
173 {
174 }
175
180 : container_(std::make_shared<value_container>())
181 {
182 }
183
187 async_container(async_container&& other) noexcept = default;
188
192 async_container& operator=(async_container&& other) noexcept = default;
193
197 ~async_container() = default;
198
199 // Non-copyable
202
207 [[nodiscard]] std::shared_ptr<value_container> get_container() const noexcept
208 {
209 return container_;
210 }
211
216 void set_container(std::shared_ptr<value_container> container) noexcept
217 {
218 container_ = std::move(container);
219 }
220
221 // =======================================================================
222 // Async Serialization APIs
223 // =======================================================================
224
244#if CONTAINER_HAS_COMMON_RESULT
246#else
247 [[nodiscard]] task<std::vector<uint8_t>> serialize_async() const;
248#endif
249
255#if CONTAINER_HAS_COMMON_RESULT
257#else
258 [[nodiscard]] task<std::string> serialize_string_async() const;
259#endif
260
261 // =======================================================================
262 // Async Deserialization APIs
263 // =======================================================================
264
282#if CONTAINER_HAS_COMMON_RESULT
284 deserialize_async(std::span<const uint8_t> data);
285#else
286 [[nodiscard]] static task<std::shared_ptr<value_container>>
287 deserialize_async(std::span<const uint8_t> data);
288#endif
289
296#if CONTAINER_HAS_COMMON_RESULT
298 deserialize_string_async(std::string_view data);
299#else
300 [[nodiscard]] static task<std::shared_ptr<value_container>>
301 deserialize_string_async(std::string_view data);
302#endif
303
304 // =======================================================================
305 // Convenience Methods (forwarding to underlying container)
306 // =======================================================================
307
315 template<typename T>
316 async_container& set(std::string_view key, T&& value)
317 {
318 container_->set(key, std::forward<T>(value));
319 return *this;
320 }
321
328 template<typename T>
329 [[nodiscard]] std::optional<T> get(std::string_view key) const
330 {
331 auto val = container_->get(key);
332 if (!val) {
333 return std::nullopt;
334 }
335 if (auto* ptr = std::get_if<T>(&val->data)) {
336 return *ptr;
337 }
338 return std::nullopt;
339 }
340
346 [[nodiscard]] bool contains(std::string_view key) const noexcept
347 {
348 return container_->contains(key);
349 }
350
351 // =======================================================================
352 // Async File I/O APIs (Issue #267 - Phase 3)
353 // =======================================================================
354
373#if CONTAINER_HAS_COMMON_RESULT
375 std::string_view path,
376 progress_callback callback = nullptr);
377#else
378 [[nodiscard]] task<bool> load_async(
379 std::string_view path,
380 progress_callback callback = nullptr);
381#endif
382
401#if CONTAINER_HAS_COMMON_RESULT
403 std::string_view path,
404 progress_callback callback = nullptr);
405#else
406 [[nodiscard]] task<bool> save_async(
407 std::string_view path,
408 progress_callback callback = nullptr);
409#endif
410
411 // =======================================================================
412 // Streaming APIs (Issue #268 - Phase 4)
413 // =======================================================================
414
433 size_t chunk_size = 64 * 1024) const;
434
458#if CONTAINER_HAS_COMMON_RESULT
461 std::span<const uint8_t> data,
462 bool is_final = true);
463#else
464 [[nodiscard]] static task<std::shared_ptr<value_container>>
466 std::span<const uint8_t> data,
467 bool is_final = true);
468#endif
469
470 private:
471 std::shared_ptr<value_container> container_;
472 };
473
474 // ==========================================================================
475 // Async File I/O Utility Functions (Issue #267 - Phase 3)
476 // ==========================================================================
477
495#if CONTAINER_HAS_COMMON_RESULT
497 std::string_view path,
498 progress_callback callback = nullptr);
499#else
501 std::string_view path,
502 progress_callback callback = nullptr);
503#endif
504
523#if CONTAINER_HAS_COMMON_RESULT
525 std::string_view path,
526 std::span<const uint8_t> data,
527 progress_callback callback = nullptr);
528#else
529 [[nodiscard]] task<bool> write_file_async(
530 std::string_view path,
531 std::span<const uint8_t> data,
532 progress_callback callback = nullptr);
533#endif
534
535 // ==========================================================================
536 // Implementation of async methods (separate to avoid compiler issues)
537 // ==========================================================================
538
539#if CONTAINER_HAS_COMMON_RESULT
542 {
543 auto container = container_;
544 auto result = co_await detail::make_async_awaitable([container]() {
545 return container->serialize(value_container::serialization_format::binary);
546 });
547 co_return result;
548 }
549
550 inline task<kcenon::common::Result<std::string>>
552 {
553 auto container = container_;
554 auto result = co_await detail::make_async_awaitable([container]() {
555 return container->serialize_string(value_container::serialization_format::binary);
556 });
557 co_return result;
558 }
559
560 inline task<kcenon::common::Result<std::shared_ptr<value_container>>>
561 async_container::deserialize_async(std::span<const uint8_t> data)
562 {
563 std::vector<uint8_t> data_copy(data.begin(), data.end());
564 auto result = co_await detail::make_async_awaitable(
565 [data_copy = std::move(data_copy)]() {
566 auto container = std::make_shared<value_container>();
567 auto deser_result = container->deserialize_result(data_copy, false);
568 if (!deser_result.is_ok()) {
569 return kcenon::common::Result<std::shared_ptr<value_container>>(
570 deser_result.error());
571 }
572 return kcenon::common::ok(container);
573 });
574 co_return result;
575 }
576
577 inline task<kcenon::common::Result<std::shared_ptr<value_container>>>
578 async_container::deserialize_string_async(std::string_view data)
579 {
580 std::string data_copy(data);
581 auto result = co_await detail::make_async_awaitable(
582 [data_copy = std::move(data_copy)]() {
583 auto container = std::make_shared<value_container>();
584 auto deser_result = container->deserialize_result(data_copy, false);
585 if (!deser_result.is_ok()) {
586 return kcenon::common::Result<std::shared_ptr<value_container>>(
587 deser_result.error());
588 }
589 return kcenon::common::ok(container);
590 });
591 co_return result;
592 }
593
594 inline task<kcenon::common::VoidResult>
595 async_container::load_async(std::string_view path, progress_callback callback)
596 {
597 using namespace kcenon::container;
598 auto container = container_;
599 std::string path_str(path);
600 auto result = co_await detail::make_async_awaitable(
601 [container, path_str, callback]() -> kcenon::common::VoidResult {
602 std::ifstream file(path_str, std::ios::binary | std::ios::ate);
603 if (!file) {
604 return kcenon::common::VoidResult(
605 kcenon::common::error_info{
606 kcenon::container::error_codes::file_not_found,
607 kcenon::container::error_codes::make_message(
608 kcenon::container::error_codes::file_not_found, path_str),
609 "container_system"});
610 }
611
612 auto size = file.tellg();
613 if (size < 0) {
614 return kcenon::common::VoidResult(
615 kcenon::common::error_info{
616 kcenon::container::error_codes::file_read_error,
617 kcenon::container::error_codes::make_message(
618 kcenon::container::error_codes::file_read_error, path_str),
619 "container_system"});
620 }
621 file.seekg(0, std::ios::beg);
622
623 std::vector<uint8_t> buffer(static_cast<size_t>(size));
624 const size_t chunk_size = 64 * 1024;
625 size_t bytes_read = 0;
626
627 while (bytes_read < static_cast<size_t>(size)) {
628 size_t to_read = std::min(chunk_size,
629 static_cast<size_t>(size) - bytes_read);
630 file.read(reinterpret_cast<char*>(buffer.data() + bytes_read),
631 static_cast<std::streamsize>(to_read));
632 if (!file) {
633 return kcenon::common::VoidResult(
634 kcenon::common::error_info{
635 kcenon::container::error_codes::file_read_error,
636 kcenon::container::error_codes::make_message(
637 kcenon::container::error_codes::file_read_error, path_str),
638 "container_system"});
639 }
640 bytes_read += to_read;
641 if (callback) {
642 callback(bytes_read, static_cast<size_t>(size));
643 }
644 }
645
646 auto deser_result = container->deserialize_result(buffer, false);
647 if (!deser_result.is_ok()) {
648 return deser_result;
649 }
650 return kcenon::common::ok();
651 });
652 co_return result;
653 }
654
655 inline task<kcenon::common::VoidResult>
656 async_container::save_async(std::string_view path, progress_callback callback)
657 {
658 auto container = container_;
659 std::string path_str(path);
660 auto result = co_await detail::make_async_awaitable(
661 [container, path_str, callback]() -> kcenon::common::VoidResult {
662 auto data_result = container->serialize(value_container::serialization_format::binary);
663 if (!data_result.is_ok()) {
664 return kcenon::common::VoidResult(data_result.error());
665 }
666 const auto& data = data_result.value();
667
668 std::ofstream file(path_str, std::ios::binary);
669 if (!file) {
670 return kcenon::common::VoidResult(
671 kcenon::common::error_info{
672 kcenon::container::error_codes::file_write_error,
673 kcenon::container::error_codes::make_message(
674 kcenon::container::error_codes::file_write_error, path_str),
675 "container_system"});
676 }
677
678 const size_t chunk_size = 64 * 1024;
679 size_t bytes_written = 0;
680 const size_t total_size = data.size();
681
682 while (bytes_written < total_size) {
683 size_t to_write = std::min(chunk_size, total_size - bytes_written);
684 file.write(reinterpret_cast<const char*>(data.data() + bytes_written),
685 static_cast<std::streamsize>(to_write));
686 if (!file) {
687 return kcenon::common::VoidResult(
688 kcenon::common::error_info{
689 kcenon::container::error_codes::file_write_error,
690 kcenon::container::error_codes::make_message(
691 kcenon::container::error_codes::file_write_error, path_str),
692 "container_system"});
693 }
694 bytes_written += to_write;
695 if (callback) {
696 callback(bytes_written, total_size);
697 }
698 }
699
700 return kcenon::common::ok();
701 });
702 co_return result;
703 }
704
705 inline task<kcenon::common::Result<std::vector<uint8_t>>>
706 read_file_async(std::string_view path, progress_callback callback)
707 {
708 std::string path_str(path);
709 auto result = co_await detail::make_async_awaitable(
710 [path_str, callback]() -> kcenon::common::Result<std::vector<uint8_t>> {
711 std::ifstream file(path_str, std::ios::binary | std::ios::ate);
712 if (!file) {
713 return kcenon::common::Result<std::vector<uint8_t>>(
714 kcenon::common::error_info{
715 kcenon::container::error_codes::file_not_found,
716 kcenon::container::error_codes::make_message(
717 kcenon::container::error_codes::file_not_found, path_str),
718 "container_system"});
719 }
720
721 auto size = file.tellg();
722 if (size < 0) {
723 return kcenon::common::Result<std::vector<uint8_t>>(
724 kcenon::common::error_info{
725 kcenon::container::error_codes::file_read_error,
726 kcenon::container::error_codes::make_message(
727 kcenon::container::error_codes::file_read_error, path_str),
728 "container_system"});
729 }
730 file.seekg(0, std::ios::beg);
731
732 std::vector<uint8_t> buffer(static_cast<size_t>(size));
733 const size_t chunk_size = 64 * 1024;
734 size_t bytes_read = 0;
735
736 while (bytes_read < static_cast<size_t>(size)) {
737 size_t to_read = std::min(chunk_size,
738 static_cast<size_t>(size) - bytes_read);
739 file.read(reinterpret_cast<char*>(buffer.data() + bytes_read),
740 static_cast<std::streamsize>(to_read));
741 if (!file) {
742 return kcenon::common::Result<std::vector<uint8_t>>(
743 kcenon::common::error_info{
744 kcenon::container::error_codes::file_read_error,
745 kcenon::container::error_codes::make_message(
746 kcenon::container::error_codes::file_read_error, path_str),
747 "container_system"});
748 }
749 bytes_read += to_read;
750 if (callback) {
751 callback(bytes_read, static_cast<size_t>(size));
752 }
753 }
754
755 return kcenon::common::ok(std::move(buffer));
756 });
757 co_return result;
758 }
759
760 inline task<kcenon::common::VoidResult>
761 write_file_async(std::string_view path, std::span<const uint8_t> data,
762 progress_callback callback)
763 {
764 std::string path_str(path);
765 std::vector<uint8_t> data_copy(data.begin(), data.end());
766 auto result = co_await detail::make_async_awaitable(
767 [path_str, data_copy = std::move(data_copy), callback]()
768 -> kcenon::common::VoidResult {
769 std::ofstream file(path_str, std::ios::binary);
770 if (!file) {
771 return kcenon::common::VoidResult(
772 kcenon::common::error_info{
773 kcenon::container::error_codes::file_write_error,
774 kcenon::container::error_codes::make_message(
775 kcenon::container::error_codes::file_write_error, path_str),
776 "container_system"});
777 }
778
779 const size_t chunk_size = 64 * 1024;
780 size_t bytes_written = 0;
781 const size_t total_size = data_copy.size();
782
783 while (bytes_written < total_size) {
784 size_t to_write = std::min(chunk_size, total_size - bytes_written);
785 file.write(reinterpret_cast<const char*>(data_copy.data() + bytes_written),
786 static_cast<std::streamsize>(to_write));
787 if (!file) {
788 return kcenon::common::VoidResult(
789 kcenon::common::error_info{
790 kcenon::container::error_codes::file_write_error,
791 kcenon::container::error_codes::make_message(
792 kcenon::container::error_codes::file_write_error, path_str),
793 "container_system"});
794 }
795 bytes_written += to_write;
796 if (callback) {
797 callback(bytes_written, total_size);
798 }
799 }
800
801 return kcenon::common::ok();
802 });
803 co_return result;
804 }
805
806 // ==========================================================================
807 // Streaming Implementation (Issue #268 - Phase 4)
808 // ==========================================================================
809
810 inline generator<std::vector<uint8_t>>
811 async_container::serialize_chunked(size_t chunk_size) const
812 {
813 auto data_result = container_->serialize(value_container::serialization_format::binary);
814 if (!data_result.is_ok()) {
815 co_return;
816 }
817
818 const auto& full_data = data_result.value();
819 const size_t total_size = full_data.size();
820 size_t offset = 0;
821
822 while (offset < total_size) {
823 size_t current_chunk_size = std::min(chunk_size, total_size - offset);
824 std::vector<uint8_t> chunk(
825 full_data.begin() + static_cast<std::ptrdiff_t>(offset),
826 full_data.begin() + static_cast<std::ptrdiff_t>(offset + current_chunk_size));
827 offset += current_chunk_size;
828 co_yield chunk;
829 }
830 }
831
832 inline task<kcenon::common::Result<std::shared_ptr<value_container>>>
834 std::span<const uint8_t> data,
835 bool is_final)
836 {
837 // For streaming deserialization, we accumulate data until we have a complete
838 // container. In this simplified implementation, we require is_final to be true
839 // to actually deserialize. For true streaming, a stateful deserializer would
840 // be needed.
841 if (!is_final) {
842 // Return an error indicating more data is needed
843 co_return kcenon::common::Result<std::shared_ptr<value_container>>(
844 kcenon::common::error_info{
845 kcenon::container::error_codes::deserialization_failed,
846 kcenon::container::error_codes::make_message(
847 kcenon::container::error_codes::deserialization_failed,
848 "streaming requires complete data (is_final=true)"),
849 "container_system"});
850 }
851
852 std::vector<uint8_t> data_copy(data.begin(), data.end());
853 auto result = co_await detail::make_async_awaitable(
854 [data_copy = std::move(data_copy)]() {
855 auto container = std::make_shared<value_container>();
856 auto deser_result = container->deserialize_result(data_copy, false);
857 if (!deser_result.is_ok()) {
858 return kcenon::common::Result<std::shared_ptr<value_container>>(
859 deser_result.error());
860 }
861 return kcenon::common::ok(container);
862 });
863 co_return result;
864 }
865
866#else
867
868 inline task<std::vector<uint8_t>>
870 {
871 auto container = container_;
872 auto result = co_await detail::make_async_awaitable([container]() {
873 auto serialize_result = container->serialize(serialization_format::binary);
874 return serialize_result.is_ok() ? serialize_result.value() : std::vector<uint8_t>{};
875 });
876 co_return result;
877 }
878
879 inline task<std::string>
881 {
882 auto container = container_;
883 auto result = co_await detail::make_async_awaitable([container]() {
884 auto serialize_result = container->serialize_string(serialization_format::binary);
885 return serialize_result.is_ok() ? serialize_result.value() : std::string{};
886 });
887 co_return result;
888 }
889
891 async_container::deserialize_async(std::span<const uint8_t> data)
892 {
893 std::vector<uint8_t> data_copy(data.begin(), data.end());
894 auto result = co_await detail::make_async_awaitable(
895 [data_copy = std::move(data_copy)]() {
896 auto container = std::make_shared<value_container>(data_copy, false);
897 return container;
898 });
899 co_return result;
900 }
901
904 {
905 std::string data_copy(data);
906 auto result = co_await detail::make_async_awaitable(
907 [data_copy = std::move(data_copy)]() {
908 auto container = std::make_shared<value_container>(data_copy, false);
909 return container;
910 });
911 co_return result;
912 }
913
914 inline task<bool>
915 async_container::load_async(std::string_view path, progress_callback callback)
916 {
917 auto container = container_;
918 std::string path_str(path);
919 auto result = co_await detail::make_async_awaitable(
920 [container, path_str, callback]() {
921 std::ifstream file(path_str, std::ios::binary | std::ios::ate);
922 if (!file) {
923 return false;
924 }
925
926 auto size = file.tellg();
927 if (size < 0) {
928 return false;
929 }
930 file.seekg(0, std::ios::beg);
931
932 std::vector<uint8_t> buffer(static_cast<size_t>(size));
933 const size_t chunk_size = 64 * 1024; // 64KB chunks
934 size_t bytes_read = 0;
935
936 while (bytes_read < static_cast<size_t>(size)) {
937 size_t to_read = std::min(chunk_size,
938 static_cast<size_t>(size) - bytes_read);
939 file.read(reinterpret_cast<char*>(buffer.data() + bytes_read),
940 static_cast<std::streamsize>(to_read));
941 if (!file) {
942 return false;
943 }
944 bytes_read += to_read;
945 if (callback) {
946 callback(bytes_read, static_cast<size_t>(size));
947 }
948 }
949
950 container->deserialize(buffer, false);
951 return true;
952 });
953 co_return result;
954 }
955
956 inline task<bool>
957 async_container::save_async(std::string_view path, progress_callback callback)
958 {
959 auto container = container_;
960 std::string path_str(path);
961 auto result = co_await detail::make_async_awaitable(
962 [container, path_str, callback]() {
963 auto serialize_result = container->serialize(serialization_format::binary);
964 if (!serialize_result.is_ok()) {
965 return false;
966 }
967 auto data = serialize_result.value();
968 if (data.empty()) {
969 return false;
970 }
971
972 std::ofstream file(path_str, std::ios::binary);
973 if (!file) {
974 return false;
975 }
976
977 const size_t chunk_size = 64 * 1024; // 64KB chunks
978 size_t bytes_written = 0;
979 const size_t total_size = data.size();
980
981 while (bytes_written < total_size) {
982 size_t to_write = std::min(chunk_size, total_size - bytes_written);
983 file.write(reinterpret_cast<const char*>(data.data() + bytes_written),
984 static_cast<std::streamsize>(to_write));
985 if (!file) {
986 return false;
987 }
988 bytes_written += to_write;
989 if (callback) {
990 callback(bytes_written, total_size);
991 }
992 }
993
994 return true;
995 });
996 co_return result;
997 }
998
1000 read_file_async(std::string_view path, progress_callback callback)
1001 {
1002 std::string path_str(path);
1003 auto result = co_await detail::make_async_awaitable(
1004 [path_str, callback]() {
1005 std::ifstream file(path_str, std::ios::binary | std::ios::ate);
1006 if (!file) {
1007 return std::vector<uint8_t>{};
1008 }
1009
1010 auto size = file.tellg();
1011 if (size < 0) {
1012 return std::vector<uint8_t>{};
1013 }
1014 file.seekg(0, std::ios::beg);
1015
1016 std::vector<uint8_t> buffer(static_cast<size_t>(size));
1017 const size_t chunk_size = 64 * 1024; // 64KB chunks
1018 size_t bytes_read = 0;
1019
1020 while (bytes_read < static_cast<size_t>(size)) {
1021 size_t to_read = std::min(chunk_size,
1022 static_cast<size_t>(size) - bytes_read);
1023 file.read(reinterpret_cast<char*>(buffer.data() + bytes_read),
1024 static_cast<std::streamsize>(to_read));
1025 if (!file) {
1026 return std::vector<uint8_t>{};
1027 }
1028 bytes_read += to_read;
1029 if (callback) {
1030 callback(bytes_read, static_cast<size_t>(size));
1031 }
1032 }
1033
1034 return buffer;
1035 });
1036 co_return result;
1037 }
1038
1039 inline task<bool>
1040 write_file_async(std::string_view path, std::span<const uint8_t> data,
1041 progress_callback callback)
1042 {
1043 std::string path_str(path);
1044 std::vector<uint8_t> data_copy(data.begin(), data.end());
1045 auto result = co_await detail::make_async_awaitable(
1046 [path_str, data_copy = std::move(data_copy), callback]() {
1047 std::ofstream file(path_str, std::ios::binary);
1048 if (!file) {
1049 return false;
1050 }
1051
1052 const size_t chunk_size = 64 * 1024; // 64KB chunks
1053 size_t bytes_written = 0;
1054 const size_t total_size = data_copy.size();
1055
1056 while (bytes_written < total_size) {
1057 size_t to_write = std::min(chunk_size, total_size - bytes_written);
1058 file.write(reinterpret_cast<const char*>(data_copy.data() + bytes_written),
1059 static_cast<std::streamsize>(to_write));
1060 if (!file) {
1061 return false;
1062 }
1063 bytes_written += to_write;
1064 if (callback) {
1065 callback(bytes_written, total_size);
1066 }
1067 }
1068
1069 return true;
1070 });
1071 co_return result;
1072 }
1073
1074 // ==========================================================================
1075 // Streaming Implementation (Issue #268 - Phase 4) - Non-Result version
1076 // ==========================================================================
1077
1078 inline generator<std::vector<uint8_t>>
1079 async_container::serialize_chunked(size_t chunk_size) const
1080 {
1081 auto serialize_result = container_->serialize(serialization_format::binary);
1082 if (!serialize_result.is_ok()) {
1083 co_return;
1084 }
1085 auto full_data = serialize_result.value();
1086 if (full_data.empty()) {
1087 co_return;
1088 }
1089
1090 const size_t total_size = full_data.size();
1091 size_t offset = 0;
1092
1093 while (offset < total_size) {
1094 size_t current_chunk_size = std::min(chunk_size, total_size - offset);
1095 std::vector<uint8_t> chunk(
1096 full_data.begin() + static_cast<std::ptrdiff_t>(offset),
1097 full_data.begin() + static_cast<std::ptrdiff_t>(offset + current_chunk_size));
1098 offset += current_chunk_size;
1099 co_yield chunk;
1100 }
1101 }
1102
1105 std::span<const uint8_t> data,
1106 bool is_final)
1107 {
1108 // For streaming deserialization, we accumulate data until we have a complete
1109 // container. In this simplified implementation, we require is_final to be true
1110 // to actually deserialize.
1111 if (!is_final) {
1112 co_return nullptr;
1113 }
1114
1115 std::vector<uint8_t> data_copy(data.begin(), data.end());
1116 auto result = co_await detail::make_async_awaitable(
1117 [data_copy = std::move(data_copy)]() {
1118 auto container = std::make_shared<value_container>(data_copy, false);
1119 return container;
1120 });
1121 co_return result;
1122 }
1123
1124#endif
1125
1126} // namespace kcenon::container::async
Async wrapper for value_container operations.
static task< std::shared_ptr< value_container > > deserialize_streaming(std::span< const uint8_t > data, bool is_final=true)
Deserialize container progressively from chunks.
std::shared_ptr< value_container > get_container() const noexcept
Get the underlying container.
async_container(async_container &&other) noexcept=default
Move constructor.
std::optional< T > get(std::string_view key) const
Get a value from the container.
async_container & operator=(const async_container &)=delete
bool contains(std::string_view key) const noexcept
Check if container contains a key.
async_container()
Construct async_container with new empty container.
static task< std::shared_ptr< value_container > > deserialize_async(std::span< const uint8_t > data)
Deserialize from byte array asynchronously.
async_container(std::shared_ptr< value_container > container)
Construct async_container with existing container.
void set_container(std::shared_ptr< value_container > container) noexcept
Set the underlying container.
task< bool > load_async(std::string_view path, progress_callback callback=nullptr)
Load container from file asynchronously.
task< std::vector< uint8_t > > serialize_async() const
Serialize container to byte array asynchronously.
static task< std::shared_ptr< value_container > > deserialize_string_async(std::string_view data)
Deserialize from string asynchronously.
async_container & set(std::string_view key, T &&value)
Set a value in the container.
async_container & operator=(async_container &&other) noexcept=default
Move assignment operator.
std::shared_ptr< value_container > container_
task< std::string > serialize_string_async() const
Serialize container to string asynchronously.
generator< std::vector< uint8_t > > serialize_chunked(size_t chunk_size=64 *1024) const
Serialize container in chunks using generator.
async_container(const async_container &)=delete
task< bool > save_async(std::string_view path, progress_callback callback=nullptr)
Save container to file asynchronously.
Forward declaration of generator.
Definition generator.h:151
Forward declaration of task.
Definition task.h:209
Enhanced type-safe value with perfect legacy compatibility.
Definition value.h:129
C++20 coroutine generator for lazy sequence generation.
auto make_async_awaitable(F &&func) -> async_awaitable< std::invoke_result_t< F > >
std::function< void(size_t bytes_processed, size_t total_bytes)> progress_callback
Progress callback type for async file operations.
task< std::vector< uint8_t > > read_file_async(std::string_view path, progress_callback callback=nullptr)
Read file contents asynchronously.
task< bool > write_file_async(std::string_view path, std::span< const uint8_t > data, progress_callback callback=nullptr)
Write data to file asynchronously.
Awaitable that runs work in a separate thread.
async_awaitable(async_awaitable &&) noexcept=default
std::shared_ptr< async_state< T > > state_
void await_suspend(std::coroutine_handle<> handle)
Shared state for async operations.
async_state(const async_state &)=delete
async_state & operator=(const async_state &)=delete
async_state & operator=(async_state &&)=delete
C++20 coroutine task type for async operations.