Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
backpressure_job_queue.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
12#pragma once
13
19 // end of backpressure
20
21#include "job_queue.h"
22#include "backpressure_config.h"
23#include "token_bucket.h"
24
25#include <memory>
26#include <mutex>
27
28namespace kcenon::thread
29{
88 {
89 public:
100 std::size_t max_size,
102
107
108 // Non-copyable, non-movable (inherits shared_ptr semantics from job_queue)
113
114 // =========================================================================
115 // Overridden Queue Operations
116 // =========================================================================
117
138 [[nodiscard]] auto enqueue(std::unique_ptr<job>&& value) -> common::VoidResult override;
139
152 [[nodiscard]] auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs)
153 -> common::VoidResult override;
154
155 // =========================================================================
156 // Backpressure-Specific Methods
157 // =========================================================================
158
169 [[nodiscard]] auto get_pressure_level() const -> pressure_level;
170
178 [[nodiscard]] auto get_pressure_ratio() const -> double;
179
188 auto set_backpressure_config(backpressure_config config) -> void;
189
194 [[nodiscard]] auto get_backpressure_config() const -> const backpressure_config&;
195
196 // =========================================================================
197 // Rate Limiting
198 // =========================================================================
199
206 [[nodiscard]] auto is_rate_limited() const -> bool;
207
212 [[nodiscard]] auto get_available_tokens() const -> std::size_t;
213
214 // =========================================================================
215 // Statistics
216 // =========================================================================
217
225 [[nodiscard]] auto get_backpressure_stats() const -> backpressure_stats_snapshot;
226
230 auto reset_stats() -> void;
231
232 // =========================================================================
233 // String Representation
234 // =========================================================================
235
240 [[nodiscard]] auto to_string() const -> std::string override;
241
242 private:
251 [[nodiscard]] auto apply_backpressure(std::unique_ptr<job>&& value) -> common::VoidResult;
252
257 [[nodiscard]] auto apply_rate_limiting() -> bool;
258
262 auto update_pressure_state() -> void;
263
269 [[nodiscard]] auto handle_block_policy(std::unique_ptr<job>&& value) -> common::VoidResult;
270
276 [[nodiscard]] auto handle_drop_oldest_policy(std::unique_ptr<job>&& value) -> common::VoidResult;
277
283 [[nodiscard]] auto handle_callback_policy(std::unique_ptr<job>&& value) -> common::VoidResult;
284
290 [[nodiscard]] auto handle_adaptive_policy(std::unique_ptr<job>&& value) -> common::VoidResult;
291
297 [[nodiscard]] auto direct_enqueue(std::unique_ptr<job>&& value) -> common::VoidResult;
298
303
307 mutable std::mutex config_mutex_;
308
313
318
322 std::atomic<double> current_pressure_ratio_{0.0};
323
328
332 std::condition_variable space_available_;
333 };
334
335} // namespace kcenon::thread
Configuration for backpressure and queue overflow policies.
A job queue with comprehensive backpressure mechanisms.
backpressure_stats stats_
Backpressure statistics.
auto get_backpressure_stats() const -> backpressure_stats_snapshot
Returns backpressure statistics snapshot.
backpressure_job_queue(std::size_t max_size, backpressure_config config=backpressure_config{})
Constructs a backpressure-aware job queue.
auto handle_drop_oldest_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles drop_oldest policy.
auto set_backpressure_config(backpressure_config config) -> void
Sets the backpressure configuration.
auto to_string() const -> std::string override
Returns string representation including backpressure state.
auto get_backpressure_config() const -> const backpressure_config &
Returns the current backpressure configuration.
auto handle_adaptive_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles adaptive policy.
std::unique_ptr< token_bucket > rate_limiter_
Token bucket rate limiter (nullptr if disabled).
std::condition_variable space_available_
Condition variable for blocking policy wait.
auto is_rate_limited() const -> bool
Checks if rate limiting is causing delays.
backpressure_job_queue & operator=(const backpressure_job_queue &)=delete
auto get_available_tokens() const -> std::size_t
Returns available rate limit tokens.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult override
Enqueues a batch of jobs with backpressure handling.
backpressure_job_queue(backpressure_job_queue &&)=delete
backpressure_job_queue(const backpressure_job_queue &)=delete
auto apply_backpressure(std::unique_ptr< job > &&value) -> common::VoidResult
Applies backpressure logic for a single job.
backpressure_config config_
Backpressure configuration.
auto update_pressure_state() -> void
Updates pressure level and triggers callbacks if changed.
auto direct_enqueue(std::unique_ptr< job > &&value) -> common::VoidResult
Directly enqueues without backpressure (internal use).
~backpressure_job_queue() override
Virtual destructor.
auto enqueue(std::unique_ptr< job > &&value) -> common::VoidResult override
Enqueues a job with backpressure handling.
auto apply_rate_limiting() -> bool
Applies rate limiting check.
auto handle_block_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles blocking policy with timeout.
std::atomic< double > current_pressure_ratio_
Current pressure ratio (atomic for lock-free reads).
auto reset_stats() -> void
Resets backpressure statistics.
backpressure_job_queue & operator=(backpressure_job_queue &&)=delete
std::atomic< pressure_level > current_pressure_
Current pressure level (atomic for lock-free reads).
auto handle_callback_policy(std::unique_ptr< job > &&value) -> common::VoidResult
Handles callback policy.
std::mutex config_mutex_
Mutex for configuration access.
auto get_pressure_ratio() const -> double
Returns the current pressure as a ratio.
auto get_pressure_level() const -> pressure_level
Returns the current pressure level.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Lock-free token bucket rate limiter for controlling throughput.
pressure_level
Current pressure level for graduated response.
@ none
Below low_watermark, queue is healthy.
Thread-safe FIFO job queue with optional bounded size.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Configuration for backpressure mechanisms.
Snapshot of backpressure statistics (copyable).
Thread-safe statistics for backpressure operations.
Lock-free token bucket rate limiter for controlling throughput.