Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
kcenon::thread::autoscaler Class Reference

Manages automatic scaling of thread pool workers based on load metrics. More...

#include <autoscaler.h>

Collaboration diagram for kcenon::thread::autoscaler:
Collaboration graph

Public Member Functions

 autoscaler (thread_pool &pool, autoscaling_policy policy={})
 Constructs an autoscaler for the given thread pool.
 
 ~autoscaler ()
 Destructor. Stops the monitor thread if running.
 
 autoscaler (const autoscaler &)=delete
 
autoscaleroperator= (const autoscaler &)=delete
 
 autoscaler (autoscaler &&)=delete
 
autoscaleroperator= (autoscaler &&)=delete
 
auto start () -> void
 Starts the autoscaling monitor thread.
 
auto stop () -> void
 Stops the autoscaling monitor thread.
 
auto is_active () const -> bool
 Checks if the autoscaler is currently active.
 
auto evaluate_now () -> scaling_decision
 Manually triggers a scaling evaluation.
 
auto scale_to (std::size_t target_workers) -> common::VoidResult
 Manually scales to a specific worker count.
 
auto scale_up () -> common::VoidResult
 Manually scales up by the configured increment.
 
auto scale_down () -> common::VoidResult
 Manually scales down by the configured increment.
 
auto set_policy (autoscaling_policy policy) -> void
 Updates the autoscaling policy.
 
auto get_policy () const -> const autoscaling_policy &
 Gets the current autoscaling policy.
 
auto get_current_metrics () const -> scaling_metrics_sample
 Collects current metrics from the thread pool.
 
auto get_metrics_history (std::size_t count=60) const -> std::vector< scaling_metrics_sample >
 Gets historical metrics samples.
 
auto get_stats () const -> autoscaling_stats
 Gets autoscaling statistics.
 
auto reset_stats () -> void
 Resets autoscaling statistics.
 

Private Member Functions

auto monitor_loop () -> void
 Main monitoring loop running in the background thread.
 
auto collect_metrics () const -> scaling_metrics_sample
 Collects current metrics from the pool.
 
auto make_decision (const std::vector< scaling_metrics_sample > &samples) const -> scaling_decision
 Makes a scaling decision based on recent samples.
 
auto execute_scaling (const scaling_decision &decision) -> void
 Executes a scaling decision.
 
auto can_scale_up () const -> bool
 Checks if scale-up cooldown has elapsed.
 
auto can_scale_down () const -> bool
 Checks if scale-down cooldown has elapsed.
 
auto add_workers (std::size_t count) -> common::VoidResult
 Adds workers to the pool.
 
auto remove_workers (std::size_t count) -> common::VoidResult
 Removes workers from the pool.
 

Private Attributes

thread_poolpool_
 
autoscaling_policy policy_
 
std::atomic< bool > running_ {false}
 
std::unique_ptr< std::thread > monitor_thread_
 
std::mutex mutex_
 
std::condition_variable cv_
 
std::deque< scaling_metrics_samplemetrics_history_
 
std::mutex history_mutex_
 
std::chrono::steady_clock::time_point last_scale_up_time_
 
std::chrono::steady_clock::time_point last_scale_down_time_
 
autoscaling_stats stats_
 
std::mutex stats_mutex_
 
std::uint64_t last_jobs_completed_ {0}
 
std::uint64_t last_jobs_submitted_ {0}
 
std::chrono::steady_clock::time_point last_sample_time_
 

Detailed Description

Manages automatic scaling of thread pool workers based on load metrics.

The autoscaler monitors thread pool metrics and automatically adjusts the number of workers to match workload demands. It uses a background monitor thread to periodically collect metrics and make scaling decisions.

Design Principles

  • Non-intrusive: Scaling decisions are made asynchronously
  • Configurable: All thresholds and behaviors are customizable
  • Graceful: Scale-down removes workers only when safe
  • Observable: Provides statistics and callbacks for monitoring

State Machine

┌─────────────────────────────────────────────────────────────┐
│ Autoscaler Loop │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Collect │───▶│ Aggregate │───▶│ Make │ │
│ │ Metrics │ │ Samples │ │ Decision │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────────┐ │
│ │ │ Check Cooldown │ │
│ │ └─────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────────┐ │
│ │ │ Execute Scale │ │
│ │ └─────────────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ Sleep │◀───────────────────────────┘ │
│ │ Interval │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

Thread Safety

All public methods are thread-safe and can be called from any thread.

Usage Example

auto pool = std::make_shared<thread_pool>("MyPool");
auto scaler = std::make_shared<autoscaler>(*pool, autoscaling_policy{
.min_workers = 2,
.max_workers = 16,
});
scaler->start();
// ... pool automatically scales ...
scaler->stop();
Configuration for autoscaling behavior.
@ automatic
Fully automatic scaling.
See also
autoscaling_policy
scaling_metrics_sample

Definition at line 94 of file autoscaler.h.

Constructor & Destructor Documentation

◆ autoscaler() [1/3]

kcenon::thread::autoscaler::autoscaler ( thread_pool & pool,
autoscaling_policy policy = {} )
explicit

Constructs an autoscaler for the given thread pool.

Parameters
poolReference to the thread pool to manage.
policyAutoscaling policy configuration.

Definition at line 16 of file autoscaler.cpp.

17 : pool_(pool)
18 , policy_(std::move(policy))
19 , last_sample_time_(std::chrono::steady_clock::now())
20{
21 // Initialize stats with current worker count
22 std::scoped_lock<std::mutex> lock(stats_mutex_);
25}
std::chrono::steady_clock::time_point last_sample_time_
Definition autoscaler.h:292
autoscaling_policy policy_
Definition autoscaler.h:272
autoscaling_stats stats_
Definition autoscaler.h:286
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
std::size_t peak_workers
Peak worker count observed.
std::size_t min_workers
Minimum worker count observed.

References kcenon::thread::thread_pool::get_active_worker_count(), kcenon::thread::autoscaling_stats::min_workers, kcenon::thread::autoscaling_stats::peak_workers, pool_, stats_, and stats_mutex_.

Here is the call graph for this function:

◆ ~autoscaler()

kcenon::thread::autoscaler::~autoscaler ( )

Destructor. Stops the monitor thread if running.

Definition at line 27 of file autoscaler.cpp.

28{
29 stop();
30}
auto stop() -> void
Stops the autoscaling monitor thread.

References stop().

Here is the call graph for this function:

◆ autoscaler() [2/3]

kcenon::thread::autoscaler::autoscaler ( const autoscaler & )
delete

◆ autoscaler() [3/3]

kcenon::thread::autoscaler::autoscaler ( autoscaler && )
delete

Member Function Documentation

◆ add_workers()

auto kcenon::thread::autoscaler::add_workers ( std::size_t count) -> common::VoidResult
private

Adds workers to the pool.

Parameters
countNumber of workers to add.
Returns
Error if operation fails.

Definition at line 542 of file autoscaler.cpp.

543{
544 if (count == 0)
545 {
546 return common::ok();
547 }
548
549 // Get current context from pool
550 const auto& context = pool_.get_context();
551
552 for (std::size_t i = 0; i < count; ++i)
553 {
554 auto worker = std::make_unique<thread_worker>(true, context);
555 auto result = pool_.enqueue(std::move(worker));
556 if (result.is_err())
557 {
558 return result;
559 }
560 }
561
562 return common::ok();
563}
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
auto enqueue(std::unique_ptr< job > &&job) -> common::VoidResult
Enqueues a new job into the shared job_queue.

◆ can_scale_down()

auto kcenon::thread::autoscaler::can_scale_down ( ) const -> bool
nodiscardprivate

Checks if scale-down cooldown has elapsed.

Returns
true if scale-down is allowed.

Definition at line 528 of file autoscaler.cpp.

529{
531 {
532 return false;
533 }
534
535 auto now = std::chrono::steady_clock::now();
536 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
538
539 return since_last >= policy_.scale_down_cooldown;
540}
std::chrono::steady_clock::time_point last_scale_down_time_
Definition autoscaler.h:284
std::chrono::seconds scale_down_cooldown
Minimum time between scale-down events.
std::size_t min_workers
Minimum number of workers (never scale below this)

References kcenon::thread::thread_pool::get_active_worker_count(), last_scale_down_time_, kcenon::thread::autoscaling_policy::min_workers, policy_, pool_, and kcenon::thread::autoscaling_policy::scale_down_cooldown.

Here is the call graph for this function:

◆ can_scale_up()

auto kcenon::thread::autoscaler::can_scale_up ( ) const -> bool
nodiscardprivate

Checks if scale-up cooldown has elapsed.

Returns
true if scale-up is allowed.

Definition at line 514 of file autoscaler.cpp.

515{
517 {
518 return false;
519 }
520
521 auto now = std::chrono::steady_clock::now();
522 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
523 now - last_scale_up_time_);
524
525 return since_last >= policy_.scale_up_cooldown;
526}
std::chrono::steady_clock::time_point last_scale_up_time_
Definition autoscaler.h:283
std::chrono::seconds scale_up_cooldown
Minimum time between scale-up events.
std::size_t max_workers
Maximum number of workers (never scale above this)

References kcenon::thread::thread_pool::get_active_worker_count(), last_scale_up_time_, kcenon::thread::autoscaling_policy::max_workers, policy_, pool_, and kcenon::thread::autoscaling_policy::scale_up_cooldown.

Here is the call graph for this function:

◆ collect_metrics()

auto kcenon::thread::autoscaler::collect_metrics ( ) const -> scaling_metrics_sample
nodiscardprivate

Collects current metrics from the pool.

Returns
Collected metrics sample.

Definition at line 289 of file autoscaler.cpp.

290{
291 auto now = std::chrono::steady_clock::now();
292
293 scaling_metrics_sample sample;
294 sample.timestamp = now;
295 sample.worker_count = pool_.get_active_worker_count();
296 sample.active_workers = sample.worker_count - pool_.get_idle_worker_count();
297 sample.queue_depth = pool_.get_pending_task_count();
298
299 // Calculate utilization
300 if (sample.worker_count > 0)
301 {
302 sample.utilization = static_cast<double>(sample.active_workers) /
303 static_cast<double>(sample.worker_count);
304 sample.queue_depth_per_worker = static_cast<double>(sample.queue_depth) /
305 static_cast<double>(sample.worker_count);
306 }
307
308 // Get metrics from pool
309 auto metrics_snapshot = pool_.metrics().snapshot();
310 sample.jobs_completed = metrics_snapshot.tasks_executed;
311 sample.jobs_submitted = metrics_snapshot.tasks_submitted;
312
313 // Calculate throughput if we have a previous sample
314 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
315 now - last_sample_time_).count();
316 if (duration > 0 && sample.jobs_completed >= last_jobs_completed_)
317 {
318 std::uint64_t jobs_delta = sample.jobs_completed - last_jobs_completed_;
319 sample.throughput_per_second = static_cast<double>(jobs_delta) * 1000.0 /
320 static_cast<double>(duration);
321 }
322
323 // Get P95 latency from enhanced metrics if available
324 // Note: Using P99 wait time as closest approximation to P95
326 {
327 auto enhanced_snapshot = pool_.enhanced_metrics_snapshot();
328 // Convert from microseconds to milliseconds
329 sample.p95_latency_ms = enhanced_snapshot.wait_time_p99_us / 1000.0;
330 }
331
332 // Update cached values for next sample
333 const_cast<autoscaler*>(this)->last_jobs_completed_ = sample.jobs_completed;
334 const_cast<autoscaler*>(this)->last_jobs_submitted_ = sample.jobs_submitted;
335 const_cast<autoscaler*>(this)->last_sample_time_ = now;
336
337 return sample;
338}
std::uint64_t last_jobs_submitted_
Definition autoscaler.h:291
std::uint64_t last_jobs_completed_
Definition autoscaler.h:290
autoscaler(thread_pool &pool, autoscaling_policy policy={})
Constructs an autoscaler for the given thread pool.
Snapshot snapshot() const
Get a snapshot of all metrics.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
double wait_time_p99_us
99th percentile wait time in microseconds.

References kcenon::thread::scaling_metrics_sample::active_workers, kcenon::thread::thread_pool::enhanced_metrics_snapshot(), kcenon::thread::thread_pool::get_active_worker_count(), kcenon::thread::thread_pool::get_idle_worker_count(), kcenon::thread::thread_pool::get_pending_task_count(), kcenon::thread::thread_pool::is_enhanced_metrics_enabled(), kcenon::thread::scaling_metrics_sample::jobs_completed, kcenon::thread::scaling_metrics_sample::jobs_submitted, last_jobs_completed_, last_jobs_submitted_, last_sample_time_, kcenon::thread::thread_pool::metrics(), kcenon::thread::scaling_metrics_sample::p95_latency_ms, pool_, kcenon::thread::scaling_metrics_sample::queue_depth, kcenon::thread::scaling_metrics_sample::queue_depth_per_worker, kcenon::thread::metrics::ThreadPoolMetrics::snapshot(), kcenon::thread::scaling_metrics_sample::throughput_per_second, kcenon::thread::scaling_metrics_sample::timestamp, kcenon::thread::scaling_metrics_sample::utilization, and kcenon::thread::scaling_metrics_sample::worker_count.

Referenced by get_current_metrics().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ evaluate_now()

auto kcenon::thread::autoscaler::evaluate_now ( ) -> scaling_decision
nodiscard

Manually triggers a scaling evaluation.

Returns
The scaling decision that would be made.

This does not actually execute the scaling; use scale_to() or scale_up()/scale_down() to actually modify worker count.

Definition at line 75 of file autoscaler.cpp.

76{
77 // Collect current metrics
78 auto sample = collect_metrics();
79
80 // Add to history
81 {
82 std::scoped_lock<std::mutex> lock(history_mutex_);
83 metrics_history_.push_back(sample);
84 if (metrics_history_.size() > 60)
85 {
86 metrics_history_.pop_front();
87 }
88 }
89
90 // Get recent samples for decision
91 std::vector<scaling_metrics_sample> samples;
92 {
93 std::scoped_lock<std::mutex> lock(history_mutex_);
94 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
95 samples.reserve(count);
96 auto it = metrics_history_.end();
97 std::advance(it, -static_cast<std::ptrdiff_t>(count));
98 for (; it != metrics_history_.end(); ++it)
99 {
100 samples.push_back(*it);
101 }
102 }
103
104 // Make decision
105 return make_decision(samples);
106}
auto collect_metrics() const -> scaling_metrics_sample
Collects current metrics from the pool.
auto make_decision(const std::vector< scaling_metrics_sample > &samples) const -> scaling_decision
Makes a scaling decision based on recent samples.
std::deque< scaling_metrics_sample > metrics_history_
Definition autoscaler.h:280
std::size_t samples_for_decision
Number of samples to aggregate before making a decision.

◆ execute_scaling()

auto kcenon::thread::autoscaler::execute_scaling ( const scaling_decision & decision) -> void
private

Executes a scaling decision.

Parameters
decisionThe decision to execute.

Definition at line 465 of file autoscaler.cpp.

466{
467 std::size_t current_workers = pool_.get_active_worker_count();
468 auto now = std::chrono::steady_clock::now();
469
470 if (decision.direction == scaling_direction::up)
471 {
472 auto result = add_workers(decision.target_workers - current_workers);
473 if (result.is_ok())
474 {
476
477 std::scoped_lock<std::mutex> lock(stats_mutex_);
479 stats_.last_scale_up = now;
480
482 {
485 decision.reason,
486 current_workers,
487 decision.target_workers);
488 }
489 }
490 }
491 else if (decision.direction == scaling_direction::down)
492 {
493 auto result = remove_workers(current_workers - decision.target_workers);
494 if (result.is_ok())
495 {
497
498 std::scoped_lock<std::mutex> lock(stats_mutex_);
501
503 {
506 decision.reason,
507 current_workers,
508 decision.target_workers);
509 }
510 }
511 }
512}
auto add_workers(std::size_t count) -> common::VoidResult
Adds workers to the pool.
auto remove_workers(std::size_t count) -> common::VoidResult
Removes workers from the pool.
@ up
Scale up (add workers)
@ down
Scale down (remove workers)
std::function< void(scaling_direction, scaling_reason, std::size_t, std::size_t)> scaling_callback
Callback invoked on scaling events Parameters: direction, reason, from_count, to_count.
std::size_t scale_up_count
Number of scale-up events.
std::chrono::steady_clock::time_point last_scale_down
Time of last scale-down event.
std::chrono::steady_clock::time_point last_scale_up
Time of last scale-up event.
std::size_t scale_down_count
Number of scale-down events.

References kcenon::thread::down, kcenon::thread::result< T >::is_ok(), and kcenon::thread::up.

Here is the call graph for this function:

◆ get_current_metrics()

auto kcenon::thread::autoscaler::get_current_metrics ( ) const -> scaling_metrics_sample
nodiscard

Collects current metrics from the thread pool.

Returns
Current metrics sample.

Definition at line 165 of file autoscaler.cpp.

166{
167 return collect_metrics();
168}

References collect_metrics().

Here is the call graph for this function:

◆ get_metrics_history()

auto kcenon::thread::autoscaler::get_metrics_history ( std::size_t count = 60) const -> std::vector<scaling_metrics_sample>
nodiscard

Gets historical metrics samples.

Parameters
countMaximum number of samples to return.
Returns
Vector of recent metrics samples.

Definition at line 170 of file autoscaler.cpp.

172{
173 std::scoped_lock<std::mutex> lock(history_mutex_);
174
175 std::vector<scaling_metrics_sample> result;
176 std::size_t actual_count = std::min(count, metrics_history_.size());
177 result.reserve(actual_count);
178
179 auto it = metrics_history_.end();
180 std::advance(it, -static_cast<std::ptrdiff_t>(actual_count));
181 for (; it != metrics_history_.end(); ++it)
182 {
183 result.push_back(*it);
184 }
185
186 return result;
187}

◆ get_policy()

auto kcenon::thread::autoscaler::get_policy ( ) const -> const autoscaling_policy&
nodiscard

Gets the current autoscaling policy.

Returns
Const reference to the policy.

Definition at line 160 of file autoscaler.cpp.

161{
162 return policy_;
163}

References policy_.

◆ get_stats()

auto kcenon::thread::autoscaler::get_stats ( ) const -> autoscaling_stats
nodiscard

Gets autoscaling statistics.

Returns
Statistics about scaling operations.

Definition at line 189 of file autoscaler.cpp.

190{
191 std::scoped_lock<std::mutex> lock(stats_mutex_);
192 return stats_;
193}

References stats_, and stats_mutex_.

◆ is_active()

auto kcenon::thread::autoscaler::is_active ( ) const -> bool
nodiscard

Checks if the autoscaler is currently active.

Returns
true if the monitor thread is running.

Definition at line 70 of file autoscaler.cpp.

71{
72 return running_.load(std::memory_order_acquire);
73}
std::atomic< bool > running_
Definition autoscaler.h:274

References running_.

◆ make_decision()

auto kcenon::thread::autoscaler::make_decision ( const std::vector< scaling_metrics_sample > & samples) const -> scaling_decision
nodiscardprivate

Makes a scaling decision based on recent samples.

Parameters
samplesRecent metrics samples.
Returns
Scaling decision.

Definition at line 340 of file autoscaler.cpp.

342{
343 if (samples.empty())
344 {
345 return scaling_decision{};
346 }
347
348 // Calculate average metrics from samples
349 double avg_utilization = 0.0;
350 double avg_queue_depth_per_worker = 0.0;
351 double avg_latency = 0.0;
352 std::size_t avg_queue_depth = 0;
353
354 for (const auto& sample : samples)
355 {
356 avg_utilization += sample.utilization;
357 avg_queue_depth_per_worker += sample.queue_depth_per_worker;
358 avg_latency += sample.p95_latency_ms;
359 avg_queue_depth += sample.queue_depth;
360 }
361
362 auto sample_count = static_cast<double>(samples.size());
363 avg_utilization /= sample_count;
364 avg_queue_depth_per_worker /= sample_count;
365 avg_latency /= sample_count;
366 avg_queue_depth /= samples.size();
367
368 std::size_t current_workers = pool_.get_active_worker_count();
369
370 // Check scale-up triggers (ANY trigger)
371 if (can_scale_up())
372 {
373 if (avg_utilization > policy_.scale_up.utilization_threshold)
374 {
375 std::size_t target = std::min(
376 current_workers + policy_.scale_up_increment,
378
379 return scaling_decision{
380 .direction = scaling_direction::up,
382 .target_workers = target,
384 "Utilization {:.1f}% exceeds threshold {:.1f}%",
385 avg_utilization * 100, policy_.scale_up.utilization_threshold * 100)
386 };
387 }
388
389 if (avg_queue_depth_per_worker > policy_.scale_up.queue_depth_threshold)
390 {
391 std::size_t target = std::min(
392 current_workers + policy_.scale_up_increment,
394
395 return scaling_decision{
396 .direction = scaling_direction::up,
398 .target_workers = target,
400 "Queue depth per worker {:.1f} exceeds threshold {:.1f}",
401 avg_queue_depth_per_worker, policy_.scale_up.queue_depth_threshold)
402 };
403 }
404
405 if (avg_latency > policy_.scale_up.latency_threshold_ms && avg_latency > 0)
406 {
407 std::size_t target = std::min(
408 current_workers + policy_.scale_up_increment,
410
411 return scaling_decision{
412 .direction = scaling_direction::up,
413 .reason = scaling_reason::latency,
414 .target_workers = target,
416 "P95 latency {:.1f}ms exceeds threshold {:.1f}ms",
418 };
419 }
420
421 if (avg_queue_depth > policy_.scale_up.pending_jobs_threshold)
422 {
423 std::size_t target = std::min(
424 current_workers + policy_.scale_up_increment,
426
427 return scaling_decision{
428 .direction = scaling_direction::up,
430 .target_workers = target,
432 "Queue depth {} exceeds threshold {}",
433 avg_queue_depth, policy_.scale_up.pending_jobs_threshold)
434 };
435 }
436 }
437
438 // Check scale-down triggers (ALL triggers)
439 if (can_scale_down() && current_workers > policy_.min_workers)
440 {
441 bool utilization_ok = avg_utilization < policy_.scale_down.utilization_threshold;
442 bool queue_depth_ok = avg_queue_depth_per_worker < policy_.scale_down.queue_depth_threshold;
443
444 if (utilization_ok && queue_depth_ok)
445 {
446 std::size_t target = std::max(
447 current_workers - policy_.scale_down_increment,
449
450 return scaling_decision{
451 .direction = scaling_direction::down,
453 .target_workers = target,
455 "Utilization {:.1f}% below threshold {:.1f}%, queue depth {:.1f} below {:.1f}",
456 avg_utilization * 100, policy_.scale_down.utilization_threshold * 100,
457 avg_queue_depth_per_worker, policy_.scale_down.queue_depth_threshold)
458 };
459 }
460 }
461
462 return scaling_decision{};
463}
auto can_scale_down() const -> bool
Checks if scale-down cooldown has elapsed.
auto can_scale_up() const -> bool
Checks if scale-up cooldown has elapsed.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
@ latency
Latency threshold exceeded.
@ worker_utilization
Worker utilization threshold exceeded.
@ queue_depth
Queue depth threshold exceeded.
double queue_depth_threshold
Jobs per worker threshold (scale down when below)
double utilization_threshold
Worker utilization threshold (0.0 - 1.0, scale down when below)
double latency_threshold_ms
P95 latency threshold in milliseconds (scale up when exceeded)
double utilization_threshold
Worker utilization threshold (0.0 - 1.0, scale up when exceeded)
double queue_depth_threshold
Jobs per worker threshold (scale up when exceeded)
std::size_t pending_jobs_threshold
Absolute pending jobs threshold (scale up when exceeded)
scale_down_config scale_down
Scale-down trigger configuration.
scale_up_config scale_up
Scale-up trigger configuration.
std::size_t scale_down_increment
Number of workers to remove per scale-down event.
std::size_t scale_up_increment
Number of workers to add per scale-up event.

References kcenon::thread::down, utility_module::formatter::format(), kcenon::thread::latency, kcenon::thread::queue_depth, kcenon::thread::up, and kcenon::thread::worker_utilization.

Here is the call graph for this function:

◆ monitor_loop()

auto kcenon::thread::autoscaler::monitor_loop ( ) -> void
private

Main monitoring loop running in the background thread.

Definition at line 203 of file autoscaler.cpp.

204{
205 while (running_.load(std::memory_order_acquire))
206 {
207 // Wait for sample interval
208 {
209 std::unique_lock<std::mutex> lock(mutex_);
210 cv_.wait_for(lock, policy_.sample_interval, [this]() {
211 return !running_.load(std::memory_order_acquire);
212 });
213 }
214
215 if (!running_.load(std::memory_order_acquire))
216 {
217 break;
218 }
219
220 // Skip if pool is not running
221 if (!pool_.is_running())
222 {
223 continue;
224 }
225
226 // Collect metrics
227 auto sample = collect_metrics();
228
229 // Add to history
230 {
231 std::scoped_lock<std::mutex> lock(history_mutex_);
232 metrics_history_.push_back(sample);
233
234 // Keep max 60 samples (1 minute at 1s interval)
235 while (metrics_history_.size() > 60)
236 {
237 metrics_history_.pop_front();
238 }
239 }
240
241 // Only auto-scale in automatic mode
243 {
244 continue;
245 }
246
247 // Collect samples for decision
248 std::vector<scaling_metrics_sample> samples;
249 {
250 std::scoped_lock<std::mutex> lock(history_mutex_);
251 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
252 if (count < policy_.samples_for_decision)
253 {
254 // Not enough samples yet
255 continue;
256 }
257
258 samples.reserve(count);
259 auto it = metrics_history_.end();
260 std::advance(it, -static_cast<std::ptrdiff_t>(count));
261 for (; it != metrics_history_.end(); ++it)
262 {
263 samples.push_back(*it);
264 }
265 }
266
267 // Make and execute decision
268 auto decision = make_decision(samples);
269 if (decision.should_scale())
270 {
271 execute_scaling(decision);
272 }
273
274 // Update stats
275 {
276 std::scoped_lock<std::mutex> lock(stats_mutex_);
278
279 std::size_t current = pool_.get_active_worker_count();
280 stats_.peak_workers = std::max(stats_.peak_workers, current);
281 if (stats_.min_workers == 0 || current < stats_.min_workers)
282 {
283 stats_.min_workers = current;
284 }
285 }
286 }
287}
std::condition_variable cv_
Definition autoscaler.h:278
auto execute_scaling(const scaling_decision &decision) -> void
Executes a scaling decision.
auto is_running() const -> bool
Check if the thread pool is currently running.
std::chrono::milliseconds sample_interval
Interval between metric samples.
std::size_t decisions_evaluated
Number of decisions evaluated.

References kcenon::thread::autoscaling_policy::automatic.

◆ operator=() [1/2]

autoscaler & kcenon::thread::autoscaler::operator= ( autoscaler && )
delete

◆ operator=() [2/2]

autoscaler & kcenon::thread::autoscaler::operator= ( const autoscaler & )
delete

◆ remove_workers()

auto kcenon::thread::autoscaler::remove_workers ( std::size_t count) -> common::VoidResult
private

Removes workers from the pool.

Parameters
countNumber of workers to remove.
Returns
Error if operation fails.

Definition at line 565 of file autoscaler.cpp.

566{
567 if (count == 0)
568 {
569 return common::ok();
570 }
571
572 // Request pool to remove workers using internal method
573 // This will gracefully stop idle workers
574 auto result = pool_.remove_workers_internal(count, policy_.min_workers);
575 return result;
576}
auto remove_workers_internal(std::size_t count, std::size_t min_workers=1) -> common::VoidResult
Internal method to remove workers from the pool.

◆ reset_stats()

auto kcenon::thread::autoscaler::reset_stats ( ) -> void

Resets autoscaling statistics.

Definition at line 195 of file autoscaler.cpp.

196{
197 std::scoped_lock<std::mutex> lock(stats_mutex_);
198 stats_ = autoscaling_stats{};
201}

References kcenon::thread::autoscaling_stats::min_workers.

◆ scale_down()

auto kcenon::thread::autoscaler::scale_down ( ) -> common::VoidResult

Manually scales down by the configured increment.

Returns
Error if scaling fails.

Definition at line 143 of file autoscaler.cpp.

144{
145 std::size_t current = pool_.get_active_worker_count();
146 std::size_t target = current > policy_.scale_down_increment
147 ? current - policy_.scale_down_increment
149
150 target = std::max(target, policy_.min_workers);
151 return scale_to(target);
152}
auto scale_to(std::size_t target_workers) -> common::VoidResult
Manually scales to a specific worker count.

◆ scale_to()

auto kcenon::thread::autoscaler::scale_to ( std::size_t target_workers) -> common::VoidResult

Manually scales to a specific worker count.

Parameters
target_workersDesired number of workers.
Returns
Error if scaling fails.

The target is clamped to [min_workers, max_workers] from the policy.

Definition at line 108 of file autoscaler.cpp.

109{
110 // Clamp to policy bounds
111 target_workers = std::clamp(target_workers, policy_.min_workers, policy_.max_workers);
112
113 std::size_t current_workers = pool_.get_active_worker_count();
114
115 if (target_workers > current_workers)
116 {
117 return add_workers(target_workers - current_workers);
118 }
119 else if (target_workers < current_workers)
120 {
121 return remove_workers(current_workers - target_workers);
122 }
123
124 return common::ok();
125}

◆ scale_up()

auto kcenon::thread::autoscaler::scale_up ( ) -> common::VoidResult

Manually scales up by the configured increment.

Returns
Error if scaling fails.

Definition at line 127 of file autoscaler.cpp.

128{
129 std::size_t current = pool_.get_active_worker_count();
130 std::size_t increment = policy_.use_multiplicative_scaling
131 ? static_cast<std::size_t>(current * (policy_.scale_up_factor - 1.0))
132 : policy_.scale_up_increment;
133
134 if (increment == 0)
135 {
136 increment = 1;
137 }
138
139 std::size_t target = std::min(current + increment, policy_.max_workers);
140 return scale_to(target);
141}
double scale_up_factor
Multiplicative factor for scaling (used if use_multiplicative_scaling is true)
bool use_multiplicative_scaling
Whether to use multiplicative scaling instead of additive.

◆ set_policy()

auto kcenon::thread::autoscaler::set_policy ( autoscaling_policy policy) -> void

Updates the autoscaling policy.

Parameters
policyNew policy configuration.

Definition at line 154 of file autoscaler.cpp.

155{
156 std::scoped_lock<std::mutex> lock(mutex_);
157 policy_ = std::move(policy);
158}

◆ start()

auto kcenon::thread::autoscaler::start ( ) -> void

Starts the autoscaling monitor thread.

The monitor thread periodically collects metrics and makes scaling decisions based on the configured policy.

Definition at line 32 of file autoscaler.cpp.

33{
34 bool expected = false;
35 if (!running_.compare_exchange_strong(expected, true))
36 {
37 // Already running
38 return;
39 }
40
41 // Start monitor thread
42 monitor_thread_ = std::make_unique<std::thread>([this]() {
44 });
45}
std::unique_ptr< std::thread > monitor_thread_
Definition autoscaler.h:275
auto monitor_loop() -> void
Main monitoring loop running in the background thread.

◆ stop()

auto kcenon::thread::autoscaler::stop ( ) -> void

Stops the autoscaling monitor thread.

Waits for the monitor thread to complete before returning.

Definition at line 47 of file autoscaler.cpp.

48{
49 bool expected = true;
50 if (!running_.compare_exchange_strong(expected, false))
51 {
52 // Already stopped
53 return;
54 }
55
56 // Wake up monitor thread
57 {
58 std::lock_guard<std::mutex> lock(mutex_);
59 cv_.notify_one();
60 }
61
62 // Wait for thread to complete
63 if (monitor_thread_ && monitor_thread_->joinable())
64 {
65 monitor_thread_->join();
66 }
67 monitor_thread_.reset();
68}

Referenced by ~autoscaler().

Here is the caller graph for this function:

Member Data Documentation

◆ cv_

std::condition_variable kcenon::thread::autoscaler::cv_
private

Definition at line 278 of file autoscaler.h.

◆ history_mutex_

std::mutex kcenon::thread::autoscaler::history_mutex_
mutableprivate

Definition at line 281 of file autoscaler.h.

◆ last_jobs_completed_

std::uint64_t kcenon::thread::autoscaler::last_jobs_completed_ {0}
private

Definition at line 290 of file autoscaler.h.

290{0};

Referenced by collect_metrics().

◆ last_jobs_submitted_

std::uint64_t kcenon::thread::autoscaler::last_jobs_submitted_ {0}
private

Definition at line 291 of file autoscaler.h.

291{0};

Referenced by collect_metrics().

◆ last_sample_time_

std::chrono::steady_clock::time_point kcenon::thread::autoscaler::last_sample_time_
private

Definition at line 292 of file autoscaler.h.

Referenced by collect_metrics().

◆ last_scale_down_time_

std::chrono::steady_clock::time_point kcenon::thread::autoscaler::last_scale_down_time_
private

Definition at line 284 of file autoscaler.h.

Referenced by can_scale_down().

◆ last_scale_up_time_

std::chrono::steady_clock::time_point kcenon::thread::autoscaler::last_scale_up_time_
private

Definition at line 283 of file autoscaler.h.

Referenced by can_scale_up().

◆ metrics_history_

std::deque<scaling_metrics_sample> kcenon::thread::autoscaler::metrics_history_
private

Definition at line 280 of file autoscaler.h.

◆ monitor_thread_

std::unique_ptr<std::thread> kcenon::thread::autoscaler::monitor_thread_
private

Definition at line 275 of file autoscaler.h.

◆ mutex_

std::mutex kcenon::thread::autoscaler::mutex_
mutableprivate

Definition at line 277 of file autoscaler.h.

◆ policy_

autoscaling_policy kcenon::thread::autoscaler::policy_
private

Definition at line 272 of file autoscaler.h.

Referenced by can_scale_down(), can_scale_up(), and get_policy().

◆ pool_

thread_pool& kcenon::thread::autoscaler::pool_
private

Definition at line 271 of file autoscaler.h.

Referenced by autoscaler(), can_scale_down(), can_scale_up(), and collect_metrics().

◆ running_

std::atomic<bool> kcenon::thread::autoscaler::running_ {false}
private

Definition at line 274 of file autoscaler.h.

274{false};

Referenced by is_active().

◆ stats_

autoscaling_stats kcenon::thread::autoscaler::stats_
private

Definition at line 286 of file autoscaler.h.

Referenced by autoscaler(), and get_stats().

◆ stats_mutex_

std::mutex kcenon::thread::autoscaler::stats_mutex_
mutableprivate

Definition at line 287 of file autoscaler.h.

Referenced by autoscaler(), and get_stats().


The documentation for this class was generated from the following files: