Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
composition_example.cpp

Demonstrates three usage patterns: minimal thread pool without any services, full composition with ILogger and IMonitor registered in the global service_container, and dynamic service registration at runtime.

See also
service_container, thread_context, thread_pool, ILogger, IMonitor
// BSD 3-Clause License
// Copyright (c) 2025, 🍀☀🌕🌥 🌊
// See the LICENSE file in the project root for full license information.
#include <iostream>
#include <chrono>
#include <thread>
#include <kcenon/common/interfaces/logger_interface.h>
#include <kcenon/common/interfaces/global_logger_registry.h>
#include <kcenon/common/interfaces/monitoring_interface.h>
// Use explicit namespace to avoid ambiguity with kcenon::thread::log_level
// from thread_logger.h (included via thread_context.h)
using namespace kcenon::thread;
using common_log_level = kcenon::common::interfaces::log_level;
class console_logger : public kcenon::common::interfaces::ILogger {
public:
using VoidResult = kcenon::common::VoidResult;
using log_entry = kcenon::common::interfaces::log_entry;
using source_location = kcenon::common::source_location;
VoidResult log(common_log_level level, const std::string& message) override {
std::cout << "[" << level_to_string(level) << "] " << message << std::endl;
return VoidResult::ok({});
}
std::string_view message,
const source_location& loc = source_location::current()) override {
std::cout << "[" << level_to_string(level) << "] "
<< loc.file_name() << ":" << loc.line() << " (" << loc.function_name() << ") - "
<< message << std::endl;
return VoidResult::ok({});
}
VoidResult log(const log_entry& entry) override {
std::cout << "[" << level_to_string(entry.level) << "] "
<< entry.file << ":" << entry.line << " (" << entry.function << ") - "
<< entry.message << std::endl;
return VoidResult::ok({});
}
bool is_enabled(common_log_level /*level*/) const override {
return true; // Enable all levels for demo
}
min_level_ = level;
return VoidResult::ok({});
}
common_log_level get_level() const override {
return min_level_;
}
VoidResult flush() override {
std::cout.flush();
return VoidResult::ok({});
}
private:
std::string level_to_string(common_log_level level) const {
return std::string(kcenon::common::interfaces::to_string(level));
}
common_log_level min_level_ = common_log_level::trace;
};
class console_monitoring : public kcenon::common::interfaces::IMonitor {
public:
using VoidResult = kcenon::common::VoidResult;
VoidResult record_metric(const std::string& name, double value) override {
std::cout << "[MONITORING] " << name << ": " << value << std::endl;
snapshot_.add_metric(name, value);
return kcenon::common::ok();
}
const std::string& name,
double value,
const std::unordered_map<std::string, std::string>& tags) override {
std::cout << "[MONITORING] " << name << ": " << value;
if (!tags.empty()) {
std::cout << " {";
bool first = true;
for (const auto& [k, v] : tags) {
if (!first) std::cout << ", ";
std::cout << k << "=" << v;
first = false;
}
std::cout << "}";
}
std::cout << std::endl;
kcenon::common::interfaces::metric_value mv(name, value);
mv.tags = tags;
snapshot_.metrics.push_back(mv);
return kcenon::common::ok();
}
kcenon::common::Result<kcenon::common::interfaces::metrics_snapshot> get_metrics() override {
return kcenon::common::ok(snapshot_);
}
kcenon::common::Result<kcenon::common::interfaces::health_check_result> check_health() override {
kcenon::common::interfaces::health_check_result result;
result.status = kcenon::common::interfaces::health_status::healthy;
result.message = "Console monitoring active";
return kcenon::common::ok(result);
}
VoidResult reset() override {
snapshot_ = {};
return kcenon::common::ok();
}
private:
kcenon::common::interfaces::metrics_snapshot snapshot_;
};
std::cout << "\n=== Composition-Based Thread System Demo ===\n" << std::endl;
// 1. Setup service container with implementations
auto& container = service_container::global();
// Register logger service
container.register_singleton<kcenon::common::interfaces::ILogger>(
std::make_shared<console_logger>());
// Register monitoring service
container.register_singleton<kcenon::common::interfaces::IMonitor>(
std::make_shared<console_monitoring>());
// 2. Create thread pool with context from global container
thread_context context; // Will resolve services from container
auto pool = std::make_shared<thread_pool>("CompositionPool", context);
// 3. Add workers - they inherit context from pool
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(std::make_unique<thread_worker>());
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
// 4. Start pool - will log through context
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// 5. Submit jobs that will be logged
for (int i = 0; i < 10; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[i, &context]() -> kcenon::common::VoidResult {
context.log(log_level_v2::info,
"Processing job " + std::to_string(i));
// Simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
// 6. Wait for completion
std::this_thread::sleep_for(std::chrono::seconds(2));
// 7. Stop pool
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
std::cout << "\n=== Basic Thread Pool Demo Complete ===\n" << std::endl;
}
/*
* @brief Demonstrate typed thread pool with composition
* Temporarily disabled - requires typed_pool implementation
*/
/*
void demonstrate_typed_pool_composition() {
std::cout << "\n=== Typed Thread Pool with Composition Demo ===\n" << std::endl;
// Use builder pattern for context
auto context = thread_context_builder()
.from_global_container()
.build();
// Create typed thread pool with priority support
auto pool = std::make_shared<typed_thread_pool_t<job_types>>("TypedPool", context);
// Add specialized workers
for (auto priority : {job_types::RealTime, job_types::Batch, job_types::Background}) {
auto worker = std::make_unique<typed_thread_worker_t<job_types>>();
// For typed workers, set the type they handle
// Note: The typed worker template includes the type in the template parameter
auto r = pool->enqueue(std::move(worker));
if (r.is_err()) {
std::cerr << "enqueue worker failed: " << r.error().message << std::endl;
}
}
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "typed pool start failed: " << r.error().message << std::endl;
return;
}
}
// Submit jobs with different priorities
for (int i = 0; i < 5; ++i) {
// Real-time job
auto r1 = pool->enqueue(std::make_unique<callback_typed_job_t<job_types>>(
[i, &context]() -> kcenon::common::VoidResult {
context.log(log_level::info,
"RealTime job " + std::to_string(i) + " executing");
return kcenon::common::ok();
},
job_types::RealTime
));
if (r1.is_err()) {
std::cerr << "enqueue realtime job failed: " << r1.error().message << std::endl;
}
// Background job
auto r2 = pool->enqueue(std::make_unique<callback_typed_job_t<job_types>>(
[i, &context]() -> kcenon::common::VoidResult {
context.log(log_level::debug,
"Background job " + std::to_string(i) + " executing");
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return kcenon::common::ok();
},
job_types::Background
));
if (r2.is_err()) {
std::cerr << "enqueue background job failed: " << r2.error().message << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "typed pool stop failed: " << r.error().message << std::endl;
}
}
std::cout << "\n=== Typed Thread Pool Demo Complete ===\n" << std::endl;
}
*/
std::cout << "\n=== Minimal Thread Pool (No Services) Demo ===\n" << std::endl;
// Clear any existing services
service_container::global().clear();
// Create pool without context - no logging or monitoring
auto pool = std::make_shared<thread_pool>("MinimalPool");
// Add workers
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 2; ++i) {
workers.push_back(std::make_unique<thread_worker>());
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// Submit jobs - no logging will occur
std::atomic<int> counter{0};
for (int i = 0; i < 5; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[&counter]() -> kcenon::common::VoidResult {
counter.fetch_add(1);
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
std::cout << "Completed " << counter.load() << " jobs without any logging/monitoring" << std::endl;
std::cout << "\n=== Minimal Demo Complete ===\n" << std::endl;
}
int main() {
try {
// Show different usage patterns
// demonstrate_typed_pool_composition(); // Temporarily disabled - requires typed_pool implementation
// Clean up
service_container::global().clear();
std::cout << "\nAll demos completed successfully!" << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
Specialized job class that encapsulates user-defined callbacks.
Simple console logger implementation using common_system ILogger.
common_log_level min_level_
kcenon::common::source_location source_location
kcenon::common::VoidResult VoidResult
VoidResult flush() override
VoidResult log(common_log_level level, const std::string &message) override
bool is_enabled(common_log_level) const override
common_log_level get_level() const override
kcenon::common::interfaces::log_entry log_entry
VoidResult set_level(common_log_level level) override
std::string level_to_string(common_log_level level) const
Simple monitoring implementation using common::interfaces::IMonitor.
kcenon::common::interfaces::metrics_snapshot snapshot_
kcenon::common::Result< kcenon::common::interfaces::metrics_snapshot > get_metrics() override
kcenon::common::VoidResult VoidResult
VoidResult reset() override
kcenon::common::Result< kcenon::common::interfaces::health_check_result > check_health() override
VoidResult record_metric(const std::string &name, double value) override
A template class representing either a value or an error.
Context object that provides access to optional services.
void demonstrate_composition()
Demonstrate composition-based design.
void demonstrate_minimal_usage()
Demonstrate using thread pool without any services.
kcenon::common::interfaces::log_level common_log_level
Core thread pool implementation with work stealing and auto-scaling.
Logging severity levels for the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Service container for dependency injection within the thread system.
Context object providing access to optional thread system services.