50 kcenon::common::VoidResult
do_work()
override
52 std::cout << formatter::format(
"[{}] Starting job with {} iterations\n",
60 std::cout << formatter::format(
"[{}] Job cancelled at iteration {}/{}\n",
63 return kcenon::common::error_info{
64 error_code::operation_canceled,
65 formatter::format(
"Cancelled at iteration {}", i),
66 "job_cancellation_example"};
70 std::this_thread::sleep_for(std::chrono::milliseconds(100));
75 std::cout << formatter::format(
"[{}] Progress: {}/{}\n",
80 std::cout << formatter::format(
"[{}] Job completed successfully\n",
get_name());
81 return kcenon::common::ok();
109 std::cout << formatter::format(
"[{}] Starting non-cancellable job\n",
get_name());
114 std::this_thread::sleep_for(std::chrono::milliseconds(100));
118 std::cout << formatter::format(
"[{}] Progress: {}/{} (ignoring cancellation)\n",
123 std::cout << formatter::format(
"[{}] Job completed (never checked cancellation)\n",
125 return kcenon::common::ok();
139 std::cout <<
"\n========================================\n";
140 std::cout <<
"Demo 1: Basic Job Cancellation\n";
141 std::cout <<
"========================================\n\n";
144 auto pool = std::make_shared<thread_pool>(
"cancellation_demo_pool");
146 auto worker = std::make_unique<thread_worker>();
147 pool->enqueue(std::move(worker));
151 auto long_job = std::make_unique<cancellable_long_job>(
"long_task", 100);
152 pool->enqueue(std::move(long_job));
155 std::cout <<
"Letting job run for 2 seconds...\n";
156 std::this_thread::sleep_for(std::chrono::seconds(2));
159 std::cout <<
"\n>>> Calling pool->stop() <<<\n\n";
160 auto stop_start = std::chrono::steady_clock::now();
162 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
163 std::chrono::steady_clock::now() - stop_start);
165 std::cout <<
formatter::format(
"\nPool stopped in {}ms (job cooperated with cancellation)\n",
166 stop_duration.count());
176 std::cout <<
"\n========================================\n";
177 std::cout <<
"Demo 2: Non-Cooperative Job (Anti-Pattern)\n";
178 std::cout <<
"========================================\n\n";
180 auto pool = std::make_shared<thread_pool>(
"non_coop_pool");
182 auto worker = std::make_unique<thread_worker>();
183 pool->enqueue(std::move(worker));
187 auto stubborn_job = std::make_unique<non_cancellable_job>(
"stubborn_task", 50);
188 pool->enqueue(std::move(stubborn_job));
191 std::cout <<
"Letting job run for 1 second...\n";
192 std::this_thread::sleep_for(std::chrono::seconds(1));
195 std::cout <<
"\n>>> Calling pool->stop() <<<\n";
196 std::cout <<
"⚠️ Job is NOT checking cancellation token!\n";
197 std::cout <<
"Worker must wait for job to complete...\n\n";
199 auto stop_start = std::chrono::steady_clock::now();
201 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
202 std::chrono::steady_clock::now() - stop_start);
204 std::cout <<
formatter::format(
"\nPool stopped in {}ms (job did NOT cooperate)\n",
205 stop_duration.count());
206 std::cout <<
"Notice how much longer this took!\n";
216 std::cout <<
"\n========================================\n";
217 std::cout <<
"Demo 3: Pool-Level Multi-Worker Cancellation\n";
218 std::cout <<
"========================================\n\n";
220 auto pool = std::make_shared<thread_pool>(
"multi_worker_pool");
223 for (
int i = 0; i < 3; ++i)
225 auto worker = std::make_unique<thread_worker>();
226 pool->enqueue(std::move(worker));
231 for (
int i = 0; i < 3; ++i)
233 auto job = std::make_unique<cancellable_long_job>(
235 pool->enqueue(std::move(
job));
239 std::cout <<
"All workers running jobs...\n";
240 std::this_thread::sleep_for(std::chrono::seconds(2));
243 std::cout <<
"\n>>> Calling pool->stop() - cancelling ALL workers <<<\n\n";
244 auto stop_start = std::chrono::steady_clock::now();
246 auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
247 std::chrono::steady_clock::now() - stop_start);
249 std::cout <<
formatter::format(
"\nAll workers stopped in {}ms\n", stop_duration.count());
250 std::cout <<
"All jobs received cancellation signal simultaneously!\n";
258 std::cout <<
"\n========================================\n";
259 std::cout <<
"Demo 4: Immediate vs. Graceful Shutdown\n";
260 std::cout <<
"========================================\n\n";
264 std::cout <<
"--- Graceful Shutdown (immediately_stop = false) ---\n";
265 auto pool = std::make_shared<thread_pool>(
"graceful_pool");
266 auto worker = std::make_unique<thread_worker>();
267 pool->enqueue(std::move(worker));
271 for (
int i = 0; i < 5; ++i)
273 auto job = std::make_unique<cancellable_long_job>(
275 pool->enqueue(std::move(
job));
278 std::this_thread::sleep_for(std::chrono::milliseconds(500));
280 std::cout <<
"Stopping gracefully (pending jobs remain in queue)...\n";
282 std::cout <<
"Done\n\n";
287 std::cout <<
"--- Immediate Shutdown (immediately_stop = true) ---\n";
288 auto pool = std::make_shared<thread_pool>(
"immediate_pool");
289 auto worker = std::make_unique<thread_worker>();
290 pool->enqueue(std::move(worker));
294 for (
int i = 0; i < 5; ++i)
296 auto job = std::make_unique<cancellable_long_job>(
298 pool->enqueue(std::move(
job));
301 std::this_thread::sleep_for(std::chrono::milliseconds(500));
303 std::cout <<
"Stopping immediately (clearing pending jobs)...\n";
305 std::cout <<
"Done (pending jobs were cleared)\n\n";
311 std::cout <<
"╔═══════════════════════════════════════════════════════╗\n";
312 std::cout <<
"║ Thread System - Job Cancellation System Demo ║\n";
313 std::cout <<
"╚═══════════════════════════════════════════════════════╝\n";
318 std::this_thread::sleep_for(std::chrono::seconds(1));
321 std::this_thread::sleep_for(std::chrono::seconds(1));
324 std::this_thread::sleep_for(std::chrono::seconds(1));
328 std::cout <<
"\n========================================\n";
329 std::cout <<
"All demonstrations completed!\n";
330 std::cout <<
"========================================\n\n";
332 std::cout <<
"Key Takeaways:\n";
333 std::cout <<
"1. ✅ Jobs MUST check cancellation_token periodically\n";
334 std::cout <<
"2. ✅ Worker stop() propagates cancellation to running job\n";
335 std::cout <<
"3. ✅ Pool stop() cancels all workers simultaneously\n";
336 std::cout <<
"4. ⚠️ Non-cooperative jobs delay shutdown\n";
337 std::cout <<
"5. ✅ Immediate stop clears pending jobs from queue\n\n";
339 catch (
const std::exception& e)
341 std::cerr <<
"Error: " << e.what() << std::endl;
A job that performs a long-running task with periodic cancellation checks.
cancellable_long_job(const std::string &name, int iterations=100)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
bool is_cancelled() const
Checks if the token has been canceled.
Represents a unit of work (task) to be executed, typically by a job queue.
auto get_name(void) const -> std::string
Retrieves the name of this job.
cancellation_token cancellation_token_
The cancellation token associated with this job.
A job that DOES NOT check for cancellation (anti-pattern).
non_cancellable_job(const std::string &name, int iterations=50)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
Core thread pool implementation with work stealing and auto-scaling.
Base job class for schedulable work units in the thread system.
void demo_basic_cancellation()
Demonstrates basic job cancellation via worker stop.
void demo_immediate_vs_graceful()
Demonstrates immediate vs. graceful shutdown.
void demo_pool_level_cancellation()
Demonstrates pool-level cancellation with multiple workers.
void demo_non_cooperative_job()
Demonstrates what happens with non-cooperating jobs.
Core threading foundation of the thread system library.
Specialized worker thread that processes jobs from a job_queue.