Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
storage_backends.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#pragma once
12
13#include <string>
14#include <vector>
15#include <memory>
16#include <unordered_map>
17#include <chrono>
18#include <mutex>
19#include <deque>
20#include <algorithm>
21
24
25namespace kcenon::monitoring {
26
42
47 none,
48 gzip,
49 lz4,
50 zstd
51};
52
58 std::string path;
59 std::string data_directory;
61 size_t max_size_mb{100};
62 bool auto_flush{true};
63 std::chrono::milliseconds flush_interval{std::chrono::milliseconds(5000)};
64
65 // Extended configuration for Phase 2
66 size_t max_capacity{1000};
67 size_t batch_size{100};
68 std::string table_name;
69 std::string host;
70 uint16_t port{0};
71 std::string database_name;
72 std::string username;
73 std::string password;
74
79 common::Result<bool> validate() const {
80 // Memory buffer doesn't require path
82 if (path.empty() && host.empty()) {
83 return common::Result<bool>::err(error_info(monitoring_error_code::invalid_configuration, "Path or host required for non-memory storage").to_common_error());
84 }
85 }
86
87 if (max_capacity == 0) {
88 return common::Result<bool>::err(error_info(monitoring_error_code::invalid_capacity, "Capacity must be greater than 0").to_common_error());
89 }
90
91 if (batch_size == 0) {
92 return common::Result<bool>::err(error_info(monitoring_error_code::invalid_configuration, "Batch size must be greater than 0").to_common_error());
93 }
94
96 return common::Result<bool>::err(error_info(monitoring_error_code::invalid_configuration, "Batch size cannot exceed capacity").to_common_error());
97 }
98
99 return common::ok(true);
100 }
101};
102
107public:
108 virtual ~snapshot_storage_backend() = default;
109
110 virtual common::Result<bool> store(const metrics_snapshot& snapshot) = 0;
111 virtual common::Result<metrics_snapshot> retrieve(size_t index) = 0;
112 virtual common::Result<std::vector<metrics_snapshot>> retrieve_range(size_t start, size_t count) = 0;
113 virtual size_t size() const = 0;
114 virtual size_t capacity() const = 0;
115 virtual common::Result<bool> flush() = 0;
116 virtual common::Result<bool> clear() = 0;
117 virtual std::unordered_map<std::string, size_t> get_stats() const = 0;
118};
119
124public:
126
127 explicit file_storage_backend(const storage_config& config)
128 : config_(config) {}
129
130 common::Result<bool> store(const metrics_snapshot& snapshot) override {
131 std::lock_guard<std::mutex> lock(mutex_);
132
133 // Remove oldest if at capacity
134 if (snapshots_.size() >= config_.max_capacity) {
135 snapshots_.pop_front();
136 }
137
138 snapshots_.push_back(snapshot);
139 return common::ok(true);
140 }
141
142 common::Result<metrics_snapshot> retrieve(size_t index) override {
143 std::lock_guard<std::mutex> lock(mutex_);
144
145 if (index >= snapshots_.size()) {
146 return common::Result<metrics_snapshot>::err(error_info(monitoring_error_code::not_found, "Snapshot index out of range").to_common_error());
147 }
148
149 return common::ok(snapshots_[index]);
150 }
151
152 common::Result<std::vector<metrics_snapshot>> retrieve_range(size_t start, size_t count) override {
153 std::lock_guard<std::mutex> lock(mutex_);
154
155 std::vector<metrics_snapshot> result;
156 size_t end = std::min(start + count, snapshots_.size());
157
158 for (size_t i = start; i < end; ++i) {
159 result.push_back(snapshots_[i]);
160 }
161
162 return common::ok(std::move(result));
163 }
164
165 size_t size() const override {
166 std::lock_guard<std::mutex> lock(mutex_);
167 return snapshots_.size();
168 }
169
170 size_t capacity() const override {
171 return config_.max_capacity;
172 }
173
174 common::Result<bool> flush() override {
175 // Stub implementation - actual file I/O would go here
176 return common::ok(true);
177 }
178
179 common::Result<bool> clear() override {
180 std::lock_guard<std::mutex> lock(mutex_);
181 snapshots_.clear();
182 return common::ok(true);
183 }
184
185 std::unordered_map<std::string, size_t> get_stats() const override {
186 std::lock_guard<std::mutex> lock(mutex_);
187 return {
188 {"total_snapshots", snapshots_.size()},
189 {"capacity", config_.max_capacity}
190 };
191 }
192
193private:
195 std::deque<metrics_snapshot> snapshots_;
196 mutable std::mutex mutex_;
197};
198
203public:
205
207 : config_(config), connected_(true) {}
208
209 common::Result<bool> store(const metrics_snapshot& snapshot) override {
210 std::lock_guard<std::mutex> lock(mutex_);
211
212 if (snapshots_.size() >= config_.max_capacity) {
213 snapshots_.pop_front();
214 }
215
216 snapshots_.push_back(snapshot);
217 return common::ok(true);
218 }
219
220 common::Result<metrics_snapshot> retrieve(size_t index) override {
221 std::lock_guard<std::mutex> lock(mutex_);
222
223 if (index >= snapshots_.size()) {
224 return common::Result<metrics_snapshot>::err(error_info(monitoring_error_code::not_found, "Snapshot index out of range").to_common_error());
225 }
226
227 return common::ok(snapshots_[index]);
228 }
229
230 common::Result<std::vector<metrics_snapshot>> retrieve_range(size_t start, size_t count) override {
231 std::lock_guard<std::mutex> lock(mutex_);
232
233 std::vector<metrics_snapshot> result;
234 size_t end = std::min(start + count, snapshots_.size());
235
236 for (size_t i = start; i < end; ++i) {
237 result.push_back(snapshots_[i]);
238 }
239
240 return common::ok(std::move(result));
241 }
242
243 size_t size() const override {
244 std::lock_guard<std::mutex> lock(mutex_);
245 return snapshots_.size();
246 }
247
248 size_t capacity() const override {
249 return config_.max_capacity;
250 }
251
252 common::Result<bool> flush() override {
253 return common::ok(true);
254 }
255
256 common::Result<bool> clear() override {
257 std::lock_guard<std::mutex> lock(mutex_);
258 snapshots_.clear();
259 return common::ok(true);
260 }
261
262 std::unordered_map<std::string, size_t> get_stats() const override {
263 std::lock_guard<std::mutex> lock(mutex_);
264 return {
265 {"stored_count", snapshots_.size()},
266 {"capacity", config_.max_capacity},
267 {"connected", connected_ ? 1UL : 0UL}
268 };
269 }
270
271private:
273 std::deque<metrics_snapshot> snapshots_;
274 mutable std::mutex mutex_;
275 bool connected_{false};
276};
277
282public:
284
285 explicit cloud_storage_backend(const storage_config& config)
286 : config_(config) {}
287
288 common::Result<bool> store(const metrics_snapshot& snapshot) override {
289 std::lock_guard<std::mutex> lock(mutex_);
290
291 if (snapshots_.size() >= config_.max_capacity) {
292 snapshots_.pop_front();
293 }
294
295 snapshots_.push_back(snapshot);
296 return common::ok(true);
297 }
298
299 common::Result<metrics_snapshot> retrieve(size_t index) override {
300 std::lock_guard<std::mutex> lock(mutex_);
301
302 if (index >= snapshots_.size()) {
303 return common::Result<metrics_snapshot>::err(error_info(monitoring_error_code::not_found, "Snapshot index out of range").to_common_error());
304 }
305
306 return common::ok(snapshots_[index]);
307 }
308
309 common::Result<std::vector<metrics_snapshot>> retrieve_range(size_t start, size_t count) override {
310 std::lock_guard<std::mutex> lock(mutex_);
311
312 std::vector<metrics_snapshot> result;
313 size_t end = std::min(start + count, snapshots_.size());
314
315 for (size_t i = start; i < end; ++i) {
316 result.push_back(snapshots_[i]);
317 }
318
319 return common::ok(std::move(result));
320 }
321
322 size_t size() const override {
323 std::lock_guard<std::mutex> lock(mutex_);
324 return snapshots_.size();
325 }
326
327 size_t capacity() const override {
328 return config_.max_capacity;
329 }
330
331 common::Result<bool> flush() override {
332 return common::ok(true);
333 }
334
335 common::Result<bool> clear() override {
336 std::lock_guard<std::mutex> lock(mutex_);
337 snapshots_.clear();
338 return common::ok(true);
339 }
340
341 std::unordered_map<std::string, size_t> get_stats() const override {
342 std::lock_guard<std::mutex> lock(mutex_);
343 return {
344 {"stored_count", snapshots_.size()},
345 {"capacity", config_.max_capacity}
346 };
347 }
348
349private:
351 std::deque<metrics_snapshot> snapshots_;
352 mutable std::mutex mutex_;
353};
354
359public:
361
363 : config_(config) {}
364
365 common::Result<bool> store(const metrics_snapshot& snapshot) override {
366 std::lock_guard<std::mutex> lock(mutex_);
367
368 if (snapshots_.size() >= config_.max_capacity) {
369 snapshots_.pop_front();
370 }
371
372 snapshots_.push_back(snapshot);
373 return common::ok(true);
374 }
375
376 common::Result<metrics_snapshot> retrieve(size_t index) override {
377 std::lock_guard<std::mutex> lock(mutex_);
378
379 if (index >= snapshots_.size()) {
380 return common::Result<metrics_snapshot>::err(error_info(monitoring_error_code::not_found, "Snapshot index out of range").to_common_error());
381 }
382
383 return common::ok(snapshots_[index]);
384 }
385
386 common::Result<std::vector<metrics_snapshot>> retrieve_range(size_t start, size_t count) override {
387 std::lock_guard<std::mutex> lock(mutex_);
388
389 std::vector<metrics_snapshot> result;
390 size_t end = std::min(start + count, snapshots_.size());
391
392 for (size_t i = start; i < end; ++i) {
393 result.push_back(snapshots_[i]);
394 }
395
396 return common::ok(std::move(result));
397 }
398
399 size_t size() const override {
400 std::lock_guard<std::mutex> lock(mutex_);
401 return snapshots_.size();
402 }
403
404 size_t capacity() const override {
405 return config_.max_capacity;
406 }
407
408 common::Result<bool> flush() override {
409 return common::ok(true);
410 }
411
412 common::Result<bool> clear() override {
413 std::lock_guard<std::mutex> lock(mutex_);
414 snapshots_.clear();
415 return common::ok(true);
416 }
417
418 std::unordered_map<std::string, size_t> get_stats() const override {
419 std::lock_guard<std::mutex> lock(mutex_);
420 return {
421 {"stored_count", snapshots_.size()},
422 {"capacity", config_.max_capacity}
423 };
424 }
425
426private:
428 std::deque<metrics_snapshot> snapshots_;
429 mutable std::mutex mutex_;
430};
431
436public:
442 static std::unique_ptr<snapshot_storage_backend> create_backend(const storage_config& config) {
443 switch (config.type) {
445 return std::make_unique<memory_storage_backend>(config);
446
450 return std::make_unique<file_storage_backend>(config);
451
455 return std::make_unique<database_storage_backend>(config);
456
460 return std::make_unique<cloud_storage_backend>(config);
461
462 default:
463 return nullptr;
464 }
465 }
466
485};
486
487// Helper functions
488
496inline std::unique_ptr<snapshot_storage_backend> create_file_storage(
497 const std::string& path,
499 size_t capacity) {
500
501 storage_config config;
502 config.type = type;
503 config.path = path;
504 config.max_capacity = capacity;
505
507}
508
516inline std::unique_ptr<snapshot_storage_backend> create_database_storage(
518 const std::string& path,
519 const std::string& table) {
520
521 storage_config config;
522 config.type = type;
523 config.path = path;
524 config.table_name = table;
525 config.max_capacity = 10000;
526
528}
529
536inline std::unique_ptr<snapshot_storage_backend> create_cloud_storage(
538 const std::string& bucket) {
539
540 storage_config config;
541 config.type = type;
542 config.path = bucket;
543 config.max_capacity = 100000;
544
546}
547
548// Legacy key-value storage interface (for backward compatibility)
549
554public:
555 virtual ~kv_storage_backend() = default;
556 virtual bool store(const std::string& key, const std::string& value) = 0;
557 virtual std::string retrieve(const std::string& key) = 0;
558 virtual bool remove(const std::string& key) = 0;
559 virtual common::Result<bool> flush() { return common::ok(true); }
560};
561
566public:
568
569 explicit kv_memory_storage_backend(const storage_config& /* config */) {}
570
571 bool store(const std::string& key, const std::string& value) override {
572 data_[key] = value;
573 return true;
574 }
575
576 std::string retrieve(const std::string& key) override {
577 auto it = data_.find(key);
578 return it != data_.end() ? it->second : "";
579 }
580
581 bool remove(const std::string& key) override {
582 return data_.erase(key) > 0;
583 }
584
585private:
586 std::unordered_map<std::string, std::string> data_;
587};
588
589} // namespace kcenon::monitoring
Cloud storage backend (stub implementation)
cloud_storage_backend(const storage_config &config)
common::Result< bool > store(const metrics_snapshot &snapshot) override
common::Result< std::vector< metrics_snapshot > > retrieve_range(size_t start, size_t count) override
common::Result< metrics_snapshot > retrieve(size_t index) override
common::Result< bool > clear() override
std::unordered_map< std::string, size_t > get_stats() const override
std::deque< metrics_snapshot > snapshots_
common::Result< bool > flush() override
Database storage backend (stub implementation)
common::Result< bool > flush() override
database_storage_backend(const storage_config &config)
common::Result< bool > store(const metrics_snapshot &snapshot) override
std::deque< metrics_snapshot > snapshots_
std::unordered_map< std::string, size_t > get_stats() const override
common::Result< std::vector< metrics_snapshot > > retrieve_range(size_t start, size_t count) override
common::Result< bool > clear() override
common::Result< metrics_snapshot > retrieve(size_t index) override
File storage backend for metrics snapshots.
common::Result< metrics_snapshot > retrieve(size_t index) override
common::Result< bool > clear() override
file_storage_backend(const storage_config &config)
std::deque< metrics_snapshot > snapshots_
common::Result< std::vector< metrics_snapshot > > retrieve_range(size_t start, size_t count) override
common::Result< bool > flush() override
std::unordered_map< std::string, size_t > get_stats() const override
common::Result< bool > store(const metrics_snapshot &snapshot) override
In-memory key-value storage backend (legacy interface)
std::string retrieve(const std::string &key) override
std::unordered_map< std::string, std::string > data_
bool store(const std::string &key, const std::string &value) override
bool remove(const std::string &key) override
Basic key-value storage interface - stub.
virtual std::string retrieve(const std::string &key)=0
virtual bool remove(const std::string &key)=0
virtual bool store(const std::string &key, const std::string &value)=0
virtual common::Result< bool > flush()
In-memory snapshot storage backend.
common::Result< bool > store(const metrics_snapshot &snapshot) override
common::Result< bool > flush() override
common::Result< std::vector< metrics_snapshot > > retrieve_range(size_t start, size_t count) override
std::unordered_map< std::string, size_t > get_stats() const override
common::Result< bool > clear() override
memory_storage_backend(const storage_config &config)
std::deque< metrics_snapshot > snapshots_
common::Result< metrics_snapshot > retrieve(size_t index) override
Base interface for snapshot storage backends.
virtual std::unordered_map< std::string, size_t > get_stats() const =0
virtual common::Result< bool > flush()=0
virtual common::Result< bool > store(const metrics_snapshot &snapshot)=0
virtual common::Result< metrics_snapshot > retrieve(size_t index)=0
virtual common::Result< std::vector< metrics_snapshot > > retrieve_range(size_t start, size_t count)=0
virtual common::Result< bool > clear()=0
Factory for creating storage backends.
static std::unique_ptr< snapshot_storage_backend > create_backend(const storage_config &config)
Create a storage backend based on configuration.
static std::vector< storage_backend_type > get_supported_backends()
Get list of supported backend types.
Core monitoring system interface definitions.
@ none
Not in a cgroup or not Linux.
std::unique_ptr< snapshot_storage_backend > create_database_storage(storage_backend_type type, const std::string &path, const std::string &table)
Create a database storage backend.
std::unique_ptr< snapshot_storage_backend > create_cloud_storage(storage_backend_type type, const std::string &bucket)
Create a cloud storage backend.
compression_algorithm
Compression algorithms.
std::unique_ptr< snapshot_storage_backend > create_file_storage(const std::string &path, storage_backend_type type, size_t capacity)
Create a file storage backend.
storage_backend_type
Storage backend types.
Result pattern type definitions for monitoring system.
Extended error information with context.
Complete snapshot of metrics at a point in time.
std::chrono::milliseconds flush_interval
common::Result< bool > validate() const
Validate configuration.