90 {
91 std::cout << "=== Multi-Process Monitoring Integration Example ===\n\n";
92
93
94 auto monitoring = std::make_shared<sample_monitoring>();
95
96
98
99
100 auto primary_pool = std::make_shared<thread_pool>("primary_pool", context);
101 auto secondary_pool = std::make_shared<thread_pool>("secondary_pool", context);
102
103
104 std::cout <<
formatter::format(
"Primary pool instance ID: {}\n", primary_pool->get_pool_instance_id());
105 std::cout <<
formatter::format(
"Secondary pool instance ID: {}\n\n", secondary_pool->get_pool_instance_id());
106
107
108 {
109 std::vector<std::unique_ptr<thread_worker>> workers;
110 for (int i = 0; i < 3; ++i) workers.push_back(std::make_unique<thread_worker>());
111 auto r = primary_pool->enqueue_batch(std::move(workers));
112 if (r.is_err()) {
113 std::cerr << "Failed to add workers to primary_pool: " << r.error().message << "\n";
114 return 1;
115 }
116 }
117 {
118 std::vector<std::unique_ptr<thread_worker>> workers;
119 for (int i = 0; i < 2; ++i) workers.push_back(std::make_unique<thread_worker>());
120 auto r = secondary_pool->enqueue_batch(std::move(workers));
121 if (r.is_err()) {
122 std::cerr << "Failed to add workers to secondary_pool: " << r.error().message << "\n";
123 return 1;
124 }
125 }
126
127 auto start_primary = primary_pool->start();
128 if (start_primary.is_err()) {
129 std::cerr << "Failed to start primary_pool: " << start_primary.error().message << "\n";
130 return 1;
131 }
132 auto start_secondary = secondary_pool->start();
133 if (start_secondary.is_err()) {
134 std::cerr << "Failed to start secondary_pool: " << start_secondary.error().message << "\n";
135 return 1;
136 }
137
138
139 primary_pool->report_metrics();
140 secondary_pool->report_metrics();
141
142 std::cout << "\n--- Submitting jobs ---\n";
143
144
145 for (int i = 0; i < 10; ++i) {
146 auto job = std::make_unique<callback_job>(
147 [i]() {
148 std::this_thread::sleep_for(std::chrono::milliseconds(50 + i * 10));
150 return kcenon::common::ok();
151 },
153 );
154 auto r = primary_pool->enqueue(std::move(
job));
155 if (r.is_err()) {
156 std::cerr << "enqueue to primary_pool failed: " << r.error().message << "\n";
157 }
158 }
159
160
161 for (int i = 0; i < 5; ++i) {
162 auto job = std::make_unique<callback_job>(
163 [i]() {
164 std::this_thread::sleep_for(std::chrono::milliseconds(100));
166 return kcenon::common::ok();
167 },
169 );
170 auto r = secondary_pool->enqueue(std::move(
job));
171 if (r.is_err()) {
172 std::cerr << "enqueue to secondary_pool failed: " << r.error().message << "\n";
173 }
174 }
175
176
177 for (int i = 0; i < 3; ++i) {
178 std::this_thread::sleep_for(std::chrono::milliseconds(200));
179 std::cout << "\n--- Metrics Update ---\n";
180 primary_pool->report_metrics();
181 secondary_pool->report_metrics();
182 }
183
184
185 std::cout << "\n--- Stopping pools ---\n";
186 auto stop_primary = primary_pool->stop();
187 if (stop_primary.is_err()) {
188 std::cerr << "Error stopping primary_pool: " << stop_primary.error().message << "\n";
189 }
190 auto stop_secondary = secondary_pool->stop();
191 if (stop_secondary.is_err()) {
192 std::cerr << "Error stopping secondary_pool: " << stop_secondary.error().message << "\n";
193 }
194
195
196 std::cout << "\n--- Final Metrics ---\n";
197 primary_pool->report_metrics();
198 secondary_pool->report_metrics();
199
200 std::cout << "\n=== Example completed ===\n";
201
202 return 0;
203}
Represents a unit of work (task) to be executed, typically by a job queue.
Context object that provides access to optional services.