Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
log_sampler.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8#include <cstring>
9
11
12// =============================================================================
13// log_sampler implementation
14// =============================================================================
15
17 : config_(config),
18 rng_state_(static_cast<std::uint64_t>(
19 std::chrono::steady_clock::now().time_since_epoch().count()) | 1) {
20 // Initialize rate limiting window
21 auto now = std::chrono::steady_clock::now();
22 rate_limit_window_start_.store(now.time_since_epoch().count());
23 adaptive_window_start_.store(now.time_since_epoch().count());
25}
26
28 : config_(std::move(other.config_)),
29 total_count_(other.total_count_.load()),
30 sampled_count_(other.sampled_count_.load()),
31 dropped_count_(other.dropped_count_.load()),
32 bypassed_count_(other.bypassed_count_.load()),
33 rng_state_(other.rng_state_.load()),
34 rate_limit_count_(other.rate_limit_count_.load()),
35 rate_limit_window_start_(other.rate_limit_window_start_.load()),
36 effective_rate_(other.effective_rate_.load()),
37 adaptive_window_count_(other.adaptive_window_count_.load()),
38 adaptive_window_start_(other.adaptive_window_start_.load()),
39 is_throttling_(other.is_throttling_.load()) {
40}
41
43 if (this != &other) {
44 std::unique_lock<std::shared_mutex> lock(config_mutex_);
45 config_ = std::move(other.config_);
46 total_count_.store(other.total_count_.load());
47 sampled_count_.store(other.sampled_count_.load());
48 dropped_count_.store(other.dropped_count_.load());
49 bypassed_count_.store(other.bypassed_count_.load());
50 rng_state_.store(other.rng_state_.load());
51 rate_limit_count_.store(other.rate_limit_count_.load());
52 rate_limit_window_start_.store(other.rate_limit_window_start_.load());
53 effective_rate_.store(other.effective_rate_.load());
54 adaptive_window_count_.store(other.adaptive_window_count_.load());
55 adaptive_window_start_.store(other.adaptive_window_start_.load());
56 is_throttling_.store(other.is_throttling_.load());
57 }
58 return *this;
59}
60
62 // Increment total count
63 total_count_.fetch_add(1, std::memory_order_relaxed);
64
65 // Check if sampling is enabled
66 bool enabled = false;
68 double rate = 1.0;
69 {
70 std::shared_lock<std::shared_mutex> lock(config_mutex_);
71 enabled = config_.enabled;
72 strategy = config_.strategy;
73 rate = config_.rate;
74 }
75
76 if (!enabled) {
77 sampled_count_.fetch_add(1, std::memory_order_relaxed);
78 return true; // Pass through when disabled
79 }
80
81 // Check if level bypasses sampling
82 if (should_bypass_level(entry.level)) {
83 bypassed_count_.fetch_add(1, std::memory_order_relaxed);
84 return true;
85 }
86
87 // Check if any field bypasses sampling (Phase 3.4)
88 if (should_bypass_field(entry)) {
89 bypassed_count_.fetch_add(1, std::memory_order_relaxed);
90 return true;
91 }
92
93 // Try to get field-specific rate first (Phase 3.4)
94 double effective = get_field_rate(entry);
95 if (effective < 0) {
96 // No field-specific rate found, try category
97 effective = rate;
98 if (entry.category.has_value()) {
99 effective = get_category_rate(entry.category->to_string());
100 }
101 }
102
103 // Apply the configured strategy
104 bool sampled = false;
105 switch (strategy) {
107 sampled = random_sample(effective);
108 break;
110 sampled = rate_limit_sample();
111 break;
113 sampled = adaptive_sample();
114 break;
116 sampled = hash_sample(entry.message.to_string(), effective);
117 break;
118 }
119
120 if (sampled) {
121 sampled_count_.fetch_add(1, std::memory_order_relaxed);
122 } else {
123 dropped_count_.fetch_add(1, std::memory_order_relaxed);
124 }
125
126 return sampled;
127}
128
129bool log_sampler::should_sample(log_level level,
130 const std::string& message) {
131 return should_sample(level, message, std::nullopt);
132}
133
134bool log_sampler::should_sample(log_level level,
135 const std::string& message,
136 const std::optional<std::string>& category) {
137 // Increment total count
138 total_count_.fetch_add(1, std::memory_order_relaxed);
139
140 // Check if sampling is enabled
141 bool enabled = false;
143 double rate = 1.0;
144 {
145 std::shared_lock<std::shared_mutex> lock(config_mutex_);
146 enabled = config_.enabled;
147 strategy = config_.strategy;
148 rate = config_.rate;
149 }
150
151 if (!enabled) {
152 sampled_count_.fetch_add(1, std::memory_order_relaxed);
153 return true; // Pass through when disabled
154 }
155
156 // Check if level bypasses sampling
157 if (should_bypass_level(level)) {
158 bypassed_count_.fetch_add(1, std::memory_order_relaxed);
159 return true;
160 }
161
162 // Get category-specific rate if applicable
163 double effective = rate;
164 if (category.has_value()) {
165 effective = get_category_rate(category.value());
166 }
167
168 // Apply the configured strategy
169 bool sampled = false;
170 switch (strategy) {
172 sampled = random_sample(effective);
173 break;
175 sampled = rate_limit_sample();
176 break;
178 sampled = adaptive_sample();
179 break;
181 sampled = hash_sample(message, effective);
182 break;
183 }
184
185 if (sampled) {
186 sampled_count_.fetch_add(1, std::memory_order_relaxed);
187 } else {
188 dropped_count_.fetch_add(1, std::memory_order_relaxed);
189 }
190
191 return sampled;
192}
193
195 std::unique_lock<std::shared_mutex> lock(config_mutex_);
196 config_ = config;
197 effective_rate_.store(config.rate);
198}
199
201 std::shared_lock<std::shared_mutex> lock(config_mutex_);
202 return config_;
203}
204
206 sampling_stats stats;
207 stats.total_count = total_count_.load(std::memory_order_relaxed);
208 stats.sampled_count = sampled_count_.load(std::memory_order_relaxed);
209 stats.dropped_count = dropped_count_.load(std::memory_order_relaxed);
210 stats.bypassed_count = bypassed_count_.load(std::memory_order_relaxed);
211 stats.effective_rate = effective_rate_.load(std::memory_order_relaxed);
212 stats.is_throttling = is_throttling_.load(std::memory_order_relaxed);
213 return stats;
214}
215
217 total_count_.store(0, std::memory_order_relaxed);
218 sampled_count_.store(0, std::memory_order_relaxed);
219 dropped_count_.store(0, std::memory_order_relaxed);
220 bypassed_count_.store(0, std::memory_order_relaxed);
221}
222
224 std::shared_lock<std::shared_mutex> lock(config_mutex_);
225 return config_.enabled;
226}
227
228void log_sampler::set_enabled(bool enabled) {
229 std::unique_lock<std::shared_mutex> lock(config_mutex_);
230 config_.enabled = enabled;
231}
232
234 return effective_rate_.load(std::memory_order_relaxed);
235}
236
237bool log_sampler::should_bypass_level(log_level level) const {
238 std::shared_lock<std::shared_mutex> lock(config_mutex_);
239 const auto& levels = config_.always_log_levels;
240 return std::find(levels.begin(), levels.end(), level) != levels.end();
241}
242
244 if (!entry.fields.has_value()) {
245 return false;
246 }
247
248 std::shared_lock<std::shared_mutex> lock(config_mutex_);
249 const auto& bypass_fields = config_.always_log_fields;
250
251 for (const auto& field_name : bypass_fields) {
252 if (entry.fields->find(field_name) != entry.fields->end()) {
253 return true; // Found a bypass field
254 }
255 }
256 return false;
257}
258
259double log_sampler::get_field_rate(const log_entry& entry) const {
260 if (!entry.fields.has_value()) {
261 return -1.0; // No fields, use default rate
262 }
263
264 std::shared_lock<std::shared_mutex> lock(config_mutex_);
265 const auto& field_rates = config_.field_rates;
266
267 // Check each configured field
268 for (const auto& [field_name, value_rates] : field_rates) {
269 auto field_it = entry.fields->find(field_name);
270 if (field_it == entry.fields->end()) {
271 continue;
272 }
273
274 // Convert field value to string for lookup
275 std::string value_str;
276 if (std::holds_alternative<std::string>(field_it->second)) {
277 value_str = std::get<std::string>(field_it->second);
278 } else if (std::holds_alternative<int64_t>(field_it->second)) {
279 value_str = std::to_string(std::get<int64_t>(field_it->second));
280 } else if (std::holds_alternative<double>(field_it->second)) {
281 value_str = std::to_string(std::get<double>(field_it->second));
282 } else if (std::holds_alternative<bool>(field_it->second)) {
283 value_str = std::get<bool>(field_it->second) ? "true" : "false";
284 }
285
286 // Look up the rate for this value
287 auto rate_it = value_rates.find(value_str);
288 if (rate_it != value_rates.end()) {
289 return rate_it->second;
290 }
291 }
292
293 return -1.0; // No matching field/value rate found
294}
295
296double log_sampler::get_category_rate(const std::string& category) const {
297 std::shared_lock<std::shared_mutex> lock(config_mutex_);
298 auto it = config_.category_rates.find(category);
299 if (it != config_.category_rates.end()) {
300 return it->second;
301 }
302 return config_.rate;
303}
304
305bool log_sampler::random_sample(double rate) {
306 if (rate >= 1.0) {
307 return true;
308 }
309 if (rate <= 0.0) {
310 return false;
311 }
312
313 // Generate random number and compare with threshold
314 std::uint64_t random = xorshift64();
315 double normalized = static_cast<double>(random) /
316 static_cast<double>(std::numeric_limits<std::uint64_t>::max());
317 return normalized < rate;
318}
319
321 auto now = std::chrono::steady_clock::now();
322 auto now_count = now.time_since_epoch().count();
323
324 std::size_t limit_per_second = 0;
325 std::size_t window_ms = 0;
326 {
327 std::shared_lock<std::shared_mutex> lock(config_mutex_);
328 limit_per_second = config_.rate_limit_per_second;
329 window_ms = config_.rate_limit_window_ms;
330 }
331
332 // Calculate max messages per window
333 std::size_t max_per_window = (limit_per_second * window_ms) / 1000;
334 if (max_per_window == 0) {
335 max_per_window = 1;
336 }
337
338 // Check if we need to reset the window
339 auto window_start = std::chrono::steady_clock::time_point(
340 std::chrono::steady_clock::duration(rate_limit_window_start_.load()));
341 auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
342 now - window_start).count();
343
344 if (static_cast<std::size_t>(elapsed_ms) >= window_ms) {
345 // Reset window
346 std::lock_guard<std::mutex> lock(rate_limit_mutex_);
347 // Double-check after acquiring lock
348 window_start = std::chrono::steady_clock::time_point(
349 std::chrono::steady_clock::duration(rate_limit_window_start_.load()));
350 elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
351 now - window_start).count();
352
353 if (static_cast<std::size_t>(elapsed_ms) >= window_ms) {
354 rate_limit_window_start_.store(now_count);
355 rate_limit_count_.store(0);
356 }
357 }
358
359 // Check if within limit
360 std::uint64_t current = rate_limit_count_.fetch_add(1, std::memory_order_relaxed);
361 return current < max_per_window;
362}
363
365 // Update adaptive rate if needed
367
368 // Use effective rate for sampling
369 double rate = effective_rate_.load(std::memory_order_relaxed);
370 return random_sample(rate);
371}
372
374 auto now = std::chrono::steady_clock::now();
375 auto now_count = now.time_since_epoch().count();
376
377 std::size_t threshold = 0;
378 double min_rate = 0.01;
379 double base_rate = 1.0;
380 {
381 std::shared_lock<std::shared_mutex> lock(config_mutex_);
382 threshold = config_.adaptive_threshold;
383 min_rate = config_.adaptive_min_rate;
384 base_rate = config_.rate;
385 }
386
387 // Increment window count
388 adaptive_window_count_.fetch_add(1, std::memory_order_relaxed);
389
390 // Check if we need to recalculate rate (every second)
391 auto window_start = std::chrono::steady_clock::time_point(
392 std::chrono::steady_clock::duration(adaptive_window_start_.load()));
393 auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
394 now - window_start).count();
395
396 if (elapsed_ms >= 1000) {
397 // Calculate messages per second
398 std::uint64_t count = adaptive_window_count_.exchange(0);
399 adaptive_window_start_.store(now_count);
400
401 if (count > threshold) {
402 // Calculate new rate based on how much we're over threshold
403 double ratio = static_cast<double>(threshold) / static_cast<double>(count);
404 double new_rate = base_rate * ratio;
405 new_rate = std::max(new_rate, min_rate);
406 effective_rate_.store(new_rate);
407 is_throttling_.store(true);
408 } else {
409 // Reset to base rate
410 effective_rate_.store(base_rate);
411 is_throttling_.store(false);
412 }
413 }
414}
415
416bool log_sampler::hash_sample(const std::string& message, double rate) {
417 if (rate >= 1.0) {
418 return true;
419 }
420 if (rate <= 0.0) {
421 return false;
422 }
423
424 std::uint64_t seed = 0;
425 {
426 std::shared_lock<std::shared_mutex> lock(config_mutex_);
427 seed = config_.hash_seed;
428 }
429
430 // Compute hash of message
431 std::uint64_t hash = fnv1a_hash(message);
432 hash ^= seed;
433
434 // Normalize hash to [0, 1) and compare with rate
435 double normalized = static_cast<double>(hash) /
436 static_cast<double>(std::numeric_limits<std::uint64_t>::max());
437 return normalized < rate;
438}
439
440std::uint64_t log_sampler::xorshift64() {
441 std::uint64_t x = rng_state_.load(std::memory_order_relaxed);
442 x ^= x << 13;
443 x ^= x >> 7;
444 x ^= x << 17;
445 rng_state_.store(x, std::memory_order_relaxed);
446 return x;
447}
448
449std::uint64_t log_sampler::fnv1a_hash(const std::string& str) {
450 constexpr std::uint64_t kFnvOffsetBasis = 14695981039346656037ULL;
451 constexpr std::uint64_t kFnvPrime = 1099511628211ULL;
452
453 std::uint64_t hash = kFnvOffsetBasis;
454 for (char c : str) {
455 hash ^= static_cast<std::uint64_t>(static_cast<unsigned char>(c));
456 hash *= kFnvPrime;
457 }
458 return hash;
459}
460
461// =============================================================================
462// sampler_factory implementation
463// =============================================================================
464
465std::unique_ptr<log_sampler> sampler_factory::create_disabled() {
466 return std::make_unique<log_sampler>(sampling_config::disabled());
467}
468
469std::unique_ptr<log_sampler> sampler_factory::create_random(double rate) {
470 return std::make_unique<log_sampler>(sampling_config::random_sampling(rate));
471}
472
473std::unique_ptr<log_sampler> sampler_factory::create_rate_limited(std::size_t max_per_second) {
474 return std::make_unique<log_sampler>(sampling_config::rate_limited(max_per_second));
475}
476
477std::unique_ptr<log_sampler> sampler_factory::create_adaptive(
478 std::size_t threshold,
479 double min_rate) {
480 return std::make_unique<log_sampler>(sampling_config::adaptive(threshold, min_rate));
481}
482
483std::unique_ptr<log_sampler> sampler_factory::create_production(
484 double base_rate,
485 std::vector<log_level> critical_levels) {
486 sampling_config config;
487 config.enabled = true;
488 config.rate = base_rate;
490 config.adaptive_enabled = true;
491 config.adaptive_threshold = 50000;
492 config.adaptive_min_rate = 0.01;
493 config.always_log_levels = std::move(critical_levels);
494 return std::make_unique<log_sampler>(config);
495}
496
497} // namespace kcenon::logger::sampling
Thread-safe log sampler with multiple strategy support.
Definition log_sampler.h:71
bool random_sample(double rate)
Perform random sampling decision.
void update_adaptive_rate()
Update adaptive sampling rate based on current throughput.
bool adaptive_sample()
Perform adaptive sampling decision.
std::atomic< std::uint64_t > sampled_count_
std::atomic< std::uint64_t > rate_limit_count_
void set_enabled(bool enabled)
Enable or disable sampling.
sampling_config get_config() const
Get the current configuration.
sampling_stats get_stats() const
Get sampling statistics.
bool rate_limit_sample()
Perform rate limiting sampling decision.
bool is_enabled() const
Check if sampling is enabled.
std::atomic< std::uint64_t > bypassed_count_
std::atomic< std::chrono::steady_clock::time_point::rep > rate_limit_window_start_
double get_field_rate(const log_entry &entry) const
Get the sampling rate based on structured fields.
std::atomic< std::uint64_t > total_count_
std::atomic< std::uint64_t > adaptive_window_count_
bool should_bypass_level(log_level level) const
Check if a level should bypass sampling.
bool should_bypass_field(const log_entry &entry) const
Check if any field should bypass sampling.
std::atomic< std::chrono::steady_clock::time_point::rep > adaptive_window_start_
std::atomic< std::uint64_t > dropped_count_
log_sampler & operator=(const log_sampler &)=delete
Copy assignment operator (deleted)
void reset_stats()
Reset sampling statistics.
double get_effective_rate() const
Get the current effective sampling rate.
void set_config(const sampling_config &config)
Update the sampling configuration.
std::uint64_t xorshift64()
Fast xorshift64 PRNG for random sampling.
double get_category_rate(const std::string &category) const
Get the sampling rate for a category.
bool should_sample(const log_entry &entry)
Check if a log entry should be sampled (logged)
log_sampler(const sampling_config &config=sampling_config{})
Construct a sampler with the given configuration.
bool hash_sample(const std::string &message, double rate)
Perform hash-based sampling decision.
std::atomic< std::uint64_t > rng_state_
static std::uint64_t fnv1a_hash(const std::string &str)
FNV-1a hash for message hashing.
std::atomic< double > effective_rate_
static std::unique_ptr< log_sampler > create_random(double rate)
Create a random sampling sampler.
static std::unique_ptr< log_sampler > create_adaptive(std::size_t threshold=10000, double min_rate=0.01)
Create an adaptive sampler.
static std::unique_ptr< log_sampler > create_disabled()
Create a disabled sampler (pass-through)
static std::unique_ptr< log_sampler > create_rate_limited(std::size_t max_per_second)
Create a rate limiting sampler.
static std::unique_ptr< log_sampler > create_production(double base_rate=0.1, std::vector< log_level > critical_levels={ log_level::warn, log_level::error, log_level::fatal })
Create a production-ready sampler.
std::string to_string() const
Convert to std::string.
Log sampling implementation for high-volume scenarios kcenon.
sampling_strategy
Defines the sampling algorithm to use.
@ hash_based
Deterministic sampling based on message hash.
@ random
Simple random sampling based on probability.
@ rate_limiting
Rate-based sampling (N logs per time window)
@ adaptive
Adaptive sampling that adjusts based on volume.
Represents a single log entry with all associated metadata.
Definition log_entry.h:155
std::optional< log_fields > fields
Optional structured fields for key-value logging.
Definition log_entry.h:213
log_level level
Severity level of the log message.
Definition log_entry.h:162
std::optional< small_string_128 > category
Optional category for log filtering and routing.
Definition log_entry.h:197
small_string_256 message
The actual log message.
Definition log_entry.h:169
Configuration for log sampling behavior.
static sampling_config random_sampling(double sample_rate)
Create a random sampling configuration.
double rate
Base sampling rate (0.0 to 1.0)
static sampling_config rate_limited(std::size_t max_per_second)
Create a rate limiting configuration.
std::size_t adaptive_threshold
Threshold (messages/second) to trigger adaptive sampling.
std::vector< std::string > always_log_fields
std::unordered_map< std::string, std::unordered_map< std::string, double > > field_rates
std::unordered_map< std::string, double > category_rates
std::size_t rate_limit_per_second
Maximum logs per second for rate limiting strategy.
std::size_t rate_limit_window_ms
Time window for rate limiting (milliseconds)
std::uint64_t hash_seed
Seed for hash-based sampling.
bool adaptive_enabled
Enable adaptive sampling.
double adaptive_min_rate
Minimum sampling rate when adaptive sampling is active.
static sampling_config disabled()
Create a default sampling configuration (disabled)
static sampling_config adaptive(std::size_t threshold, double min_rate=0.01)
Create an adaptive sampling configuration.
sampling_strategy strategy
Sampling strategy to use.
std::vector< log_level > always_log_levels
Log levels that are never sampled (always logged)
bool enabled
Enable or disable sampling.
Statistics about sampling behavior.
std::uint64_t sampled_count
Number of messages that passed sampling (were logged)
std::uint64_t bypassed_count
Number of messages that bypassed sampling (always_log levels)
std::uint64_t dropped_count
Number of messages dropped due to sampling.
double effective_rate
Current effective sampling rate.
bool is_throttling
Whether adaptive sampling is currently reducing the rate.
std::uint64_t total_count
Total number of log messages considered for sampling.