Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
autoscaler.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
9
10#include <algorithm>
11#include <numeric>
12
13namespace kcenon::thread
14{
15
17 : pool_(pool)
18 , policy_(std::move(policy))
19 , last_sample_time_(std::chrono::steady_clock::now())
20{
21 // Initialize stats with current worker count
22 std::scoped_lock<std::mutex> lock(stats_mutex_);
25}
26
31
32auto autoscaler::start() -> void
33{
34 bool expected = false;
35 if (!running_.compare_exchange_strong(expected, true))
36 {
37 // Already running
38 return;
39 }
40
41 // Start monitor thread
42 monitor_thread_ = std::make_unique<std::thread>([this]() {
43 monitor_loop();
44 });
45}
46
47auto autoscaler::stop() -> void
48{
49 bool expected = true;
50 if (!running_.compare_exchange_strong(expected, false))
51 {
52 // Already stopped
53 return;
54 }
55
56 // Wake up monitor thread
57 {
58 std::lock_guard<std::mutex> lock(mutex_);
59 cv_.notify_one();
60 }
61
62 // Wait for thread to complete
63 if (monitor_thread_ && monitor_thread_->joinable())
64 {
65 monitor_thread_->join();
66 }
67 monitor_thread_.reset();
68}
69
70auto autoscaler::is_active() const -> bool
71{
72 return running_.load(std::memory_order_acquire);
73}
74
76{
77 // Collect current metrics
78 auto sample = collect_metrics();
79
80 // Add to history
81 {
82 std::scoped_lock<std::mutex> lock(history_mutex_);
83 metrics_history_.push_back(sample);
84 if (metrics_history_.size() > 60)
85 {
86 metrics_history_.pop_front();
87 }
88 }
89
90 // Get recent samples for decision
91 std::vector<scaling_metrics_sample> samples;
92 {
93 std::scoped_lock<std::mutex> lock(history_mutex_);
94 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
95 samples.reserve(count);
96 auto it = metrics_history_.end();
97 std::advance(it, -static_cast<std::ptrdiff_t>(count));
98 for (; it != metrics_history_.end(); ++it)
99 {
100 samples.push_back(*it);
101 }
102 }
103
104 // Make decision
105 return make_decision(samples);
106}
107
108auto autoscaler::scale_to(std::size_t target_workers) -> common::VoidResult
109{
110 // Clamp to policy bounds
111 target_workers = std::clamp(target_workers, policy_.min_workers, policy_.max_workers);
112
113 std::size_t current_workers = pool_.get_active_worker_count();
114
115 if (target_workers > current_workers)
116 {
117 return add_workers(target_workers - current_workers);
118 }
119 else if (target_workers < current_workers)
120 {
121 return remove_workers(current_workers - target_workers);
122 }
123
124 return common::ok();
125}
126
127auto autoscaler::scale_up() -> common::VoidResult
128{
129 std::size_t current = pool_.get_active_worker_count();
130 std::size_t increment = policy_.use_multiplicative_scaling
131 ? static_cast<std::size_t>(current * (policy_.scale_up_factor - 1.0))
132 : policy_.scale_up_increment;
133
134 if (increment == 0)
135 {
136 increment = 1;
137 }
138
139 std::size_t target = std::min(current + increment, policy_.max_workers);
140 return scale_to(target);
141}
142
143auto autoscaler::scale_down() -> common::VoidResult
144{
145 std::size_t current = pool_.get_active_worker_count();
146 std::size_t target = current > policy_.scale_down_increment
147 ? current - policy_.scale_down_increment
148 : policy_.min_workers;
149
150 target = std::max(target, policy_.min_workers);
151 return scale_to(target);
152}
153
155{
156 std::scoped_lock<std::mutex> lock(mutex_);
157 policy_ = std::move(policy);
158}
159
161{
162 return policy_;
163}
164
169
170auto autoscaler::get_metrics_history(std::size_t count) const
171 -> std::vector<scaling_metrics_sample>
172{
173 std::scoped_lock<std::mutex> lock(history_mutex_);
174
175 std::vector<scaling_metrics_sample> result;
176 std::size_t actual_count = std::min(count, metrics_history_.size());
177 result.reserve(actual_count);
178
179 auto it = metrics_history_.end();
180 std::advance(it, -static_cast<std::ptrdiff_t>(actual_count));
181 for (; it != metrics_history_.end(); ++it)
182 {
183 result.push_back(*it);
184 }
185
186 return result;
187}
188
190{
191 std::scoped_lock<std::mutex> lock(stats_mutex_);
192 return stats_;
193}
194
196{
197 std::scoped_lock<std::mutex> lock(stats_mutex_);
198 stats_ = autoscaling_stats{};
199 stats_.min_workers = pool_.get_active_worker_count();
200 stats_.peak_workers = stats_.min_workers;
201}
202
204{
205 while (running_.load(std::memory_order_acquire))
206 {
207 // Wait for sample interval
208 {
209 std::unique_lock<std::mutex> lock(mutex_);
210 cv_.wait_for(lock, policy_.sample_interval, [this]() {
211 return !running_.load(std::memory_order_acquire);
212 });
213 }
214
215 if (!running_.load(std::memory_order_acquire))
216 {
217 break;
218 }
219
220 // Skip if pool is not running
221 if (!pool_.is_running())
222 {
223 continue;
224 }
225
226 // Collect metrics
227 auto sample = collect_metrics();
228
229 // Add to history
230 {
231 std::scoped_lock<std::mutex> lock(history_mutex_);
232 metrics_history_.push_back(sample);
233
234 // Keep max 60 samples (1 minute at 1s interval)
235 while (metrics_history_.size() > 60)
236 {
237 metrics_history_.pop_front();
238 }
239 }
240
241 // Only auto-scale in automatic mode
242 if (policy_.scaling_mode != autoscaling_policy::mode::automatic)
243 {
244 continue;
245 }
246
247 // Collect samples for decision
248 std::vector<scaling_metrics_sample> samples;
249 {
250 std::scoped_lock<std::mutex> lock(history_mutex_);
251 std::size_t count = std::min(metrics_history_.size(), policy_.samples_for_decision);
252 if (count < policy_.samples_for_decision)
253 {
254 // Not enough samples yet
255 continue;
256 }
257
258 samples.reserve(count);
259 auto it = metrics_history_.end();
260 std::advance(it, -static_cast<std::ptrdiff_t>(count));
261 for (; it != metrics_history_.end(); ++it)
262 {
263 samples.push_back(*it);
264 }
265 }
266
267 // Make and execute decision
268 auto decision = make_decision(samples);
269 if (decision.should_scale())
270 {
271 execute_scaling(decision);
272 }
273
274 // Update stats
275 {
276 std::scoped_lock<std::mutex> lock(stats_mutex_);
277 ++stats_.decisions_evaluated;
278
279 std::size_t current = pool_.get_active_worker_count();
280 stats_.peak_workers = std::max(stats_.peak_workers, current);
281 if (stats_.min_workers == 0 || current < stats_.min_workers)
282 {
283 stats_.min_workers = current;
284 }
285 }
286 }
287}
288
290{
291 auto now = std::chrono::steady_clock::now();
292
294 sample.timestamp = now;
298
299 // Calculate utilization
300 if (sample.worker_count > 0)
301 {
302 sample.utilization = static_cast<double>(sample.active_workers) /
303 static_cast<double>(sample.worker_count);
304 sample.queue_depth_per_worker = static_cast<double>(sample.queue_depth) /
305 static_cast<double>(sample.worker_count);
306 }
307
308 // Get metrics from pool
309 auto metrics_snapshot = pool_.metrics().snapshot();
310 sample.jobs_completed = metrics_snapshot.tasks_executed;
311 sample.jobs_submitted = metrics_snapshot.tasks_submitted;
312
313 // Calculate throughput if we have a previous sample
314 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
315 now - last_sample_time_).count();
316 if (duration > 0 && sample.jobs_completed >= last_jobs_completed_)
317 {
318 std::uint64_t jobs_delta = sample.jobs_completed - last_jobs_completed_;
319 sample.throughput_per_second = static_cast<double>(jobs_delta) * 1000.0 /
320 static_cast<double>(duration);
321 }
322
323 // Get P95 latency from enhanced metrics if available
324 // Note: Using P99 wait time as closest approximation to P95
326 {
327 auto enhanced_snapshot = pool_.enhanced_metrics_snapshot();
328 // Convert from microseconds to milliseconds
329 sample.p95_latency_ms = enhanced_snapshot.wait_time_p99_us / 1000.0;
330 }
331
332 // Update cached values for next sample
333 const_cast<autoscaler*>(this)->last_jobs_completed_ = sample.jobs_completed;
334 const_cast<autoscaler*>(this)->last_jobs_submitted_ = sample.jobs_submitted;
335 const_cast<autoscaler*>(this)->last_sample_time_ = now;
336
337 return sample;
338}
339
340auto autoscaler::make_decision(const std::vector<scaling_metrics_sample>& samples) const
342{
343 if (samples.empty())
344 {
345 return scaling_decision{};
346 }
347
348 // Calculate average metrics from samples
349 double avg_utilization = 0.0;
350 double avg_queue_depth_per_worker = 0.0;
351 double avg_latency = 0.0;
352 std::size_t avg_queue_depth = 0;
353
354 for (const auto& sample : samples)
355 {
356 avg_utilization += sample.utilization;
357 avg_queue_depth_per_worker += sample.queue_depth_per_worker;
358 avg_latency += sample.p95_latency_ms;
359 avg_queue_depth += sample.queue_depth;
360 }
361
362 auto sample_count = static_cast<double>(samples.size());
363 avg_utilization /= sample_count;
364 avg_queue_depth_per_worker /= sample_count;
365 avg_latency /= sample_count;
366 avg_queue_depth /= samples.size();
367
368 std::size_t current_workers = pool_.get_active_worker_count();
369
370 // Check scale-up triggers (ANY trigger)
371 if (can_scale_up())
372 {
373 if (avg_utilization > policy_.scale_up.utilization_threshold)
374 {
375 std::size_t target = std::min(
376 current_workers + policy_.scale_up_increment,
377 policy_.max_workers);
378
379 return scaling_decision{
380 .direction = scaling_direction::up,
382 .target_workers = target,
384 "Utilization {:.1f}% exceeds threshold {:.1f}%",
385 avg_utilization * 100, policy_.scale_up.utilization_threshold * 100)
386 };
387 }
388
389 if (avg_queue_depth_per_worker > policy_.scale_up.queue_depth_threshold)
390 {
391 std::size_t target = std::min(
392 current_workers + policy_.scale_up_increment,
393 policy_.max_workers);
394
395 return scaling_decision{
396 .direction = scaling_direction::up,
398 .target_workers = target,
400 "Queue depth per worker {:.1f} exceeds threshold {:.1f}",
401 avg_queue_depth_per_worker, policy_.scale_up.queue_depth_threshold)
402 };
403 }
404
405 if (avg_latency > policy_.scale_up.latency_threshold_ms && avg_latency > 0)
406 {
407 std::size_t target = std::min(
408 current_workers + policy_.scale_up_increment,
409 policy_.max_workers);
410
411 return scaling_decision{
412 .direction = scaling_direction::up,
413 .reason = scaling_reason::latency,
414 .target_workers = target,
416 "P95 latency {:.1f}ms exceeds threshold {:.1f}ms",
417 avg_latency, policy_.scale_up.latency_threshold_ms)
418 };
419 }
420
421 if (avg_queue_depth > policy_.scale_up.pending_jobs_threshold)
422 {
423 std::size_t target = std::min(
424 current_workers + policy_.scale_up_increment,
425 policy_.max_workers);
426
427 return scaling_decision{
428 .direction = scaling_direction::up,
430 .target_workers = target,
432 "Queue depth {} exceeds threshold {}",
433 avg_queue_depth, policy_.scale_up.pending_jobs_threshold)
434 };
435 }
436 }
437
438 // Check scale-down triggers (ALL triggers)
439 if (can_scale_down() && current_workers > policy_.min_workers)
440 {
441 bool utilization_ok = avg_utilization < policy_.scale_down.utilization_threshold;
442 bool queue_depth_ok = avg_queue_depth_per_worker < policy_.scale_down.queue_depth_threshold;
443
444 if (utilization_ok && queue_depth_ok)
445 {
446 std::size_t target = std::max(
447 current_workers - policy_.scale_down_increment,
448 policy_.min_workers);
449
450 return scaling_decision{
451 .direction = scaling_direction::down,
453 .target_workers = target,
455 "Utilization {:.1f}% below threshold {:.1f}%, queue depth {:.1f} below {:.1f}",
456 avg_utilization * 100, policy_.scale_down.utilization_threshold * 100,
457 avg_queue_depth_per_worker, policy_.scale_down.queue_depth_threshold)
458 };
459 }
460 }
461
462 return scaling_decision{};
463}
464
466{
467 std::size_t current_workers = pool_.get_active_worker_count();
468 auto now = std::chrono::steady_clock::now();
469
470 if (decision.direction == scaling_direction::up)
471 {
472 auto result = add_workers(decision.target_workers - current_workers);
473 if (result.is_ok())
474 {
475 last_scale_up_time_ = now;
476
477 std::scoped_lock<std::mutex> lock(stats_mutex_);
478 ++stats_.scale_up_count;
479 stats_.last_scale_up = now;
480
481 if (policy_.scaling_callback)
482 {
483 policy_.scaling_callback(
485 decision.reason,
486 current_workers,
487 decision.target_workers);
488 }
489 }
490 }
491 else if (decision.direction == scaling_direction::down)
492 {
493 auto result = remove_workers(current_workers - decision.target_workers);
494 if (result.is_ok())
495 {
496 last_scale_down_time_ = now;
497
498 std::scoped_lock<std::mutex> lock(stats_mutex_);
499 ++stats_.scale_down_count;
500 stats_.last_scale_down = now;
501
502 if (policy_.scaling_callback)
503 {
504 policy_.scaling_callback(
506 decision.reason,
507 current_workers,
508 decision.target_workers);
509 }
510 }
511 }
512}
513
514auto autoscaler::can_scale_up() const -> bool
515{
517 {
518 return false;
519 }
520
521 auto now = std::chrono::steady_clock::now();
522 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
523 now - last_scale_up_time_);
524
525 return since_last >= policy_.scale_up_cooldown;
526}
527
528auto autoscaler::can_scale_down() const -> bool
529{
531 {
532 return false;
533 }
534
535 auto now = std::chrono::steady_clock::now();
536 auto since_last = std::chrono::duration_cast<std::chrono::seconds>(
538
539 return since_last >= policy_.scale_down_cooldown;
540}
541
542auto autoscaler::add_workers(std::size_t count) -> common::VoidResult
543{
544 if (count == 0)
545 {
546 return common::ok();
547 }
548
549 // Get current context from pool
550 const auto& context = pool_.get_context();
551
552 for (std::size_t i = 0; i < count; ++i)
553 {
554 auto worker = std::make_unique<thread_worker>(true, context);
555 auto result = pool_.enqueue(std::move(worker));
556 if (result.is_err())
557 {
558 return result;
559 }
560 }
561
562 return common::ok();
563}
564
565auto autoscaler::remove_workers(std::size_t count) -> common::VoidResult
566{
567 if (count == 0)
568 {
569 return common::ok();
570 }
571
572 // Request pool to remove workers using internal method
573 // This will gracefully stop idle workers
574 auto result = pool_.remove_workers_internal(count, policy_.min_workers);
575 return result;
576}
577
578} // namespace kcenon::thread
Automatic scaling of thread pool workers based on load metrics.
Manages automatic scaling of thread pool workers based on load metrics.
Definition autoscaler.h:95
auto collect_metrics() const -> scaling_metrics_sample
Collects current metrics from the pool.
auto can_scale_down() const -> bool
Checks if scale-down cooldown has elapsed.
auto scale_to(std::size_t target_workers) -> common::VoidResult
Manually scales to a specific worker count.
std::uint64_t last_jobs_submitted_
Definition autoscaler.h:291
std::atomic< bool > running_
Definition autoscaler.h:274
std::chrono::steady_clock::time_point last_sample_time_
Definition autoscaler.h:292
auto is_active() const -> bool
Checks if the autoscaler is currently active.
auto make_decision(const std::vector< scaling_metrics_sample > &samples) const -> scaling_decision
Makes a scaling decision based on recent samples.
auto get_stats() const -> autoscaling_stats
Gets autoscaling statistics.
auto scale_up() -> common::VoidResult
Manually scales up by the configured increment.
autoscaling_policy policy_
Definition autoscaler.h:272
auto get_metrics_history(std::size_t count=60) const -> std::vector< scaling_metrics_sample >
Gets historical metrics samples.
auto execute_scaling(const scaling_decision &decision) -> void
Executes a scaling decision.
auto evaluate_now() -> scaling_decision
Manually triggers a scaling evaluation.
std::chrono::steady_clock::time_point last_scale_down_time_
Definition autoscaler.h:284
autoscaling_stats stats_
Definition autoscaler.h:286
auto start() -> void
Starts the autoscaling monitor thread.
auto stop() -> void
Stops the autoscaling monitor thread.
~autoscaler()
Destructor. Stops the monitor thread if running.
auto scale_down() -> common::VoidResult
Manually scales down by the configured increment.
std::uint64_t last_jobs_completed_
Definition autoscaler.h:290
auto reset_stats() -> void
Resets autoscaling statistics.
auto get_current_metrics() const -> scaling_metrics_sample
Collects current metrics from the thread pool.
auto can_scale_up() const -> bool
Checks if scale-up cooldown has elapsed.
auto get_policy() const -> const autoscaling_policy &
Gets the current autoscaling policy.
auto add_workers(std::size_t count) -> common::VoidResult
Adds workers to the pool.
autoscaler(thread_pool &pool, autoscaling_policy policy={})
Constructs an autoscaler for the given thread pool.
auto remove_workers(std::size_t count) -> common::VoidResult
Removes workers from the pool.
auto set_policy(autoscaling_policy policy) -> void
Updates the autoscaling policy.
std::chrono::steady_clock::time_point last_scale_up_time_
Definition autoscaler.h:283
auto monitor_loop() -> void
Main monitoring loop running in the background thread.
Snapshot snapshot() const
Get a snapshot of all metrics.
A template class representing either a value or an error.
bool is_ok() const noexcept
Checks if the result is successful.
A thread pool for concurrent execution of jobs using multiple worker threads.
bool is_enhanced_metrics_enabled() const
Check if enhanced metrics is enabled.
auto get_pending_task_count() const -> std::size_t
Get the number of pending tasks in the queue.
metrics::EnhancedSnapshot enhanced_metrics_snapshot() const
Get enhanced metrics snapshot.
std::size_t get_idle_worker_count() const
Get the number of idle workers.
auto get_active_worker_count() const -> std::size_t
Get the current number of active (running) workers.
const metrics::ThreadPoolMetrics & metrics() const noexcept
Access aggregated runtime metrics (read-only reference).
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ up
Scale up (add workers)
@ down
Scale down (remove workers)
@ latency
Latency threshold exceeded.
@ worker_utilization
Worker utilization threshold exceeded.
@ queue_depth
Queue depth threshold exceeded.
STL namespace.
Configuration for autoscaling behavior.
std::chrono::seconds scale_down_cooldown
Minimum time between scale-down events.
@ automatic
Fully automatic scaling.
std::chrono::seconds scale_up_cooldown
Minimum time between scale-up events.
std::size_t min_workers
Minimum number of workers (never scale below this)
std::size_t max_workers
Maximum number of workers (never scale above this)
Statistics for autoscaling operations.
std::size_t peak_workers
Peak worker count observed.
std::size_t min_workers
Minimum worker count observed.
Scaling decision result.
Metrics sample for autoscaling decisions.
std::uint64_t jobs_submitted
Jobs submitted since last sample.
std::size_t worker_count
Current number of workers in the pool.
std::chrono::steady_clock::time_point timestamp
Timestamp when this sample was collected.
double queue_depth_per_worker
Jobs per worker ratio.
std::uint64_t jobs_completed
Jobs completed since last sample.
std::size_t queue_depth
Number of jobs waiting in the queue.
double p95_latency_ms
P95 latency in milliseconds.
double throughput_per_second
Throughput in jobs per second.
std::size_t active_workers
Number of workers currently processing jobs.
double utilization
Worker utilization ratio (0.0 - 1.0)
Specialized worker thread that processes jobs from a job_queue.