Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
typed_thread_pool.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
10#include <sstream>
11
12namespace kcenon::thread
13{
14 // Support both old (namespace common) and new (namespace kcenon::common) versions
15 // When inside namespace kcenon::thread, 'common' resolves to kcenon::common
16#if KCENON_HAS_COMMON_EXECUTOR
17 namespace common_ns = common;
18#endif
19
20 template <typename job_type>
22 const std::string& thread_title,
23 const thread_context& context)
24 : thread_title_(thread_title)
25 , start_pool_(false)
26 , job_queue_(std::make_shared<aging_typed_job_queue_t<job_type>>())
27 , context_(context)
28 {
29 }
30
31 template <typename job_type>
33 {
34 if (start_pool_.load())
35 {
36 stop(false); // Wait for jobs to complete
37 }
38 }
39
40 template <typename job_type>
41 auto typed_thread_pool_t<job_type>::get_ptr() -> std::shared_ptr<typed_thread_pool_t<job_type>>
42 {
43 return this->shared_from_this();
44 }
45
46 template <typename job_type>
47 auto typed_thread_pool_t<job_type>::start() -> common::VoidResult
48 {
49 bool expected = false;
50 if (!start_pool_.compare_exchange_strong(expected, true))
51 {
52 return common::error_info{static_cast<int>(error_code::thread_already_running), "Thread pool already started", "thread_system"};
53 }
54
55 // Check if there are workers to start
56 if (workers_.empty())
57 {
58 start_pool_.store(false); // Reset state since we didn't actually start
59 return common::error_info{static_cast<int>(error_code::invalid_argument), "no workers to start", "thread_system"};
60 }
61
62 // Start all workers
63 for (auto& worker : workers_)
64 {
65 if (worker)
66 {
67 worker->start();
68 }
69 }
70
71 return common::ok();
72 }
73
74 template <typename job_type>
75 auto typed_thread_pool_t<job_type>::get_job_queue() -> std::shared_ptr<aging_typed_job_queue_t<job_type>>
76 {
77 return job_queue_;
78 }
79
80 template <typename job_type>
81 auto typed_thread_pool_t<job_type>::execute(std::unique_ptr<job>&& work) -> common::VoidResult
82 {
83 // Use acquire to ensure we see the latest pool state
84 if (!start_pool_.load(std::memory_order_acquire))
85 {
86 return common::error_info{static_cast<int>(error_code::thread_not_running), "Thread pool not started", "thread_system"};
87 }
88
89 return job_queue_->enqueue(std::move(work));
90 }
91
92 template <typename job_type>
94 -> common::VoidResult
95 {
96 // Use acquire to ensure we see the latest pool state
97 if (!start_pool_.load(std::memory_order_acquire))
98 {
99 return common::error_info{static_cast<int>(error_code::thread_not_running), "Thread pool not started", "thread_system"};
100 }
101
102 return job_queue_->enqueue(std::move(job));
103 }
104
105 template <typename job_type>
107 std::vector<std::unique_ptr<typed_job_t<job_type>>>&& jobs) -> common::VoidResult
108 {
109 // Use acquire to ensure we see the latest pool state
110 if (!start_pool_.load(std::memory_order_acquire))
111 {
112 return common::error_info{static_cast<int>(error_code::thread_not_running), "Thread pool not started", "thread_system"};
113 }
114
115 return job_queue_->enqueue_batch(std::move(jobs));
116 }
117
118 template <typename job_type>
120 std::unique_ptr<typed_thread_worker_t<job_type>>&& worker) -> common::VoidResult
121 {
122 if (!worker)
123 {
124 return common::error_info{static_cast<int>(error_code::invalid_argument), "Null worker", "thread_system"};
125 }
126
127 // Set the aging job queue for the worker
128 worker->set_aging_job_queue(job_queue_);
129 worker->set_context(context_);
130
131 // Add worker first, then start if pool is running
132 // This ensures stop() will see and stop this worker if called concurrently
133 // Use acquire to synchronize with start_pool_ release in start()
134 bool is_running = start_pool_.load(std::memory_order_acquire);
135
136 workers_.push_back(std::move(worker));
137
138 // Start the worker if the pool is already started
139 if (is_running)
140 {
141 auto start_result = workers_.back()->start();
142 if (start_result.is_err())
143 {
144 // Remove the worker we just added since it failed to start
145 workers_.pop_back();
146 return start_result;
147 }
148 }
149
150 return common::ok();
151 }
152
153 template <typename job_type>
155 std::vector<std::unique_ptr<typed_thread_worker_t<job_type>>>&& workers) -> common::VoidResult
156 {
157 for (auto& worker : workers)
158 {
159 auto result = enqueue(std::move(worker));
160 if (result.is_err())
161 {
162 return result;
163 }
164 }
165
166 return common::ok();
167 }
168
169 template <typename job_type>
170 auto typed_thread_pool_t<job_type>::stop(bool clear_queue) -> common::VoidResult
171 {
172 // Use compare_exchange_strong to atomically check and set state
173 // This prevents race conditions during concurrent stop() calls
174 bool expected = true;
175 if (!start_pool_.compare_exchange_strong(expected, false,
176 std::memory_order_acq_rel,
177 std::memory_order_acquire))
178 {
179 return common::ok(); // Already stopped or being stopped by another thread
180 }
181
182 // Always stop the queue to prevent new jobs from being enqueued
183 // This ensures consistent behavior with thread_pool and prevents
184 // race conditions where jobs might be added after stop() is called
185 if (job_queue_)
186 {
187 job_queue_->stop();
188
189 // Optionally clear pending jobs for immediate shutdown
190 if (clear_queue)
191 {
192 job_queue_->clear();
193 }
194 }
195
196 // Stop all workers
197 for (auto& worker : workers_)
198 {
199 if (worker)
200 {
201 worker->stop();
202 // thread_base doesn't have join(), it manages threads internally
203 }
204 }
205
206 return common::ok();
207 }
208
209 template <typename job_type>
211 {
212 std::ostringstream oss;
213 oss << "typed_thread_pool{"
214 << "title: " << thread_title_
215 << ", started: " << start_pool_.load()
216 << ", workers: " << workers_.size()
217 << "}";
218 return oss.str();
219 }
220
221 template <typename job_type>
223 std::shared_ptr<aging_typed_job_queue_t<job_type>> job_queue) -> void
224 {
225 job_queue_ = std::move(job_queue);
226
227 // Update all workers with the new queue
228 for (auto& worker : workers_)
229 {
230 if (worker)
231 {
232 worker->set_aging_job_queue(job_queue_);
233 }
234 }
235 }
236
237 template <typename job_type>
239 {
240 return context_;
241 }
242
243 // ============================================================================
244 // Priority Aging Integration
245 // ============================================================================
246
247 template <typename job_type>
249 -> void
250 {
251 if (priority_aging_enabled_)
252 {
253 // Already enabled, just update config
254 if (job_queue_)
255 {
256 job_queue_->set_aging_config(std::move(config));
257 }
258 return;
259 }
260
261 // Update the job queue config and start aging
262 config.enabled = true;
263 if (job_queue_)
264 {
265 job_queue_->set_aging_config(config);
266
267 // Start aging if pool is running
268 if (start_pool_.load())
269 {
270 job_queue_->start_aging();
271 }
272 }
273
274 priority_aging_enabled_ = true;
275 }
276
277 template <typename job_type>
279 {
280 if (!priority_aging_enabled_)
281 {
282 return;
283 }
284
285 if (job_queue_)
286 {
287 job_queue_->stop_aging();
288 }
289
290 priority_aging_enabled_ = false;
291 }
292
293 template <typename job_type>
295 {
296 return priority_aging_enabled_ && job_queue_ &&
297 job_queue_->is_aging_running();
298 }
299
300 template <typename job_type>
302 {
303 if (job_queue_)
304 {
305 return job_queue_->get_aging_stats();
306 }
307 return aging_stats{};
308 }
309
310 template <typename job_type>
312 std::unique_ptr<aging_typed_job_t<job_type>>&& job) -> common::VoidResult
313 {
314 if (!start_pool_.load(std::memory_order_acquire))
315 {
316 return common::error_info{
317 static_cast<int>(error_code::thread_not_running),
318 "Thread pool not started",
319 "thread_system"
320 };
321 }
322
323 if (!job)
324 {
325 return common::error_info{
326 static_cast<int>(error_code::invalid_argument),
327 "Null job",
328 "thread_system"
329 };
330 }
331
332 return job_queue_->enqueue(std::move(job));
333 }
334
335#if KCENON_HAS_COMMON_EXECUTOR
336 // ============================================================================
337 // IExecutor interface implementation
338 // ============================================================================
339
340 template <typename job_type>
341 std::future<void> typed_thread_pool_t<job_type>::submit(std::function<void()> task)
342 {
343 auto promise = std::make_shared<std::promise<void>>();
344 auto future = promise->get_future();
345
346 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
347 [task = std::move(task), promise]() mutable -> common::VoidResult {
348 try {
349 task();
350 promise->set_value();
351 } catch (...) {
352 promise->set_exception(std::current_exception());
353 }
354 return common::ok();
355 },
356 job_type{} // Default priority
357 );
358
359 auto enqueue_result = enqueue(std::move(job_ptr));
360 if (enqueue_result.is_err()) {
361 try {
362 throw std::runtime_error("Failed to enqueue task: " +
363 enqueue_result.error().message);
364 } catch (...) {
365 promise->set_exception(std::current_exception());
366 }
367 }
368
369 return future;
370 }
371
372 template <typename job_type>
373 std::future<void> typed_thread_pool_t<job_type>::submit_delayed(
374 std::function<void()> task,
375 std::chrono::milliseconds delay)
376 {
377 auto promise = std::make_shared<std::promise<void>>();
378 auto future = promise->get_future();
379
380 auto delayed_task = [task = std::move(task), delay, promise]() mutable -> common::VoidResult {
381 std::this_thread::sleep_for(delay);
382 try {
383 task();
384 promise->set_value();
385 } catch (...) {
386 promise->set_exception(std::current_exception());
387 }
388 return common::ok();
389 };
390
391 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
392 std::move(delayed_task),
393 job_type{} // Default priority
394 );
395
396 auto enqueue_result = enqueue(std::move(job_ptr));
397 if (enqueue_result.is_err()) {
398 try {
399 throw std::runtime_error("Failed to enqueue delayed task: " +
400 enqueue_result.error().message);
401 } catch (...) {
402 promise->set_exception(std::current_exception());
403 }
404 }
405
406 return future;
407 }
408
409 template <typename job_type>
410 common_ns::Result<std::future<void>> typed_thread_pool_t<job_type>::execute(
411 std::unique_ptr<common_ns::interfaces::IJob>&& common_job)
412 {
413 if (!common_job) {
414 return common_ns::error_info{
415 static_cast<int>(error_code::job_invalid),
416 "Null job provided",
417 "typed_thread_pool"
418 };
419 }
420
421 auto promise = std::make_shared<std::promise<void>>();
422 auto future = promise->get_future();
423
424 // Use shared_ptr for copyable lambda
425 auto shared_job = std::shared_ptr<common_ns::interfaces::IJob>(std::move(common_job));
426 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
427 [shared_job, promise]() -> common::VoidResult {
428 auto result = shared_job->execute();
429 if (result.is_ok()) {
430 promise->set_value();
431 } else {
432 try {
433 throw std::runtime_error("Job execution failed: " + result.error().message);
434 } catch (...) {
435 promise->set_exception(std::current_exception());
436 }
437 }
438 return common::ok();
439 },
440 job_type{} // Default priority
441 );
442
443 auto enqueue_result = enqueue(std::move(job_ptr));
444 if (enqueue_result.is_err()) {
445 return common_ns::error_info{
446 enqueue_result.error().code,
447 enqueue_result.error().message,
448 enqueue_result.error().module
449 };
450 }
451
452 return common_ns::Result<std::future<void>>(std::move(future));
453 }
454
455 template <typename job_type>
456 common_ns::Result<std::future<void>> typed_thread_pool_t<job_type>::execute_delayed(
457 std::unique_ptr<common_ns::interfaces::IJob>&& common_job,
458 std::chrono::milliseconds delay)
459 {
460 if (!common_job) {
461 return common_ns::error_info{
462 static_cast<int>(error_code::job_invalid),
463 "Null job provided",
464 "typed_thread_pool"
465 };
466 }
467
468 auto promise = std::make_shared<std::promise<void>>();
469 auto future = promise->get_future();
470
471 // Use shared_ptr for copyable lambda
472 auto shared_job = std::shared_ptr<common_ns::interfaces::IJob>(std::move(common_job));
473 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
474 [shared_job, delay, promise]() -> common::VoidResult {
475 std::this_thread::sleep_for(delay);
476 auto result = shared_job->execute();
477 if (result.is_ok()) {
478 promise->set_value();
479 } else {
480 try {
481 throw std::runtime_error("Job execution failed: " + result.error().message);
482 } catch (...) {
483 promise->set_exception(std::current_exception());
484 }
485 }
486 return common::ok();
487 },
488 job_type{} // Default priority
489 );
490
491 auto enqueue_result = enqueue(std::move(job_ptr));
492 if (enqueue_result.is_err()) {
493 return common_ns::error_info{
494 enqueue_result.error().code,
495 enqueue_result.error().message,
496 enqueue_result.error().module
497 };
498 }
499
500 return common_ns::Result<std::future<void>>(std::move(future));
501 }
502
503 template <typename job_type>
504 size_t typed_thread_pool_t<job_type>::worker_count() const
505 {
506 return workers_.size();
507 }
508
509 template <typename job_type>
510 size_t typed_thread_pool_t<job_type>::pending_tasks() const
511 {
512 return job_queue_ ? job_queue_->size() : 0;
513 }
514
515 template <typename job_type>
516 bool typed_thread_pool_t<job_type>::is_running() const
517 {
518 return start_pool_.load(std::memory_order_acquire);
519 }
520
521 template <typename job_type>
522 void typed_thread_pool_t<job_type>::shutdown(bool wait_for_completion)
523 {
524 stop(!wait_for_completion); // immediately_stop = !wait_for_completion
525 }
526#endif // KCENON_HAS_COMMON_EXECUTOR
527
528 // Explicit template instantiation for job_types
529 template class typed_thread_pool_t<job_types>;
530
531} // namespace kcenon::thread
Typed job with priority aging support to prevent starvation.
Priority queue with aging to prevent low-priority job starvation.
Callback-based typed job for priority-aware lambda execution.
A typed job queue with priority aging support, based on policy_queue.
A typed job with priority aging support.
A thread-safe job queue for managing and dispatching work items.
Definition job_queue.h:65
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
Context object that provides access to optional services.
Typed job template.
Definition typed_job.h:31
typed_thread_pool_t(const std::string &thread_title="typed_thread_pool", const thread_context &context=thread_context())
Constructs a new typed_thread_pool_t instance.
auto get_job_queue(void) -> std::shared_ptr< aging_typed_job_queue_t< job_type > >
Retrieves the underlying priority job queue managed by this thread pool.
auto is_priority_aging_enabled() const -> bool
Checks if priority aging is enabled.
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
auto get_ptr(void) -> std::shared_ptr< typed_thread_pool_t< job_type > >
Returns a shared pointer to the current typed_thread_pool_t.
auto execute(std::unique_ptr< job > &&work) -> common::VoidResult
auto stop(bool clear_queue=false) -> common::VoidResult
Stops the thread pool and optionally waits for currently running jobs to finish.
auto get_aging_stats() const -> aging_stats
Gets priority aging statistics.
auto shutdown() -> common::VoidResult
auto enqueue_batch(std::vector< std::unique_ptr< typed_job_t< job_type > > > &&jobs) -> common::VoidResult
Enqueues a batch of priority jobs into the thread pool's job queue.
auto enqueue(std::unique_ptr< typed_job_t< job_type > > &&job) -> common::VoidResult
Enqueues a priority job into the thread pool's job queue.
virtual ~typed_thread_pool_t(void)
Destroys the typed_thread_pool_t object.
auto set_job_queue(std::shared_ptr< aging_typed_job_queue_t< job_type > > job_queue) -> void
Sets the job queue for this thread pool and its workers.
auto enable_priority_aging(priority_aging_config config={}) -> void
Enables priority aging for this thread pool.
auto to_string(void) const -> std::string
Generates a string representation of the thread pool's internal state.
auto start(void) -> common::VoidResult
Starts the thread pool by creating worker threads and initializing internal structures.
auto disable_priority_aging() -> void
Disables priority aging.
@ delay
Delay processing (attempt later)
Type-based thread pool with priority scheduling and job type routing.
Worker thread for typed_thread_pool processing priority job queues.
std::shared_ptr< job_interface > job_ptr
Shared pointer type for job objects.
Definition job_types.h:84
Core threading foundation of the thread system library.
Definition thread_impl.h:17
STL namespace.
Statistics about priority aging behavior.
Configuration for priority aging behavior.