Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
multi_process_monitoring_integration.cpp

Shows how to run multiple named thread pools in one process, report metrics through the IMonitor interface, and use pool instance IDs for multi-process monitoring scenarios.

See also
thread_pool, thread_context, IMonitor
/*****************************************************************************
BSD 3-Clause License
Copyright (c) 2025, 🍀☀🌕🌥 🌊
All rights reserved.
*****************************************************************************/
#include <kcenon/common/interfaces/monitoring_interface.h>
#include <chrono>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
using namespace kcenon::thread;
// Implementation of IMonitor for multi-process monitoring
class sample_monitoring : public kcenon::common::interfaces::IMonitor {
public:
using VoidResult = kcenon::common::VoidResult;
VoidResult record_metric(const std::string& name, double value) override {
std::cout << formatter::format("{}: {}\n", name, value);
snapshot_.add_metric(name, value);
return kcenon::common::ok();
}
const std::string& name,
double value,
const std::unordered_map<std::string, std::string>& tags) override {
std::cout << formatter::format("{}: {}", name, value);
if (!tags.empty()) {
std::cout << " [";
bool first = true;
for (const auto& [k, v] : tags) {
if (!first) std::cout << ", ";
std::cout << k << "=" << v;
first = false;
}
std::cout << "]";
}
std::cout << "\n";
kcenon::common::interfaces::metric_value mv(name, value);
mv.tags = tags;
snapshot_.metrics.push_back(mv);
return kcenon::common::ok();
}
kcenon::common::Result<kcenon::common::interfaces::metrics_snapshot> get_metrics() override {
return kcenon::common::ok(snapshot_);
}
kcenon::common::Result<kcenon::common::interfaces::health_check_result> check_health() override {
kcenon::common::interfaces::health_check_result result;
result.status = kcenon::common::interfaces::health_status::healthy;
result.message = "Sample monitoring active";
return kcenon::common::ok(result);
}
VoidResult reset() override {
snapshot_ = {};
return kcenon::common::ok();
}
private:
kcenon::common::interfaces::metrics_snapshot snapshot_;
};
int main() {
std::cout << "=== Multi-Process Monitoring Integration Example ===\n\n";
// Create monitoring instance
auto monitoring = std::make_shared<sample_monitoring>();
// Create thread context with monitoring
thread_context context(nullptr, monitoring);
// Create multiple thread pools with unique names
auto primary_pool = std::make_shared<thread_pool>("primary_pool", context);
auto secondary_pool = std::make_shared<thread_pool>("secondary_pool", context);
// Display pool instance IDs
std::cout << formatter::format("Primary pool instance ID: {}\n", primary_pool->get_pool_instance_id());
std::cout << formatter::format("Secondary pool instance ID: {}\n\n", secondary_pool->get_pool_instance_id());
// Add workers then start pools
{
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 3; ++i) workers.push_back(std::make_unique<thread_worker>());
auto r = primary_pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "Failed to add workers to primary_pool: " << r.error().message << "\n";
return 1;
}
}
{
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 2; ++i) workers.push_back(std::make_unique<thread_worker>());
auto r = secondary_pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "Failed to add workers to secondary_pool: " << r.error().message << "\n";
return 1;
}
}
auto start_primary = primary_pool->start();
if (start_primary.is_err()) {
std::cerr << "Failed to start primary_pool: " << start_primary.error().message << "\n";
return 1;
}
auto start_secondary = secondary_pool->start();
if (start_secondary.is_err()) {
std::cerr << "Failed to start secondary_pool: " << start_secondary.error().message << "\n";
return 1;
}
// Report initial metrics
primary_pool->report_metrics();
secondary_pool->report_metrics();
std::cout << "\n--- Submitting jobs ---\n";
// Submit jobs to primary pool
for (int i = 0; i < 10; ++i) {
auto job = std::make_unique<callback_job>(
[i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 10));
std::cout << formatter::format("Primary job {} completed\n", i);
return kcenon::common::ok();
},
formatter::format("primary_job_{}", i)
);
auto r = primary_pool->enqueue(std::move(job));
if (r.is_err()) {
std::cerr << "enqueue to primary_pool failed: " << r.error().message << "\n";
}
}
// Submit jobs to secondary pool
for (int i = 0; i < 5; ++i) {
auto job = std::make_unique<callback_job>(
[i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << formatter::format("Secondary job {} completed\n", i);
return kcenon::common::ok();
},
formatter::format("secondary_job_{}", i)
);
auto r = secondary_pool->enqueue(std::move(job));
if (r.is_err()) {
std::cerr << "enqueue to secondary_pool failed: " << r.error().message << "\n";
}
}
// Periodically report metrics while jobs are processing
for (int i = 0; i < 3; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "\n--- Metrics Update ---\n";
primary_pool->report_metrics();
secondary_pool->report_metrics();
}
// Stop pools
std::cout << "\n--- Stopping pools ---\n";
auto stop_primary = primary_pool->stop();
if (stop_primary.is_err()) {
std::cerr << "Error stopping primary_pool: " << stop_primary.error().message << "\n";
}
auto stop_secondary = secondary_pool->stop();
if (stop_secondary.is_err()) {
std::cerr << "Error stopping secondary_pool: " << stop_secondary.error().message << "\n";
}
// Final metrics
std::cout << "\n--- Final Metrics ---\n";
primary_pool->report_metrics();
secondary_pool->report_metrics();
std::cout << "\n=== Example completed ===\n";
return 0;
}
Specialized job class that encapsulates user-defined callbacks.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
Context object that provides access to optional services.
Provides convenience methods for string formatting using C++20 <format>.
Definition formatter.h:122
kcenon::common::interfaces::metrics_snapshot snapshot_
VoidResult record_metric(const std::string &name, double value) override
kcenon::common::VoidResult VoidResult
kcenon::common::Result< kcenon::common::interfaces::metrics_snapshot > get_metrics() override
kcenon::common::Result< kcenon::common::interfaces::health_check_result > check_health() override
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Context object providing access to optional thread system services.
Specialized worker thread that processes jobs from a job_queue.