Container System 0.1.0
High-performance C++20 type-safe container framework with SIMD-accelerated serialization
Loading...
Searching...
No Matches
thread_safe_container.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
7#include <unordered_map>
8#include <mutex>
9#include <shared_mutex>
10#include <atomic>
11#include <algorithm>
12#include <concepts>
13#include <thread>
14#include <chrono>
15#include <condition_variable>
16#include "value.h"
17#include "core/concepts.h"
18
19namespace kcenon::container
20{
21 // Forward declarations
22 class lockfree_container_reader;
23 class auto_refresh_reader;
24
31 class thread_safe_container : public std::enable_shared_from_this<thread_safe_container>
32 {
33 public:
34 using value_map = std::unordered_map<std::string, value>;
35 using const_iterator = value_map::const_iterator;
36 using iterator = value_map::iterator;
37
42
46 thread_safe_container(std::initializer_list<std::pair<std::string, value>> init);
47
52
57
62
67
73 std::optional<value> get(std::string_view key) const;
74
81 template<concepts::ValueVariantType T>
82 std::optional<T> get_typed(std::string_view key) const {
83 std::shared_lock lock(mutex_);
84 auto it = values_.find(std::string(key));
85 if (it != values_.end()) {
86 return it->second.get<T>();
87 }
88 return std::nullopt;
89 }
90
96 void set(std::string_view key, value value);
97
104 template<concepts::ValueVariantType T>
105 void set_typed(std::string_view key, T&& val) {
106 set(key, value(key, std::forward<T>(val)));
107 }
108
114 bool remove(std::string_view key);
115
119 void clear();
120
124 size_t size() const;
125
129 bool empty() const;
130
134 bool contains(std::string_view key) const;
135
139 std::vector<std::string> keys() const;
140
145 void set_variant(const value& val);
146
152 std::optional<value> get_variant(const std::string& key) const;
153
159 void set_container(const std::string& key,
160 std::shared_ptr<thread_safe_container> container);
161
167 std::shared_ptr<thread_safe_container> get_container(const std::string& key) const;
168
174 template<concepts::KeyValueCallback Func>
175 void for_each(Func&& func) const {
176 std::shared_lock lock(mutex_);
177 for (const auto& [key, val] : values_) {
178 func(key, val);
179 }
180 }
181
187 template<concepts::MutableKeyValueCallback Func>
188 void for_each_mut(Func&& func) {
189 std::unique_lock lock(mutex_);
190 for (auto& [key, val] : values_) {
191 func(key, val);
192 }
193 }
194
200 template<concepts::ValueMapCallback<value_map> Func>
201 void bulk_update(Func&& updater) {
202 std::unique_lock lock(mutex_);
203 updater(values_);
204 bulk_write_count_.fetch_add(1, std::memory_order_relaxed);
205 }
206
213 template<concepts::ConstValueMapCallback<value_map> Func>
214 auto bulk_read(Func&& reader) const {
215 std::shared_lock lock(mutex_);
216 bulk_read_count_.fetch_add(1, std::memory_order_relaxed);
217 return reader(values_);
218 }
219
227 bool compare_exchange(std::string_view key,
228 const value& expected,
229 const value& desired);
230
234 struct Statistics {
239 size_t size;
240 };
241
243
247 std::string to_json() const;
248
252 std::vector<uint8_t> serialize() const;
253
257 static std::shared_ptr<thread_safe_container> deserialize(
258 const std::vector<uint8_t>& data);
259
263 value& operator[](const std::string& key);
264
273 std::shared_ptr<lockfree_container_reader> create_lockfree_reader();
274
293 std::shared_ptr<auto_refresh_reader> create_auto_refresh_reader(
294 std::chrono::milliseconds refresh_interval);
295
296 private:
297 mutable std::shared_mutex mutex_;
299
300 // Statistics
301 mutable std::atomic<size_t> read_count_{0};
302 mutable std::atomic<size_t> write_count_{0};
303 mutable std::atomic<size_t> bulk_read_count_{0};
304 mutable std::atomic<size_t> bulk_write_count_{0};
305 };
306
317 public:
318 using snapshot_ptr = std::shared_ptr<const thread_safe_container::value_map>;
319
320 explicit snapshot_reader(std::shared_ptr<thread_safe_container> container)
321 : container_(container) {
323 }
324
330 template<concepts::ValueVariantType T>
331 std::optional<T> get(std::string_view key) const {
332 std::shared_lock lock(snapshot_mutex_);
333 if (!snapshot_) return std::nullopt;
334
335 auto it = snapshot_->find(std::string(key));
336 if (it != snapshot_->end()) {
337 return it->second.get<T>();
338 }
339 return std::nullopt;
340 }
341
346 void update_snapshot();
347
348 private:
349 std::shared_ptr<thread_safe_container> container_;
351 mutable std::shared_mutex snapshot_mutex_;
352 };
353
384 public:
385 using snapshot_type = std::unordered_map<std::string, value>;
386 using snapshot_ptr = std::shared_ptr<const snapshot_type>;
387
392 explicit lockfree_container_reader(std::shared_ptr<thread_safe_container> container)
393 : container_(std::move(container))
394 , snapshot_(std::make_shared<const snapshot_type>())
395 {
396 refresh();
397 }
398
403 : container_(other.container_)
404 , snapshot_(std::atomic_load_explicit(&other.snapshot_, std::memory_order_acquire))
405 {}
406
411 : container_(std::move(other.container_))
412 , snapshot_(std::atomic_load_explicit(&other.snapshot_, std::memory_order_acquire))
413 {}
414
419 if (this != &other) {
420 container_ = other.container_;
421 auto new_snapshot = std::atomic_load_explicit(&other.snapshot_, std::memory_order_acquire);
422 std::atomic_store_explicit(&snapshot_, new_snapshot, std::memory_order_release);
423 }
424 return *this;
425 }
426
431 if (this != &other) {
432 container_ = std::move(other.container_);
433 auto new_snapshot = std::atomic_load_explicit(&other.snapshot_, std::memory_order_acquire);
434 std::atomic_store_explicit(&snapshot_, new_snapshot, std::memory_order_release);
435 }
436 return *this;
437 }
438
443
457 template<concepts::ValueVariantType T>
458 [[nodiscard]] std::optional<T> get(std::string_view key) const noexcept {
459 auto snap = std::atomic_load_explicit(&snapshot_, std::memory_order_acquire);
460 if (!snap) {
461 return std::nullopt;
462 }
463
464 auto it = snap->find(std::string(key));
465 if (it != snap->end()) {
466 return it->second.get<T>();
467 }
468 return std::nullopt;
469 }
470
479 [[nodiscard]] bool contains(std::string_view key) const noexcept {
480 auto snap = std::atomic_load_explicit(&snapshot_, std::memory_order_acquire);
481 if (!snap) {
482 return false;
483 }
484 return snap->find(std::string(key)) != snap->end();
485 }
486
494 [[nodiscard]] size_t size() const noexcept {
495 auto snap = std::atomic_load_explicit(&snapshot_, std::memory_order_acquire);
496 return snap ? snap->size() : 0;
497 }
498
506 [[nodiscard]] bool empty() const noexcept {
507 return size() == 0;
508 }
509
517 [[nodiscard]] std::vector<std::string> keys() const {
518 auto snap = std::atomic_load_explicit(&snapshot_, std::memory_order_acquire);
519 std::vector<std::string> result;
520 if (snap) {
521 result.reserve(snap->size());
522 for (const auto& [key, val] : *snap) {
523 result.push_back(key);
524 }
525 }
526 return result;
527 }
528
541 template<typename Func>
542 void for_each(Func&& func) const {
543 auto snap = std::atomic_load_explicit(&snapshot_, std::memory_order_acquire);
544 if (snap) {
545 for (const auto& [key, val] : *snap) {
546 func(key, val);
547 }
548 }
549 }
550
560 void refresh() {
561 auto new_snapshot = std::make_shared<snapshot_type>();
562 container_->for_each([&new_snapshot](const std::string& key, const value& val) {
563 new_snapshot->emplace(key, val);
564 });
565 std::atomic_store_explicit(&snapshot_,
566 snapshot_ptr(new_snapshot),
567 std::memory_order_release);
568 refresh_count_.fetch_add(1, std::memory_order_relaxed);
569 }
570
575 [[nodiscard]] size_t refresh_count() const noexcept {
576 return refresh_count_.load(std::memory_order_relaxed);
577 }
578
583 [[nodiscard]] std::shared_ptr<thread_safe_container> source() const noexcept {
584 return container_;
585 }
586
587 private:
588 std::shared_ptr<thread_safe_container> container_;
590 std::atomic<size_t> refresh_count_{0};
591 };
592
616 public:
622 explicit auto_refresh_reader(std::shared_ptr<thread_safe_container> container,
623 std::chrono::milliseconds refresh_interval)
624 : reader_(std::make_shared<lockfree_container_reader>(std::move(container)))
626 , running_(true)
627 {
629 }
630
635 stop();
636 }
637
638 // Non-copyable
641
642 // Non-movable (due to thread)
645
652 void stop() {
653 {
654 std::lock_guard<std::mutex> lock(mutex_);
655 if (!running_) {
656 return;
657 }
658 running_ = false;
659 }
660 cv_.notify_one();
661 if (refresh_thread_.joinable()) {
662 refresh_thread_.join();
663 }
664 }
665
670 [[nodiscard]] bool is_running() const noexcept {
671 std::lock_guard<std::mutex> lock(mutex_);
672 return running_;
673 }
674
679 [[nodiscard]] std::chrono::milliseconds refresh_interval() const noexcept {
680 return refresh_interval_;
681 }
682
692 template<concepts::ValueVariantType T>
693 [[nodiscard]] std::optional<T> get(std::string_view key) const noexcept {
694 return reader_->get<T>(key);
695 }
696
702 [[nodiscard]] bool contains(std::string_view key) const noexcept {
703 return reader_->contains(key);
704 }
705
710 [[nodiscard]] size_t size() const noexcept {
711 return reader_->size();
712 }
713
718 [[nodiscard]] bool empty() const noexcept {
719 return reader_->empty();
720 }
721
726 [[nodiscard]] std::vector<std::string> keys() const {
727 return reader_->keys();
728 }
729
735 template<typename Func>
736 void for_each(Func&& func) const {
737 reader_->for_each(std::forward<Func>(func));
738 }
739
745 void refresh() {
746 reader_->refresh();
747 }
748
753 [[nodiscard]] size_t refresh_count() const noexcept {
754 return reader_->refresh_count();
755 }
756
761 [[nodiscard]] std::shared_ptr<lockfree_container_reader> reader() const noexcept {
762 return reader_;
763 }
764
769 [[nodiscard]] std::shared_ptr<thread_safe_container> source() const noexcept {
770 return reader_->source();
771 }
772
773 private:
775 while (true) {
776 std::unique_lock<std::mutex> lock(mutex_);
777 if (cv_.wait_for(lock, refresh_interval_, [this] { return !running_; })) {
778 break; // Stopped
779 }
780 lock.unlock();
781 reader_->refresh();
782 }
783 }
784
785 std::shared_ptr<lockfree_container_reader> reader_;
786 std::chrono::milliseconds refresh_interval_;
787 std::thread refresh_thread_;
788 mutable std::mutex mutex_;
789 std::condition_variable cv_;
791 };
792
793 // Type alias for backward compatibility
795
796 // Inline definition of create_lockfree_reader (must be after lockfree_container_reader is defined)
797 inline std::shared_ptr<lockfree_container_reader> thread_safe_container::create_lockfree_reader() {
798 return std::make_shared<lockfree_container_reader>(shared_from_this());
799 }
800
801 // Inline definition of create_auto_refresh_reader (must be after auto_refresh_reader is defined)
802 inline std::shared_ptr<auto_refresh_reader> thread_safe_container::create_auto_refresh_reader(
803 std::chrono::milliseconds refresh_interval) {
804 return std::make_shared<auto_refresh_reader>(shared_from_this(), refresh_interval);
805 }
806
807} // namespace kcenon::container
Auto-refreshing lock-free reader.
auto_refresh_reader(std::shared_ptr< thread_safe_container > container, std::chrono::milliseconds refresh_interval)
Construct auto-refresh reader.
bool empty() const noexcept
Lock-free empty check.
std::shared_ptr< thread_safe_container > source() const noexcept
Get the source container.
void for_each(Func &&func) const
Iterate over snapshot.
std::vector< std::string > keys() const
Get all keys from snapshot.
std::optional< T > get(std::string_view key) const noexcept
Lock-free typed value read.
auto_refresh_reader & operator=(auto_refresh_reader &&)=delete
std::shared_ptr< lockfree_container_reader > reader() const noexcept
Get the underlying lock-free reader.
void stop()
Stop the auto-refresh thread.
auto_refresh_reader & operator=(const auto_refresh_reader &)=delete
auto_refresh_reader(const auto_refresh_reader &)=delete
auto_refresh_reader(auto_refresh_reader &&)=delete
std::chrono::milliseconds refresh_interval() const noexcept
Get the refresh interval.
void refresh()
Manual refresh (in addition to automatic)
size_t refresh_count() const noexcept
Get the number of refreshes performed.
size_t size() const noexcept
Lock-free snapshot size check.
~auto_refresh_reader()
Destructor - stops the refresh thread.
bool is_running() const noexcept
Check if auto-refresh is running.
std::shared_ptr< lockfree_container_reader > reader_
bool contains(std::string_view key) const noexcept
Lock-free value existence check.
True lock-free reader using RCU (Read-Copy-Update) pattern.
size_t refresh_count() const noexcept
Get the number of refreshes performed.
lockfree_container_reader(const lockfree_container_reader &other)
Copy constructor.
bool empty() const noexcept
Lock-free empty check.
void for_each(Func &&func) const
Iterate over snapshot (lock-free for snapshot access)
bool contains(std::string_view key) const noexcept
Lock-free value existence check.
std::shared_ptr< const snapshot_type > snapshot_ptr
std::shared_ptr< thread_safe_container > container_
lockfree_container_reader(std::shared_ptr< thread_safe_container > container)
Construct reader from container.
std::shared_ptr< thread_safe_container > source() const noexcept
Get the source container.
void refresh()
Refresh snapshot from source container.
std::optional< T > get(std::string_view key) const noexcept
Lock-free typed value read.
lockfree_container_reader & operator=(lockfree_container_reader &&other) noexcept
Move assignment.
size_t size() const noexcept
Lock-free snapshot size check.
std::vector< std::string > keys() const
Get all keys from snapshot (lock-free)
std::unordered_map< std::string, value > snapshot_type
lockfree_container_reader(lockfree_container_reader &&other) noexcept
Move constructor.
lockfree_container_reader & operator=(const lockfree_container_reader &other)
Copy assignment.
Snapshot-based reader for frequently accessed data.
snapshot_reader(std::shared_ptr< thread_safe_container > container)
void update_snapshot()
Update snapshot from container.
std::optional< T > get(std::string_view key) const
Get value from snapshot (lock-protected read of snapshot)
std::shared_ptr< const thread_safe_container::value_map > snapshot_ptr
std::shared_ptr< thread_safe_container > container_
Thread-safe container with lock optimization.
std::shared_ptr< lockfree_container_reader > create_lockfree_reader()
Create a lock-free reader for this container.
std::optional< value > get(std::string_view key) const
Get value by key (thread-safe read)
bool contains(std::string_view key) const
Check if key exists.
thread_safe_container & operator=(const thread_safe_container &other)
Copy assignment (thread-safe)
std::shared_ptr< thread_safe_container > get_container(const std::string &key) const
Get a nested container.
value & operator[](const std::string &key)
Array-style access (creates if not exists)
size_t size() const
Get the number of values.
std::unordered_map< std::string, value > value_map
thread_safe_container()=default
Default constructor.
bool empty() const
Check if container is empty.
auto bulk_read(Func &&reader) const
Bulk read operation.
void bulk_update(Func &&updater)
Bulk update operation with minimal lock contention.
std::shared_ptr< auto_refresh_reader > create_auto_refresh_reader(std::chrono::milliseconds refresh_interval)
Create an auto-refreshing lock-free reader.
std::optional< T > get_typed(std::string_view key) const
Get typed value by key.
std::vector< std::string > keys() const
Get all keys.
static std::shared_ptr< thread_safe_container > deserialize(const std::vector< uint8_t > &data)
Deserialize from binary.
std::optional< value > get_variant(const std::string &key) const
Get a value as variant value.
std::string to_json() const
Serialize container to JSON.
void set_variant(const value &val)
Set a value directly using variant value.
void for_each_mut(Func &&func)
Apply a function to all values (mutable)
bool remove(std::string_view key)
Remove value by key.
void set(std::string_view key, value value)
Set value for key (thread-safe write)
void set_container(const std::string &key, std::shared_ptr< thread_safe_container > container)
Set a nested container.
std::vector< uint8_t > serialize() const
Serialize container to binary.
void set_typed(std::string_view key, T &&val)
Set typed value for key.
void for_each(Func &&func) const
Apply a function to all values (read-only)
bool compare_exchange(std::string_view key, const value &expected, const value &desired)
Atomic compare and swap.
Enhanced type-safe value with perfect legacy compatibility.
Definition value.h:129