30 metrics_.mark_start_time();
35 if (
running_.load(std::memory_order_acquire)) {
36 static_cast<void>(
stop());
45 std::lock_guard<std::mutex> lock(mutex_);
47 if (running_.load(std::memory_order_acquire)) {
49 "Pipeline coordinator is already running");
56 stage_pools_[i] = create_stage_pool(stage);
57 if (!stage_pools_[i]->start()) {
59 for (
size_t j = 0; j < i; ++j) {
60 if (stage_pools_[j]) {
61 stage_pools_[j]->shutdown(
false);
62 stage_pools_[j].reset();
66 "Failed to start thread pool for stage: " +
71 if (config_.enable_metrics) {
74 static_cast<uint32_t
>(config_.get_workers_for_stage(stage)),
75 std::memory_order_relaxed);
77 }
catch (
const std::exception& e) {
79 for (
size_t j = 0; j < i; ++j) {
80 if (stage_pools_[j]) {
81 stage_pools_[j]->shutdown(
false);
82 stage_pools_[j].reset();
86 "Exception creating thread pool for stage " +
91 running_.store(
true, std::memory_order_release);
96 std::lock_guard<std::mutex> lock(mutex_);
98 if (!running_.load(std::memory_order_acquire)) {
102 running_.store(
false, std::memory_order_release);
105 bool all_stopped =
true;
107 if (stage_pools_[i]) {
109 stage_pools_[i]->shutdown(
true);
110 stage_pools_[i].reset();
116 "Some stage pools did not shutdown cleanly");
123 return running_.load(std::memory_order_acquire);
131 std::unique_ptr<pipeline_job_base> job)
135 "Cannot submit null job");
139 auto ctx = job->get_context();
140 return submit_to_stage(stage, std::move(job), ctx);
144 std::unique_ptr<pipeline_job_base> job,
146 if (!running_.load(std::memory_order_acquire)) {
148 "Pipeline coordinator is not running");
151 size_t stage_idx =
static_cast<size_t>(stage);
152 if (stage_idx >=
stage_count || !stage_pools_[stage_idx]) {
154 "Invalid pipeline stage");
158 check_backpressure(stage);
161 if (config_.enable_metrics) {
162 metrics_.increment_stage_queue(stage);
166 auto job_ptr = job.release();
167 auto start_time = std::chrono::steady_clock::now();
170 stage_pools_[stage_idx]->submit_fire_and_forget(
171 [
this, job_ptr, stage, ctx_copy = ctx, start_time]()
mutable {
172 std::unique_ptr<pipeline_job_base> owned_job(job_ptr);
175 auto result = owned_job->execute(*
this);
178 auto end_time = std::chrono::steady_clock::now();
179 auto duration_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
180 end_time - start_time).count();
184 if (config_.enable_metrics) {
185 metrics_.decrement_stage_queue(stage);
186 metrics_.record_stage_completion(stage,
187 static_cast<uint64_t
>(duration_ns),
success);
190 notify_job_completion(ctx_copy,
success);
198 if (!running_.load(std::memory_order_acquire)) {
200 "Pipeline coordinator is not running");
203 size_t stage_idx =
static_cast<size_t>(stage);
204 if (stage_idx >=
stage_count || !stage_pools_[stage_idx]) {
206 "Invalid pipeline stage");
209 if (config_.enable_metrics) {
210 metrics_.increment_stage_queue(stage);
213 stage_pools_[stage_idx]->submit_fire_and_forget(
214 [
this, stage, task = std::move(task)]() {
216 if (config_.enable_metrics) {
217 metrics_.decrement_stage_queue(stage);
229 -> std::shared_ptr<kcenon::pacs::integration::thread_pool_interface> {
230 size_t stage_idx =
static_cast<size_t>(stage);
234 return stage_pools_[stage_idx];
239 size_t stage_idx =
static_cast<size_t>(stage);
240 if (stage_idx >=
stage_count || !stage_pools_[stage_idx]) {
243 return stage_pools_[stage_idx]->get_pending_task_count();
248 return get_queue_depth(stage) >= config_.max_queue_depth;
260 std::lock_guard<std::mutex> lock(
mutex_);
265 std::lock_guard<std::mutex> lock(
mutex_);
294 total += pool->get_thread_count();
304 total += pool->get_pending_task_count();
311 return next_job_id_.fetch_add(1, std::memory_order_relaxed);
321 pool_config.
min_threads = config_.get_workers_for_stage(stage);
322 pool_config.
max_threads = config_.get_workers_for_stage(stage);
325 return std::make_shared<kcenon::pacs::integration::thread_pool_adapter>(pool_config);
331 std::lock_guard<std::mutex> lock(
mutex_);
349 std::lock_guard<std::mutex> lock(
mutex_);
355 callback(stage, depth);
Coordinates the 6-stage DICOM I/O pipeline.
std::function< void(const job_context &, bool success)> job_completion_callback
Callback type for job completion notification.
auto get_stage_pool(pipeline_stage stage) -> std::shared_ptr< kcenon::pacs::integration::thread_pool_interface >
Get the thread pool for a specific stage.
void check_backpressure(pipeline_stage stage)
std::function< void(pipeline_stage, size_t queue_depth)> backpressure_callback
Callback type for backpressure notification.
auto get_config() const noexcept -> const pipeline_config &
Get the current configuration.
void notify_job_completion(const job_context &ctx, bool success)
~pipeline_coordinator()
Destructor - ensures graceful shutdown.
job_completion_callback job_completion_callback_
pipeline_coordinator()
Construct coordinator with default configuration.
auto is_running() const noexcept -> bool
Check if the pipeline is running.
std::array< pool_ptr, stage_count > stage_pools_
pipeline_metrics metrics_
void set_backpressure_callback(backpressure_callback callback)
Set the backpressure callback.
auto get_total_worker_count() const noexcept -> size_t
Get total number of workers across all stages.
auto create_stage_pool(pipeline_stage stage) -> pool_ptr
std::atomic< uint64_t > next_job_id_
void set_job_completion_callback(job_completion_callback callback)
Set the job completion callback.
auto start() -> VoidResult
Start the pipeline.
auto generate_job_id() noexcept -> uint64_t
Generate a unique job ID.
backpressure_callback backpressure_callback_
std::shared_ptr< kcenon::pacs::integration::thread_pool_interface > pool_ptr
auto is_backpressure_active(pipeline_stage stage) const noexcept -> bool
Check if backpressure is active for a stage.
auto submit_task(pipeline_stage stage, std::function< void()> task) -> VoidResult
Submit a raw function to a stage (for simple tasks)
auto stop() -> VoidResult
Stop the pipeline gracefully.
auto submit_to_stage(pipeline_stage stage, std::unique_ptr< pipeline_job_base > job) -> VoidResult
Submit a job to a specific stage.
auto get_metrics() noexcept -> pipeline_metrics &
Get the metrics collector.
auto get_queue_depth(pipeline_stage stage) const noexcept -> size_t
Get queue depth for a specific stage.
auto get_total_pending_jobs() const noexcept -> size_t
Get total pending jobs across all stages.
std::atomic< bool > running_
void reset_metrics() noexcept
Reset all metrics.
Centralized metrics collection for the entire pipeline.
void mark_start_time() noexcept
Mark the start time for throughput calculation.
void reset() noexcept
Reset all metrics.
constexpr auto get_stage_name(pipeline_stage stage) noexcept -> std::string_view
Get the human-readable name of a pipeline stage.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
@ stage_count
Total number of stages.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Main coordinator for the 6-stage DICOM I/O pipeline.
Configuration options for the thread pool.
std::string pool_name
Thread pool name for logging.
std::size_t min_threads
Minimum number of worker threads.
std::size_t max_threads
Maximum number of worker threads.
Context information attached to pipeline jobs for tracking.
Configuration options for the pipeline coordinator.
bool enable_metrics
Enable metrics collection.
size_t max_queue_depth
Maximum queue depth per stage (backpressure threshold)
Metrics for a single pipeline stage.
std::atomic< uint32_t > active_workers
Number of active workers in this stage.
Concrete implementation of thread_pool_interface using kcenon::thread.