Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_pool_builder.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7namespace kcenon::thread
8{
10 : name_(name)
11 , worker_count_(0)
12 , context_()
13 , enable_diagnostics_(false)
14 , enable_enhanced_metrics_(false)
15 {
16 }
17
19 {
20 worker_count_ = count;
21 return *this;
22 }
23
25 {
26 context_ = context;
27 return *this;
28 }
29
30 thread_pool_builder& thread_pool_builder::with_queue(std::shared_ptr<job_queue> queue)
31 {
32 custom_queue_ = std::move(queue);
33 return *this;
34 }
35
37 std::unique_ptr<pool_queue_adapter_interface> adapter)
38 {
39 queue_adapter_ = std::move(adapter);
40 return *this;
41 }
42
44 const circuit_breaker_config& config)
45 {
48 return *this;
49 }
50
52 std::shared_ptr<circuit_breaker> cb)
53 {
54 shared_circuit_breaker_ = std::move(cb);
56 return *this;
57 }
58
60 const autoscaling_policy& config)
61 {
62 autoscaling_config_ = config;
63 return *this;
64 }
65
67 {
68 worker_policy config;
69 config.enable_work_stealing = true;
70 work_stealing_config_ = config;
71 return *this;
72 }
73
79
85
91
92 thread_pool_builder& thread_pool_builder::with_policy(std::unique_ptr<pool_policy> policy)
93 {
94 policies_.push_back(std::move(policy));
95 return *this;
96 }
97
98 std::shared_ptr<thread_pool> thread_pool_builder::build()
99 {
100 std::shared_ptr<thread_pool> pool;
101
102 if (queue_adapter_)
103 {
104 pool = std::make_shared<thread_pool>(name_, std::move(queue_adapter_), context_);
105 }
106 else if (custom_queue_)
107 {
108 pool = std::make_shared<thread_pool>(name_, custom_queue_, context_);
109 }
110 else
111 {
112 pool = std::make_shared<thread_pool>(name_, context_);
113 }
114
115 std::size_t worker_count = worker_count_;
116 if (worker_count == 0)
117 {
118 worker_count = std::thread::hardware_concurrency();
119 if (worker_count == 0)
120 {
121 worker_count = 4;
122 }
123 }
124
125 for (std::size_t i = 0; i < worker_count; ++i)
126 {
127 auto worker = std::make_unique<thread_worker>(true, context_);
128 worker->set_job_queue(pool->get_job_queue());
129 pool->enqueue(std::move(worker));
130 }
131
132 if (circuit_breaker_config_.has_value())
133 {
134 auto cb_policy = std::make_unique<circuit_breaker_policy>(
136 pool->add_policy(std::move(cb_policy));
137 }
139 {
140 auto cb_policy = std::make_unique<circuit_breaker_policy>(
142 pool->add_policy(std::move(cb_policy));
143 }
144
145 if (autoscaling_config_.has_value())
146 {
147 auto as_policy = std::make_unique<autoscaling_pool_policy>(
148 *pool, autoscaling_config_.value());
149 pool->add_policy(std::move(as_policy));
150 }
151
152 if (work_stealing_config_.has_value())
153 {
154 auto ws_policy = std::make_unique<work_stealing_pool_policy>(
155 work_stealing_config_.value());
156 pool->add_policy(std::move(ws_policy));
157 }
158
159 for (auto& policy : policies_)
160 {
161 pool->add_policy(std::move(policy));
162 }
163
165 {
166 pool->set_enhanced_metrics_enabled(true);
167 }
168
170 {
171 (void)pool->diagnostics();
172 }
173
174 reset();
175
176 return pool;
177 }
178
179 std::shared_ptr<thread_pool> thread_pool_builder::build_and_start()
180 {
181 auto pool = build();
182 pool->start();
183 return pool;
184 }
185
187 {
188 name_ = "thread_pool";
189 worker_count_ = 0;
191 custom_queue_.reset();
192 queue_adapter_.reset();
193 policies_.clear();
194 enable_diagnostics_ = false;
198 autoscaling_config_.reset();
199 work_stealing_config_.reset();
200 }
201
202} // namespace kcenon::thread
Context object that provides access to optional services.
Fluent builder for creating and configuring thread pools.
std::vector< std::unique_ptr< pool_policy > > policies_
std::shared_ptr< job_queue > custom_queue_
thread_pool_builder & with_work_stealing()
Enables work-stealing with default configuration.
std::optional< autoscaling_policy > autoscaling_config_
std::unique_ptr< pool_queue_adapter_interface > queue_adapter_
thread_pool_builder & with_policy(std::unique_ptr< pool_policy > policy)
Adds a custom policy to the pool.
thread_pool_builder & with_workers(std::size_t count)
Sets the number of worker threads.
thread_pool_builder & with_context(const thread_context &context)
Sets the thread context for logging and monitoring.
std::shared_ptr< thread_pool > build()
Builds and returns the configured thread pool.
std::optional< worker_policy > work_stealing_config_
std::shared_ptr< thread_pool > build_and_start()
Builds the pool and starts it immediately.
std::shared_ptr< circuit_breaker > shared_circuit_breaker_
thread_pool_builder & with_queue_adapter(std::unique_ptr< pool_queue_adapter_interface > adapter)
Sets a policy-based queue adapter.
thread_pool_builder & with_diagnostics()
Enables diagnostics for the pool.
thread_pool_builder & with_circuit_breaker(const circuit_breaker_config &config={})
Adds circuit breaker protection.
thread_pool_builder & with_queue(std::shared_ptr< job_queue > queue)
Sets a custom job queue.
std::optional< circuit_breaker_config > circuit_breaker_config_
thread_pool_builder & with_enhanced_metrics()
Enables enhanced metrics collection.
thread_pool_builder(const std::string &name="thread_pool")
Constructs a builder with the given pool name.
thread_pool_builder & with_autoscaling(const autoscaling_policy &config={})
Enables autoscaling with the specified policy.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Configuration for autoscaling behavior.
Worker behavior policy configuration.
Fluent builder for creating and configuring thread pools.