Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
enhanced_cancellation_token.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8
9namespace kcenon::thread
10{
11 // ========================================================================
12 // Internal state structure
13 // ========================================================================
14
16 {
17 std::atomic<bool> is_cancelled{false};
18 std::optional<cancellation_reason> reason;
19
20 std::unordered_map<callback_handle, callback_type> simple_callbacks;
21 std::unordered_map<callback_handle, callback_with_reason_type>
23 std::atomic<callback_handle> next_handle{1};
24
25 std::chrono::steady_clock::time_point deadline_point{
26 std::chrono::steady_clock::time_point::max()};
27 std::atomic<bool> has_deadline{false};
28
29 mutable std::mutex mutex;
30 mutable std::condition_variable cv;
31
32 std::atomic<bool> timer_active{false};
33 std::atomic<bool> timer_should_stop{false};
34 };
35
36 // ========================================================================
37 // enhanced_cancellation_token implementation
38 // ========================================================================
39
41 : state_(std::make_shared<state>())
42 {
43 }
44
46 std::shared_ptr<state> state)
47 : state_(std::move(state))
48 {
49 }
50
52 {
53 // If this is the last reference and timer is active, signal it to stop
54 if (state_ && state_.use_count() == 1)
55 {
56 state_->timer_should_stop.store(true, std::memory_order_release);
57 state_->cv.notify_all();
58 }
59 }
60
65
67 std::chrono::milliseconds timeout) -> enhanced_cancellation_token
68 {
69 auto deadline_point = std::chrono::steady_clock::now() + timeout;
70 return create_with_deadline(deadline_point);
71 }
72
74 std::chrono::steady_clock::time_point deadline_point)
76 {
77 auto token = create();
78 token.state_->deadline_point = deadline_point;
79 token.state_->has_deadline.store(true, std::memory_order_release);
80
81 // Start timeout timer in background
82 std::weak_ptr<state> state_weak = token.state_;
83 start_timeout_timer(state_weak, deadline_point);
84
85 return token;
86 }
87
89 std::initializer_list<enhanced_cancellation_token> tokens)
91 {
92 auto new_token = create();
93 std::weak_ptr<state> new_state_weak = new_token.state_;
94
95 for (const auto& parent : tokens)
96 {
97 auto parent_copy = parent;
98 parent_copy.register_callback(
99 [new_state_weak](const cancellation_reason& /*parent_reason*/)
100 {
101 if (auto s = new_state_weak.lock())
102 {
103 std::vector<callback_type> simple_to_invoke;
104 std::vector<callback_with_reason_type> reason_to_invoke;
105 cancellation_reason new_reason;
106
107 {
108 std::lock_guard<std::mutex> lock(s->mutex);
109 bool was_cancelled =
110 s->is_cancelled.exchange(true, std::memory_order_release);
111 if (!was_cancelled)
112 {
113 new_reason.reason_type =
115 new_reason.message = "Parent token was cancelled";
116 new_reason.cancel_time = std::chrono::steady_clock::now();
117 s->reason = new_reason;
118
119 for (auto& [handle, cb] : s->simple_callbacks)
120 {
121 simple_to_invoke.push_back(std::move(cb));
122 }
123 s->simple_callbacks.clear();
124
125 for (auto& [handle, cb] : s->reason_callbacks)
126 {
127 reason_to_invoke.push_back(std::move(cb));
128 }
129 s->reason_callbacks.clear();
130 }
131 }
132
133 s->cv.notify_all();
134
135 for (const auto& cb : simple_to_invoke)
136 {
137 cb();
138 }
139 for (const auto& cb : reason_to_invoke)
140 {
141 cb(new_reason);
142 }
143 }
144 });
145 }
146
147 return new_token;
148 }
149
151 const enhanced_cancellation_token& parent,
152 std::chrono::milliseconds timeout) -> enhanced_cancellation_token
153 {
154 auto deadline_point = std::chrono::steady_clock::now() + timeout;
155
156 auto token = create();
157 token.state_->deadline_point = deadline_point;
158 token.state_->has_deadline.store(true, std::memory_order_release);
159
160 // Link to parent
161 std::weak_ptr<state> token_state_weak = token.state_;
162 auto parent_copy = parent;
163 parent_copy.register_callback(
164 [token_state_weak](const cancellation_reason& /*parent_reason*/)
165 {
166 if (auto s = token_state_weak.lock())
167 {
168 std::vector<callback_type> simple_to_invoke;
169 std::vector<callback_with_reason_type> reason_to_invoke;
170 cancellation_reason new_reason;
171
172 {
173 std::lock_guard<std::mutex> lock(s->mutex);
174 bool was_cancelled =
175 s->is_cancelled.exchange(true, std::memory_order_release);
176 if (!was_cancelled)
177 {
178 new_reason.reason_type =
180 new_reason.message = "Parent token was cancelled";
181 new_reason.cancel_time = std::chrono::steady_clock::now();
182 s->reason = new_reason;
183
184 for (auto& [handle, cb] : s->simple_callbacks)
185 {
186 simple_to_invoke.push_back(std::move(cb));
187 }
188 s->simple_callbacks.clear();
189
190 for (auto& [handle, cb] : s->reason_callbacks)
191 {
192 reason_to_invoke.push_back(std::move(cb));
193 }
194 s->reason_callbacks.clear();
195
196 s->timer_should_stop.store(true, std::memory_order_release);
197 }
198 }
199
200 s->cv.notify_all();
201
202 for (const auto& cb : simple_to_invoke)
203 {
204 cb();
205 }
206 for (const auto& cb : reason_to_invoke)
207 {
208 cb(new_reason);
209 }
210 }
211 });
212
213 // Start timeout timer
214 start_timeout_timer(token_state_weak, deadline_point);
215
216 return token;
217 }
218
220 {
221 do_cancel(cancellation_reason::type::user_requested, "", std::nullopt);
222 }
223
224 auto enhanced_cancellation_token::cancel(const std::string& message) -> void
225 {
226 do_cancel(cancellation_reason::type::user_requested, message, std::nullopt);
227 }
228
229 auto enhanced_cancellation_token::cancel(std::exception_ptr ex) -> void
230 {
231 do_cancel(cancellation_reason::type::error, "", ex);
232 }
233
235 const std::string& message,
236 std::optional<std::exception_ptr> ex)
237 -> void
238 {
239 std::vector<callback_type> simple_to_invoke;
240 std::vector<callback_with_reason_type> reason_to_invoke;
241 cancellation_reason new_reason;
242
243 {
244 std::lock_guard<std::mutex> lock(state_->mutex);
245 bool was_cancelled =
246 state_->is_cancelled.exchange(true, std::memory_order_release);
247 if (!was_cancelled)
248 {
249 new_reason.reason_type = reason_type;
250 new_reason.message = message;
251 new_reason.cancel_time = std::chrono::steady_clock::now();
252 new_reason.exception = ex;
253 state_->reason = new_reason;
254
255 for (auto& [handle, cb] : state_->simple_callbacks)
256 {
257 simple_to_invoke.push_back(std::move(cb));
258 }
259 state_->simple_callbacks.clear();
260
261 for (auto& [handle, cb] : state_->reason_callbacks)
262 {
263 reason_to_invoke.push_back(std::move(cb));
264 }
265 state_->reason_callbacks.clear();
266
267 state_->timer_should_stop.store(true, std::memory_order_release);
268 }
269 }
270
271 state_->cv.notify_all();
272
273 for (const auto& cb : simple_to_invoke)
274 {
275 cb();
276 }
277 for (const auto& cb : reason_to_invoke)
278 {
279 cb(new_reason);
280 }
281 }
282
284 {
285 return state_->is_cancelled.load(std::memory_order_acquire);
286 }
287
289 {
290 return is_cancelled();
291 }
292
294 -> std::optional<cancellation_reason>
295 {
296 std::lock_guard<std::mutex> lock(state_->mutex);
297 return state_->reason;
298 }
299
300 auto enhanced_cancellation_token::check_cancelled() const -> common::VoidResult
301 {
302 if (is_cancelled())
303 {
304 auto reason = get_reason();
305 std::string msg = "Operation cancelled";
306 if (reason && !reason->message.empty())
307 {
308 msg += ": " + reason->message;
309 }
311 }
312 return common::ok();
313 }
314
316 {
317 return state_->has_deadline.load(std::memory_order_acquire);
318 }
319
321 -> std::chrono::milliseconds
322 {
323 if (!has_timeout())
324 {
325 return std::chrono::milliseconds::max();
326 }
327
328 auto now = std::chrono::steady_clock::now();
329 auto deadline_point = state_->deadline_point;
330
331 if (now >= deadline_point)
332 {
333 return std::chrono::milliseconds::zero();
334 }
335
336 return std::chrono::duration_cast<std::chrono::milliseconds>(deadline_point -
337 now);
338 }
339
341 -> std::chrono::steady_clock::time_point
342 {
343 return state_->deadline_point;
344 }
345
347 std::chrono::milliseconds additional) -> void
348 {
349 std::lock_guard<std::mutex> lock(state_->mutex);
350 if (state_->has_deadline.load(std::memory_order_acquire))
351 {
352 state_->deadline_point += additional;
353 }
354 }
355
358 {
359 std::unique_lock<std::mutex> lock(state_->mutex);
360
361 if (state_->is_cancelled.load(std::memory_order_acquire))
362 {
363 lock.unlock();
364 callback();
365 return 0;
366 }
367
368 callback_handle handle =
369 state_->next_handle.fetch_add(1, std::memory_order_relaxed);
370 state_->simple_callbacks[handle] = std::move(callback);
371 return handle;
372 }
373
376 {
377 std::unique_lock<std::mutex> lock(state_->mutex);
378
379 if (state_->is_cancelled.load(std::memory_order_acquire))
380 {
381 auto reason = state_->reason;
382 lock.unlock();
383 if (reason)
384 {
385 callback(*reason);
386 }
387 else
388 {
390 "Operation cancelled",
391 std::chrono::steady_clock::now(),
392 std::nullopt});
393 }
394 return 0;
395 }
396
397 callback_handle handle =
398 state_->next_handle.fetch_add(1, std::memory_order_relaxed);
399 state_->reason_callbacks[handle] = std::move(callback);
400 return handle;
401 }
402
404 -> void
405 {
406 if (handle == 0)
407 {
408 return;
409 }
410
411 std::lock_guard<std::mutex> lock(state_->mutex);
412 state_->simple_callbacks.erase(handle);
413 state_->reason_callbacks.erase(handle);
414 }
415
417 {
418 std::unique_lock<std::mutex> lock(state_->mutex);
419 state_->cv.wait(
420 lock, [this]
421 { return state_->is_cancelled.load(std::memory_order_acquire); });
422 }
423
424 auto enhanced_cancellation_token::wait_for(std::chrono::milliseconds timeout) const
425 -> bool
426 {
427 std::unique_lock<std::mutex> lock(state_->mutex);
428 return state_->cv.wait_for(
429 lock, timeout,
430 [this]
431 { return state_->is_cancelled.load(std::memory_order_acquire); });
432 }
433
435 std::chrono::steady_clock::time_point deadline_point) const -> bool
436 {
437 std::unique_lock<std::mutex> lock(state_->mutex);
438 return state_->cv.wait_until(
439 lock, deadline_point,
440 [this]
441 { return state_->is_cancelled.load(std::memory_order_acquire); });
442 }
443
445 std::weak_ptr<state> state_weak,
446 std::chrono::steady_clock::time_point deadline_point) -> void
447 {
448 std::thread timer_thread(
449 [state_weak, deadline_point]() mutable
450 {
451 auto s = state_weak.lock();
452 // Release the weak_ptr immediately after locking, regardless of
453 // lock success. This ensures ~weak_ptr() runs while the strong
454 // reference (s) is still alive, preventing a data race on the
455 // shared control block between this thread's operator delete
456 // (via __on_zero_shared_weak) and the token destructor's
457 // __release_shared read (ThreadSanitizer: data race).
458 state_weak.reset();
459 if (!s)
460 {
461 return;
462 }
463
464 s->timer_active.store(true, std::memory_order_release);
465
466 std::unique_lock<std::mutex> lock(s->mutex);
467
468 // Wait until deadline or cancellation
469 auto result = s->cv.wait_until(
470 lock, deadline_point,
471 [&s]
472 {
473 return s->is_cancelled.load(std::memory_order_acquire) ||
474 s->timer_should_stop.load(std::memory_order_acquire);
475 });
476
477 // If not cancelled and deadline reached, cancel with timeout reason
478 if (!result && !s->is_cancelled.load(std::memory_order_acquire))
479 {
480 std::vector<callback_type> simple_to_invoke;
481 std::vector<callback_with_reason_type> reason_to_invoke;
482 cancellation_reason new_reason;
483
484 bool was_cancelled =
485 s->is_cancelled.exchange(true, std::memory_order_release);
486 if (!was_cancelled)
487 {
489 new_reason.message = "Timeout expired";
490 new_reason.cancel_time = std::chrono::steady_clock::now();
491 s->reason = new_reason;
492
493 for (auto& [handle, cb] : s->simple_callbacks)
494 {
495 simple_to_invoke.push_back(std::move(cb));
496 }
497 s->simple_callbacks.clear();
498
499 for (auto& [handle, cb] : s->reason_callbacks)
500 {
501 reason_to_invoke.push_back(std::move(cb));
502 }
503 s->reason_callbacks.clear();
504 }
505
506 lock.unlock();
507 s->cv.notify_all();
508
509 for (const auto& cb : simple_to_invoke)
510 {
511 cb();
512 }
513 for (const auto& cb : reason_to_invoke)
514 {
515 cb(new_reason);
516 }
517 }
518 else
519 {
520 lock.unlock();
521 }
522
523 s->timer_active.store(false, std::memory_order_release);
524 });
525
526 timer_thread.detach();
527 }
528
529 // ========================================================================
530 // cancellation_callback_guard implementation
531 // ========================================================================
532
535 std::function<void()> callback)
536 : token_(&token), handle_(token.register_callback(std::move(callback)))
537 {
538 }
539
547
549 cancellation_callback_guard&& other) noexcept
550 : token_(other.token_), handle_(other.handle_)
551 {
552 other.token_ = nullptr;
553 other.handle_ = 0;
554 }
555
558 {
559 if (this != &other)
560 {
561 if (token_ && handle_ != 0)
562 {
563 token_->unregister_callback(handle_);
564 }
565 token_ = other.token_;
566 handle_ = other.handle_;
567 other.token_ = nullptr;
568 other.handle_ = 0;
569 }
570 return *this;
571 }
572
573 // ========================================================================
574 // cancellation_scope implementation
575 // ========================================================================
576
578 : token_(std::move(token))
579 {
580 }
581
583 {
584 return token_.is_cancelled();
585 }
586
587 auto cancellation_scope::check_cancelled() const -> common::VoidResult
588 {
589 return token_.check_cancelled();
590 }
591
593 {
594 return token_;
595 }
596
597 // ========================================================================
598 // cancellation_context implementation
599 // ========================================================================
600
601 namespace
602 {
603 thread_local std::vector<enhanced_cancellation_token> context_stack;
604 }
605
607 {
608 if (context_stack.empty())
609 {
611 }
612 return context_stack.back();
613 }
614
616 {
617 context_stack.push_back(std::move(token));
618 }
619
621 {
622 if (!context_stack.empty())
623 {
624 context_stack.pop_back();
625 }
626 }
627
629 : pushed_(true)
630 {
631 cancellation_context::push(std::move(token));
632 }
633
635 {
636 if (pushed_)
637 {
639 }
640 }
641
642} // namespace kcenon::thread
RAII guard for automatic callback unregistration.
~cancellation_callback_guard()
Destructor unregisters the callback.
enhanced_cancellation_token::callback_handle handle_
cancellation_callback_guard(enhanced_cancellation_token &token, std::function< void()> callback)
Constructs a guard and registers the callback.
auto operator=(const cancellation_callback_guard &) -> cancellation_callback_guard &=delete
guard(enhanced_cancellation_token token)
Constructs a guard and pushes the token.
static auto push(enhanced_cancellation_token token) -> void
Pushes a token to the thread-local stack.
static auto current() -> enhanced_cancellation_token
Gets the current thread's cancellation token.
static auto pop() -> void
Pops a token from the thread-local stack.
auto is_cancelled() const -> bool
Checks if the token is cancelled.
auto token() const -> const enhanced_cancellation_token &
Gets the underlying token.
auto check_cancelled() const -> common::VoidResult
Checks if the token is cancelled and returns an error result.
cancellation_scope(enhanced_cancellation_token token)
Constructs a scope with the given token.
Advanced cancellation token with timeout, deadline, and reason support.
std::function< void()> callback_type
Simple callback function type.
static auto create() -> enhanced_cancellation_token
Creates a new cancellation token.
static auto create_linked_with_timeout(const enhanced_cancellation_token &parent, std::chrono::milliseconds timeout) -> enhanced_cancellation_token
Creates a linked token with additional timeout.
enhanced_cancellation_token()
Default constructor creates a new token.
auto has_timeout() const -> bool
Checks if the token has a timeout or deadline.
auto do_cancel(cancellation_reason::type reason_type, const std::string &message, std::optional< std::exception_ptr > ex) -> void
auto wait_for(std::chrono::milliseconds timeout) const -> bool
Waits for cancellation with a timeout.
auto remaining_time() const -> std::chrono::milliseconds
Gets the remaining time before timeout/deadline.
std::function< void(const cancellation_reason &)> callback_with_reason_type
Callback function type with reason parameter.
static auto create_linked(std::initializer_list< enhanced_cancellation_token > tokens) -> enhanced_cancellation_token
Creates a token linked to parent tokens.
auto is_cancelled() const -> bool
Checks if the token has been cancelled.
auto wait_until(std::chrono::steady_clock::time_point deadline) const -> bool
Waits for cancellation until a deadline.
auto unregister_callback(callback_handle handle) -> void
Unregisters a previously registered callback.
static auto create_with_timeout(std::chrono::milliseconds timeout) -> enhanced_cancellation_token
Creates a token that auto-cancels after the specified timeout.
static auto create_with_deadline(std::chrono::steady_clock::time_point deadline) -> enhanced_cancellation_token
Creates a token that auto-cancels at the specified deadline.
auto extend_timeout(std::chrono::milliseconds additional) -> void
Extends the timeout by the specified duration.
auto deadline() const -> std::chrono::steady_clock::time_point
Gets the deadline time point.
std::size_t callback_handle
Callback handle type for registration management.
static auto start_timeout_timer(std::weak_ptr< state > state_weak, std::chrono::steady_clock::time_point deadline) -> void
auto wait() const -> void
Waits until the token is cancelled.
auto is_cancellation_requested() const -> bool
Checks if cancellation has been requested.
auto get_reason() const -> std::optional< cancellation_reason >
Gets the cancellation reason.
auto register_callback(callback_type callback) -> callback_handle
Registers a callback to be invoked on cancellation.
auto check_cancelled() const -> common::VoidResult
Checks if the token has been cancelled and returns an error result.
A template class representing either a value or an error.
Enhanced cancellation token with timeout, deadline, and reason support.
@ callback
Call user callback for custom decision.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
common::VoidResult make_error_result(error_code code, const std::string &message="")
Create a common::VoidResult error from a thread::error_code.
STL namespace.
Holds information about why a cancellation occurred.
std::chrono::steady_clock::time_point cancel_time
Time point when the cancellation occurred.
type reason_type
The type of cancellation that occurred.
std::optional< std::exception_ptr > exception
Optional exception that triggered the cancellation.
std::string message
Human-readable message describing the cancellation.
type
The type of cancellation that occurred.
@ parent_cancelled
Parent token was cancelled.
@ user_requested
Explicit cancel() call by user.
@ timeout
Timeout duration expired.
@ error
Cancellation triggered by an error.
std::unordered_map< callback_handle, callback_with_reason_type > reason_callbacks
std::unordered_map< callback_handle, callback_type > simple_callbacks