Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
job_cancellation_example.cpp

Showcases four scenarios: basic cancellation via worker stop, non-cooperative job anti-pattern, pool-level multi-worker cancellation, and immediate versus graceful shutdown comparison.

See also
thread_pool, job, cancellation_token
/*****************************************************************************
BSD 3-Clause License
Copyright (c) 2024, 🍀☀🌕🌥 🌊
All rights reserved.
*****************************************************************************/
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
using namespace kcenon::thread;
class cancellable_long_job : public job
{
public:
cancellable_long_job(const std::string& name, int iterations = 100)
: job(name), iterations_(iterations)
{
}
kcenon::common::VoidResult do_work() override
{
std::cout << formatter::format("[{}] Starting job with {} iterations\n",
for (int i = 0; i < iterations_; ++i)
{
// ✅ BEST PRACTICE: Check cancellation periodically
{
std::cout << formatter::format("[{}] Job cancelled at iteration {}/{}\n",
return kcenon::common::error_info{
error_code::operation_canceled,
formatter::format("Cancelled at iteration {}", i),
"job_cancellation_example"};
}
// Simulate work (100ms per iteration)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Log progress every 10 iterations
if (i % 10 == 0)
{
std::cout << formatter::format("[{}] Progress: {}/{}\n",
}
}
std::cout << formatter::format("[{}] Job completed successfully\n", get_name());
return kcenon::common::ok();
}
private:
};
class non_cancellable_job : public job
{
public:
non_cancellable_job(const std::string& name, int iterations = 50)
: job(name), iterations_(iterations)
{
}
kcenon::common::VoidResult do_work() override
{
std::cout << formatter::format("[{}] Starting non-cancellable job\n", get_name());
for (int i = 0; i < iterations_; ++i)
{
// ❌ BAD PRACTICE: Never checking cancellation
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (i % 10 == 0)
{
std::cout << formatter::format("[{}] Progress: {}/{} (ignoring cancellation)\n",
}
}
std::cout << formatter::format("[{}] Job completed (never checked cancellation)\n",
return kcenon::common::ok();
}
private:
};
{
std::cout << "\n========================================\n";
std::cout << "Demo 1: Basic Job Cancellation\n";
std::cout << "========================================\n\n";
// Create pool with single worker
auto pool = std::make_shared<thread_pool>("cancellation_demo_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
// Submit a long-running cancellable job (10 seconds total)
auto long_job = std::make_unique<cancellable_long_job>("long_task", 100);
pool->enqueue(std::move(long_job));
// Let job run for 2 seconds
std::cout << "Letting job run for 2 seconds...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
// Stop the pool (triggers cancellation)
std::cout << "\n>>> Calling pool->stop() <<<\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nPool stopped in {}ms (job cooperated with cancellation)\n",
stop_duration.count());
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 2: Non-Cooperative Job (Anti-Pattern)\n";
std::cout << "========================================\n\n";
auto pool = std::make_shared<thread_pool>("non_coop_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
// Submit a job that doesn't check cancellation (5 seconds total)
auto stubborn_job = std::make_unique<non_cancellable_job>("stubborn_task", 50);
pool->enqueue(std::move(stubborn_job));
// Let job run for 1 second
std::cout << "Letting job run for 1 second...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
// Stop the pool
std::cout << "\n>>> Calling pool->stop() <<<\n";
std::cout << "⚠️ Job is NOT checking cancellation token!\n";
std::cout << "Worker must wait for job to complete...\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nPool stopped in {}ms (job did NOT cooperate)\n",
stop_duration.count());
std::cout << "Notice how much longer this took!\n";
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 3: Pool-Level Multi-Worker Cancellation\n";
std::cout << "========================================\n\n";
auto pool = std::make_shared<thread_pool>("multi_worker_pool");
// Add 3 workers
for (int i = 0; i < 3; ++i)
{
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
}
pool->start();
// Submit multiple jobs to different workers
for (int i = 0; i < 3; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("worker_{}_task", i), 100);
pool->enqueue(std::move(job));
}
// Let jobs run for 2 seconds
std::cout << "All workers running jobs...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
// Stop the entire pool
std::cout << "\n>>> Calling pool->stop() - cancelling ALL workers <<<\n\n";
auto stop_start = std::chrono::steady_clock::now();
pool->stop();
auto stop_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - stop_start);
std::cout << formatter::format("\nAll workers stopped in {}ms\n", stop_duration.count());
std::cout << "All jobs received cancellation signal simultaneously!\n";
}
{
std::cout << "\n========================================\n";
std::cout << "Demo 4: Immediate vs. Graceful Shutdown\n";
std::cout << "========================================\n\n";
// Test 1: Graceful shutdown
{
std::cout << "--- Graceful Shutdown (immediately_stop = false) ---\n";
auto pool = std::make_shared<thread_pool>("graceful_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
// Enqueue multiple jobs
for (int i = 0; i < 5; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("graceful_job_{}", i), 20);
pool->enqueue(std::move(job));
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Stopping gracefully (pending jobs remain in queue)...\n";
pool->stop(false); // Graceful shutdown
std::cout << "Done\n\n";
}
// Test 2: Immediate shutdown
{
std::cout << "--- Immediate Shutdown (immediately_stop = true) ---\n";
auto pool = std::make_shared<thread_pool>("immediate_pool");
auto worker = std::make_unique<thread_worker>();
pool->enqueue(std::move(worker));
pool->start();
// Enqueue multiple jobs
for (int i = 0; i < 5; ++i)
{
auto job = std::make_unique<cancellable_long_job>(
formatter::format("immediate_job_{}", i), 20);
pool->enqueue(std::move(job));
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Stopping immediately (clearing pending jobs)...\n";
pool->stop(true); // Immediate shutdown
std::cout << "Done (pending jobs were cleared)\n\n";
}
}
int main()
{
std::cout << "╔═══════════════════════════════════════════════════════╗\n";
std::cout << "║ Thread System - Job Cancellation System Demo ║\n";
std::cout << "╚═══════════════════════════════════════════════════════╝\n";
try
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "\n========================================\n";
std::cout << "All demonstrations completed!\n";
std::cout << "========================================\n\n";
std::cout << "Key Takeaways:\n";
std::cout << "1. ✅ Jobs MUST check cancellation_token periodically\n";
std::cout << "2. ✅ Worker stop() propagates cancellation to running job\n";
std::cout << "3. ✅ Pool stop() cancels all workers simultaneously\n";
std::cout << "4. ⚠️ Non-cooperative jobs delay shutdown\n";
std::cout << "5. ✅ Immediate stop clears pending jobs from queue\n\n";
}
catch (const std::exception& e)
{
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
A job that performs a long-running task with periodic cancellation checks.
cancellable_long_job(const std::string &name, int iterations=100)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
bool is_cancelled() const
Checks if the token has been canceled.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
auto get_name(void) const -> std::string
Retrieves the name of this job.
Definition job.cpp:112
cancellation_token cancellation_token_
The cancellation token associated with this job.
Definition job.h:504
job(const std::string &name="job")
Constructs a new job with an optional human-readable name.
Definition job.cpp:53
Provides convenience methods for string formatting using C++20 <format>.
Definition formatter.h:122
A job that DOES NOT check for cancellation (anti-pattern).
non_cancellable_job(const std::string &name, int iterations=50)
kcenon::common::VoidResult do_work() override
The core task execution method to be overridden by derived classes.
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Base job class for schedulable work units in the thread system.
void demo_basic_cancellation()
Demonstrates basic job cancellation via worker stop.
void demo_immediate_vs_graceful()
Demonstrates immediate vs. graceful shutdown.
void demo_pool_level_cancellation()
Demonstrates pool-level cancellation with multiple workers.
void demo_non_cooperative_job()
Demonstrates what happens with non-cooperating jobs.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Specialized worker thread that processes jobs from a job_queue.