16#if KCENON_HAS_COMMON_EXECUTOR
17 namespace common_ns = common;
20 template <
typename job_type>
22 const std::string& thread_title,
24 : thread_title_(thread_title)
31 template <
typename job_type>
34 if (start_pool_.load())
40 template <
typename job_type>
43 return this->shared_from_this();
46 template <
typename job_type>
49 bool expected =
false;
50 if (!start_pool_.compare_exchange_strong(expected,
true))
58 start_pool_.store(
false);
63 for (
auto& worker : workers_)
74 template <
typename job_type>
80 template <
typename job_type>
84 if (!start_pool_.load(std::memory_order_acquire))
89 return job_queue_->enqueue(std::move(work));
92 template <
typename job_type>
97 if (!start_pool_.load(std::memory_order_acquire))
102 return job_queue_->enqueue(std::move(
job));
105 template <
typename job_type>
110 if (!start_pool_.load(std::memory_order_acquire))
115 return job_queue_->enqueue_batch(std::move(jobs));
118 template <
typename job_type>
128 worker->set_aging_job_queue(job_queue_);
129 worker->set_context(context_);
134 bool is_running = start_pool_.load(std::memory_order_acquire);
136 workers_.push_back(std::move(worker));
141 auto start_result = workers_.back()->start();
142 if (start_result.is_err())
153 template <
typename job_type>
157 for (
auto& worker : workers)
159 auto result = enqueue(std::move(worker));
169 template <
typename job_type>
174 bool expected =
true;
175 if (!start_pool_.compare_exchange_strong(expected,
false,
176 std::memory_order_acq_rel,
177 std::memory_order_acquire))
197 for (
auto& worker : workers_)
209 template <
typename job_type>
212 std::ostringstream oss;
213 oss <<
"typed_thread_pool{"
214 <<
"title: " << thread_title_
215 <<
", started: " << start_pool_.load()
216 <<
", workers: " << workers_.size()
221 template <
typename job_type>
228 for (
auto& worker : workers_)
232 worker->set_aging_job_queue(job_queue_);
237 template <
typename job_type>
247 template <
typename job_type>
251 if (priority_aging_enabled_)
256 job_queue_->set_aging_config(std::move(config));
262 config.enabled =
true;
265 job_queue_->set_aging_config(config);
268 if (start_pool_.load())
270 job_queue_->start_aging();
274 priority_aging_enabled_ =
true;
277 template <
typename job_type>
280 if (!priority_aging_enabled_)
287 job_queue_->stop_aging();
290 priority_aging_enabled_ =
false;
293 template <
typename job_type>
296 return priority_aging_enabled_ && job_queue_ &&
297 job_queue_->is_aging_running();
300 template <
typename job_type>
305 return job_queue_->get_aging_stats();
310 template <
typename job_type>
314 if (!start_pool_.load(std::memory_order_acquire))
316 return common::error_info{
318 "Thread pool not started",
325 return common::error_info{
332 return job_queue_->enqueue(std::move(
job));
335#if KCENON_HAS_COMMON_EXECUTOR
340 template <
typename job_type>
343 auto promise = std::make_shared<std::promise<void>>();
344 auto future = promise->get_future();
346 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
347 [task = std::move(task), promise]()
mutable -> common::VoidResult {
350 promise->set_value();
352 promise->set_exception(std::current_exception());
359 auto enqueue_result = enqueue(std::move(job_ptr));
360 if (enqueue_result.is_err()) {
362 throw std::runtime_error(
"Failed to enqueue task: " +
363 enqueue_result.error().message);
365 promise->set_exception(std::current_exception());
372 template <
typename job_type>
373 std::future<void> typed_thread_pool_t<job_type>::submit_delayed(
374 std::function<
void()> task,
375 std::chrono::milliseconds
delay)
377 auto promise = std::make_shared<std::promise<void>>();
378 auto future = promise->get_future();
380 auto delayed_task = [task = std::move(task),
delay, promise]()
mutable -> common::VoidResult {
381 std::this_thread::sleep_for(
delay);
384 promise->set_value();
386 promise->set_exception(std::current_exception());
391 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
392 std::move(delayed_task),
396 auto enqueue_result = enqueue(std::move(job_ptr));
397 if (enqueue_result.is_err()) {
399 throw std::runtime_error(
"Failed to enqueue delayed task: " +
400 enqueue_result.error().message);
402 promise->set_exception(std::current_exception());
409 template <
typename job_type>
411 std::unique_ptr<common_ns::interfaces::IJob>&& common_job)
414 return common_ns::error_info{
421 auto promise = std::make_shared<std::promise<void>>();
422 auto future = promise->get_future();
425 auto shared_job = std::shared_ptr<common_ns::interfaces::IJob>(std::move(common_job));
426 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
427 [shared_job, promise]() -> common::VoidResult {
428 auto result = shared_job->execute();
429 if (result.is_ok()) {
430 promise->set_value();
433 throw std::runtime_error(
"Job execution failed: " + result.error().message);
435 promise->set_exception(std::current_exception());
443 auto enqueue_result = enqueue(std::move(job_ptr));
444 if (enqueue_result.is_err()) {
445 return common_ns::error_info{
446 enqueue_result.error().code,
447 enqueue_result.error().message,
448 enqueue_result.error().module
452 return common_ns::Result<std::future<void>>(std::move(future));
455 template <
typename job_type>
456 common_ns::Result<std::future<void>> typed_thread_pool_t<job_type>::execute_delayed(
457 std::unique_ptr<common_ns::interfaces::IJob>&& common_job,
458 std::chrono::milliseconds
delay)
461 return common_ns::error_info{
468 auto promise = std::make_shared<std::promise<void>>();
469 auto future = promise->get_future();
472 auto shared_job = std::shared_ptr<common_ns::interfaces::IJob>(std::move(common_job));
473 auto job_ptr = std::make_unique<callback_typed_job_t<job_type>>(
474 [shared_job,
delay, promise]() -> common::VoidResult {
475 std::this_thread::sleep_for(
delay);
476 auto result = shared_job->execute();
477 if (result.is_ok()) {
478 promise->set_value();
481 throw std::runtime_error(
"Job execution failed: " + result.error().message);
483 promise->set_exception(std::current_exception());
491 auto enqueue_result = enqueue(std::move(job_ptr));
492 if (enqueue_result.is_err()) {
493 return common_ns::error_info{
494 enqueue_result.error().code,
495 enqueue_result.error().message,
496 enqueue_result.error().module
500 return common_ns::Result<std::future<void>>(std::move(future));
503 template <
typename job_type>
504 size_t typed_thread_pool_t<job_type>::worker_count()
const
506 return workers_.size();
509 template <
typename job_type>
510 size_t typed_thread_pool_t<job_type>::pending_tasks()
const
512 return job_queue_ ? job_queue_->size() : 0;
515 template <
typename job_type>
516 bool typed_thread_pool_t<job_type>::is_running()
const
518 return start_pool_.load(std::memory_order_acquire);
521 template <
typename job_type>
524 stop(!wait_for_completion);
529 template class typed_thread_pool_t<job_types>;
Typed job with priority aging support to prevent starvation.
Priority queue with aging to prevent low-priority job starvation.
Callback-based typed job for priority-aware lambda execution.
A typed job queue with priority aging support, based on policy_queue.
A typed job with priority aging support.
A thread-safe job queue for managing and dispatching work items.
Represents a unit of work (task) to be executed, typically by a job queue.
A template class representing either a value or an error.
Context object that provides access to optional services.
Typed thread pool template.
typed_thread_pool_t(const std::string &thread_title="typed_thread_pool", const thread_context &context=thread_context())
Constructs a new typed_thread_pool_t instance.
auto get_job_queue(void) -> std::shared_ptr< aging_typed_job_queue_t< job_type > >
Retrieves the underlying priority job queue managed by this thread pool.
auto is_priority_aging_enabled() const -> bool
Checks if priority aging is enabled.
auto get_context(void) const -> const thread_context &
Gets the thread context for this pool.
auto get_ptr(void) -> std::shared_ptr< typed_thread_pool_t< job_type > >
Returns a shared pointer to the current typed_thread_pool_t.
auto execute(std::unique_ptr< job > &&work) -> common::VoidResult
auto stop(bool clear_queue=false) -> common::VoidResult
Stops the thread pool and optionally waits for currently running jobs to finish.
auto get_aging_stats() const -> aging_stats
Gets priority aging statistics.
auto shutdown() -> common::VoidResult
auto enqueue_batch(std::vector< std::unique_ptr< typed_job_t< job_type > > > &&jobs) -> common::VoidResult
Enqueues a batch of priority jobs into the thread pool's job queue.
auto enqueue(std::unique_ptr< typed_job_t< job_type > > &&job) -> common::VoidResult
Enqueues a priority job into the thread pool's job queue.
virtual ~typed_thread_pool_t(void)
Destroys the typed_thread_pool_t object.
auto set_job_queue(std::shared_ptr< aging_typed_job_queue_t< job_type > > job_queue) -> void
Sets the job queue for this thread pool and its workers.
auto enable_priority_aging(priority_aging_config config={}) -> void
Enables priority aging for this thread pool.
auto to_string(void) const -> std::string
Generates a string representation of the thread pool's internal state.
auto start(void) -> common::VoidResult
Starts the thread pool by creating worker threads and initializing internal structures.
auto disable_priority_aging() -> void
Disables priority aging.
Typed thread worker template.
@ delay
Delay processing (attempt later)
Type-based thread pool with priority scheduling and job type routing.
Worker thread for typed_thread_pool processing priority job queues.
std::shared_ptr< job_interface > job_ptr
Shared pointer type for job objects.
Core threading foundation of the thread system library.
Statistics about priority aging behavior.
Configuration for priority aging behavior.