Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
integration_example.cpp

Shows four patterns: thread pool with logger only, thread pool with monitoring only, complete integration with both services, and dynamic service registration at runtime. Uses mock implementations of ILogger and IMonitor.

See also
service_container, thread_context, ILogger, IMonitor
// BSD 3-Clause License
// Copyright (c) 2025, 🍀☀🌕🌥 🌊
// See the LICENSE file in the project root for full license information.
#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
// Thread system headers
// External logger headers (would be from installed package)
// #include <logger_system/logger.h>
// #include <logger_system/writers/console_writer.h>
// External monitoring headers (would be from installed package)
// #include <monitoring_system/monitoring.h>
// For this example, we'll create mock implementations
#include "mock_logger.h"
using namespace kcenon::thread;
using ILogger = kcenon::common::interfaces::ILogger;
using IMonitor = kcenon::common::interfaces::IMonitor;
std::cout << "\n=== Thread Pool with External Logger ===\n" << std::endl;
// 1. Create and configure external logger
auto logger = std::make_shared<mock_logger>();
logger->start();
// 2. Register logger in service container
service_container::global().register_singleton<ILogger>(logger);
// 3. Create thread pool - it will automatically use the logger
thread_context context; // Resolves from global container
auto pool = std::make_shared<thread_pool>("LoggedPool", context);
// 4. Add workers
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(std::make_unique<thread_worker>());
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
// 5. Start pool
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// 6. Submit jobs
for (int i = 0; i < 10; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[i, &context]() -> kcenon::common::VoidResult {
// Job logs through context
context.log(log_level_v2::info,
"Executing job " + std::to_string(i));
// Simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
// 7. Wait and stop
std::this_thread::sleep_for(std::chrono::seconds(1));
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
logger->stop();
// 8. Clear service container
service_container::global().clear();
}
std::cout << "\n=== Thread Pool with External Monitoring ===\n" << std::endl;
// 1. Create and configure external monitoring
auto monitor = std::make_shared<mock_monitoring>();
monitor->start();
// 2. Register monitoring in service container
service_container::global().register_singleton<IMonitor>(monitor);
// 3. Create thread pool with monitoring
thread_context context;
auto pool = std::make_shared<thread_pool>("MonitoredPool", context);
// 4. Add workers and start
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(std::make_unique<thread_worker>());
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// 5. Submit jobs and monitor
std::cout << "Submitting jobs and monitoring performance..." << std::endl;
for (int batch = 0; batch < 3; ++batch) {
// Submit batch of jobs
for (int i = 0; i < 20; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[&context]() -> kcenon::common::VoidResult {
// Simulate varying workload
std::this_thread::sleep_for(
std::chrono::milliseconds(10 + rand() % 40));
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
// Wait and check metrics
std::this_thread::sleep_for(std::chrono::milliseconds(500));
auto snapshot_result = monitor->get_metrics();
if (snapshot_result.is_ok()) {
const auto& snapshot = snapshot_result.value();
std::cout << "Batch " << batch + 1 << " metrics:" << std::endl;
std::cout << " Total metrics recorded: " << snapshot.metrics.size() << std::endl;
}
}
// 6. Stop and get final stats
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
monitor->stop();
auto stats = monitor->get_stats();
std::cout << "\nFinal monitoring stats:" << std::endl;
std::cout << " Total collections: " << stats.total_collections << std::endl;
service_container::global().clear();
}
std::cout << "\n=== Complete Integration Example ===\n" << std::endl;
// 1. Setup external services
auto logger = std::make_shared<mock_logger>();
auto monitor = std::make_shared<mock_monitoring>();
logger->start();
monitor->start();
// 2. Register both services
service_container::global().register_singleton<ILogger>(logger);
service_container::global().register_singleton<IMonitor>(monitor);
// 3. Create fully integrated thread pool
thread_context context;
auto pool = std::make_shared<thread_pool>("IntegratedPool", context);
// Log that we're starting
context.log(log_level_v2::info, "Starting integrated thread pool example");
// 4. Configure pool
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(std::make_unique<thread_worker>(true)); // Enable timing
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// 5. Run workload with full instrumentation
std::cout << "Running workload with logging and monitoring..." << std::endl;
auto workload_start = std::chrono::steady_clock::now();
for (int i = 0; i < 50; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[i, &context]() -> kcenon::common::VoidResult {
// Log job start
context.log(log_level_v2::debug,
"Job " + std::to_string(i) + " started");
// Simulate work
auto work_time = 20 + (i % 30);
std::this_thread::sleep_for(std::chrono::milliseconds(work_time));
// Simulate occasional warnings
if (i % 10 == 0) {
context.log(log_level_v2::warn,
"Job " + std::to_string(i) + " took longer than expected");
}
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
// 6. Monitor progress
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
auto snapshot_result = monitor->get_metrics();
if (snapshot_result.is_ok()) {
context.log(log_level_v2::info,
"Progress: " + std::to_string(snapshot_result.value().metrics.size()) +
" metrics recorded");
}
}
// 7. Wait for completion
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
auto workload_end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
workload_end - workload_start);
context.log(log_level_v2::info,
"Workload completed in " + std::to_string(duration.count()) + " ms");
// 8. Final metrics
auto final_result = monitor->get_metrics();
if (final_result.is_ok()) {
const auto& final_snapshot = final_result.value();
std::cout << "\nFinal metrics:" << std::endl;
std::cout << " Total metrics collected: " << final_snapshot.metrics.size() << std::endl;
}
// 9. Cleanup
logger->stop();
monitor->stop();
service_container::global().clear();
}
std::cout << "\n=== Dynamic Service Registration Example ===\n" << std::endl;
// Create thread pool without any services
auto pool = std::make_shared<thread_pool>("DynamicPool");
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 2; ++i) {
workers.push_back(std::make_unique<thread_worker>());
}
{
auto r = pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "enqueue_batch failed: " << r.error().message << std::endl;
return;
}
}
{
auto r = pool->start();
if (r.is_err()) {
std::cerr << "start failed: " << r.error().message << std::endl;
return;
}
}
// Submit jobs without logging
std::cout << "Running without services..." << std::endl;
for (int i = 0; i < 5; ++i) {
auto r = pool->enqueue(std::make_unique<callback_job>(
[]() -> kcenon::common::VoidResult {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return kcenon::common::ok();
}
));
if (r.is_err()) {
std::cerr << "enqueue failed: " << r.error().message << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Now add logger dynamically
std::cout << "\nAdding logger service dynamically..." << std::endl;
auto logger = std::make_shared<mock_logger>();
logger->start();
service_container::global().register_singleton<ILogger>(logger);
// Create new context that will pick up the logger
thread_context new_context;
// Submit more jobs with logging
for (int i = 5; i < 10; ++i) {
auto r2 = pool->enqueue(std::make_unique<callback_job>(
[i, &new_context]() -> kcenon::common::VoidResult {
new_context.log(log_level_v2::info,
"Job " + std::to_string(i) + " with dynamic logger");
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return kcenon::common::ok();
}
));
if (r2.is_err()) {
std::cerr << "enqueue failed: " << r2.error().message << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(600));
{
auto r = pool->stop();
if (r.is_err()) {
std::cerr << "stop failed: " << r.error().message << std::endl;
}
}
logger->stop();
service_container::global().clear();
}
int main() {
try {
std::cout << "=== Thread System Integration Examples ===" << std::endl;
std::cout << "Demonstrating integration with external logger and monitoring systems\n";
std::cout << "\n=== All integration examples completed successfully! ===" << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
Specialized job class that encapsulates user-defined callbacks.
Context object that provides access to optional services.
Core thread pool implementation with work stealing and auto-scaling.
kcenon::common::interfaces::ILogger ILogger
void dynamic_service_example()
Example 4: Dynamic service registration.
void thread_pool_with_monitoring_example()
Example 2: Thread pool with external monitoring only.
kcenon::common::interfaces::IMonitor IMonitor
void thread_pool_with_logger_example()
Example 1: Thread pool with external logger only.
void complete_integration_example()
Example 3: Complete integration with both logger and monitoring.
Logging severity levels for the thread system.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Service container for dependency injection within the thread system.
Context object providing access to optional thread system services.