PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
pipeline_coordinator.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
14
15#include <chrono>
16
18
19// =============================================================================
20// Construction & Destruction
21// =============================================================================
22
26
28 : config_(config) {
30 metrics_.mark_start_time();
31 }
32}
33
35 if (running_.load(std::memory_order_acquire)) {
36 static_cast<void>(stop());
37 }
38}
39
40// =============================================================================
41// Lifecycle Management
42// =============================================================================
43
45 std::lock_guard<std::mutex> lock(mutex_);
46
47 if (running_.load(std::memory_order_acquire)) {
48 return pacs_void_error(error_codes::already_exists,
49 "Pipeline coordinator is already running");
50 }
51
52 // Create and start thread pools for each stage
53 for (size_t i = 0; i < stage_count; ++i) {
54 auto stage = static_cast<pipeline_stage>(i);
55 try {
56 stage_pools_[i] = create_stage_pool(stage);
57 if (!stage_pools_[i]->start()) {
58 // Cleanup already started pools
59 for (size_t j = 0; j < i; ++j) {
60 if (stage_pools_[j]) {
61 stage_pools_[j]->shutdown(false);
62 stage_pools_[j].reset();
63 }
64 }
65 return pacs_void_error(error_codes::internal_error,
66 "Failed to start thread pool for stage: " +
67 std::string(get_stage_name(stage)));
68 }
69
70 // Update metrics
71 if (config_.enable_metrics) {
72 auto& stage_metrics = metrics_.get_stage_metrics(stage);
74 static_cast<uint32_t>(config_.get_workers_for_stage(stage)),
75 std::memory_order_relaxed);
76 }
77 } catch (const std::exception& e) {
78 // Cleanup already started pools
79 for (size_t j = 0; j < i; ++j) {
80 if (stage_pools_[j]) {
81 stage_pools_[j]->shutdown(false);
82 stage_pools_[j].reset();
83 }
84 }
85 return pacs_void_error(error_codes::internal_error,
86 "Exception creating thread pool for stage " +
87 std::string(get_stage_name(stage)) + ": " + e.what());
88 }
89 }
90
91 running_.store(true, std::memory_order_release);
92 return ok();
93}
94
96 std::lock_guard<std::mutex> lock(mutex_);
97
98 if (!running_.load(std::memory_order_acquire)) {
99 return ok(); // Already stopped
100 }
101
102 running_.store(false, std::memory_order_release);
103
104 // Stop all stage pools with timeout consideration
105 bool all_stopped = true;
106 for (size_t i = 0; i < stage_count; ++i) {
107 if (stage_pools_[i]) {
108 // Use wait_for_completion=true to drain queues gracefully
109 stage_pools_[i]->shutdown(true);
110 stage_pools_[i].reset();
111 }
112 }
113
114 if (!all_stopped) {
115 return pacs_void_error(error_codes::internal_error,
116 "Some stage pools did not shutdown cleanly");
117 }
118
119 return ok();
120}
121
122auto pipeline_coordinator::is_running() const noexcept -> bool {
123 return running_.load(std::memory_order_acquire);
124}
125
126// =============================================================================
127// Job Submission
128// =============================================================================
129
131 std::unique_ptr<pipeline_job_base> job)
132 -> VoidResult {
133 if (!job) {
134 return pacs_void_error(error_codes::invalid_argument,
135 "Cannot submit null job");
136 }
137 // Extract context before moving job to avoid undefined behavior
138 // (evaluation order of function arguments is unspecified in C++)
139 auto ctx = job->get_context();
140 return submit_to_stage(stage, std::move(job), ctx);
141}
142
144 std::unique_ptr<pipeline_job_base> job,
145 const job_context& ctx) -> VoidResult {
146 if (!running_.load(std::memory_order_acquire)) {
147 return pacs_void_error(error_codes::not_initialized,
148 "Pipeline coordinator is not running");
149 }
150
151 size_t stage_idx = static_cast<size_t>(stage);
152 if (stage_idx >= stage_count || !stage_pools_[stage_idx]) {
153 return pacs_void_error(error_codes::invalid_argument,
154 "Invalid pipeline stage");
155 }
156
157 // Check backpressure
158 check_backpressure(stage);
159
160 // Update metrics
161 if (config_.enable_metrics) {
162 metrics_.increment_stage_queue(stage);
163 }
164
165 // Capture job for lambda
166 auto job_ptr = job.release();
167 auto start_time = std::chrono::steady_clock::now();
168
169 // Submit to stage pool
170 stage_pools_[stage_idx]->submit_fire_and_forget(
171 [this, job_ptr, stage, ctx_copy = ctx, start_time]() mutable {
172 std::unique_ptr<pipeline_job_base> owned_job(job_ptr);
173
174 // Execute the job
175 auto result = owned_job->execute(*this);
176
177 // Record completion
178 auto end_time = std::chrono::steady_clock::now();
179 auto duration_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
180 end_time - start_time).count();
181
182 bool success = result.is_ok();
183
184 if (config_.enable_metrics) {
185 metrics_.decrement_stage_queue(stage);
186 metrics_.record_stage_completion(stage,
187 static_cast<uint64_t>(duration_ns), success);
188 }
189
190 notify_job_completion(ctx_copy, success);
191 });
192
193 return ok();
194}
195
197 std::function<void()> task) -> VoidResult {
198 if (!running_.load(std::memory_order_acquire)) {
199 return pacs_void_error(error_codes::not_initialized,
200 "Pipeline coordinator is not running");
201 }
202
203 size_t stage_idx = static_cast<size_t>(stage);
204 if (stage_idx >= stage_count || !stage_pools_[stage_idx]) {
205 return pacs_void_error(error_codes::invalid_argument,
206 "Invalid pipeline stage");
207 }
208
209 if (config_.enable_metrics) {
210 metrics_.increment_stage_queue(stage);
211 }
212
213 stage_pools_[stage_idx]->submit_fire_and_forget(
214 [this, stage, task = std::move(task)]() {
215 task();
216 if (config_.enable_metrics) {
217 metrics_.decrement_stage_queue(stage);
218 }
219 });
220
221 return ok();
222}
223
224// =============================================================================
225// Stage Management
226// =============================================================================
227
229 -> std::shared_ptr<kcenon::pacs::integration::thread_pool_interface> {
230 size_t stage_idx = static_cast<size_t>(stage);
231 if (stage_idx >= stage_count) {
232 return nullptr;
233 }
234 return stage_pools_[stage_idx];
235}
236
238 -> size_t {
239 size_t stage_idx = static_cast<size_t>(stage);
240 if (stage_idx >= stage_count || !stage_pools_[stage_idx]) {
241 return 0;
242 }
243 return stage_pools_[stage_idx]->get_pending_task_count();
244}
245
247 -> bool {
248 return get_queue_depth(stage) >= config_.max_queue_depth;
249}
250
251// =============================================================================
252// Configuration & Callbacks
253// =============================================================================
254
255auto pipeline_coordinator::get_config() const noexcept -> const pipeline_config& {
256 return config_;
257}
258
260 std::lock_guard<std::mutex> lock(mutex_);
261 job_completion_callback_ = std::move(callback);
262}
263
265 std::lock_guard<std::mutex> lock(mutex_);
266 backpressure_callback_ = std::move(callback);
267}
268
269// =============================================================================
270// Metrics
271// =============================================================================
272
274 return metrics_;
275}
276
278 return metrics_;
279}
280
285
286// =============================================================================
287// Statistics
288// =============================================================================
289
290auto pipeline_coordinator::get_total_worker_count() const noexcept -> size_t {
291 size_t total = 0;
292 for (const auto& pool : stage_pools_) {
293 if (pool) {
294 total += pool->get_thread_count();
295 }
296 }
297 return total;
298}
299
300auto pipeline_coordinator::get_total_pending_jobs() const noexcept -> size_t {
301 size_t total = 0;
302 for (const auto& pool : stage_pools_) {
303 if (pool) {
304 total += pool->get_pending_task_count();
305 }
306 }
307 return total;
308}
309
310auto pipeline_coordinator::generate_job_id() noexcept -> uint64_t {
311 return next_job_id_.fetch_add(1, std::memory_order_relaxed);
312}
313
314// =============================================================================
315// Private Methods
316// =============================================================================
317
319 -> pool_ptr {
321 pool_config.min_threads = config_.get_workers_for_stage(stage);
322 pool_config.max_threads = config_.get_workers_for_stage(stage);
323 pool_config.pool_name = config_.name_prefix + "_" + std::string(get_stage_name(stage));
324
325 return std::make_shared<kcenon::pacs::integration::thread_pool_adapter>(pool_config);
326}
327
330 {
331 std::lock_guard<std::mutex> lock(mutex_);
332 callback = job_completion_callback_;
333 }
334
335 if (callback) {
336 try {
337 callback(ctx, success);
338 } catch (...) {
339 // Swallow exceptions from callback to prevent pipeline disruption
340 }
341 }
342}
343
345 size_t depth = get_queue_depth(stage);
346 if (depth >= config_.max_queue_depth) {
347 backpressure_callback callback;
348 {
349 std::lock_guard<std::mutex> lock(mutex_);
350 callback = backpressure_callback_;
351 }
352
353 if (callback) {
354 try {
355 callback(stage, depth);
356 } catch (...) {
357 // Swallow exceptions from callback
358 }
359 }
360 }
361}
362
363} // namespace kcenon::pacs::network::pipeline
Coordinates the 6-stage DICOM I/O pipeline.
std::function< void(const job_context &, bool success)> job_completion_callback
Callback type for job completion notification.
auto get_stage_pool(pipeline_stage stage) -> std::shared_ptr< kcenon::pacs::integration::thread_pool_interface >
Get the thread pool for a specific stage.
std::function< void(pipeline_stage, size_t queue_depth)> backpressure_callback
Callback type for backpressure notification.
auto get_config() const noexcept -> const pipeline_config &
Get the current configuration.
void notify_job_completion(const job_context &ctx, bool success)
~pipeline_coordinator()
Destructor - ensures graceful shutdown.
pipeline_coordinator()
Construct coordinator with default configuration.
auto is_running() const noexcept -> bool
Check if the pipeline is running.
void set_backpressure_callback(backpressure_callback callback)
Set the backpressure callback.
auto get_total_worker_count() const noexcept -> size_t
Get total number of workers across all stages.
auto create_stage_pool(pipeline_stage stage) -> pool_ptr
void set_job_completion_callback(job_completion_callback callback)
Set the job completion callback.
auto generate_job_id() noexcept -> uint64_t
Generate a unique job ID.
std::shared_ptr< kcenon::pacs::integration::thread_pool_interface > pool_ptr
auto is_backpressure_active(pipeline_stage stage) const noexcept -> bool
Check if backpressure is active for a stage.
auto submit_task(pipeline_stage stage, std::function< void()> task) -> VoidResult
Submit a raw function to a stage (for simple tasks)
auto stop() -> VoidResult
Stop the pipeline gracefully.
auto submit_to_stage(pipeline_stage stage, std::unique_ptr< pipeline_job_base > job) -> VoidResult
Submit a job to a specific stage.
auto get_metrics() noexcept -> pipeline_metrics &
Get the metrics collector.
auto get_queue_depth(pipeline_stage stage) const noexcept -> size_t
Get queue depth for a specific stage.
auto get_total_pending_jobs() const noexcept -> size_t
Get total pending jobs across all stages.
Centralized metrics collection for the entire pipeline.
void mark_start_time() noexcept
Mark the start time for throughput calculation.
constexpr auto get_stage_name(pipeline_stage stage) noexcept -> std::string_view
Get the human-readable name of a pipeline stage.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
Definition association.h:59
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
Main coordinator for the 6-stage DICOM I/O pipeline.
Configuration options for the thread pool.
std::string pool_name
Thread pool name for logging.
std::size_t min_threads
Minimum number of worker threads.
std::size_t max_threads
Maximum number of worker threads.
Context information attached to pipeline jobs for tracking.
Configuration options for the pipeline coordinator.
size_t max_queue_depth
Maximum queue depth per stage (backpressure threshold)
Metrics for a single pipeline stage.
std::atomic< uint32_t > active_workers
Number of active workers in this stage.
Concrete implementation of thread_pool_interface using kcenon::thread.