Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
typed_thread_pool_sample.cpp File Reference

Typed thread pool with priority-based job routing using built-in job_types. More...

#include <iostream>
#include <memory>
#include <chrono>
#include <thread>
#include "logger/core/logger.h"
#include <kcenon/thread/utils/formatter.h>
#include <kcenon/thread/impl/typed_pool/typed_thread_pool.h>
#include <format>
Include dependency graph for typed_thread_pool_sample.cpp:

Go to the source code of this file.

Functions

auto initialize_logger () -> std::optional< std::string >
 
auto create_default (const uint16_t &high_priority_workers, const uint16_t &normal_priority_workers, const uint16_t &low_priority_workers) -> std::tuple< std::shared_ptr< typed_thread_pool >, std::optional< std::string > >
 
auto store_job (std::shared_ptr< typed_thread_pool > thread_pool) -> std::optional< std::string >
 
auto main () -> int
 

Variables

bool use_backup_ = false
 
uint32_t max_lines_ = 0
 
uint16_t wait_interval_ = 100
 
uint32_t test_line_count_ = 1000000
 
log_module::log_types file_target_ = log_module::log_types::None
 
log_module::log_types console_target_ = log_module::log_types::Information
 
log_module::log_types callback_target_ = log_module::log_types::None
 
uint16_t high_priority_workers_ = 3
 
uint16_t normal_priority_workers_ = 2
 
uint16_t low_priority_workers_ = 1
 

Detailed Description

Typed thread pool with priority-based job routing using built-in job_types.

Definition in file typed_thread_pool_sample.cpp.

Function Documentation

◆ create_default()

auto create_default ( const uint16_t & high_priority_workers,
const uint16_t & normal_priority_workers,
const uint16_t & low_priority_workers ) -> std::tuple<std::shared_ptr<typed_thread_pool>, std::optional<std::string>>

Definition at line 64 of file typed_thread_pool_sample.cpp.

68{
69 std::shared_ptr<typed_thread_pool> pool;
70
71 try
72 {
73 pool = std::make_shared<typed_thread_pool>();
74 }
75 catch (const std::bad_alloc& e)
76 {
77 return { nullptr, std::string(e.what()) };
78 }
79
80 std::optional<std::string> error_message = std::nullopt;
81
82 std::vector<std::unique_ptr<typed_thread_worker>> workers;
83 workers.reserve(high_priority_workers + normal_priority_workers + low_priority_workers);
84 for (uint16_t i = 0; i < high_priority_workers; ++i)
85 {
86 workers.push_back(std::make_unique<typed_thread_worker>(
87 std::vector<job_types>{ job_types::RealTime }, "high priority worker"));
88 }
89
90 for (uint16_t i = 0; i < normal_priority_workers; ++i)
91 {
92 workers.push_back(std::make_unique<typed_thread_worker>(
93 std::vector<job_types>{ job_types::Batch }, "normal priority worker"));
94 }
95
96 for (uint16_t i = 0; i < low_priority_workers; ++i)
97 {
98 workers.push_back(std::make_unique<typed_thread_worker>(
99 std::vector<job_types>{ job_types::Background }, "low priority worker"));
100 }
101
102 auto enqueue_result = pool->enqueue_batch(std::move(workers));
103 if (enqueue_result.is_err())
104 {
105 return { nullptr, formatter::format("cannot enqueue to workers: {}",
106 enqueue_result.error().message) };
107 }
108
109 return { pool, std::nullopt };
110}
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132

References kcenon::thread::utils::formatter::format().

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ initialize_logger()

auto initialize_logger ( ) -> std::optional<std::string>

Definition at line 43 of file typed_thread_pool_sample.cpp.

44{
45 log_module::set_title("typed_thread_pool_sample");
46 log_module::set_use_backup(use_backup_);
47 log_module::set_max_lines(max_lines_);
48 log_module::file_target(file_target_);
49 log_module::console_target(console_target_);
50 log_module::callback_target(callback_target_);
51 // Note: This demonstrates the logger callback feature - std::cout is intentionally used here
52 log_module::message_callback(
53 [](const log_module::log_types& type, const std::string& datetime,
54 const std::string& message)
55 { std::cout << formatter::format("[{}][{}] {}\n", datetime, type, message); });
56 if (wait_interval_ > 0)
57 {
58 log_module::set_wake_interval(std::chrono::milliseconds(wait_interval_));
59 }
60
61 return log_module::start();
62}
log_module::log_types file_target_
log_module::log_types callback_target_
uint32_t max_lines_
log_module::log_types console_target_
uint16_t wait_interval_

References callback_target_, console_target_, file_target_, kcenon::thread::utils::formatter::format(), max_lines_, use_backup_, and wait_interval_.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

auto main ( ) -> int

Definition at line 143 of file typed_thread_pool_sample.cpp.

144{
145 auto error_message = initialize_logger();
146 if (error_message.has_value())
147 {
148 std::cerr << formatter::format("error starting logger: {}\n",
149 error_message.value_or("unknown error"));
150 return 0;
151 }
152
153 std::shared_ptr<typed_thread_pool> thread_pool = nullptr;
154 std::tie(thread_pool, error_message)
156 if (error_message.has_value())
157 {
158 log_module::write_error("error creating thread pool: {}",
159 error_message.value_or("unknown error"));
160
161 return 0;
162 }
163
164 log_module::write_information("created {}", thread_pool->to_string());
165
166 error_message = store_job(thread_pool);
167 if (error_message.has_value())
168 {
169 log_module::write_error("error storing job: {}", error_message.value_or("unknown error"));
170
171 thread_pool.reset();
172
173 return 0;
174 }
175
176 auto start_result = thread_pool->start();
177 if (start_result.is_err())
178 {
179 log_module::write_error("error starting thread pool: {}",
180 start_result.error().message);
181
182 thread_pool.reset();
183
184 return 0;
185 }
186
187 log_module::write_information("started {}", thread_pool->to_string());
188
189 {
190 auto stop_result = thread_pool->stop();
191 if (stop_result.is_err()) {
192 log_module::write_error("error stopping thread pool: {}", stop_result.error().message);
193 }
194 }
195
196 log_module::write_information("stopped {}", thread_pool->to_string());
197
198 thread_pool.reset();
199
200 log_module::stop();
201
202 return 0;
203}
A thread pool for concurrent execution of jobs using multiple worker threads.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
uint16_t high_priority_workers_
uint16_t low_priority_workers_
auto initialize_logger() -> std::optional< std::string >
auto create_default(const uint16_t &high_priority_workers, const uint16_t &normal_priority_workers, const uint16_t &low_priority_workers) -> std::tuple< std::shared_ptr< typed_thread_pool >, std::optional< std::string > >
auto store_job(std::shared_ptr< typed_thread_pool > thread_pool) -> std::optional< std::string >
uint16_t normal_priority_workers_

References create_default(), kcenon::thread::utils::formatter::format(), high_priority_workers_, initialize_logger(), low_priority_workers_, normal_priority_workers_, kcenon::thread::thread_pool::start(), kcenon::thread::thread_pool::stop(), store_job(), and kcenon::thread::thread_pool::to_string().

Here is the call graph for this function:

◆ store_job()

auto store_job ( std::shared_ptr< typed_thread_pool > thread_pool) -> std::optional<std::string>

Definition at line 112 of file typed_thread_pool_sample.cpp.

113{
114 int target = 0;
115
116 std::vector<std::unique_ptr<typed_job>> jobs;
117 jobs.reserve(test_line_count_);
118
119 for (auto index = 0; index < test_line_count_; ++index)
120 {
121 target = index % 3;
122 jobs.push_back(std::make_unique<callback_typed_job>(
123 [target](void) -> kcenon::common::VoidResult
124 {
125 log_module::write_debug("Hello, World!: {} priority", target);
126 return kcenon::common::ok();
127 },
128 static_cast<job_types>(target)));
129 }
130
131 auto enqueue_result = thread_pool->enqueue_batch(std::move(jobs));
132 if (enqueue_result.is_err())
133 {
134 return formatter::format("error enqueuing jobs: {}",
135 enqueue_result.error().message);
136 }
137
138 log_module::write_sequence("enqueued jobs: {}", test_line_count_);
139
140 return std::nullopt;
141}
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
job_types
Defines different types of jobs for a typed thread pool.
Definition job_types.h:33
uint32_t test_line_count_

References kcenon::thread::thread_pool::enqueue_batch(), kcenon::thread::utils::formatter::format(), and test_line_count_.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ callback_target_

log_module::log_types callback_target_ = log_module::log_types::None

Definition at line 37 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().

◆ console_target_

log_module::log_types console_target_ = log_module::log_types::Information

Definition at line 36 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().

◆ file_target_

log_module::log_types file_target_ = log_module::log_types::None

Definition at line 35 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().

◆ high_priority_workers_

uint16_t high_priority_workers_ = 3
Examples
typed_thread_pool_sample.cpp.

Definition at line 39 of file typed_thread_pool_sample.cpp.

Referenced by main().

◆ low_priority_workers_

uint16_t low_priority_workers_ = 1
Examples
typed_thread_pool_sample.cpp.

Definition at line 41 of file typed_thread_pool_sample.cpp.

Referenced by main().

◆ max_lines_

uint32_t max_lines_ = 0

Definition at line 32 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().

◆ normal_priority_workers_

uint16_t normal_priority_workers_ = 2
Examples
typed_thread_pool_sample.cpp.

Definition at line 40 of file typed_thread_pool_sample.cpp.

Referenced by main().

◆ test_line_count_

uint32_t test_line_count_ = 1000000

Definition at line 34 of file typed_thread_pool_sample.cpp.

Referenced by store_job().

◆ use_backup_

bool use_backup_ = false

Definition at line 31 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().

◆ wait_interval_

uint16_t wait_interval_ = 100

Definition at line 33 of file typed_thread_pool_sample.cpp.

Referenced by initialize_logger().