Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
typed_thread_pool_sample_2.cpp

Shows how to use typed_thread_pool_t with a user-defined enum type (test_priority) instead of the built-in job_types. Workers handle Top, Middle, or Bottom priority lanes.

See also
typed_thread_pool_t, typed_thread_worker_t, callback_typed_job_t, test_priority
// BSD 3-Clause License
// Copyright (c) 2024, 🍀☀🌕🌥 🌊
// See the LICENSE file in the project root for full license information.
#include <iostream>
#include <memory>
#include <chrono>
#include <thread>
#include "logger/core/logger.h"
#include "test_type.h"
#include <format>
using namespace kcenon::thread;
bool use_backup_ = false;
uint32_t max_lines_ = 0;
uint16_t wait_interval_ = 100;
uint32_t test_line_count_ = 1000000;
log_module::log_types file_target_ = log_module::log_types::None;
log_module::log_types console_target_ = log_module::log_types::Information;
log_module::log_types callback_target_ = log_module::log_types::None;
uint16_t top_priority_workers_ = 3;
auto initialize_logger() -> std::optional<std::string>
{
log_module::set_title("typed_thread_pool_sample_2");
log_module::set_use_backup(use_backup_);
log_module::set_max_lines(max_lines_);
log_module::file_target(file_target_);
log_module::console_target(console_target_);
log_module::callback_target(callback_target_);
// Note: This demonstrates the logger callback feature - std::cout is intentionally used here
log_module::message_callback(
[](const log_module::log_types& type, const std::string& datetime,
const std::string& message)
{ std::cout << formatter::format("[{}][{}] {}\n", datetime, type, message); });
if (wait_interval_ > 0)
{
log_module::set_wake_interval(std::chrono::milliseconds(wait_interval_));
}
return log_module::start();
}
auto create_default(const uint16_t& top_priority_workers,
const uint16_t& middle_priority_workers,
const uint16_t& bottom_priority_workers)
-> std::tuple<std::shared_ptr<typed_thread_pool_t<test_priority>>,
std::optional<std::string>>
{
std::shared_ptr<typed_thread_pool_t<test_priority>> pool;
try
{
pool = std::make_shared<typed_thread_pool_t<test_priority>>();
}
catch (const std::bad_alloc& e)
{
return { nullptr, std::string(e.what()) };
}
std::optional<std::string> error_message = std::nullopt;
std::vector<std::unique_ptr<typed_thread_worker_t<test_priority>>> workers;
workers.reserve(top_priority_workers + middle_priority_workers + bottom_priority_workers);
for (uint16_t i = 0; i < top_priority_workers; ++i)
{
workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
std::vector<test_priority>{ test_priority::Top }, "top priority worker"));
}
for (uint16_t i = 0; i < middle_priority_workers; ++i)
{
workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
std::vector<test_priority>{ test_priority::Middle }, "middle priority worker"));
}
for (uint16_t i = 0; i < bottom_priority_workers; ++i)
{
workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
std::vector<test_priority>{ test_priority::Bottom }, "bottom priority worker"));
}
auto enqueue_result = pool->enqueue_batch(std::move(workers));
if (enqueue_result.is_err())
{
return { nullptr, formatter::format("cannot enqueue to workers: {}",
enqueue_result.error().message) };
}
return { pool, std::nullopt };
}
-> std::optional<std::string>
{
int target = 0;
std::vector<std::unique_ptr<typed_job_t<test_priority>>> jobs;
jobs.reserve(test_line_count_);
for (auto index = 0; index < test_line_count_; ++index)
{
target = index % 3;
jobs.push_back(std::make_unique<callback_typed_job_t<test_priority>>(
[target](void) -> kcenon::common::VoidResult
{
log_module::write_debug("Hello, World!: {} priority", target);
return kcenon::common::ok();
},
static_cast<test_priority>(target)));
}
auto enqueue_result = thread_pool->enqueue_batch(std::move(jobs));
if (enqueue_result.is_err())
{
return formatter::format("error enqueuing jobs: {}",
enqueue_result.error().message);
}
log_module::write_sequence("enqueued jobs: {}", test_line_count_);
return std::nullopt;
}
auto main() -> int
{
auto error_message = initialize_logger();
if (error_message.has_value())
{
std::cerr << formatter::format("error starting logger: {}\n",
error_message.value_or("unknown error"));
return 0;
}
std::shared_ptr<typed_thread_pool_t<test_priority>> thread_pool = nullptr;
std::tie(thread_pool, error_message)
if (error_message.has_value())
{
log_module::write_error("error creating thread pool: {}",
error_message.value_or("unknown error"));
return 0;
}
log_module::write_information("created {}", thread_pool->to_string());
error_message = store_job(thread_pool);
if (error_message.has_value())
{
log_module::write_error("error storing job: {}", error_message.value_or("unknown error"));
thread_pool.reset();
return 0;
}
auto start_result = thread_pool->start();
if (start_result.is_err())
{
log_module::write_error("error starting thread pool: {}",
start_result.error().message);
thread_pool.reset();
return 0;
}
log_module::write_information("started {}", thread_pool->to_string());
{
auto stop_result = thread_pool->stop();
if (stop_result.is_err()) {
log_module::write_error("error stopping thread pool: {}", stop_result.error().message);
}
}
log_module::write_information("stopped {}", thread_pool->to_string());
thread_pool.reset();
log_module::stop();
return 0;
}
Callback-based typed job template.
A thread pool for concurrent execution of jobs using multiple worker threads.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
Provides convenience methods for string formatting using C++20 <format>.
Definition formatter.h:122
Type-based thread pool with priority scheduling and job type routing.
Generic formatter for enum types using user-provided converter functors.
log_module::log_types file_target_
log_module::log_types callback_target_
uint32_t max_lines_
bool use_backup_
auto initialize_logger() -> std::optional< std::string >
uint32_t test_line_count_
log_module::log_types console_target_
uint16_t wait_interval_
Core threading foundation of the thread system library.
Definition thread_impl.h:17
test_priority
Enumeration of test priority levels.
Definition test_type.h:23
@ Bottom
Bottom priority.
@ Top
Top priority.
@ Middle
Middle priority.
auto store_job(std::shared_ptr< thread_pool > thread_pool) -> kcenon::common::VoidResult
auto create_default(const uint16_t &worker_counts) -> std::tuple< std::shared_ptr< thread_pool >, kcenon::common::VoidResult >
uint16_t top_priority_workers_
uint16_t bottom_priority_workers_
uint16_t middle_priority_workers_