Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
messaging_integration.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021, 🍀☀🌕ðŸŒĨ 🌊
3// See the LICENSE file in the project root for full license information.
4
6#include <sstream>
7#include <iomanip>
8
10
11#ifdef HAS_PERFORMANCE_METRICS
13#endif
14
15#ifdef HAS_EXTERNAL_INTEGRATION
16std::vector<messaging_integration::container_callback_t> messaging_integration::creation_callbacks_;
17std::vector<messaging_integration::container_callback_t> messaging_integration::serialization_callbacks_;
19#endif
20
22 const std::string& message_type) {
23
24 auto container = std::make_shared<value_container>();
25 container->set_message_type(message_type);
26
27#ifdef HAS_PERFORMANCE_METRICS
28 metrics_.containers_created.fetch_add(1, std::memory_order_relaxed);
29#endif
30
31#ifdef HAS_EXTERNAL_INTEGRATION
32 {
33 std::lock_guard<std::mutex> lock(callback_mutex_);
34 for (const auto& callback : creation_callbacks_) {
35 if (callback) {
36 callback(container);
37 }
38 }
39 }
40#endif
41
42 return container;
43}
44
46 const std::shared_ptr<value_container>& container,
47 bool compress) {
48
49 if (!container) {
50 return {};
51 }
52
53#ifdef HAS_PERFORMANCE_METRICS
54 auto start = std::chrono::high_resolution_clock::now();
55#endif
56
57 auto serialize_result = container->serialize_string(value_container::serialization_format::binary);
58 std::string result = serialize_result.is_ok() ? serialize_result.value() : "";
59
60 // TODO: Add compression support when ENABLE_COMPRESSION is available
61 if (compress) {
62 // Placeholder for compression implementation
63 }
64
65#ifdef HAS_PERFORMANCE_METRICS
66 auto end = std::chrono::high_resolution_clock::now();
67 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
68 metrics_.serializations_performed.fetch_add(1, std::memory_order_relaxed);
69 metrics_.total_serialize_time_us.fetch_add(duration.count(), std::memory_order_relaxed);
70#endif
71
72#ifdef HAS_EXTERNAL_INTEGRATION
73 {
74 std::lock_guard<std::mutex> lock(callback_mutex_);
75 for (const auto& callback : serialization_callbacks_) {
76 if (callback) {
77 callback(container);
78 }
79 }
80 }
81#endif
82
83 return result;
84}
85
87 const std::string& data,
88 bool decompress) {
89
90 if (data.empty()) {
91 return nullptr;
92 }
93
94#ifdef HAS_PERFORMANCE_METRICS
95 auto start = std::chrono::high_resolution_clock::now();
96#endif
97
98 std::string processed_data = data;
99
100 // TODO: Add decompression support when ENABLE_COMPRESSION is available
101 if (decompress) {
102 // Placeholder for decompression implementation
103 }
104
105 auto container = std::make_shared<value_container>(processed_data, false);
106
107#ifdef HAS_PERFORMANCE_METRICS
108 auto end = std::chrono::high_resolution_clock::now();
109 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
110 metrics_.deserializations_performed.fetch_add(1, std::memory_order_relaxed);
111 metrics_.total_deserialize_time_us.fetch_add(duration.count(), std::memory_order_relaxed);
112#endif
113
114 return container;
115}
116
117#ifdef HAS_PERFORMANCE_METRICS
121
123 metrics_.containers_created.store(0, std::memory_order_relaxed);
124 metrics_.serializations_performed.store(0, std::memory_order_relaxed);
125 metrics_.deserializations_performed.store(0, std::memory_order_relaxed);
126 metrics_.total_serialize_time_us.store(0, std::memory_order_relaxed);
127 metrics_.total_deserialize_time_us.store(0, std::memory_order_relaxed);
128}
129
131 std::ostringstream oss;
132
133 auto containers = metrics_.containers_created.load(std::memory_order_relaxed);
134 auto serializations = metrics_.serializations_performed.load(std::memory_order_relaxed);
135 auto deserializations = metrics_.deserializations_performed.load(std::memory_order_relaxed);
136 auto serialize_time = metrics_.total_serialize_time_us.load(std::memory_order_relaxed);
137 auto deserialize_time = metrics_.total_deserialize_time_us.load(std::memory_order_relaxed);
138
139 oss << "Container System Metrics:\n";
140 oss << " Containers created: " << containers << "\n";
141 oss << " Serializations: " << serializations << "\n";
142 oss << " Deserializations: " << deserializations << "\n";
143
144 if (serializations > 0) {
145 oss << " Avg serialize time: " << std::fixed << std::setprecision(2)
146 << (static_cast<double>(serialize_time) / serializations) << " Ξs\n";
147 }
148
149 if (deserializations > 0) {
150 oss << " Avg deserialize time: " << std::fixed << std::setprecision(2)
151 << (static_cast<double>(deserialize_time) / deserializations) << " Ξs\n";
152 }
153
154 return oss.str();
155}
156#endif
157
158#ifdef HAS_EXTERNAL_INTEGRATION
160 std::lock_guard<std::mutex> lock(callback_mutex_);
161 creation_callbacks_.push_back(std::move(callback));
162}
163
165 std::lock_guard<std::mutex> lock(callback_mutex_);
166 serialization_callbacks_.push_back(std::move(callback));
167}
168
170 std::lock_guard<std::mutex> lock(callback_mutex_);
171 creation_callbacks_.clear();
173}
174#endif
175
176// messaging_container_builder implementation
177
179 : container_(std::make_shared<value_container>()) {
180}
181
183 const std::string& id, const std::string& sub_id) {
184 container_->set_source(id, sub_id);
185 return *this;
186}
187
189 const std::string& id, const std::string& sub_id) {
190 container_->set_target(id, sub_id);
191 return *this;
192}
193
195 container_->set_message_type(type);
196 return *this;
197}
198
204
210
211std::shared_ptr<value_container> messaging_container_builder::build() {
212 // Apply optimizations based on settings
213 if (size_optimized_) {
214 // TODO: Apply size optimizations
215 } else if (speed_optimized_) {
216 // TODO: Apply speed optimizations
217 }
218
219 auto result = container_;
220 container_ = std::make_shared<value_container>(); // Reset for potential reuse
221 return result;
222}
223
224#ifdef HAS_PERFORMANCE_METRICS
225// container_performance_monitor implementation
226
228 : operation_name_(operation_name)
229 , start_time_(std::chrono::high_resolution_clock::now()) {
230}
231
233 auto end_time = std::chrono::high_resolution_clock::now();
234 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time_);
235
236 // This could be extended to log to a performance monitoring system
237 // For now, it's just a placeholder for future monitoring integration
238}
239
243
247#endif // HAS_PERFORMANCE_METRICS
248
249} // namespace kcenon::container::integration
std::chrono::high_resolution_clock::time_point start_time_
messaging_container_builder & source(const std::string &id, const std::string &sub_id="")
messaging_container_builder & target(const std::string &id, const std::string &sub_id="")
messaging_container_builder & message_type(const std::string &type)
static void register_serialization_callback(container_callback_t callback)
static std::shared_ptr< value_container > create_optimized_container(const std::string &message_type="data_container")
Container creation with messaging optimization.
static std::shared_ptr< value_container > deserialize_from_messaging(const std::string &data, bool decompress=false)
Optimized deserialization for messaging.
static std::vector< container_callback_t > serialization_callbacks_
static void register_creation_callback(container_callback_t callback)
static std::vector< container_callback_t > creation_callbacks_
static std::string serialize_for_messaging(const std::shared_ptr< value_container > &container, bool compress=false)
High-performance serialization for messaging.
std::function< void(const std::shared_ptr< value_container > &)> container_callback_t
External system callback registration.