Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
when_helpers.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#pragma once
6
15#include <atomic>
16#include <future>
17#include <memory>
18#include <thread>
19#include <tuple>
20#include <type_traits>
21#include <utility>
22#include <vector>
23
24namespace kcenon::thread {
25
26namespace detail {
27
31template<typename Future>
33 using type = decltype(std::declval<Future>().get());
34};
35
36template<typename Future>
38
42template<std::size_t... Is, typename Tuple, typename ResultTuple>
43void get_all_impl(std::index_sequence<Is...>, Tuple& futures, ResultTuple& results) {
44 ((std::get<Is>(results) = std::get<Is>(futures).get()), ...);
45}
46
47} // namespace detail
48
77template<typename... Futures>
78auto when_all(Futures&&... futures)
79 -> std::future<std::tuple<detail::future_value_type_t<std::decay_t<Futures>>...>>
80{
81 using result_tuple = std::tuple<detail::future_value_type_t<std::decay_t<Futures>>...>;
82
83 auto promise = std::make_shared<std::promise<result_tuple>>();
84 auto result_future = promise->get_future();
85
86 // Store futures in a tuple for processing
87 auto futures_tuple = std::make_shared<std::tuple<std::decay_t<Futures>...>>(
88 std::forward<Futures>(futures)...);
89
90 // Launch async operation to wait for all
91 std::thread([promise, futures_tuple]() mutable {
92 try {
93 result_tuple results;
95 std::make_index_sequence<sizeof...(Futures)>{},
96 *futures_tuple,
97 results);
98 promise->set_value(std::move(results));
99 } catch (...) {
100 promise->set_exception(std::current_exception());
101 }
102 }).detach();
103
104 return result_future;
105}
106
110inline auto when_all_empty() -> std::future<std::tuple<>>
111{
112 std::promise<std::tuple<>> promise;
113 promise.set_value(std::tuple<>{});
114 return promise.get_future();
115}
116
140template<typename T>
141auto when_any(std::vector<std::future<T>>&& futures)
142 -> std::future<T>
143{
144 if (futures.empty()) {
145 std::promise<T> promise;
146 promise.set_exception(
147 std::make_exception_ptr(std::invalid_argument("Empty futures vector"))
148 );
149 return promise.get_future();
150 }
151
152 auto promise = std::make_shared<std::promise<T>>();
153 auto result_future = promise->get_future();
154 auto completed = std::make_shared<std::atomic<bool>>(false);
155 auto futures_ptr = std::make_shared<std::vector<std::future<T>>>(std::move(futures));
156
157 // Launch threads for each future
158 for (std::size_t i = 0; i < futures_ptr->size(); ++i) {
159 std::thread([promise, completed, futures_ptr, i]() {
160 try {
161 auto& future = (*futures_ptr)[i];
162 T result = future.get();
163
164 bool expected = false;
165 if (completed->compare_exchange_strong(expected, true)) {
166 promise->set_value(std::move(result));
167 }
168 } catch (...) {
169 bool expected = false;
170 if (completed->compare_exchange_strong(expected, true)) {
171 promise->set_exception(std::current_exception());
172 }
173 }
174 }).detach();
175 }
176
177 return result_future;
178}
179
183inline auto when_any(std::vector<std::future<void>>&& futures)
184 -> std::future<void>
185{
186 if (futures.empty()) {
187 std::promise<void> promise;
188 promise.set_exception(
189 std::make_exception_ptr(std::invalid_argument("Empty futures vector"))
190 );
191 return promise.get_future();
192 }
193
194 auto promise = std::make_shared<std::promise<void>>();
195 auto result_future = promise->get_future();
196 auto completed = std::make_shared<std::atomic<bool>>(false);
197 auto futures_ptr = std::make_shared<std::vector<std::future<void>>>(std::move(futures));
198
199 for (std::size_t i = 0; i < futures_ptr->size(); ++i) {
200 std::thread([promise, completed, futures_ptr, i]() {
201 try {
202 (*futures_ptr)[i].get();
203
204 bool expected = false;
205 if (completed->compare_exchange_strong(expected, true)) {
206 promise->set_value();
207 }
208 } catch (...) {
209 bool expected = false;
210 if (completed->compare_exchange_strong(expected, true)) {
211 promise->set_exception(std::current_exception());
212 }
213 }
214 }).detach();
215 }
216
217 return result_future;
218}
219
239template<typename T>
240auto when_any_with_index(std::vector<std::future<T>>&& futures)
241 -> std::future<std::pair<std::size_t, T>>
242{
243 if (futures.empty()) {
244 std::promise<std::pair<std::size_t, T>> promise;
245 promise.set_exception(
246 std::make_exception_ptr(std::invalid_argument("Empty futures vector"))
247 );
248 return promise.get_future();
249 }
250
251 auto promise = std::make_shared<std::promise<std::pair<std::size_t, T>>>();
252 auto result_future = promise->get_future();
253 auto completed = std::make_shared<std::atomic<bool>>(false);
254 auto futures_ptr = std::make_shared<std::vector<std::future<T>>>(std::move(futures));
255
256 for (std::size_t i = 0; i < futures_ptr->size(); ++i) {
257 std::thread([promise, completed, futures_ptr, i]() {
258 try {
259 auto& future = (*futures_ptr)[i];
260 T result = future.get();
261
262 bool expected = false;
263 if (completed->compare_exchange_strong(expected, true)) {
264 promise->set_value(std::make_pair(i, std::move(result)));
265 }
266 } catch (...) {
267 bool expected = false;
268 if (completed->compare_exchange_strong(expected, true)) {
269 promise->set_exception(std::current_exception());
270 }
271 }
272 }).detach();
273 }
274
275 return result_future;
276}
277
278} // namespace kcenon::thread
A template class representing either a value or an error.
typename future_value_type< Future >::type future_value_type_t
void get_all_impl(std::index_sequence< Is... >, Tuple &futures, ResultTuple &results)
Index sequence helper for when_all implementation.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
auto when_all_empty() -> std::future< std::tuple<> >
Overload of when_all for no arguments.
@ completed
Successfully completed.
auto when_any(std::vector< std::future< T > > &&futures) -> std::future< T >
Wait for any future to complete and return its result.
auto when_any_with_index(std::vector< std::future< T > > &&futures) -> std::future< std::pair< std::size_t, T > >
Wait for any future to complete with index.
auto when_all(Futures &&... futures) -> std::future< std::tuple< detail::future_value_type_t< std::decay_t< Futures > >... > >
Wait for all futures to complete and return their results as a tuple.
Helper to get the value type from a future.
decltype(std::declval< Future >().get()) type