18 , policy_(
std::move(policy))
19 , last_sample_time_(
std::chrono::steady_clock::now())
34 bool expected =
false;
35 if (!running_.compare_exchange_strong(expected,
true))
42 monitor_thread_ = std::make_unique<std::thread>([
this]() {
50 if (!running_.compare_exchange_strong(expected,
false))
58 std::lock_guard<std::mutex> lock(mutex_);
63 if (monitor_thread_ && monitor_thread_->joinable())
65 monitor_thread_->join();
67 monitor_thread_.reset();
72 return running_.load(std::memory_order_acquire);
78 auto sample = collect_metrics();
82 std::scoped_lock<std::mutex> lock(history_mutex_);
83 metrics_history_.push_back(sample);
84 if (metrics_history_.size() > 60)
86 metrics_history_.pop_front();
91 std::vector<scaling_metrics_sample> samples;
93 std::scoped_lock<std::mutex> lock(history_mutex_);
94 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
95 samples.reserve(count);
96 auto it = metrics_history_.end();
97 std::advance(it, -
static_cast<std::ptrdiff_t
>(count));
98 for (; it != metrics_history_.end(); ++it)
100 samples.push_back(*it);
105 return make_decision(samples);
111 target_workers = std::clamp(target_workers, policy_.min_workers, policy_.max_workers);
113 std::size_t current_workers = pool_.get_active_worker_count();
115 if (target_workers > current_workers)
117 return add_workers(target_workers - current_workers);
119 else if (target_workers < current_workers)
121 return remove_workers(current_workers - target_workers);
129 std::size_t current = pool_.get_active_worker_count();
130 std::size_t increment = policy_.use_multiplicative_scaling
131 ?
static_cast<std::size_t
>(current * (policy_.scale_up_factor - 1.0))
132 : policy_.scale_up_increment;
139 std::size_t target = std::min(current + increment, policy_.max_workers);
140 return scale_to(target);
145 std::size_t current = pool_.get_active_worker_count();
146 std::size_t target = current > policy_.scale_down_increment
147 ? current - policy_.scale_down_increment
148 : policy_.min_workers;
150 target = std::max(target, policy_.min_workers);
151 return scale_to(target);
156 std::scoped_lock<std::mutex> lock(mutex_);
157 policy_ = std::move(policy);
171 -> std::vector<scaling_metrics_sample>
173 std::scoped_lock<std::mutex> lock(history_mutex_);
175 std::vector<scaling_metrics_sample>
result;
176 std::size_t actual_count = std::min(count, metrics_history_.size());
177 result.reserve(actual_count);
179 auto it = metrics_history_.end();
180 std::advance(it, -
static_cast<std::ptrdiff_t
>(actual_count));
181 for (; it != metrics_history_.end(); ++it)
197 std::scoped_lock<std::mutex> lock(stats_mutex_);
199 stats_.
min_workers = pool_.get_active_worker_count();
200 stats_.peak_workers = stats_.min_workers;
205 while (running_.load(std::memory_order_acquire))
209 std::unique_lock<std::mutex> lock(mutex_);
210 cv_.wait_for(lock, policy_.sample_interval, [
this]() {
211 return !running_.load(std::memory_order_acquire);
215 if (!running_.load(std::memory_order_acquire))
221 if (!pool_.is_running())
227 auto sample = collect_metrics();
231 std::scoped_lock<std::mutex> lock(history_mutex_);
232 metrics_history_.push_back(sample);
235 while (metrics_history_.size() > 60)
237 metrics_history_.pop_front();
248 std::vector<scaling_metrics_sample> samples;
250 std::scoped_lock<std::mutex> lock(history_mutex_);
251 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
252 if (count < policy_.samples_for_decision)
258 samples.reserve(count);
259 auto it = metrics_history_.end();
260 std::advance(it, -
static_cast<std::ptrdiff_t
>(count));
261 for (; it != metrics_history_.end(); ++it)
263 samples.push_back(*it);
268 auto decision = make_decision(samples);
269 if (decision.should_scale())
271 execute_scaling(decision);
276 std::scoped_lock<std::mutex> lock(stats_mutex_);
277 ++stats_.decisions_evaluated;
279 std::size_t current = pool_.get_active_worker_count();
280 stats_.peak_workers = std::max(stats_.peak_workers, current);
281 if (stats_.min_workers == 0 || current < stats_.min_workers)
283 stats_.min_workers = current;
291 auto now = std::chrono::steady_clock::now();
314 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
320 static_cast<double>(duration);
329 sample.
p95_latency_ms = enhanced_snapshot.wait_time_p99_us / 1000.0;
349 double avg_utilization = 0.0;
350 double avg_queue_depth_per_worker = 0.0;
351 double avg_latency = 0.0;
352 std::size_t avg_queue_depth = 0;
354 for (
const auto& sample : samples)
356 avg_utilization += sample.utilization;
357 avg_queue_depth_per_worker += sample.queue_depth_per_worker;
358 avg_latency += sample.p95_latency_ms;
359 avg_queue_depth += sample.queue_depth;
362 auto sample_count =
static_cast<double>(samples.size());
363 avg_utilization /= sample_count;
364 avg_queue_depth_per_worker /= sample_count;
365 avg_latency /= sample_count;
366 avg_queue_depth /= samples.size();
368 std::size_t current_workers = pool_.get_active_worker_count();
373 if (avg_utilization > policy_.scale_up.utilization_threshold)
375 std::size_t target = std::min(
376 current_workers + policy_.scale_up_increment,
377 policy_.max_workers);
382 .target_workers = target,
384 "Utilization {:.1f}% exceeds threshold {:.1f}%",
385 avg_utilization * 100, policy_.scale_up.utilization_threshold * 100)
389 if (avg_queue_depth_per_worker > policy_.scale_up.queue_depth_threshold)
391 std::size_t target = std::min(
392 current_workers + policy_.scale_up_increment,
393 policy_.max_workers);
398 .target_workers = target,
400 "Queue depth per worker {:.1f} exceeds threshold {:.1f}",
401 avg_queue_depth_per_worker, policy_.scale_up.queue_depth_threshold)
405 if (avg_latency > policy_.scale_up.latency_threshold_ms && avg_latency > 0)
407 std::size_t target = std::min(
408 current_workers + policy_.scale_up_increment,
409 policy_.max_workers);
414 .target_workers = target,
416 "P95 latency {:.1f}ms exceeds threshold {:.1f}ms",
417 avg_latency, policy_.scale_up.latency_threshold_ms)
421 if (avg_queue_depth > policy_.scale_up.pending_jobs_threshold)
423 std::size_t target = std::min(
424 current_workers + policy_.scale_up_increment,
425 policy_.max_workers);
430 .target_workers = target,
432 "Queue depth {} exceeds threshold {}",
433 avg_queue_depth, policy_.scale_up.pending_jobs_threshold)
439 if (can_scale_down() && current_workers > policy_.min_workers)
441 bool utilization_ok = avg_utilization < policy_.scale_down.utilization_threshold;
442 bool queue_depth_ok = avg_queue_depth_per_worker < policy_.scale_down.queue_depth_threshold;
444 if (utilization_ok && queue_depth_ok)
446 std::size_t target = std::max(
447 current_workers - policy_.scale_down_increment,
448 policy_.min_workers);
453 .target_workers = target,
455 "Utilization {:.1f}% below threshold {:.1f}%, queue depth {:.1f} below {:.1f}",
456 avg_utilization * 100, policy_.scale_down.utilization_threshold * 100,
457 avg_queue_depth_per_worker, policy_.scale_down.queue_depth_threshold)
467 std::size_t current_workers = pool_.get_active_worker_count();
468 auto now = std::chrono::steady_clock::now();
472 auto result = add_workers(decision.target_workers - current_workers);
475 last_scale_up_time_ = now;
477 std::scoped_lock<std::mutex> lock(stats_mutex_);
478 ++stats_.scale_up_count;
479 stats_.last_scale_up = now;
481 if (policy_.scaling_callback)
483 policy_.scaling_callback(
487 decision.target_workers);
493 auto result = remove_workers(current_workers - decision.target_workers);
496 last_scale_down_time_ = now;
498 std::scoped_lock<std::mutex> lock(stats_mutex_);
499 ++stats_.scale_down_count;
500 stats_.last_scale_down = now;
502 if (policy_.scaling_callback)
504 policy_.scaling_callback(
508 decision.target_workers);
521 auto now = std::chrono::steady_clock::now();
522 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
535 auto now = std::chrono::steady_clock::now();
536 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
550 const auto& context = pool_.get_context();
552 for (std::size_t i = 0; i < count; ++i)
554 auto worker = std::make_unique<thread_worker>(
true, context);
555 auto result = pool_.enqueue(std::move(worker));
574 auto result = pool_.remove_workers_internal(count, policy_.min_workers);
Automatic scaling of thread pool workers based on load metrics.
Manages automatic scaling of thread pool workers based on load metrics.
auto collect_metrics() const -> scaling_metrics_sample
Collects current metrics from the pool.
auto can_scale_down() const -> bool
Checks if scale-down cooldown has elapsed.
auto scale_to(std::size_t target_workers) -> common::VoidResult
Manually scales to a specific worker count.
std::uint64_t last_jobs_submitted_
std::atomic< bool > running_
std::chrono::steady_clock::time_point last_sample_time_
auto is_active() const -> bool
Checks if the autoscaler is currently active.
auto make_decision(const std::vector< scaling_metrics_sample > &samples) const -> scaling_decision
Makes a scaling decision based on recent samples.
auto get_stats() const -> autoscaling_stats
Gets autoscaling statistics.
auto scale_up() -> common::VoidResult
Manually scales up by the configured increment.
autoscaling_policy policy_
auto get_metrics_history(std::size_t count=60) const -> std::vector< scaling_metrics_sample >
Gets historical metrics samples.
auto execute_scaling(const scaling_decision &decision) -> void
Executes a scaling decision.
auto evaluate_now() -> scaling_decision
Manually triggers a scaling evaluation.
std::chrono::steady_clock::time_point last_scale_down_time_
auto start() -> void
Starts the autoscaling monitor thread.
auto stop() -> void
Stops the autoscaling monitor thread.
~autoscaler()
Destructor. Stops the monitor thread if running.
auto scale_down() -> common::VoidResult
Manually scales down by the configured increment.
std::uint64_t last_jobs_completed_
auto reset_stats() -> void
Resets autoscaling statistics.
auto get_current_metrics() const -> scaling_metrics_sample
Collects current metrics from the thread pool.
auto can_scale_up() const -> bool
Checks if scale-up cooldown has elapsed.
auto get_policy() const -> const autoscaling_policy &
Gets the current autoscaling policy.
auto add_workers(std::size_t count) -> common::VoidResult
Adds workers to the pool.
autoscaler(thread_pool &pool, autoscaling_policy policy={})
Constructs an autoscaler for the given thread pool.
auto remove_workers(std::size_t count) -> common::VoidResult
Removes workers from the pool.
auto set_policy(autoscaling_policy policy) -> void
Updates the autoscaling policy.
std::chrono::steady_clock::time_point last_scale_up_time_
auto monitor_loop() -> void
Main monitoring loop running in the background thread.
Snapshot snapshot() const
Get a snapshot of all metrics.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
A thread pool for concurrent execution of jobs using multiple worker threads.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
Core thread pool implementation with work stealing and auto-scaling.
Core threading foundation of the thread system library.
@ up
Scale up (add workers)
@ down
Scale down (remove workers)
@ latency
Latency threshold exceeded.
@ worker_utilization
Worker utilization threshold exceeded.
@ queue_depth
Queue depth threshold exceeded.
Configuration for autoscaling behavior.
std::chrono::seconds scale_down_cooldown
Minimum time between scale-down events.
@ automatic
Fully automatic scaling.
std::chrono::seconds scale_up_cooldown
Minimum time between scale-up events.
std::size_t min_workers
Minimum number of workers (never scale below this)
std::size_t max_workers
Maximum number of workers (never scale above this)
Statistics for autoscaling operations.
std::size_t peak_workers
Peak worker count observed.
std::size_t min_workers
Minimum worker count observed.
Metrics sample for autoscaling decisions.
std::uint64_t jobs_submitted
Jobs submitted since last sample.
std::size_t worker_count
Current number of workers in the pool.
std::chrono::steady_clock::time_point timestamp
Timestamp when this sample was collected.
double queue_depth_per_worker
Jobs per worker ratio.
std::uint64_t jobs_completed
Jobs completed since last sample.
std::size_t queue_depth
Number of jobs waiting in the queue.
double p95_latency_ms
P95 latency in milliseconds.
double throughput_per_second
Throughput in jobs per second.
std::size_t active_workers
Number of workers currently processing jobs.
double utilization
Worker utilization ratio (0.0 - 1.0)
Specialized worker thread that processes jobs from a job_queue.