Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
thread_base.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2024, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
7
16namespace kcenon::thread
17{
27 thread_base::thread_base(const std::string& thread_title)
28 : wake_interval_(std::nullopt)
29 , lifecycle_()
30 , worker_thread_(nullptr)
31 , thread_title_(thread_title)
32 {
33 }
34
47
62 const std::optional<std::chrono::milliseconds>& wake_interval) -> void
63 {
64 // Use dedicated mutex for wake_interval to prevent data races
65 std::scoped_lock<std::mutex> lock(wake_interval_mutex_);
66 wake_interval_ = wake_interval;
67 }
68
84 -> std::optional<std::chrono::milliseconds>
85 {
86 // Thread-safe read of wake_interval
87 std::scoped_lock<std::mutex> lock(wake_interval_mutex_);
88 return wake_interval_;
89 }
90
110 auto thread_base::start(void) -> common::VoidResult
111 {
112 // Check if thread is already running using lifecycle_controller
113 if (lifecycle_.has_active_source())
114 {
115 return common::error_info{static_cast<int>(error_code::thread_already_running), "thread is already running", "thread_system"};
116 }
117
118 // Ensure clean state by stopping any existing thread first
119 stop();
120
121 // Initialize lifecycle controller for the new thread
122 lifecycle_.initialize_for_start();
123
124 try
125 {
126 // Create the worker thread using platform-appropriate thread type
127#ifdef USE_STD_JTHREAD
128 worker_thread_ = std::make_unique<std::jthread>(
129#else
130 worker_thread_ = std::make_unique<std::thread>(
131#endif
132 [this](void) // Capture 'this' to access member functions and variables
133 {
134 // Phase 1: Call derived class initialization hook
135 auto work_result = before_start();
136 if (work_result.is_err())
137 {
138 std::cerr << "error before start: " << work_result.error().message
139 << std::endl;
140 }
141
142 // Phase 2: Main work loop - continues until stop requested and no more work
143 while (!lifecycle_.is_stop_requested() || should_continue_work())
144 {
145 // Update thread state to indicate it's waiting for work
146 lifecycle_.set_state(thread_conditions::Waiting);
147
148 // Get current wake interval with thread-safe access
149 auto interval = get_wake_interval();
150
151 // Use lifecycle_controller for condition variable operations
152 auto lock = lifecycle_.acquire_lock();
153
154 // Wait strategy depends on whether wake interval is configured
155 if (interval.has_value())
156 {
157 // Timed wait: wake up after interval OR when condition is met
158 lifecycle_.wait_for(lock, interval.value(),
159 [this]() { return should_continue_work(); });
160 }
161 else
162 {
163 // Indefinite wait: only wake up when condition is met
164 lifecycle_.wait(lock,
165 [this]() { return should_continue_work(); });
166 }
167
168 // Check if we should exit the loop
169 if (lifecycle_.is_stop_requested() && !should_continue_work())
170 {
171 // Update state to indicate graceful shutdown in progress
172 lifecycle_.set_state(thread_conditions::Stopping);
173 break;
174 }
175
176 // Execute the actual work with exception protection
177 try
178 {
179 // Update state to indicate active work is being performed
180 lifecycle_.set_state(thread_conditions::Working);
181
182 // Call derived class work implementation
183 work_result = do_work();
184 if (work_result.is_err())
185 {
186 // Use structured logger instead of raw std::cerr
189 thread_title_,
190 "Work execution failed",
191 work_result.error().message);
192 }
193
194 // Reset consecutive failures counter on successful completion
195 consecutive_failures_.store(0, std::memory_order_relaxed);
196 }
197 catch (const std::exception& e)
198 {
199 // Track consecutive failures to prevent infinite error loops
200 int failures = consecutive_failures_.fetch_add(1, std::memory_order_relaxed) + 1;
201
204 thread_title_,
205 "Unhandled exception in worker thread (failure " + std::to_string(failures) + ")",
206 e.what());
207
208 // Stop thread if too many consecutive failures occur
209 if (failures >= max_consecutive_failures)
210 {
213 thread_title_,
214 "Too many consecutive failures, stopping thread",
215 "");
216 break; // Exit the main loop
217 }
218
219 // Exponential backoff: 100ms, 200ms, 400ms, ..., max 10 seconds
220 // This prevents CPU spinning and log flooding while giving time for transient issues to resolve
221 auto backoff_ms = std::min(100 * (1 << std::min(failures - 1, 10)), 10000);
222 std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
223 }
224 }
225
226 // Phase 3: Call derived class cleanup hook after main loop exits
227 work_result = after_stop();
228 if (work_result.is_err())
229 {
232 thread_title_,
233 "Error during cleanup",
234 work_result.error().message);
235 }
236 }); // End of lambda function passed to thread constructor
237 }
238 catch (const std::bad_alloc& e)
239 {
240 // Exception-safe cleanup: reset all resources if thread creation fails
241 lifecycle_.reset_stop_source();
242 worker_thread_.reset();
243
244 return common::error_info{static_cast<int>(error_code::resource_allocation_failed), e.what(), "thread_system"};
245 }
246
247 // Thread creation successful
248 return common::ok();
249 }
250
271 auto thread_base::stop(void) -> common::VoidResult
272 {
273 // Early exit if no thread to stop (idempotent behavior)
274 if (worker_thread_ == nullptr)
275 {
276 return common::error_info{static_cast<int>(error_code::thread_not_running), "thread is not running", "thread_system"};
277 }
278
279 // Only attempt to stop if thread is actually joinable
280 if (worker_thread_->joinable())
281 {
282 // Self-stop detection: prevent deadlock if thread tries to stop itself
283 // Calling join() from the same thread would cause deadlock
284 if (worker_thread_->get_id() == std::this_thread::get_id())
285 {
286 return common::error_info{static_cast<int>(error_code::invalid_argument),
287 "cannot stop thread from within itself - would cause deadlock", "thread_system"};
288 }
289
290 // Step 1: Signal the thread to stop via lifecycle_controller
291 lifecycle_.request_stop();
292
293 // Step 1.5: Call derived class hook for cancellation propagation
294 // This allows derived classes (e.g., thread_worker) to cancel running jobs
295 on_stop_requested();
296
297 // Step 2: Wake up the thread if it's waiting on condition variable
298 lifecycle_.notify_all();
299
300 // Step 3: Wait for the thread to complete its shutdown sequence
301 worker_thread_->join(); // Blocks until thread exits
302 }
303
304 // Step 4: Clean up thread resources
305 lifecycle_.reset_stop_source();
306 worker_thread_.reset(); // Release thread object
307
308 // Step 5: Update thread state to indicate complete shutdown
309 lifecycle_.set_stopped();
310
311 return common::ok();
312 }
313
324 auto thread_base::is_running(void) const -> bool
325 {
326 return lifecycle_.is_running();
327 }
328
340 auto thread_base::to_string(void) const -> std::string
341 {
342 return utility_module::formatter::format("{} is {}", thread_title_, lifecycle_.get_state());
343 }
344
355 auto thread_base::get_thread_id() const -> std::thread::id
356 {
357 if (worker_thread_ && worker_thread_->joinable())
358 {
359 return worker_thread_->get_id();
360 }
361 return std::thread::id{};
362 }
363} // namespace kcenon::thread
virtual ~thread_base(void)
Virtual destructor. Ensures proper cleanup of derived classes.
virtual auto to_string(void) const -> std::string
Returns a string representation of this thread_base object.
std::mutex wake_interval_mutex_
Mutex for synchronizing access to the wake_interval_ member.
auto stop(void) -> common::VoidResult
Requests the worker thread to stop and waits for it to finish.
thread_base(const thread_base &)=delete
auto get_thread_id() const -> std::thread::id
Gets the native thread ID of the worker thread.
std::unique_ptr< std::thread > worker_thread_
A std::thread for managing the worker thread's lifecycle (legacy mode).
auto is_running() const -> bool
Checks whether the worker thread is currently running.
std::optional< std::chrono::milliseconds > wake_interval_
Interval at which the thread is optionally awakened.
auto set_wake_interval(const std::optional< std::chrono::milliseconds > &wake_interval) -> void
Sets the interval at which the worker thread should wake up (if any).
auto get_wake_interval() const -> std::optional< std::chrono::milliseconds >
Gets the current wake interval setting.
auto start(void) -> common::VoidResult
Starts the worker thread.
static thread_logger & instance()
Get singleton instance.
void log(log_level level, std::string_view thread_name, std::string_view message, std::string_view context="")
Log a message with context.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:129
Core threading foundation of the thread system library.
Definition thread_impl.h:17
@ Waiting
Thread waiting for work or tasks.
@ Stopping
Thread in the process of stopping.
@ Working
Thread currently processing a task.
STL namespace.
Foundational worker thread class with lifecycle management.
Internal logging interface for the thread system.