Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
composition_example.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
17#include <iostream>
18#include <chrono>
19#include <thread>
20
23#include <kcenon/common/interfaces/logger_interface.h>
24#include <kcenon/common/interfaces/global_logger_registry.h>
25#include <kcenon/common/interfaces/monitoring_interface.h>
29
30// Use explicit namespace to avoid ambiguity with kcenon::thread::log_level
31// from thread_logger.h (included via thread_context.h)
32using namespace kcenon::thread;
33using common_log_level = kcenon::common::interfaces::log_level;
34
40class console_logger : public kcenon::common::interfaces::ILogger {
41public:
42 using VoidResult = kcenon::common::VoidResult;
43 using log_entry = kcenon::common::interfaces::log_entry;
44 using source_location = kcenon::common::source_location;
45
46 VoidResult log(common_log_level level, const std::string& message) override {
47 std::cout << "[" << level_to_string(level) << "] " << message << std::endl;
48 return VoidResult::ok({});
49 }
50
52 std::string_view message,
53 const source_location& loc = source_location::current()) override {
54 std::cout << "[" << level_to_string(level) << "] "
55 << loc.file_name() << ":" << loc.line() << " (" << loc.function_name() << ") - "
56 << message << std::endl;
57 return VoidResult::ok({});
58 }
59
60 VoidResult log(const log_entry& entry) override {
61 std::cout << "[" << level_to_string(entry.level) << "] "
62 << entry.file << ":" << entry.line << " (" << entry.function << ") - "
63 << entry.message << std::endl;
64 return VoidResult::ok({});
65 }
66
67 bool is_enabled(common_log_level /*level*/) const override {
68 return true; // Enable all levels for demo
69 }
70
72 min_level_ = level;
73 return VoidResult::ok({});
74 }
75
76 common_log_level get_level() const override {
77 return min_level_;
78 }
79
80 VoidResult flush() override {
81 std::cout.flush();
82 return VoidResult::ok({});
83 }
84
85private:
86 std::string level_to_string(common_log_level level) const {
87 return std::string(kcenon::common::interfaces::to_string(level));
88 }
89
90 common_log_level min_level_ = common_log_level::trace;
91};
92
98class console_monitoring : public kcenon::common::interfaces::IMonitor {
99public:
100 using VoidResult = kcenon::common::VoidResult;
101
102 VoidResult record_metric(const std::string& name, double value) override {
103 std::cout << "[MONITORING] " << name << ": " << value << std::endl;
104 snapshot_.add_metric(name, value);
105 return kcenon::common::ok();
106 }
107
109 const std::string& name,
110 double value,
111 const std::unordered_map<std::string, std::string>& tags) override {
112 std::cout << "[MONITORING] " << name << ": " << value;
113 if (!tags.empty()) {
114 std::cout << " {";
115 bool first = true;
116 for (const auto& [k, v] : tags) {
117 if (!first) std::cout << ", ";
118 std::cout << k << "=" << v;
119 first = false;
120 }
121 std::cout << "}";
122 }
123 std::cout << std::endl;
124
125 kcenon::common::interfaces::metric_value mv(name, value);
126 mv.tags = tags;
127 snapshot_.metrics.push_back(mv);
128 return kcenon::common::ok();
129 }
130
131 kcenon::common::Result<kcenon::common::interfaces::metrics_snapshot> get_metrics() override {
132 return kcenon::common::ok(snapshot_);
133 }
134
135 kcenon::common::Result<kcenon::common::interfaces::health_check_result> check_health() override {
136 kcenon::common::interfaces::health_check_result result;
137 result.status = kcenon::common::interfaces::health_status::healthy;
138 result.message = "Console monitoring active";
139 return kcenon::common::ok(result);
140 }
141
142 VoidResult reset() override {
143 snapshot_ = {};
144 return kcenon::common::ok();
145 }
146
147private:
148 kcenon::common::interfaces::metrics_snapshot snapshot_;
149};
150
155 std::cout << "\n=== Composition-Based Thread System Demo ===\n" << std::endl;
156
157 // 1. Setup service container with implementations
158 auto& container = service_container::global();
159
160 // Register logger service
161 container.register_singleton<kcenon::common::interfaces::ILogger>(
162 std::make_shared<console_logger>());
163
164 // Register monitoring service
165 container.register_singleton<kcenon::common::interfaces::IMonitor>(
166 std::make_shared<console_monitoring>());
167
168 // 2. Create thread pool with context from global container
169 thread_context context; // Will resolve services from container
170 auto pool = std::make_shared<thread_pool>("CompositionPool", context);
171
172 // 3. Add workers - they inherit context from pool
173 std::vector<std::unique_ptr<thread_worker>> workers;
174 for (int i = 0; i < 4; ++i) {
175 workers.push_back(std::make_unique<thread_worker>());
176 }
177 {
178 auto r = pool->enqueue_batch(std::move(workers));
179 if (r.is_err()) {
180 std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
181 return;
182 }
183 }
184
185 // 4. Start pool - will log through context
186 {
187 auto r = pool->start();
188 if (r.is_err()) {
189 std::cerr << "start failed: " << r.error().message << std::endl;
190 return;
191 }
192 }
193
194 // 5. Submit jobs that will be logged
195 for (int i = 0; i < 10; ++i) {
196 auto r = pool->enqueue(std::make_unique<callback_job>(
197 [i, &context]() -> kcenon::common::VoidResult {
198 context.log(log_level_v2::info,
199 "Processing job " + std::to_string(i));
200
201 // Simulate work
202 std::this_thread::sleep_for(std::chrono::milliseconds(100));
203
204 return kcenon::common::ok();
205 }
206 ));
207 if (r.is_err()) {
208 std::cerr << "enqueue failed: " << r.error().message << std::endl;
209 }
210 }
211
212 // 6. Wait for completion
213 std::this_thread::sleep_for(std::chrono::seconds(2));
214
215 // 7. Stop pool
216 {
217 auto r = pool->stop();
218 if (r.is_err()) {
219 std::cerr << "stop failed: " << r.error().message << std::endl;
220 }
221 }
222
223 std::cout << "\n=== Basic Thread Pool Demo Complete ===\n" << std::endl;
224}
225
226/*
227 * @brief Demonstrate typed thread pool with composition
228 * Temporarily disabled - requires typed_pool implementation
229 */
230/*
231void demonstrate_typed_pool_composition() {
232 std::cout << "\n=== Typed Thread Pool with Composition Demo ===\n" << std::endl;
233
234 // Use builder pattern for context
235 auto context = thread_context_builder()
236 .from_global_container()
237 .build();
238
239 // Create typed thread pool with priority support
240 auto pool = std::make_shared<typed_thread_pool_t<job_types>>("TypedPool", context);
241
242 // Add specialized workers
243 for (auto priority : {job_types::RealTime, job_types::Batch, job_types::Background}) {
244 auto worker = std::make_unique<typed_thread_worker_t<job_types>>();
245 // For typed workers, set the type they handle
246 // Note: The typed worker template includes the type in the template parameter
247 auto r = pool->enqueue(std::move(worker));
248 if (r.is_err()) {
249 std::cerr << "enqueue worker failed: " << r.error().message << std::endl;
250 }
251 }
252
253 {
254 auto r = pool->start();
255 if (r.is_err()) {
256 std::cerr << "typed pool start failed: " << r.error().message << std::endl;
257 return;
258 }
259 }
260
261 // Submit jobs with different priorities
262 for (int i = 0; i < 5; ++i) {
263 // Real-time job
264 auto r1 = pool->enqueue(std::make_unique<callback_typed_job_t<job_types>>(
265 [i, &context]() -> kcenon::common::VoidResult {
266 context.log(log_level::info,
267 "RealTime job " + std::to_string(i) + " executing");
268 return kcenon::common::ok();
269 },
270 job_types::RealTime
271 ));
272 if (r1.is_err()) {
273 std::cerr << "enqueue realtime job failed: " << r1.error().message << std::endl;
274 }
275
276 // Background job
277 auto r2 = pool->enqueue(std::make_unique<callback_typed_job_t<job_types>>(
278 [i, &context]() -> kcenon::common::VoidResult {
279 context.log(log_level::debug,
280 "Background job " + std::to_string(i) + " executing");
281 std::this_thread::sleep_for(std::chrono::milliseconds(50));
282 return kcenon::common::ok();
283 },
284 job_types::Background
285 ));
286 if (r2.is_err()) {
287 std::cerr << "enqueue background job failed: " << r2.error().message << std::endl;
288 }
289 }
290
291 std::this_thread::sleep_for(std::chrono::seconds(1));
292 {
293 auto r = pool->stop();
294 if (r.is_err()) {
295 std::cerr << "typed pool stop failed: " << r.error().message << std::endl;
296 }
297 }
298
299 std::cout << "\n=== Typed Thread Pool Demo Complete ===\n" << std::endl;
300}
301*/
302
307 std::cout << "\n=== Minimal Thread Pool (No Services) Demo ===\n" << std::endl;
308
309 // Clear any existing services
311
312 // Create pool without context - no logging or monitoring
313 auto pool = std::make_shared<thread_pool>("MinimalPool");
314
315 // Add workers
316 std::vector<std::unique_ptr<thread_worker>> workers;
317 for (int i = 0; i < 2; ++i) {
318 workers.push_back(std::make_unique<thread_worker>());
319 }
320 {
321 auto r = pool->enqueue_batch(std::move(workers));
322 if (r.is_err()) {
323 std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
324 return;
325 }
326 }
327
328 {
329 auto r = pool->start();
330 if (r.is_err()) {
331 std::cerr << "start failed: " << r.error().message << std::endl;
332 return;
333 }
334 }
335
336 // Submit jobs - no logging will occur
337 std::atomic<int> counter{0};
338 for (int i = 0; i < 5; ++i) {
339 auto r = pool->enqueue(std::make_unique<callback_job>(
340 [&counter]() -> kcenon::common::VoidResult {
341 counter.fetch_add(1);
342 return kcenon::common::ok();
343 }
344 ));
345 if (r.is_err()) {
346 std::cerr << "enqueue failed: " << r.error().message << std::endl;
347 }
348 }
349
350 std::this_thread::sleep_for(std::chrono::milliseconds(500));
351 {
352 auto r = pool->stop();
353 if (r.is_err()) {
354 std::cerr << "stop failed: " << r.error().message << std::endl;
355 }
356 }
357
358 std::cout << "Completed " << counter.load() << " jobs without any logging/monitoring" << std::endl;
359 std::cout << "\n=== Minimal Demo Complete ===\n" << std::endl;
360}
361
362int main() {
363 try {
364 // Show different usage patterns
367 // demonstrate_typed_pool_composition(); // Temporarily disabled - requires typed_pool implementation
368
369 // Clean up
371
372 std::cout << "\nAll demos completed successfully!" << std::endl;
373
374 } catch (const std::exception& e) {
375 std::cerr << "Error: " << e.what() << std::endl;
376 return 1;
377 }
378
379 return 0;
380}
Specialized job class that encapsulates user-defined callbacks.
Simple console logger implementation using common_system ILogger.
common_log_level min_level_
kcenon::common::source_location source_location
VoidResult log(const log_entry &entry) override
kcenon::common::VoidResult VoidResult
VoidResult flush() override
VoidResult log(common_log_level level, std::string_view message, const source_location &loc=source_location::current()) override
VoidResult log(common_log_level level, const std::string &message) override
bool is_enabled(common_log_level) const override
common_log_level get_level() const override
kcenon::common::interfaces::log_entry log_entry
VoidResult set_level(common_log_level level) override
std::string level_to_string(common_log_level level) const
Simple monitoring implementation using common::interfaces::IMonitor.
kcenon::common::interfaces::metrics_snapshot snapshot_
kcenon::common::Result< kcenon::common::interfaces::metrics_snapshot > get_metrics() override
kcenon::common::VoidResult VoidResult
VoidResult reset() override
VoidResult record_metric(const std::string &name, double value, const std::unordered_map< std::string, std::string > &tags) override
kcenon::common::Result< kcenon::common::interfaces::health_check_result > check_health() override
VoidResult record_metric(const std::string &name, double value) override
A template class representing either a value or an error.
void clear()
Clear all registered services.
static service_container & global()
Get the global service container instance.
Context object that provides access to optional services.
void demonstrate_composition()
Demonstrate composition-based design.
void demonstrate_minimal_usage()
Demonstrate using thread pool without any services.
int main()
kcenon::common::interfaces::log_level common_log_level
Core thread pool implementation with work stealing and auto-scaling.
Logging severity levels for the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Service container for dependency injection within the thread system.
Context object providing access to optional thread system services.