Network System 0.1.1
High-performance modular networking library for scalable client-server applications
Loading...
Searching...
No Matches
compression_pipeline.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
8#ifdef BUILD_LZ4_COMPRESSION
9#include <lz4.h>
10#include <lz4hc.h>
11#endif
12
13#ifdef BUILD_ZLIB_COMPRESSION
14#include <zlib.h>
15#endif
16
17#include <cstring>
18
20{
21
23 {
24 public:
25 explicit impl(compression_algorithm algo, size_t threshold)
26 : algorithm_(algo), compression_threshold_(threshold)
27 {
28 NETWORK_LOG_DEBUG("[compression_pipeline] Created with algorithm=" +
29 std::to_string(static_cast<int>(algo)) +
30 ", threshold=" + std::to_string(threshold));
31 }
32
33 auto compress(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
34 {
35 // If below threshold, return uncompressed
36 if (input.size() < compression_threshold_)
37 {
38 NETWORK_LOG_TRACE("[compression_pipeline] Size below threshold, skipping compression");
39 std::vector<uint8_t> result(input.begin(), input.end());
40 return ok<std::vector<uint8_t>>(std::move(result));
41 }
42
44 {
45 std::vector<uint8_t> result(input.begin(), input.end());
46 return ok<std::vector<uint8_t>>(std::move(result));
47 }
48
49#ifdef BUILD_LZ4_COMPRESSION
51 {
52 return compress_lz4(input);
53 }
54#endif
55
56#ifdef BUILD_ZLIB_COMPRESSION
58 {
59 return compress_gzip(input);
60 }
62 {
63 return compress_deflate(input);
64 }
65#endif
66
67 // Fallback: return uncompressed if algorithm not available
68 NETWORK_LOG_WARN("[compression_pipeline] Compression algorithm not available, returning uncompressed");
69 std::vector<uint8_t> result(input.begin(), input.end());
70 return ok<std::vector<uint8_t>>(std::move(result));
71 }
72
73 auto decompress(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
74 {
75 if (input.empty())
76 {
79 "Input data is empty");
80 }
81
83 {
84 std::vector<uint8_t> result(input.begin(), input.end());
85 return ok<std::vector<uint8_t>>(std::move(result));
86 }
87
88#ifdef BUILD_LZ4_COMPRESSION
90 {
91 return decompress_lz4(input);
92 }
93#endif
94
95#ifdef BUILD_ZLIB_COMPRESSION
97 {
98 return decompress_gzip(input);
99 }
101 {
102 return decompress_deflate(input);
103 }
104#endif
105
106 // Fallback: assume uncompressed
107 NETWORK_LOG_WARN("[compression_pipeline] Decompression algorithm not available, returning as-is");
108 std::vector<uint8_t> result(input.begin(), input.end());
109 return ok<std::vector<uint8_t>>(std::move(result));
110 }
111
112 auto set_compression_threshold(size_t bytes) -> void
113 {
115 NETWORK_LOG_DEBUG("[compression_pipeline] Set threshold=" + std::to_string(bytes));
116 }
117
118 auto get_compression_threshold() const -> size_t { return compression_threshold_; }
119
121
122 private:
123#ifdef BUILD_LZ4_COMPRESSION
124 auto compress_lz4(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
125 {
126 // Calculate maximum compressed size
127 int max_compressed_size = LZ4_compressBound(static_cast<int>(input.size()));
128 if (max_compressed_size <= 0)
129 {
132 "Failed to calculate LZ4 compressed size bound");
133 }
134
135 // Allocate buffer for compressed data + 4-byte header (original size)
136 std::vector<uint8_t> compressed(max_compressed_size + 4);
137
138 // Store original size in first 4 bytes (little-endian)
139 uint32_t original_size = static_cast<uint32_t>(input.size());
140 std::memcpy(compressed.data(), &original_size, 4);
141
142 // Compress data
143 int compressed_size = LZ4_compress_default(
144 reinterpret_cast<const char*>(input.data()),
145 reinterpret_cast<char*>(compressed.data() + 4),
146 static_cast<int>(input.size()),
147 max_compressed_size);
148
149 if (compressed_size <= 0)
150 {
151 NETWORK_LOG_WARN("[compression_pipeline] LZ4 compression failed, returning uncompressed");
152 std::vector<uint8_t> result(input.begin(), input.end());
153 return ok<std::vector<uint8_t>>(std::move(result));
154 }
155
156 // If compressed size is not smaller, return uncompressed
157 if (static_cast<size_t>(compressed_size + 4) >= input.size())
158 {
159 NETWORK_LOG_TRACE("[compression_pipeline] Compressed size not smaller, returning uncompressed");
160 std::vector<uint8_t> result(input.begin(), input.end());
161 return ok<std::vector<uint8_t>>(std::move(result));
162 }
163
164 // Resize to actual compressed size
165 compressed.resize(compressed_size + 4);
166
167 NETWORK_LOG_TRACE("[compression_pipeline] Compressed " +
168 std::to_string(input.size()) + " -> " +
169 std::to_string(compressed.size()) + " bytes (" +
170 std::to_string(100 - (compressed.size() * 100 / input.size())) + "% reduction)");
171
172 return ok<std::vector<uint8_t>>(std::move(compressed));
173 }
174
175 auto decompress_lz4(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
176 {
177 // Need at least 4 bytes for size header
178 if (input.size() < 4)
179 {
182 "Compressed data too small");
183 }
184
185 // Read original size from first 4 bytes
186 uint32_t original_size;
187 std::memcpy(&original_size, input.data(), 4);
188
189 // Sanity check on decompressed size (max 100MB)
190 if (original_size > 100 * 1024 * 1024)
191 {
194 "Decompressed size too large: " + std::to_string(original_size));
195 }
196
197 // Allocate buffer for decompressed data
198 std::vector<uint8_t> decompressed(original_size);
199
200 // Decompress
201 int decompressed_size = LZ4_decompress_safe(
202 reinterpret_cast<const char*>(input.data() + 4),
203 reinterpret_cast<char*>(decompressed.data()),
204 static_cast<int>(input.size() - 4),
205 static_cast<int>(original_size));
206
207 if (decompressed_size < 0)
208 {
211 "LZ4 decompression failed");
212 }
213
214 if (static_cast<size_t>(decompressed_size) != original_size)
215 {
218 "Decompressed size mismatch");
219 }
220
221 NETWORK_LOG_TRACE("[compression_pipeline] Decompressed " +
222 std::to_string(input.size()) + " -> " +
223 std::to_string(decompressed.size()) + " bytes");
224
225 return ok<std::vector<uint8_t>>(std::move(decompressed));
226 }
227#endif
228
229#ifdef BUILD_ZLIB_COMPRESSION
230 auto compress_gzip(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
231 {
232 z_stream stream{};
233 stream.zalloc = Z_NULL;
234 stream.zfree = Z_NULL;
235 stream.opaque = Z_NULL;
236
237 // gzip initialization (windowBits = 15 + 16 for gzip)
238 if (deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
239 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
240 {
243 "Failed to initialize gzip compression");
244 }
245
246 stream.avail_in = static_cast<uInt>(input.size());
247 stream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input.data()));
248
249 std::vector<uint8_t> compressed;
250 compressed.resize(deflateBound(&stream, static_cast<uLong>(input.size())));
251
252 stream.avail_out = static_cast<uInt>(compressed.size());
253 stream.next_out = compressed.data();
254
255 int ret = deflate(&stream, Z_FINISH);
256 deflateEnd(&stream);
257
258 if (ret != Z_STREAM_END)
259 {
260 NETWORK_LOG_WARN("[compression_pipeline] gzip compression failed, returning uncompressed");
261 std::vector<uint8_t> result(input.begin(), input.end());
262 return ok<std::vector<uint8_t>>(std::move(result));
263 }
264
265 compressed.resize(stream.total_out);
266
267 // If compressed size is not smaller, return uncompressed
268 if (compressed.size() >= input.size())
269 {
270 NETWORK_LOG_TRACE("[compression_pipeline] Compressed size not smaller, returning uncompressed");
271 std::vector<uint8_t> result(input.begin(), input.end());
272 return ok<std::vector<uint8_t>>(std::move(result));
273 }
274
275 NETWORK_LOG_TRACE("[compression_pipeline] gzip compressed " +
276 std::to_string(input.size()) + " -> " +
277 std::to_string(compressed.size()) + " bytes (" +
278 std::to_string(100 - (compressed.size() * 100 / input.size())) + "% reduction)");
279
280 return ok<std::vector<uint8_t>>(std::move(compressed));
281 }
282
283 auto decompress_gzip(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
284 {
285 z_stream stream{};
286 stream.zalloc = Z_NULL;
287 stream.zfree = Z_NULL;
288 stream.opaque = Z_NULL;
289
290 // gzip initialization (windowBits = 15 + 16 for gzip)
291 if (inflateInit2(&stream, 15 + 16) != Z_OK)
292 {
295 "Failed to initialize gzip decompression");
296 }
297
298 stream.avail_in = static_cast<uInt>(input.size());
299 stream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input.data()));
300
301 // Start with 2x input size, will grow if needed
302 std::vector<uint8_t> decompressed;
303 decompressed.resize(input.size() * 2);
304
305 stream.avail_out = static_cast<uInt>(decompressed.size());
306 stream.next_out = decompressed.data();
307
308 int ret;
309 while ((ret = inflate(&stream, Z_NO_FLUSH)) == Z_OK)
310 {
311 // Need more output space
312 size_t current_size = decompressed.size();
313 decompressed.resize(current_size * 2);
314 stream.avail_out = static_cast<uInt>(decompressed.size() - current_size);
315 stream.next_out = decompressed.data() + current_size;
316 }
317
318 inflateEnd(&stream);
319
320 if (ret != Z_STREAM_END)
321 {
324 "gzip decompression failed");
325 }
326
327 decompressed.resize(stream.total_out);
328
329 NETWORK_LOG_TRACE("[compression_pipeline] gzip decompressed " +
330 std::to_string(input.size()) + " -> " +
331 std::to_string(decompressed.size()) + " bytes");
332
333 return ok<std::vector<uint8_t>>(std::move(decompressed));
334 }
335
336 auto compress_deflate(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
337 {
338 z_stream stream{};
339 stream.zalloc = Z_NULL;
340 stream.zfree = Z_NULL;
341 stream.opaque = Z_NULL;
342
343 // deflate initialization (windowBits = 15 for deflate)
344 if (deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
345 15, 8, Z_DEFAULT_STRATEGY) != Z_OK)
346 {
349 "Failed to initialize deflate compression");
350 }
351
352 stream.avail_in = static_cast<uInt>(input.size());
353 stream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input.data()));
354
355 std::vector<uint8_t> compressed;
356 compressed.resize(deflateBound(&stream, static_cast<uLong>(input.size())));
357
358 stream.avail_out = static_cast<uInt>(compressed.size());
359 stream.next_out = compressed.data();
360
361 int ret = deflate(&stream, Z_FINISH);
362 deflateEnd(&stream);
363
364 if (ret != Z_STREAM_END)
365 {
366 NETWORK_LOG_WARN("[compression_pipeline] deflate compression failed, returning uncompressed");
367 std::vector<uint8_t> result(input.begin(), input.end());
368 return ok<std::vector<uint8_t>>(std::move(result));
369 }
370
371 compressed.resize(stream.total_out);
372
373 // If compressed size is not smaller, return uncompressed
374 if (compressed.size() >= input.size())
375 {
376 NETWORK_LOG_TRACE("[compression_pipeline] Compressed size not smaller, returning uncompressed");
377 std::vector<uint8_t> result(input.begin(), input.end());
378 return ok<std::vector<uint8_t>>(std::move(result));
379 }
380
381 NETWORK_LOG_TRACE("[compression_pipeline] deflate compressed " +
382 std::to_string(input.size()) + " -> " +
383 std::to_string(compressed.size()) + " bytes (" +
384 std::to_string(100 - (compressed.size() * 100 / input.size())) + "% reduction)");
385
386 return ok<std::vector<uint8_t>>(std::move(compressed));
387 }
388
389 auto decompress_deflate(std::span<const uint8_t> input) -> Result<std::vector<uint8_t>>
390 {
391 z_stream stream{};
392 stream.zalloc = Z_NULL;
393 stream.zfree = Z_NULL;
394 stream.opaque = Z_NULL;
395
396 // deflate initialization (windowBits = 15 for deflate)
397 if (inflateInit2(&stream, 15) != Z_OK)
398 {
401 "Failed to initialize deflate decompression");
402 }
403
404 stream.avail_in = static_cast<uInt>(input.size());
405 stream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input.data()));
406
407 // Start with 2x input size, will grow if needed
408 std::vector<uint8_t> decompressed;
409 decompressed.resize(input.size() * 2);
410
411 stream.avail_out = static_cast<uInt>(decompressed.size());
412 stream.next_out = decompressed.data();
413
414 int ret;
415 while ((ret = inflate(&stream, Z_NO_FLUSH)) == Z_OK)
416 {
417 // Need more output space
418 size_t current_size = decompressed.size();
419 decompressed.resize(current_size * 2);
420 stream.avail_out = static_cast<uInt>(decompressed.size() - current_size);
421 stream.next_out = decompressed.data() + current_size;
422 }
423
424 inflateEnd(&stream);
425
426 if (ret != Z_STREAM_END)
427 {
430 "deflate decompression failed");
431 }
432
433 decompressed.resize(stream.total_out);
434
435 NETWORK_LOG_TRACE("[compression_pipeline] deflate decompressed " +
436 std::to_string(input.size()) + " -> " +
437 std::to_string(decompressed.size()) + " bytes");
438
439 return ok<std::vector<uint8_t>>(std::move(decompressed));
440 }
441#endif
442
443 private:
446 };
447
449 size_t compression_threshold)
450 : pimpl_(std::make_unique<impl>(algo, compression_threshold))
451 {
452 }
453
455
456 auto compression_pipeline::compress(std::span<const uint8_t> input)
457 -> Result<std::vector<uint8_t>>
458 {
459 return pimpl_->compress(input);
460 }
461
462 auto compression_pipeline::compress(const std::vector<uint8_t>& input)
464 {
465 return pimpl_->compress(std::span<const uint8_t>(input));
466 }
467
468 auto compression_pipeline::decompress(std::span<const uint8_t> input)
470 {
471 return pimpl_->decompress(input);
472 }
473
474 auto compression_pipeline::decompress(const std::vector<uint8_t>& input)
476 {
477 return pimpl_->decompress(std::span<const uint8_t>(input));
478 }
479
481 {
482 pimpl_->set_compression_threshold(bytes);
483 }
484
489
494
495 auto make_compress_function(std::shared_ptr<compression_pipeline> pipeline)
496 -> std::function<std::vector<uint8_t>(const std::vector<uint8_t>&)>
497 {
498 return [pipeline](const std::vector<uint8_t>& input) -> std::vector<uint8_t> {
499 auto result = pipeline->compress(input);
500 if (result.is_err())
501 {
502 NETWORK_LOG_ERROR("[compression] Compression failed: " + result.error().message);
503 return input; // Return uncompressed on error
504 }
505 return result.value();
506 };
507 }
508
509 auto make_decompress_function(std::shared_ptr<compression_pipeline> pipeline)
510 -> std::function<std::vector<uint8_t>(const std::vector<uint8_t>&)>
511 {
512 return [pipeline](const std::vector<uint8_t>& input) -> std::vector<uint8_t> {
513 auto result = pipeline->decompress(input);
514 if (result.is_err())
515 {
516 NETWORK_LOG_ERROR("[compression] Decompression failed: " + result.error().message);
517 return input; // Return as-is on error
518 }
519 return result.value();
520 };
521 }
522
523} // namespace kcenon::network::utils
auto decompress(std::span< const uint8_t > input) -> Result< std::vector< uint8_t > >
auto get_algorithm() const -> compression_algorithm
auto compress(std::span< const uint8_t > input) -> Result< std::vector< uint8_t > >
impl(compression_algorithm algo, size_t threshold)
Message compression and decompression pipeline.
auto compress(std::span< const uint8_t > input) -> Result< std::vector< uint8_t > >
Compresses input data.
auto set_compression_threshold(size_t bytes) -> void
Sets compression threshold.
auto get_compression_threshold() const -> size_t
Gets current compression threshold.
auto decompress(std::span< const uint8_t > input) -> Result< std::vector< uint8_t > >
Decompresses input data.
compression_pipeline(compression_algorithm algo=compression_algorithm::lz4, size_t compression_threshold=256)
Constructs a compression pipeline.
auto get_algorithm() const -> compression_algorithm
Gets current algorithm.
Logger system integration interface for network_system.
#define NETWORK_LOG_TRACE(msg)
#define NETWORK_LOG_WARN(msg)
#define NETWORK_LOG_ERROR(msg)
#define NETWORK_LOG_DEBUG(msg)
Utility components for network_system.
auto make_compress_function(std::shared_ptr< compression_pipeline > pipeline) -> std::function< std::vector< uint8_t >(const std::vector< uint8_t > &)>
Creates a compression function for pipeline integration.
compression_algorithm
Supported compression algorithms.
auto make_decompress_function(std::shared_ptr< compression_pipeline > pipeline) -> std::function< std::vector< uint8_t >(const std::vector< uint8_t > &)>
Creates a decompression function for pipeline integration.