Shows how to run multiple named thread pools in one process, report metrics through the IMonitor interface, and use pool instance IDs for multi-process monitoring scenarios.
#include <kcenon/common/interfaces/monitoring_interface.h>
#include <chrono>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
public:
std::cout << formatter::format("{}: {}\n", name, value);
return kcenon::common::ok();
}
const std::string& name,
double value,
const std::unordered_map<std::string, std::string>& tags) override {
std::cout << formatter::format("{}: {}", name, value);
if (!tags.empty()) {
std::cout << " [";
bool first = true;
for (const auto& [k, v] : tags) {
if (!first) std::cout << ", ";
std::cout << k << "=" << v;
first = false;
}
std::cout << "]";
}
std::cout << "\n";
kcenon::common::interfaces::metric_value mv(name, value);
mv.tags = tags;
return kcenon::common::ok();
}
kcenon::common::Result<kcenon::common::interfaces::metrics_snapshot>
get_metrics()
override {
}
kcenon::common::Result<kcenon::common::interfaces::health_check_result>
check_health()
override {
kcenon::common::interfaces::health_check_result
result;
result.status = kcenon::common::interfaces::health_status::healthy;
result.message =
"Sample monitoring active";
return kcenon::common::ok(
result);
}
return kcenon::common::ok();
}
private:
kcenon::common::interfaces::metrics_snapshot
snapshot_;
};
std::cout << "=== Multi-Process Monitoring Integration Example ===\n\n";
auto monitoring = std::make_shared<sample_monitoring>();
auto primary_pool = std::make_shared<thread_pool>("primary_pool", context);
auto secondary_pool = std::make_shared<thread_pool>("secondary_pool", context);
std::cout << formatter::format("Primary pool instance ID: {}\n", primary_pool->get_pool_instance_id());
std::cout << formatter::format("Secondary pool instance ID: {}\n\n", secondary_pool->get_pool_instance_id());
{
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 3; ++i) workers.push_back(std::make_unique<thread_worker>());
auto r = primary_pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "Failed to add workers to primary_pool: " << r.error().message << "\n";
return 1;
}
}
{
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 2; ++i) workers.push_back(std::make_unique<thread_worker>());
auto r = secondary_pool->enqueue_batch(std::move(workers));
if (r.is_err()) {
std::cerr << "Failed to add workers to secondary_pool: " << r.error().message << "\n";
return 1;
}
}
auto start_primary = primary_pool->start();
if (start_primary.is_err()) {
std::cerr << "Failed to start primary_pool: " << start_primary.error().message << "\n";
return 1;
}
auto start_secondary = secondary_pool->start();
if (start_secondary.is_err()) {
std::cerr << "Failed to start secondary_pool: " << start_secondary.error().message << "\n";
return 1;
}
primary_pool->report_metrics();
secondary_pool->report_metrics();
std::cout << "\n--- Submitting jobs ---\n";
for (int i = 0; i < 10; ++i) {
auto job = std::make_unique<callback_job>(
[i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 10));
std::cout << formatter::format("Primary job {} completed\n", i);
return kcenon::common::ok();
},
formatter::format("primary_job_{}", i)
);
auto r = primary_pool->enqueue(std::move(
job));
if (r.is_err()) {
std::cerr << "enqueue to primary_pool failed: " << r.error().message << "\n";
}
}
for (int i = 0; i < 5; ++i) {
auto job = std::make_unique<callback_job>(
[i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << formatter::format("Secondary job {} completed\n", i);
return kcenon::common::ok();
},
formatter::format("secondary_job_{}", i)
);
auto r = secondary_pool->enqueue(std::move(
job));
if (r.is_err()) {
std::cerr << "enqueue to secondary_pool failed: " << r.error().message << "\n";
}
}
for (int i = 0; i < 3; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "\n--- Metrics Update ---\n";
primary_pool->report_metrics();
secondary_pool->report_metrics();
}
std::cout << "\n--- Stopping pools ---\n";
auto stop_primary = primary_pool->stop();
if (stop_primary.is_err()) {
std::cerr << "Error stopping primary_pool: " << stop_primary.error().message << "\n";
}
auto stop_secondary = secondary_pool->stop();
if (stop_secondary.is_err()) {
std::cerr << "Error stopping secondary_pool: " << stop_secondary.error().message << "\n";
}
std::cout << "\n--- Final Metrics ---\n";
primary_pool->report_metrics();
secondary_pool->report_metrics();
std::cout << "\n=== Example completed ===\n";
return 0;
}
Specialized job class that encapsulates user-defined callbacks.
Represents a unit of work (task) to be executed, typically by a job queue.
A template class representing either a value or an error.
Context object that provides access to optional services.
kcenon::common::interfaces::metrics_snapshot snapshot_
VoidResult reset() override
VoidResult record_metric(const std::string &name, double value) override
kcenon::common::VoidResult VoidResult
kcenon::common::Result< kcenon::common::interfaces::metrics_snapshot > get_metrics() override
kcenon::common::Result< kcenon::common::interfaces::health_check_result > check_health() override
Core thread pool implementation with work stealing and auto-scaling.
Core threading foundation of the thread system library.
Context object providing access to optional thread system services.
Specialized worker thread that processes jobs from a job_queue.