Thread System 0.3.1
High-performance C++20 thread pool with work stealing and DAG scheduling
Loading...
Searching...
No Matches
multi_process_monitoring_integration.cpp File Reference

Multi-process monitoring integration with multiple thread pools. More...

#include <kcenon/thread/core/thread_pool.h>
#include <kcenon/common/interfaces/monitoring_interface.h>
#include <kcenon/thread/interfaces/thread_context.h>
#include <kcenon/thread/core/callback_job.h>
#include <kcenon/thread/core/thread_worker.h>
#include <kcenon/thread/utils/formatter.h>
#include <chrono>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
Include dependency graph for multi_process_monitoring_integration.cpp:

Go to the source code of this file.

Classes

class  sample_monitoring
 

Functions

int main ()
 

Detailed Description

Multi-process monitoring integration with multiple thread pools.

Definition in file multi_process_monitoring_integration.cpp.

Function Documentation

◆ main()

int main ( )

Definition at line 90 of file multi_process_monitoring_integration.cpp.

90 {
91 std::cout << "=== Multi-Process Monitoring Integration Example ===\n\n";
92
93 // Create monitoring instance
94 auto monitoring = std::make_shared<sample_monitoring>();
95
96 // Create thread context with monitoring
97 thread_context context(nullptr, monitoring);
98
99 // Create multiple thread pools with unique names
100 auto primary_pool = std::make_shared<thread_pool>("primary_pool", context);
101 auto secondary_pool = std::make_shared<thread_pool>("secondary_pool", context);
102
103 // Display pool instance IDs
104 std::cout << formatter::format("Primary pool instance ID: {}\n", primary_pool->get_pool_instance_id());
105 std::cout << formatter::format("Secondary pool instance ID: {}\n\n", secondary_pool->get_pool_instance_id());
106
107 // Add workers then start pools
108 {
109 std::vector<std::unique_ptr<thread_worker>> workers;
110 for (int i = 0; i < 3; ++i) workers.push_back(std::make_unique<thread_worker>());
111 auto r = primary_pool->enqueue_batch(std::move(workers));
112 if (r.is_err()) {
113 std::cerr << "Failed to add workers to primary_pool: " << r.error().message << "\n";
114 return 1;
115 }
116 }
117 {
118 std::vector<std::unique_ptr<thread_worker>> workers;
119 for (int i = 0; i < 2; ++i) workers.push_back(std::make_unique<thread_worker>());
120 auto r = secondary_pool->enqueue_batch(std::move(workers));
121 if (r.is_err()) {
122 std::cerr << "Failed to add workers to secondary_pool: " << r.error().message << "\n";
123 return 1;
124 }
125 }
126
127 auto start_primary = primary_pool->start();
128 if (start_primary.is_err()) {
129 std::cerr << "Failed to start primary_pool: " << start_primary.error().message << "\n";
130 return 1;
131 }
132 auto start_secondary = secondary_pool->start();
133 if (start_secondary.is_err()) {
134 std::cerr << "Failed to start secondary_pool: " << start_secondary.error().message << "\n";
135 return 1;
136 }
137
138 // Report initial metrics
139 primary_pool->report_metrics();
140 secondary_pool->report_metrics();
141
142 std::cout << "\n--- Submitting jobs ---\n";
143
144 // Submit jobs to primary pool
145 for (int i = 0; i < 10; ++i) {
146 auto job = std::make_unique<callback_job>(
147 [i]() {
148 std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 10));
149 std::cout << formatter::format("Primary job {} completed\n", i);
150 return kcenon::common::ok();
151 },
152 formatter::format("primary_job_{}", i)
153 );
154 auto r = primary_pool->enqueue(std::move(job));
155 if (r.is_err()) {
156 std::cerr << "enqueue to primary_pool failed: " << r.error().message << "\n";
157 }
158 }
159
160 // Submit jobs to secondary pool
161 for (int i = 0; i < 5; ++i) {
162 auto job = std::make_unique<callback_job>(
163 [i]() {
164 std::this_thread::sleep_for(std::chrono::milliseconds(100));
165 std::cout << formatter::format("Secondary job {} completed\n", i);
166 return kcenon::common::ok();
167 },
168 formatter::format("secondary_job_{}", i)
169 );
170 auto r = secondary_pool->enqueue(std::move(job));
171 if (r.is_err()) {
172 std::cerr << "enqueue to secondary_pool failed: " << r.error().message << "\n";
173 }
174 }
175
176 // Periodically report metrics while jobs are processing
177 for (int i = 0; i < 3; ++i) {
178 std::this_thread::sleep_for(std::chrono::milliseconds(200));
179 std::cout << "\n--- Metrics Update ---\n";
180 primary_pool->report_metrics();
181 secondary_pool->report_metrics();
182 }
183
184 // Stop pools
185 std::cout << "\n--- Stopping pools ---\n";
186 auto stop_primary = primary_pool->stop();
187 if (stop_primary.is_err()) {
188 std::cerr << "Error stopping primary_pool: " << stop_primary.error().message << "\n";
189 }
190 auto stop_secondary = secondary_pool->stop();
191 if (stop_secondary.is_err()) {
192 std::cerr << "Error stopping secondary_pool: " << stop_secondary.error().message << "\n";
193 }
194
195 // Final metrics
196 std::cout << "\n--- Final Metrics ---\n";
197 primary_pool->report_metrics();
198 secondary_pool->report_metrics();
199
200 std::cout << "\n=== Example completed ===\n";
201
202 return 0;
203}
Represents a unit of work (task) to be executed, typically by a job queue.
Definition job.h:136
Context object that provides access to optional services.
static auto format(const char *formats, const FormatArgs &... args) -> std::string
Formats a narrow-character string with the given arguments.
Definition formatter.h:132

References kcenon::thread::utils::formatter::format().

Here is the call graph for this function: