Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
backpressure_config.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
28#include "job.h"
29
30#include <atomic>
31#include <chrono>
32#include <cstddef>
33#include <cstdint>
34#include <functional>
35#include <memory>
36#include <string>
37
38namespace kcenon::thread
39{
50 {
51 block,
54 callback,
56 };
57
69 {
70 accept,
71 reject,
73 delay
74 };
75
85 enum class pressure_level
86 {
87 none,
88 low,
89 high,
91 };
92
98 [[nodiscard]] inline auto pressure_level_to_string(pressure_level level) -> std::string
99 {
100 switch (level)
101 {
102 case pressure_level::none: return "none";
103 case pressure_level::low: return "low";
104 case pressure_level::high: return "high";
105 case pressure_level::critical: return "critical";
106 default: return "unknown";
107 }
108 }
109
115 [[nodiscard]] inline auto backpressure_policy_to_string(backpressure_policy policy) -> std::string
116 {
117 switch (policy)
118 {
119 case backpressure_policy::block: return "block";
120 case backpressure_policy::drop_oldest: return "drop_oldest";
121 case backpressure_policy::drop_newest: return "drop_newest";
122 case backpressure_policy::callback: return "callback";
123 case backpressure_policy::adaptive: return "adaptive";
124 default: return "unknown";
125 }
126 }
127
167 {
168 // =========================================================================
169 // Policy Selection
170 // =========================================================================
171
183
184 // =========================================================================
185 // Watermarks
186 // =========================================================================
187
194 double high_watermark = 0.8;
195
203 double low_watermark = 0.5;
204
205 // =========================================================================
206 // Blocking Behavior
207 // =========================================================================
208
215 std::chrono::milliseconds block_timeout{5000};
216
217 // =========================================================================
218 // Rate Limiting (Token Bucket)
219 // =========================================================================
220
228
235 std::size_t rate_limit_tokens_per_second = 10000;
236
243 std::size_t rate_limit_burst_size = 1000;
244
245 // =========================================================================
246 // Callbacks
247 // =========================================================================
248
258 std::function<void(std::size_t queue_depth, double pressure_ratio)> pressure_callback;
259
269 std::function<backpressure_decision(std::unique_ptr<job>&)> decision_callback;
270
271 // =========================================================================
272 // Adaptive Mode Settings
273 // =========================================================================
274
281 std::chrono::milliseconds adaptive_sample_interval{100};
282
290
300 [[nodiscard]] auto is_valid() const -> bool
301 {
302 // Watermark validation
304 {
305 return false;
306 }
308 {
309 return false;
310 }
312 {
313 return false;
314 }
315
316 // Callback policy requires decision_callback
318 {
319 return false;
320 }
321
322 // Rate limiting validation
324 {
326 {
327 return false;
328 }
329 if (rate_limit_burst_size == 0)
330 {
331 return false;
332 }
333 }
334
335 return true;
336 }
337 };
338
349 {
350 std::uint64_t jobs_accepted{0};
351 std::uint64_t jobs_rejected{0};
352 std::uint64_t jobs_dropped{0};
353 std::uint64_t rate_limit_waits{0};
354 std::uint64_t pressure_events{0};
355 std::uint64_t total_block_time_ns{0};
356
361 [[nodiscard]] auto acceptance_rate() const -> double
362 {
363 auto total = jobs_accepted + jobs_rejected;
364 if (total == 0)
365 {
366 return 1.0;
367 }
368 return static_cast<double>(jobs_accepted) / static_cast<double>(total);
369 }
370
375 [[nodiscard]] auto avg_block_time_ms() const -> double
376 {
377 if (rate_limit_waits == 0)
378 {
379 return 0.0;
380 }
381 return static_cast<double>(total_block_time_ns) /
382 static_cast<double>(rate_limit_waits) / 1e6;
383 }
384 };
385
396 {
400 std::atomic<std::uint64_t> jobs_accepted{0};
401
405 std::atomic<std::uint64_t> jobs_rejected{0};
406
410 std::atomic<std::uint64_t> jobs_dropped{0};
411
415 std::atomic<std::uint64_t> rate_limit_waits{0};
416
420 std::atomic<std::uint64_t> pressure_events{0};
421
425 std::atomic<std::uint64_t> total_block_time_ns{0};
426
430 auto reset() -> void
431 {
432 jobs_accepted.store(0, std::memory_order_relaxed);
433 jobs_rejected.store(0, std::memory_order_relaxed);
434 jobs_dropped.store(0, std::memory_order_relaxed);
435 rate_limit_waits.store(0, std::memory_order_relaxed);
436 pressure_events.store(0, std::memory_order_relaxed);
437 total_block_time_ns.store(0, std::memory_order_relaxed);
438 }
439
444 [[nodiscard]] auto snapshot() const -> backpressure_stats_snapshot
445 {
447 snap.jobs_accepted = jobs_accepted.load(std::memory_order_relaxed);
448 snap.jobs_rejected = jobs_rejected.load(std::memory_order_relaxed);
449 snap.jobs_dropped = jobs_dropped.load(std::memory_order_relaxed);
450 snap.rate_limit_waits = rate_limit_waits.load(std::memory_order_relaxed);
451 snap.pressure_events = pressure_events.load(std::memory_order_relaxed);
452 snap.total_block_time_ns = total_block_time_ns.load(std::memory_order_relaxed);
453 return snap;
454 }
455 };
456
457} // namespace kcenon::thread
backpressure_decision
Decision returned by callback policy handler.
pressure_level
Current pressure level for graduated response.
backpressure_policy
Policy for handling queue overflow conditions.
@ drop_and_accept
Drop the oldest job, then accept new one.
@ accept
Accept the job into the queue.
@ reject
Reject with error (queue_full)
@ delay
Delay processing (attempt later)
@ none
Below low_watermark, queue is healthy.
@ low
Between low and high watermark.
@ critical
At or above max_size, queue is full.
@ high
Above high_watermark, approaching capacity.
@ drop_newest
Reject the new job when full.
@ block
Block until space is available (with timeout)
@ adaptive
Automatically adjust based on load conditions.
@ callback
Call user callback for custom decision.
@ drop_oldest
Drop the oldest job when full to make room.
Base job class for schedulable work units in the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
auto pressure_level_to_string(pressure_level level) -> std::string
Converts pressure_level to human-readable string.
auto backpressure_policy_to_string(backpressure_policy policy) -> std::string
Converts backpressure_policy to human-readable string.
@ queue_depth
Queue depth threshold exceeded.
Configuration for backpressure mechanisms.
backpressure_policy policy
The backpressure policy to use.
auto is_valid() const -> bool
Validates the configuration.
std::size_t rate_limit_tokens_per_second
Token refill rate (tokens added per second).
std::chrono::milliseconds adaptive_sample_interval
Sampling interval for adaptive mode.
double low_watermark
Low watermark threshold (percentage of max_size).
double high_watermark
High watermark threshold (percentage of max_size).
std::function< backpressure_decision(std::unique_ptr< job > &)> decision_callback
Custom decision callback for callback policy.
std::function< void(std::size_t queue_depth, double pressure_ratio)> pressure_callback
Callback for pressure events.
std::chrono::milliseconds block_timeout
Maximum time to block when using block policy.
std::size_t rate_limit_burst_size
Maximum tokens that can accumulate (burst capacity).
double adaptive_target_latency_ms
Target latency for adaptive mode (milliseconds).
bool enable_rate_limiting
Enable token bucket rate limiting.
Snapshot of backpressure statistics (copyable).
auto avg_block_time_ms() const -> double
Returns average block time per blocked operation.
auto acceptance_rate() const -> double
Returns acceptance rate (accepted / total attempts).
Thread-safe statistics for backpressure operations.
std::atomic< std::uint64_t > jobs_accepted
Total jobs accepted into the queue.
auto snapshot() const -> backpressure_stats_snapshot
Creates a copyable snapshot of current statistics.
std::atomic< std::uint64_t > jobs_dropped
Total jobs dropped (oldest dropped for new).
std::atomic< std::uint64_t > rate_limit_waits
Number of times rate limiting caused a wait.
auto reset() -> void
Resets all statistics to zero.
std::atomic< std::uint64_t > pressure_events
Number of times high watermark was crossed.
std::atomic< std::uint64_t > jobs_rejected
Total jobs rejected due to backpressure.
std::atomic< std::uint64_t > total_block_time_ns
Total time spent blocking in nanoseconds.