Showcases four scenarios: basic cancellation via worker stop, non-cooperative job anti-pattern, pool-level multi-worker cancellation, and immediate versus graceful shutdown comparison.
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
{
public:
{
}
kcenon::common::VoidResult
do_work()
override
{
std::cout << formatter::format("[{}] Starting job with {} iterations\n",
{
{
std::cout << formatter::format("[{}] Job cancelled at iteration {}/{}\n",
return kcenon::common::error_info{
error_code::operation_canceled,
formatter::format("Cancelled at iteration {}", i),
"job_cancellation_example"};
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (i % 10 == 0)
{
std::cout << formatter::format("[{}] Progress: {}/{}\n",
}
}
std::cout << formatter::format(
"[{}] Job completed successfully\n",
get_name());
return kcenon::common::ok();
}
private:
};
{
public:
{
}
kcenon::common::VoidResult
do_work()
override
{
std::cout << formatter::format(
"[{}] Starting non-cancellable job\n",
get_name());
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (i % 10 == 0)
{
std::cout << formatter::format("[{}] Progress: {}/{} (ignoring cancellation)\n",
}
}
std::cout << formatter::format("[{}] Job completed (never checked cancellation)\n",
return kcenon::common::ok();
}
private:
};
{
std::cout << "\n========================================\n";
std::cout << "Demo 1: Basic Job Cancellation\n";
std::cout << "========================================\n\n";
auto pool = std::make_shared<thread_pool>("cancellation_demo_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
auto long_job = std::make_unique<cancellable_long_job>("long_task", 100);
pool->enqueue(std::move(long_job));
std::cout << "Letting job run for 2 seconds...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "\n>>> Calling pool->stop() <<<\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nPool stopped in {}ms (job cooperated with cancellation)\n",
stop_duration.count());
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 2: Non-Cooperative Job (Anti-Pattern)\n";
std::cout << "========================================\n\n";
auto pool = std::make_shared<thread_pool>("non_coop_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
auto stubborn_job = std::make_unique<non_cancellable_job>("stubborn_task", 50);
pool->enqueue(std::move(stubborn_job));
std::cout << "Letting job run for 1 second...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "\n>>> Calling pool->stop() <<<\n";
std::cout << "⚠️ Job is NOT checking cancellation token!\n";
std::cout << "Worker must wait for job to complete...\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nPool stopped in {}ms (job did NOT cooperate)\n",
stop_duration.count());
std::cout << "Notice how much longer this took!\n";
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 3: Pool-Level Multi-Worker Cancellation\n";
std::cout << "========================================\n\n";
auto pool = std::make_shared<thread_pool>("multi_worker_pool");
for (int i = 0; i < 3; ++i)
{
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
}
pool->start();
for (int i = 0; i < 3; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("worker_{}_task", i), 100);
pool->enqueue(std::move(
job));
}
std::cout << "All workers running jobs...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "\n>>> Calling pool->stop() - cancelling ALL workers <<<\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nAll workers stopped in {}ms\n", stop_duration.count());
std::cout << "All jobs received cancellation signal simultaneously!\n";
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 4: Immediate vs. Graceful Shutdown\n";
std::cout << "========================================\n\n";
{
std::cout << "--- Graceful Shutdown (immediately_stop = false) ---\n";
auto pool = std::make_shared<thread_pool>("graceful_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
for (int i = 0; i < 5; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("graceful_job_{}", i), 20);
pool->enqueue(std::move(
job));
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Stopping gracefully (pending jobs remain in queue)...\n";
pool->stop(false);
std::cout << "Done\n\n";
}
{
std::cout << "--- Immediate Shutdown (immediately_stop = true) ---\n";
auto pool = std::make_shared<thread_pool>("immediate_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
for (int i = 0; i < 5; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("immediate_job_{}", i), 20);
pool->enqueue(std::move(
job));
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Stopping immediately (clearing pending jobs)...\n";
pool->stop(true);
std::cout << "Done (pending jobs were cleared)\n\n";
}
}
{
std::cout << "╔═══════════════════════════════════════════════════════╗\n";
std::cout << "║ Thread System - Job Cancellation System Demo ║\n";
std::cout << "╚═══════════════════════════════════════════════════════╝\n";
try
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "\n========================================\n";
std::cout << "All demonstrations completed!\n";
std::cout << "========================================\n\n";
std::cout << "Key Takeaways:\n";
std::cout << "1. ✅ Jobs MUST check cancellation_token periodically\n";
std::cout << "2. ✅ Worker stop() propagates cancellation to running job\n";
std::cout << "3. ✅ Pool stop() cancels all workers simultaneously\n";
std::cout << "4. ⚠️ Non-cooperative jobs delay shutdown\n";
std::cout << "5. ✅ Immediate stop clears pending jobs from queue\n\n";
}
catch (const std::exception& e)
{
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
A job that performs a long-running task with periodic cancellation checks.
cancellable_long_job(const std::string &name, int iterations=100)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
bool is_cancelled() const
Checks if the token has been canceled.
Represents a unit of work (task) to be executed, typically by a job queue.
auto get_name(void) const -> std::string
Retrieves the name of this job.
cancellation_token cancellation_token_
The cancellation token associated with this job.
job(const std::string &name="job")
Constructs a new job with an optional human-readable name.
A job that DOES NOT check for cancellation (anti-pattern).
non_cancellable_job(const std::string &name, int iterations=50)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
Core thread pool implementation with work stealing and auto-scaling.
Base job class for schedulable work units in the thread system.
void demo_basic_cancellation()
Demonstrates basic job cancellation via worker stop.
void demo_immediate_vs_graceful()
Demonstrates immediate vs. graceful shutdown.
void demo_pool_level_cancellation()
Demonstrates pool-level cancellation with multiple workers.
void demo_non_cooperative_job()
Demonstrates what happens with non-cooperating jobs.
Core threading foundation of the thread system library.
Specialized worker thread that processes jobs from a job_queue.