Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_pool_impl.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
21
22#include <future>
23#include <vector>
24#include <thread>
25#include <memory>
26#include <atomic>
27
28namespace kcenon::thread {
29
30// ============================================================================
31// Unified Submit API Implementations
32// ============================================================================
33
34template<typename F, typename R>
35auto thread_pool::submit(F&& callable, const submit_options& opts)
36 -> std::future<R>
37{
38 auto job_ptr = std::make_unique<future_job<R>>(
39 std::forward<F>(callable),
40 opts.name.empty() ? "async_job" : opts.name
41 );
42
43 auto future = job_ptr->get_future();
44
45 auto result = enqueue(std::move(job_ptr));
46 if (result.is_err()) {
47 std::promise<R> error_promise;
48 error_promise.set_exception(
49 std::make_exception_ptr(
50 std::runtime_error(result.error().message)
51 )
52 );
53 return error_promise.get_future();
54 }
55
56 return future;
57}
58
59template<typename F, typename R>
60auto thread_pool::submit(std::vector<F>&& callables, const submit_options& opts)
61 -> std::vector<std::future<R>>
62{
63 return detail::batch_apply(std::move(callables), [this, &opts](auto&& callable) {
64 submit_options single_opts;
65 single_opts.name = opts.name;
66 return submit<F, R>(std::move(callable), single_opts);
67 });
68}
69
70template<typename F, typename R>
71auto thread_pool::submit_wait_all(std::vector<F>&& callables, const submit_options& opts)
72 -> std::vector<R>
73{
74 auto futures = submit<F, R>(std::move(callables), opts);
75 return detail::collect_all(futures);
76}
77
78template<typename F, typename R>
79auto thread_pool::submit_wait_any(std::vector<F>&& callables, const submit_options& opts)
80 -> common::Result<R>
81{
82 if (callables.empty()) {
83 return make_error_result<R>(error_code::invalid_argument, "Empty callables vector");
84 }
85
86 auto futures = submit<F, R>(std::move(callables), opts);
87 auto completed = std::make_shared<std::atomic<bool>>(false);
88 auto result_promise = std::make_shared<std::promise<R>>();
89 auto result_future = result_promise->get_future();
90
91 for (std::size_t i = 0; i < futures.size(); ++i) {
92 std::thread([completed, result_promise, fut = std::move(futures[i])]() mutable {
93 try {
94 R result = fut.get();
95 bool expected = false;
96 if (completed->compare_exchange_strong(expected, true)) {
97 result_promise->set_value(std::move(result));
98 }
99 } catch (...) {
100 bool expected = false;
101 if (completed->compare_exchange_strong(expected, true)) {
102 result_promise->set_exception(std::current_exception());
103 }
104 }
105 }).detach();
106 }
107
108 try {
109 return common::Result<R>::ok(result_future.get());
110 } catch (const std::exception& e) {
112 } catch (...) {
113 return make_error_result<R>(error_code::unknown_error, "Unknown exception in submit_wait_any");
114 }
115}
116
117template<typename T>
118auto thread_pool::find_policy(const std::string& name) -> T*
119{
120 std::scoped_lock<std::mutex> lock(policies_mutex_);
121
122 for (auto& policy : policies_) {
123 if (policy && policy->get_name() == name) {
124 return dynamic_cast<T*>(policy.get());
125 }
126 }
127
128 return nullptr;
129}
130
131} // namespace kcenon::thread
Helper templates for batch operations to eliminate duplicated loop patterns.
A template class representing either a value or an error.
auto submit_wait_any(std::vector< F > &&callables, const submit_options &opts={}) -> common::Result< R >
Submit a batch and return first completed result.
auto submit_wait_all(std::vector< F > &&callables, const submit_options &opts={}) -> std::vector< R >
Submit a batch and wait for all results.
auto find_policy(const std::string &name) -> T *
Find a policy by name.
auto submit(F &&callable, const submit_options &opts={}) -> std::future< R >
Job wrapper that provides std::future for async result returns.
Error codes and utilities for the thread system.
auto collect_all(std::vector< std::future< T > > &futures) -> std::vector< T >
Collect all results from a vector of futures.
auto batch_apply(Container &&items, Operation &&op)
Apply an operation to each item in a collection, returning results.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
@ completed
Successfully completed.
Options for submitting jobs to the thread pool.
std::string name
Optional name for the job (useful for debugging/tracing).