Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
typed_thread_pool_sample_2.cpp File Reference

Typed thread pool with a custom priority enum (test_priority) More...

#include <iostream>
#include <memory>
#include <chrono>
#include <thread>
#include "logger/core/logger.h"
#include <kcenon/thread/utils/formatter.h>
#include "test_type.h"
#include <kcenon/thread/impl/typed_pool/typed_thread_pool.h>
#include <format>
Include dependency graph for typed_thread_pool_sample_2.cpp:

Go to the source code of this file.

Functions

auto initialize_logger () -> std::optional< std::string >
 
auto create_default (const uint16_t &top_priority_workers, const uint16_t &middle_priority_workers, const uint16_t &bottom_priority_workers) -> std::tuple< std::shared_ptr< typed_thread_pool_t< test_priority > >, std::optional< std::string > >
 
auto store_job (std::shared_ptr< typed_thread_pool_t< test_priority > > thread_pool) -> std::optional< std::string >
 
auto main () -> int
 

Variables

bool use_backup_ = false
 
uint32_t max_lines_ = 0
 
uint16_t wait_interval_ = 100
 
uint32_t test_line_count_ = 1000000
 
log_module::log_types file_target_ = log_module::log_types::None
 
log_module::log_types console_target_ = log_module::log_types::Information
 
log_module::log_types callback_target_ = log_module::log_types::None
 
uint16_t top_priority_workers_ = 3
 
uint16_t middle_priority_workers_ = 2
 
uint16_t bottom_priority_workers_ = 1
 

Detailed Description

Typed thread pool with a custom priority enum (test_priority)

Definition in file typed_thread_pool_sample_2.cpp.

Function Documentation

◆ create_default()

auto create_default ( const uint16_t & top_priority_workers,
const uint16_t & middle_priority_workers,
const uint16_t & bottom_priority_workers ) -> std::tuple<std::shared_ptr<typed_thread_pool_t<test_priority>>, std::optional<std::string>>

Definition at line 65 of file typed_thread_pool_sample_2.cpp.

70{
71 std::shared_ptr<typed_thread_pool_t<test_priority>> pool;
72
73 try
74 {
75 pool = std::make_shared<typed_thread_pool_t<test_priority>>();
76 }
77 catch (const std::bad_alloc& e)
78 {
79 return { nullptr, std::string(e.what()) };
80 }
81
82 std::optional<std::string> error_message = std::nullopt;
83
84 std::vector<std::unique_ptr<typed_thread_worker_t<test_priority>>> workers;
85 workers.reserve(top_priority_workers + middle_priority_workers + bottom_priority_workers);
86
87 for (uint16_t i = 0; i < top_priority_workers; ++i)
88 {
89 workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
90 std::vector<test_priority>{ test_priority::Top }, "top priority worker"));
91 }
92 for (uint16_t i = 0; i < middle_priority_workers; ++i)
93 {
94 workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
95 std::vector<test_priority>{ test_priority::Middle }, "middle priority worker"));
96 }
97 for (uint16_t i = 0; i < bottom_priority_workers; ++i)
98 {
99 workers.push_back(std::make_unique<typed_thread_worker_t<test_priority>>(
100 std::vector<test_priority>{ test_priority::Bottom }, "bottom priority worker"));
101 }
102
103 auto enqueue_result = pool->enqueue_batch(std::move(workers));
104 if (enqueue_result.is_err())
105 {
106 return { nullptr, formatter::format("cannot enqueue to workers: {}",
107 enqueue_result.error().message) };
108 }
109
110 return { pool, std::nullopt };
111}
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
@ Bottom
Bottom priority.
@ Top
Top priority.
@ Middle
Middle priority.

References Bottom, kcenon::thread::utils::formatter::format(), Middle, and Top.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ initialize_logger()

auto initialize_logger ( ) -> std::optional<std::string>

Definition at line 44 of file typed_thread_pool_sample_2.cpp.

45{
46 log_module::set_title("typed_thread_pool_sample_2");
47 log_module::set_use_backup(use_backup_);
48 log_module::set_max_lines(max_lines_);
49 log_module::file_target(file_target_);
50 log_module::console_target(console_target_);
51 log_module::callback_target(callback_target_);
52 // Note: This demonstrates the logger callback feature - std::cout is intentionally used here
53 log_module::message_callback(
54 [](const log_module::log_types& type, const std::string& datetime,
55 const std::string& message)
56 { std::cout << formatter::format("[{}][{}] {}\n", datetime, type, message); });
57 if (wait_interval_ > 0)
58 {
59 log_module::set_wake_interval(std::chrono::milliseconds(wait_interval_));
60 }
61
62 return log_module::start();
63}
log_module::log_types file_target_
log_module::log_types callback_target_
uint32_t max_lines_
log_module::log_types console_target_
uint16_t wait_interval_

References callback_target_, console_target_, file_target_, kcenon::thread::utils::formatter::format(), max_lines_, use_backup_, and wait_interval_.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ main()

auto main ( ) -> int

Definition at line 145 of file typed_thread_pool_sample_2.cpp.

146{
147 auto error_message = initialize_logger();
148 if (error_message.has_value())
149 {
150 std::cerr << formatter::format("error starting logger: {}\n",
151 error_message.value_or("unknown error"));
152 return 0;
153 }
154
155 std::shared_ptr<typed_thread_pool_t<test_priority>> thread_pool = nullptr;
156 std::tie(thread_pool, error_message)
158 if (error_message.has_value())
159 {
160 log_module::write_error("error creating thread pool: {}",
161 error_message.value_or("unknown error"));
162
163 return 0;
164 }
165
166 log_module::write_information("created {}", thread_pool->to_string());
167
168 error_message = store_job(thread_pool);
169 if (error_message.has_value())
170 {
171 log_module::write_error("error storing job: {}", error_message.value_or("unknown error"));
172
173 thread_pool.reset();
174
175 return 0;
176 }
177
178 auto start_result = thread_pool->start();
179 if (start_result.is_err())
180 {
181 log_module::write_error("error starting thread pool: {}",
182 start_result.error().message);
183
184 thread_pool.reset();
185
186 return 0;
187 }
188
189 log_module::write_information("started {}", thread_pool->to_string());
190
191 {
192 auto stop_result = thread_pool->stop();
193 if (stop_result.is_err()) {
194 log_module::write_error("error stopping thread pool: {}", stop_result.error().message);
195 }
196 }
197
198 log_module::write_information("stopped {}", thread_pool->to_string());
199
200 thread_pool.reset();
201
202 log_module::stop();
203
204 return 0;
205}
A thread pool for concurrent execution of jobs using multiple worker threads.
auto to_string(void) const -> std::string
Provides a string representation of this thread_pool.
auto stop(const bool &immediately_stop=false) -> common::VoidResult
Stops the thread pool and all worker threads.
auto start(void) -> common::VoidResult
Starts the thread pool and all associated workers.
auto store_job(std::shared_ptr< typed_thread_pool_t< test_priority > > thread_pool) -> std::optional< std::string >
uint16_t top_priority_workers_
uint16_t bottom_priority_workers_
auto initialize_logger() -> std::optional< std::string >
auto create_default(const uint16_t &top_priority_workers, const uint16_t &middle_priority_workers, const uint16_t &bottom_priority_workers) -> std::tuple< std::shared_ptr< typed_thread_pool_t< test_priority > >, std::optional< std::string > >
uint16_t middle_priority_workers_

References bottom_priority_workers_, create_default(), kcenon::thread::utils::formatter::format(), initialize_logger(), middle_priority_workers_, kcenon::thread::thread_pool::start(), kcenon::thread::thread_pool::stop(), store_job(), kcenon::thread::thread_pool::to_string(), and top_priority_workers_.

Here is the call graph for this function:

◆ store_job()

auto store_job ( std::shared_ptr< typed_thread_pool_t< test_priority > > thread_pool) -> std::optional<std::string>

Definition at line 113 of file typed_thread_pool_sample_2.cpp.

115{
116 int target = 0;
117
118 std::vector<std::unique_ptr<typed_job_t<test_priority>>> jobs;
119 jobs.reserve(test_line_count_);
120
121 for (auto index = 0; index < test_line_count_; ++index)
122 {
123 target = index % 3;
124 jobs.push_back(std::make_unique<callback_typed_job_t<test_priority>>(
125 [target](void) -> kcenon::common::VoidResult
126 {
127 log_module::write_debug("Hello, World!: {} priority", target);
128 return kcenon::common::ok();
129 },
130 static_cast<test_priority>(target)));
131 }
132
133 auto enqueue_result = thread_pool->enqueue_batch(std::move(jobs));
134 if (enqueue_result.is_err())
135 {
136 return formatter::format("error enqueuing jobs: {}",
137 enqueue_result.error().message);
138 }
139
140 log_module::write_sequence("enqueued jobs: {}", test_line_count_);
141
142 return std::nullopt;
143}
Callback-based typed job template.
auto enqueue_batch(std::vector< std::unique_ptr< job > > &&jobs) -> common::VoidResult
Enqueues a batch of jobs into the shared job_queue.
test_priority
Enumeration of test priority levels.
Definition test_type.h:23
uint32_t test_line_count_

References kcenon::thread::thread_pool::enqueue_batch(), kcenon::thread::utils::formatter::format(), and test_line_count_.

Referenced by main().

Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ bottom_priority_workers_

uint16_t bottom_priority_workers_ = 1
Examples
typed_thread_pool_sample_2.cpp.

Definition at line 42 of file typed_thread_pool_sample_2.cpp.

Referenced by main().

◆ callback_target_

log_module::log_types callback_target_ = log_module::log_types::None

Definition at line 38 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().

◆ console_target_

log_module::log_types console_target_ = log_module::log_types::Information

Definition at line 37 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().

◆ file_target_

log_module::log_types file_target_ = log_module::log_types::None

Definition at line 36 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().

◆ max_lines_

uint32_t max_lines_ = 0

Definition at line 33 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().

◆ middle_priority_workers_

uint16_t middle_priority_workers_ = 2
Examples
typed_thread_pool_sample_2.cpp.

Definition at line 41 of file typed_thread_pool_sample_2.cpp.

Referenced by main().

◆ test_line_count_

uint32_t test_line_count_ = 1000000

Definition at line 35 of file typed_thread_pool_sample_2.cpp.

Referenced by store_job().

◆ top_priority_workers_

uint16_t top_priority_workers_ = 3
Examples
typed_thread_pool_sample_2.cpp.

Definition at line 40 of file typed_thread_pool_sample_2.cpp.

Referenced by main().

◆ use_backup_

bool use_backup_ = false

Definition at line 32 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().

◆ wait_interval_

uint16_t wait_interval_ = 100

Definition at line 34 of file typed_thread_pool_sample_2.cpp.

Referenced by initialize_logger().