Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
resource_manager.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11#pragma once
12
13#include <atomic>
14#include <chrono>
15#include <functional>
16#include <memory>
17#include <mutex>
18#include <string>
19#include <unordered_map>
20
22
23namespace kcenon::monitoring {
24
25// ============================================================================
26// Enums
27// ============================================================================
28
33 reject,
34 delay
35};
36
40enum class resource_type {
41 memory,
42 cpu
43};
44
45// ============================================================================
46// Metrics Types
47// ============================================================================
48
53 std::atomic<size_t> current_usage{0};
54 std::atomic<size_t> total_allocations{0};
55 std::atomic<size_t> peak_usage{0};
56 std::atomic<size_t> rejected_operations{0};
57 std::atomic<size_t> delayed_operations{0};
58
59 resource_metrics() = default;
66
68 if (this != &other) {
69 current_usage.store(other.current_usage.load());
70 total_allocations.store(other.total_allocations.load());
71 peak_usage.store(other.peak_usage.load());
72 rejected_operations.store(other.rejected_operations.load());
73 delayed_operations.store(other.delayed_operations.load());
74 }
75 return *this;
76 }
77};
78
79// ============================================================================
80// Configuration Types
81// ============================================================================
82
99
105 size_t max_value = 0;
106 size_t warning_threshold = 0;
109
110 resource_quota() = default;
112 : type(t), max_value(max), warning_threshold(max * 70 / 100),
113 critical_threshold(max * 90 / 100), strategy(s) {}
114
119 bool validate() const {
120 if (max_value == 0) {
121 return false;
122 }
124 return false;
125 }
127 return false;
128 }
130 return false;
131 }
132 return true;
133 }
134};
135
140 double max_cpu_usage = 0.8;
141 double warning_threshold = 0.7;
143 std::chrono::milliseconds check_interval = std::chrono::milliseconds(100);
144
149 bool validate() const {
151 return false;
152 }
154 return false;
155 }
157 return false;
158 }
159 return true;
160 }
161};
162
163// ============================================================================
164// Rate Limiter Interface
165// ============================================================================
166
171public:
172 virtual ~rate_limiter() = default;
173
179 virtual bool try_acquire(size_t count = 1) = 0;
180
187 template<typename Func>
188 auto execute(Func&& func) -> decltype(func()) {
189 using result_type = decltype(func());
190 using value_type = typename result_type::value_type;
191
192 if (!try_acquire(1)) {
193 return common::make_error<value_type>(static_cast<int>(monitoring_error_code::resource_exhausted),
194 "Rate limit exceeded for '" + get_name() + "'");
195 }
196 return func();
197 }
198
202 virtual const std::string& get_name() const = 0;
203};
204
205// ============================================================================
206// Token Bucket Rate Limiter
207// ============================================================================
208
216public:
217 using clock = std::chrono::steady_clock;
218
219 token_bucket_limiter(const std::string& name, double rate, size_t capacity,
221 : name_(name)
222 , rate_(rate)
223 , capacity_(capacity)
224 , tokens_(static_cast<double>(capacity))
225 , last_refill_(clock::now()) {}
226
227 bool try_acquire(size_t count = 1) override {
228 std::lock_guard<std::mutex> lock(mutex_);
229 refill();
230
231 if (tokens_ >= static_cast<double>(count)) {
232 tokens_ -= static_cast<double>(count);
233 return true;
234 }
235 return false;
236 }
237
238 const std::string& get_name() const override {
239 return name_;
240 }
241
242private:
243 void refill() {
244 auto now = clock::now();
245 auto elapsed = std::chrono::duration<double>(now - last_refill_).count();
246 tokens_ = std::min(static_cast<double>(capacity_), tokens_ + elapsed * rate_);
247 last_refill_ = now;
248 }
249
250 std::string name_;
251 double rate_;
252 size_t capacity_;
253 double tokens_;
254 clock::time_point last_refill_;
255 mutable std::mutex mutex_;
256};
257
258// ============================================================================
259// Leaky Bucket Rate Limiter
260// ============================================================================
261
269public:
270 using clock = std::chrono::steady_clock;
271
272 leaky_bucket_limiter(const std::string& name, double rate, size_t capacity)
273 : name_(name)
274 , rate_(rate)
275 , capacity_(capacity)
276 , water_(0.0)
277 , last_leak_(clock::now()) {}
278
279 bool try_acquire(size_t count = 1) override {
280 std::lock_guard<std::mutex> lock(mutex_);
281 leak();
282
283 if (water_ + static_cast<double>(count) <= static_cast<double>(capacity_)) {
284 water_ += static_cast<double>(count);
285 return true;
286 }
287 return false;
288 }
289
290 const std::string& get_name() const override {
291 return name_;
292 }
293
294private:
295 void leak() {
296 auto now = clock::now();
297 auto elapsed = std::chrono::duration<double>(now - last_leak_).count();
298 water_ = std::max(0.0, water_ - elapsed * rate_);
299 last_leak_ = now;
300 }
301
302 std::string name_;
303 double rate_;
304 size_t capacity_;
305 double water_;
306 clock::time_point last_leak_;
307 mutable std::mutex mutex_;
308};
309
310// ============================================================================
311// Memory Quota Manager
312// ============================================================================
313
318public:
319 memory_quota_manager(const std::string& name, const resource_quota& quota)
320 : name_(name), quota_(quota) {}
321
322 memory_quota_manager(const std::string& name, size_t max_bytes,
324 : name_(name), quota_(resource_type::memory, max_bytes, strategy) {}
325
331 common::Result<bool> allocate(size_t bytes) {
332 std::lock_guard<std::mutex> lock(mutex_);
333
334 if (metrics_.current_usage.load() + bytes > quota_.max_value) {
336 return common::make_error<bool>(static_cast<int>(monitoring_error_code::resource_exhausted),
337 "Memory quota exceeded for '" + name_ + "'");
338 }
339
340 metrics_.current_usage += bytes;
342
343 // Update peak usage
344 size_t current = metrics_.current_usage.load();
345 size_t peak = metrics_.peak_usage.load();
346 while (current > peak && !metrics_.peak_usage.compare_exchange_weak(peak, current)) {
347 // Retry until successful
348 }
349
350 return common::ok(true);
351 }
352
357 void deallocate(size_t bytes) {
358 std::lock_guard<std::mutex> lock(mutex_);
359 size_t current = metrics_.current_usage.load();
360 metrics_.current_usage.store(current >= bytes ? current - bytes : 0);
361 }
362
366 size_t current_usage() const {
367 return metrics_.current_usage.load();
368 }
369
375 }
376
383
388 return metrics_;
389 }
390
394 const std::string& get_name() const {
395 return name_;
396 }
397
398private:
399 std::string name_;
402 mutable std::mutex mutex_;
403};
404
405// ============================================================================
406// CPU Throttler
407// ============================================================================
408
413public:
414 cpu_throttler(const std::string& name, const cpu_throttle_config& config)
415 : name_(name), config_(config) {}
416
423 template<typename Func>
424 auto execute(Func&& func) -> decltype(func()) {
425 // For simplicity, we just execute the function and track metrics
426 // Real CPU monitoring would require platform-specific code
428
429 return std::forward<Func>(func)();
430 }
431
436 return metrics_;
437 }
438
442 const std::string& get_name() const {
443 return name_;
444 }
445
446private:
447 std::string name_;
450};
451
452// ============================================================================
453// Resource Manager
454// ============================================================================
455
460public:
461 explicit resource_manager(const std::string& name) : name_(name) {}
462
469 common::Result<bool> add_rate_limiter(const std::string& name, const rate_limit_config& config) {
470 std::lock_guard<std::mutex> lock(mutex_);
471
472 if (rate_limiters_.find(name) != rate_limiters_.end()) {
473 return common::make_error<bool>(static_cast<int>(monitoring_error_code::already_exists),
474 "Rate limiter '" + name + "' already exists");
475 }
476
477 rate_limiters_[name] = std::make_unique<token_bucket_limiter>(
478 name, config.rate_per_second, config.burst_capacity, config.strategy);
479 return common::ok(true);
480 }
481
487 rate_limiter* get_rate_limiter(const std::string& name) {
488 std::lock_guard<std::mutex> lock(mutex_);
489 auto it = rate_limiters_.find(name);
490 return it != rate_limiters_.end() ? it->second.get() : nullptr;
491 }
492
499 common::Result<bool> add_memory_quota(const std::string& name, const resource_quota& quota) {
500 std::lock_guard<std::mutex> lock(mutex_);
501
502 if (memory_quotas_.find(name) != memory_quotas_.end()) {
503 return common::make_error<bool>(static_cast<int>(monitoring_error_code::already_exists),
504 "Memory quota '" + name + "' already exists");
505 }
506
507 memory_quotas_[name] = std::make_unique<memory_quota_manager>(name, quota);
508 return common::ok(true);
509 }
510
516 memory_quota_manager* get_memory_quota(const std::string& name) {
517 std::lock_guard<std::mutex> lock(mutex_);
518 auto it = memory_quotas_.find(name);
519 return it != memory_quotas_.end() ? it->second.get() : nullptr;
520 }
521
528 common::Result<bool> add_cpu_throttler(const std::string& name, const cpu_throttle_config& config) {
529 std::lock_guard<std::mutex> lock(mutex_);
530
531 if (cpu_throttlers_.find(name) != cpu_throttlers_.end()) {
532 return common::make_error<bool>(static_cast<int>(monitoring_error_code::already_exists),
533 "CPU throttler '" + name + "' already exists");
534 }
535
536 cpu_throttlers_[name] = std::make_unique<cpu_throttler>(name, config);
537 return common::ok(true);
538 }
539
545 cpu_throttler* get_cpu_throttler(const std::string& name) {
546 std::lock_guard<std::mutex> lock(mutex_);
547 auto it = cpu_throttlers_.find(name);
548 return it != cpu_throttlers_.end() ? it->second.get() : nullptr;
549 }
550
555 common::Result<bool> is_healthy() const {
556 std::lock_guard<std::mutex> lock(mutex_);
557
558 for (const auto& [name, manager] : memory_quotas_) {
559 if (manager->is_over_critical_threshold()) {
560 return common::ok(false);
561 }
562 }
563
564 return common::ok(true);
565 }
566
571 std::unordered_map<std::string, resource_metrics> get_all_metrics() const {
572 std::lock_guard<std::mutex> lock(mutex_);
573 std::unordered_map<std::string, resource_metrics> all_metrics;
574
575 for (const auto& [name, limiter] : rate_limiters_) {
576 all_metrics["rate_" + name] = resource_metrics{};
577 }
578
579 for (const auto& [name, manager] : memory_quotas_) {
580 all_metrics["memory_" + name] = manager->get_metrics();
581 }
582
583 for (const auto& [name, throttler] : cpu_throttlers_) {
584 all_metrics["cpu_" + name] = throttler->get_metrics();
585 }
586
587 return all_metrics;
588 }
589
590private:
591 std::string name_;
592 std::unordered_map<std::string, std::unique_ptr<rate_limiter>> rate_limiters_;
593 std::unordered_map<std::string, std::unique_ptr<memory_quota_manager>> memory_quotas_;
594 std::unordered_map<std::string, std::unique_ptr<cpu_throttler>> cpu_throttlers_;
595 mutable std::mutex mutex_;
596};
597
598// ============================================================================
599// Factory Functions
600// ============================================================================
601
605inline std::unique_ptr<token_bucket_limiter> create_token_bucket_limiter(
606 const std::string& name, double rate, size_t capacity,
608 return std::make_unique<token_bucket_limiter>(name, rate, capacity, strategy);
609}
610
614inline std::unique_ptr<leaky_bucket_limiter> create_leaky_bucket_limiter(
615 const std::string& name, double rate, size_t capacity) {
616 return std::make_unique<leaky_bucket_limiter>(name, rate, capacity);
617}
618
622inline std::unique_ptr<memory_quota_manager> create_memory_quota_manager(
623 const std::string& name, size_t max_bytes,
625 return std::make_unique<memory_quota_manager>(name, max_bytes, strategy);
626}
627
631inline std::unique_ptr<resource_manager> create_resource_manager(const std::string& name) {
632 return std::make_unique<resource_manager>(name);
633}
634
635} // namespace kcenon::monitoring
Throttles operations based on CPU usage.
const std::string & get_name() const
Get the name of this throttler.
cpu_throttler(const std::string &name, const cpu_throttle_config &config)
resource_metrics get_metrics() const
Get current metrics.
auto execute(Func &&func) -> decltype(func())
Execute a function with CPU throttling.
Leaky bucket rate limiter implementation.
bool try_acquire(size_t count=1) override
Try to acquire tokens.
leaky_bucket_limiter(const std::string &name, double rate, size_t capacity)
const std::string & get_name() const override
Get the name of this rate limiter.
Manages memory quota with tracking and throttling.
resource_metrics get_metrics() const
Get current metrics.
memory_quota_manager(const std::string &name, const resource_quota &quota)
size_t current_usage() const
Get current memory usage.
memory_quota_manager(const std::string &name, size_t max_bytes, throttling_strategy strategy=throttling_strategy::reject)
common::Result< bool > allocate(size_t bytes)
Allocate memory from the quota.
bool is_over_warning_threshold() const
Check if usage is over warning threshold.
bool is_over_critical_threshold() const
Check if usage is over critical threshold.
const std::string & get_name() const
Get the name of this manager.
void deallocate(size_t bytes)
Deallocate memory back to the quota.
Base interface for rate limiters.
auto execute(Func &&func) -> decltype(func())
Execute a function with rate limiting.
virtual const std::string & get_name() const =0
Get the name of this rate limiter.
virtual bool try_acquire(size_t count=1)=0
Try to acquire tokens.
Coordinates multiple resource management components.
common::Result< bool > add_memory_quota(const std::string &name, const resource_quota &quota)
Add a memory quota manager.
cpu_throttler * get_cpu_throttler(const std::string &name)
Get a CPU throttler by name.
std::unordered_map< std::string, std::unique_ptr< memory_quota_manager > > memory_quotas_
rate_limiter * get_rate_limiter(const std::string &name)
Get a rate limiter by name.
common::Result< bool > is_healthy() const
Check if all resources are healthy.
resource_manager(const std::string &name)
memory_quota_manager * get_memory_quota(const std::string &name)
Get a memory quota manager by name.
std::unordered_map< std::string, std::unique_ptr< rate_limiter > > rate_limiters_
common::Result< bool > add_cpu_throttler(const std::string &name, const cpu_throttle_config &config)
Add a CPU throttler.
common::Result< bool > add_rate_limiter(const std::string &name, const rate_limit_config &config)
Add a rate limiter.
std::unordered_map< std::string, std::unique_ptr< cpu_throttler > > cpu_throttlers_
std::unordered_map< std::string, resource_metrics > get_all_metrics() const
Get metrics for all managed resources.
Token bucket rate limiter implementation.
const std::string & get_name() const override
Get the name of this rate limiter.
token_bucket_limiter(const std::string &name, double rate, size_t capacity, throttling_strategy=throttling_strategy::reject)
bool try_acquire(size_t count=1) override
Try to acquire tokens.
throttling_strategy
Strategy for handling resource exhaustion.
@ reject
Reject requests immediately when limit exceeded.
@ delay
Delay requests until resources are available.
std::unique_ptr< memory_quota_manager > create_memory_quota_manager(const std::string &name, size_t max_bytes, throttling_strategy strategy=throttling_strategy::reject)
Create a memory quota manager.
std::unique_ptr< token_bucket_limiter > create_token_bucket_limiter(const std::string &name, double rate, size_t capacity, throttling_strategy strategy=throttling_strategy::reject)
Create a token bucket rate limiter.
std::unique_ptr< resource_manager > create_resource_manager(const std::string &name)
Create a resource manager.
resource_type
Type of resource being managed.
@ memory
Memory/DRAM power domain (RAPL)
@ cpu
CPU power domain (RAPL)
std::unique_ptr< leaky_bucket_limiter > create_leaky_bucket_limiter(const std::string &name, double rate, size_t capacity)
Create a leaky bucket rate limiter.
Result pattern type definitions for monitoring system.
Configuration for CPU throttling.
bool validate() const
Validate configuration.
double max_cpu_usage
Maximum CPU usage (0.0 to 1.0)
double warning_threshold
Warning threshold (0.0 to 1.0)
Configuration for rate limiting.
size_t burst_capacity
Maximum burst capacity.
double rate_per_second
Rate of token refill per second.
bool validate() const
Validate configuration.
Metrics for resource usage tracking.
resource_metrics(const resource_metrics &other)
std::atomic< size_t > rejected_operations
resource_metrics & operator=(const resource_metrics &other)
Configuration for resource quotas.
size_t critical_threshold
Critical level threshold.
size_t max_value
Maximum allowed resource usage.
size_t warning_threshold
Warning level threshold.
bool validate() const
Validate configuration.
resource_quota(resource_type t, size_t max, throttling_strategy s=throttling_strategy::reject)