Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job_cancellation_example.cpp
Go to the documentation of this file.
1/*****************************************************************************
2BSD 3-Clause License
3
4Copyright (c) 2024, 🍀☀🌕🌥 🌊
5All rights reserved.
6*****************************************************************************/
7
24
25#include <iostream>
26#include <chrono>
27#include <thread>
28#include <atomic>
29
30using namespace kcenon::thread;
32
43{
44public:
45 cancellable_long_job(const std::string& name, int iterations = 100)
46 : job(name), iterations_(iterations)
47 {
48 }
49
50 kcenon::common::VoidResult do_work() override
51 {
52 std::cout << formatter::format("[{}] Starting job with {} iterations\n",
54
55 for (int i = 0; i < iterations_; ++i)
56 {
57 // ✅ BEST PRACTICE: Check cancellation periodically
59 {
60 std::cout << formatter::format("[{}] Job cancelled at iteration {}/{}\n",
61 get_name(), i, iterations_);
62
63 return kcenon::common::error_info{
64 error_code::operation_canceled,
65 formatter::format("Cancelled at iteration {}", i),
66 "job_cancellation_example"};
67 }
68
69 // Simulate work (100ms per iteration)
70 std::this_thread::sleep_for(std::chrono::milliseconds(100));
71
72 // Log progress every 10 iterations
73 if (i % 10 == 0)
74 {
75 std::cout << formatter::format("[{}] Progress: {}/{}\n",
76 get_name(), i, iterations_);
77 }
78 }
79
80 std::cout << formatter::format("[{}] Job completed successfully\n", get_name());
81 return kcenon::common::ok();
82 }
83
84private:
86};
87
100{
101public:
102 non_cancellable_job(const std::string& name, int iterations = 50)
103 : job(name), iterations_(iterations)
104 {
105 }
106
107 kcenon::common::VoidResult do_work() override
108 {
109 std::cout << formatter::format("[{}] Starting non-cancellable job\n", get_name());
110
111 for (int i = 0; i < iterations_; ++i)
112 {
113 // ❌ BAD PRACTICE: Never checking cancellation
114 std::this_thread::sleep_for(std::chrono::milliseconds(100));
115
116 if (i % 10 == 0)
117 {
118 std::cout << formatter::format("[{}] Progress: {}/{} (ignoring cancellation)\n",
119 get_name(), i, iterations_);
120 }
121 }
122
123 std::cout << formatter::format("[{}] Job completed (never checked cancellation)\n",
124 get_name());
125 return kcenon::common::ok();
126 }
127
128private:
130};
131
138{
139 std::cout << "\n========================================\n";
140 std::cout << "Demo 1: Basic Job Cancellation\n";
141 std::cout << "========================================\n\n";
142
143 // Create pool with single worker
144 auto pool = std::make_shared<thread_pool>("cancellation_demo_pool");
145
146 auto worker = std::make_unique<thread_worker>();
147 pool->enqueue(std::move(worker));
148 pool->start();
149
150 // Submit a long-running cancellable job (10 seconds total)
151 auto long_job = std::make_unique<cancellable_long_job>("long_task", 100);
152 pool->enqueue(std::move(long_job));
153
154 // Let job run for 2 seconds
155 std::cout << "Letting job run for 2 seconds...\n";
156 std::this_thread::sleep_for(std::chrono::seconds(2));
157
158 // Stop the pool (triggers cancellation)
159 std::cout << "\n>>> Calling pool->stop() <<<\n\n";
160 auto stop_start = std::chrono::steady_clock::now();
161 pool->stop();
162 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
163 std::chrono::steady_clock::now() - stop_start);
164
165 std::cout << formatter::format("\nPool stopped in {}ms (job cooperated with cancellation)\n",
166 stop_duration.count());
167}
168
175{
176 std::cout << "\n========================================\n";
177 std::cout << "Demo 2: Non-Cooperative Job (Anti-Pattern)\n";
178 std::cout << "========================================\n\n";
179
180 auto pool = std::make_shared<thread_pool>("non_coop_pool");
181
182 auto worker = std::make_unique<thread_worker>();
183 pool->enqueue(std::move(worker));
184 pool->start();
185
186 // Submit a job that doesn't check cancellation (5 seconds total)
187 auto stubborn_job = std::make_unique<non_cancellable_job>("stubborn_task", 50);
188 pool->enqueue(std::move(stubborn_job));
189
190 // Let job run for 1 second
191 std::cout << "Letting job run for 1 second...\n";
192 std::this_thread::sleep_for(std::chrono::seconds(1));
193
194 // Stop the pool
195 std::cout << "\n>>> Calling pool->stop() <<<\n";
196 std::cout << "⚠️ Job is NOT checking cancellation token!\n";
197 std::cout << "Worker must wait for job to complete...\n\n";
198
199 auto stop_start = std::chrono::steady_clock::now();
200 pool->stop();
201 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
202 std::chrono::steady_clock::now() - stop_start);
203
204 std::cout << formatter::format("\nPool stopped in {}ms (job did NOT cooperate)\n",
205 stop_duration.count());
206 std::cout << "Notice how much longer this took!\n";
207}
208
215{
216 std::cout << "\n========================================\n";
217 std::cout << "Demo 3: Pool-Level Multi-Worker Cancellation\n";
218 std::cout << "========================================\n\n";
219
220 auto pool = std::make_shared<thread_pool>("multi_worker_pool");
221
222 // Add 3 workers
223 for (int i = 0; i < 3; ++i)
224 {
225 auto worker = std::make_unique<thread_worker>();
226 pool->enqueue(std::move(worker));
227 }
228 pool->start();
229
230 // Submit multiple jobs to different workers
231 for (int i = 0; i < 3; ++i)
232 {
233 auto job = std::make_unique<cancellable_long_job>(
234 formatter::format("worker_{}_task", i), 100);
235 pool->enqueue(std::move(job));
236 }
237
238 // Let jobs run for 2 seconds
239 std::cout << "All workers running jobs...\n";
240 std::this_thread::sleep_for(std::chrono::seconds(2));
241
242 // Stop the entire pool
243 std::cout << "\n>>> Calling pool->stop() - cancelling ALL workers <<<\n\n";
244 auto stop_start = std::chrono::steady_clock::now();
245 pool->stop();
246 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
247 std::chrono::steady_clock::now() - stop_start);
248
249 std::cout << formatter::format("\nAll workers stopped in {}ms\n", stop_duration.count());
250 std::cout << "All jobs received cancellation signal simultaneously!\n";
251}
252
257{
258 std::cout << "\n========================================\n";
259 std::cout << "Demo 4: Immediate vs. Graceful Shutdown\n";
260 std::cout << "========================================\n\n";
261
262 // Test 1: Graceful shutdown
263 {
264 std::cout << "--- Graceful Shutdown (immediately_stop = false) ---\n";
265 auto pool = std::make_shared<thread_pool>("graceful_pool");
266 auto worker = std::make_unique<thread_worker>();
267 pool->enqueue(std::move(worker));
268 pool->start();
269
270 // Enqueue multiple jobs
271 for (int i = 0; i < 5; ++i)
272 {
273 auto job = std::make_unique<cancellable_long_job>(
274 formatter::format("graceful_job_{}", i), 20);
275 pool->enqueue(std::move(job));
276 }
277
278 std::this_thread::sleep_for(std::chrono::milliseconds(500));
279
280 std::cout << "Stopping gracefully (pending jobs remain in queue)...\n";
281 pool->stop(false); // Graceful shutdown
282 std::cout << "Done\n\n";
283 }
284
285 // Test 2: Immediate shutdown
286 {
287 std::cout << "--- Immediate Shutdown (immediately_stop = true) ---\n";
288 auto pool = std::make_shared<thread_pool>("immediate_pool");
289 auto worker = std::make_unique<thread_worker>();
290 pool->enqueue(std::move(worker));
291 pool->start();
292
293 // Enqueue multiple jobs
294 for (int i = 0; i < 5; ++i)
295 {
296 auto job = std::make_unique<cancellable_long_job>(
297 formatter::format("immediate_job_{}", i), 20);
298 pool->enqueue(std::move(job));
299 }
300
301 std::this_thread::sleep_for(std::chrono::milliseconds(500));
302
303 std::cout << "Stopping immediately (clearing pending jobs)...\n";
304 pool->stop(true); // Immediate shutdown
305 std::cout << "Done (pending jobs were cleared)\n\n";
306 }
307}
308
309int main()
310{
311 std::cout << "╔═══════════════════════════════════════════════════════╗\n";
312 std::cout << "║ Thread System - Job Cancellation System Demo ║\n";
313 std::cout << "╚═══════════════════════════════════════════════════════╝\n";
314
315 try
316 {
318 std::this_thread::sleep_for(std::chrono::seconds(1));
319
321 std::this_thread::sleep_for(std::chrono::seconds(1));
322
324 std::this_thread::sleep_for(std::chrono::seconds(1));
325
327
328 std::cout << "\n========================================\n";
329 std::cout << "All demonstrations completed!\n";
330 std::cout << "========================================\n\n";
331
332 std::cout << "Key Takeaways:\n";
333 std::cout << "1. ✅ Jobs MUST check cancellation_token periodically\n";
334 std::cout << "2. ✅ Worker stop() propagates cancellation to running job\n";
335 std::cout << "3. ✅ Pool stop() cancels all workers simultaneously\n";
336 std::cout << "4. ⚠️ Non-cooperative jobs delay shutdown\n";
337 std::cout << "5. ✅ Immediate stop clears pending jobs from queue\n\n";
338 }
339 catch (const std::exception& e)
340 {
341 std::cerr << "Error: " << e.what() << std::endl;
342 return 1;
343 }
344
345 return 0;
346}
A job that performs a long-running task with periodic cancellation checks.
cancellable_long_job(const std::string &name, int iterations=100)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
bool is_cancelled() const
Checks if the token has been canceled.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
auto get_name(void) const -> std::string
Retrieves the name of this job.
Definition job.cpp:112
cancellation_token cancellation_token_
The cancellation token associated with this job.
Definition job.h:504
Provides convenience methods for string formatting using C++20 <format>.
Definition formatter.h:122
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
A job that DOES NOT check for cancellation (anti-pattern).
non_cancellable_job(const std::string &name, int iterations=50)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Base job class for schedulable work units in the thread system.
void demo_basic_cancellation()
Demonstrates basic job cancellation via worker stop.
void demo_immediate_vs_graceful()
Demonstrates immediate vs. graceful shutdown.
void demo_pool_level_cancellation()
Demonstrates pool-level cancellation with multiple workers.
void demo_non_cooperative_job()
Demonstrates what happens with non-cooperating jobs.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Specialized worker thread that processes jobs from a job_queue.