Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
multi_process_monitoring_integration.cpp
Go to the documentation of this file.
1/*****************************************************************************
2BSD 3-Clause License
3
4Copyright (c) 2025, 🍀☀🌕🌥 🌊
5All rights reserved.
6*****************************************************************************/
7
21#include <kcenon/common/interfaces/monitoring_interface.h>
26
27#include <chrono>
28#include <thread>
29#include <iostream>
30#include <vector>
31#include <random>
32
33using namespace kcenon::thread;
35
36// Implementation of IMonitor for multi-process monitoring
37class sample_monitoring : public kcenon::common::interfaces::IMonitor {
38public:
39 using VoidResult = kcenon::common::VoidResult;
40
41 VoidResult record_metric(const std::string& name, double value) override {
42 std::cout << formatter::format("{}: {}\n", name, value);
43 snapshot_.add_metric(name, value);
44 return kcenon::common::ok();
45 }
46
48 const std::string& name,
49 double value,
50 const std::unordered_map<std::string, std::string>& tags) override {
51 std::cout << formatter::format("{}: {}", name, value);
52 if (!tags.empty()) {
53 std::cout << " [";
54 bool first = true;
55 for (const auto& [k, v] : tags) {
56 if (!first) std::cout << ", ";
57 std::cout << k << "=" << v;
58 first = false;
59 }
60 std::cout << "]";
61 }
62 std::cout << "\n";
63
64 kcenon::common::interfaces::metric_value mv(name, value);
65 mv.tags = tags;
66 snapshot_.metrics.push_back(mv);
67 return kcenon::common::ok();
68 }
69
70 kcenon::common::Result<kcenon::common::interfaces::metrics_snapshot> get_metrics() override {
71 return kcenon::common::ok(snapshot_);
72 }
73
74 kcenon::common::Result<kcenon::common::interfaces::health_check_result> check_health() override {
75 kcenon::common::interfaces::health_check_result result;
76 result.status = kcenon::common::interfaces::health_status::healthy;
77 result.message = "Sample monitoring active";
78 return kcenon::common::ok(result);
79 }
80
81 VoidResult reset() override {
82 snapshot_ = {};
83 return kcenon::common::ok();
84 }
85
86private:
87 kcenon::common::interfaces::metrics_snapshot snapshot_;
88};
89
90int main() {
91 std::cout << "=== Multi-Process Monitoring Integration Example ===\n\n";
92
93 // Create monitoring instance
94 auto monitoring = std::make_shared<sample_monitoring>();
95
96 // Create thread context with monitoring
97 thread_context context(nullptr, monitoring);
98
99 // Create multiple thread pools with unique names
100 auto primary_pool = std::make_shared<thread_pool>("primary_pool", context);
101 auto secondary_pool = std::make_shared<thread_pool>("secondary_pool", context);
102
103 // Display pool instance IDs
104 std::cout << formatter::format("Primary pool instance ID: {}\n", primary_pool->get_pool_instance_id());
105 std::cout << formatter::format("Secondary pool instance ID: {}\n\n", secondary_pool->get_pool_instance_id());
106
107 // Add workers then start pools
108 {
109 std::vector<std::unique_ptr<thread_worker>> workers;
110 for (int i = 0; i < 3; ++i) workers.push_back(std::make_unique<thread_worker>());
111 auto r = primary_pool->enqueue_batch(std::move(workers));
112 if (r.is_err()) {
113 std::cerr << "Failed to add workers to primary_pool: " << r.error().message << "\n";
114 return 1;
115 }
116 }
117 {
118 std::vector<std::unique_ptr<thread_worker>> workers;
119 for (int i = 0; i < 2; ++i) workers.push_back(std::make_unique<thread_worker>());
120 auto r = secondary_pool->enqueue_batch(std::move(workers));
121 if (r.is_err()) {
122 std::cerr << "Failed to add workers to secondary_pool: " << r.error().message << "\n";
123 return 1;
124 }
125 }
126
127 auto start_primary = primary_pool->start();
128 if (start_primary.is_err()) {
129 std::cerr << "Failed to start primary_pool: " << start_primary.error().message << "\n";
130 return 1;
131 }
132 auto start_secondary = secondary_pool->start();
133 if (start_secondary.is_err()) {
134 std::cerr << "Failed to start secondary_pool: " << start_secondary.error().message << "\n";
135 return 1;
136 }
137
138 // Report initial metrics
139 primary_pool->report_metrics();
140 secondary_pool->report_metrics();
141
142 std::cout << "\n--- Submitting jobs ---\n";
143
144 // Submit jobs to primary pool
145 for (int i = 0; i < 10; ++i) {
146 auto job = std::make_unique<callback_job>(
147 [i]() {
148 std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 10));
149 std::cout << formatter::format("Primary job {} completed\n", i);
150 return kcenon::common::ok();
151 },
152 formatter::format("primary_job_{}", i)
153 );
154 auto r = primary_pool->enqueue(std::move(job));
155 if (r.is_err()) {
156 std::cerr << "enqueue to primary_pool failed: " << r.error().message << "\n";
157 }
158 }
159
160 // Submit jobs to secondary pool
161 for (int i = 0; i < 5; ++i) {
162 auto job = std::make_unique<callback_job>(
163 [i]() {
164 std::this_thread::sleep_for(std::chrono::milliseconds(100));
165 std::cout << formatter::format("Secondary job {} completed\n", i);
166 return kcenon::common::ok();
167 },
168 formatter::format("secondary_job_{}", i)
169 );
170 auto r = secondary_pool->enqueue(std::move(job));
171 if (r.is_err()) {
172 std::cerr << "enqueue to secondary_pool failed: " << r.error().message << "\n";
173 }
174 }
175
176 // Periodically report metrics while jobs are processing
177 for (int i = 0; i < 3; ++i) {
178 std::this_thread::sleep_for(std::chrono::milliseconds(200));
179 std::cout << "\n--- Metrics Update ---\n";
180 primary_pool->report_metrics();
181 secondary_pool->report_metrics();
182 }
183
184 // Stop pools
185 std::cout << "\n--- Stopping pools ---\n";
186 auto stop_primary = primary_pool->stop();
187 if (stop_primary.is_err()) {
188 std::cerr << "Error stopping primary_pool: " << stop_primary.error().message << "\n";
189 }
190 auto stop_secondary = secondary_pool->stop();
191 if (stop_secondary.is_err()) {
192 std::cerr << "Error stopping secondary_pool: " << stop_secondary.error().message << "\n";
193 }
194
195 // Final metrics
196 std::cout << "\n--- Final Metrics ---\n";
197 primary_pool->report_metrics();
198 secondary_pool->report_metrics();
199
200 std::cout << "\n=== Example completed ===\n";
201
202 return 0;
203}
Specialized job class that encapsulates user-defined callbacks.
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
A template class representing either a value or an error.
Context object that provides access to optional services.
Provides convenience methods for string formatting using C++20 <format>.
Definition formatter.h:122
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132
kcenon::common::interfaces::metrics_snapshot snapshot_
VoidResult record_metric(const std::string &name, double value) override
kcenon::common::VoidResult VoidResult
VoidResult record_metric(const std::string &name, double value, const std::unordered_map< std::string, std::string > &tags) override
kcenon::common::Result< kcenon::common::interfaces::metrics_snapshot > get_metrics() override
kcenon::common::Result< kcenon::common::interfaces::health_check_result > check_health() override
Core thread pool implementation with work stealing and auto-scaling.
Generic formatter for enum types using user-provided converter functors.
Core threading foundation of the thread system library.
Definition thread_impl.h:17
Context object providing access to optional thread system services.
Specialized worker thread that processes jobs from a job_queue.