101class pipeline_job_base;
202 [[nodiscard]]
auto is_running() const noexcept ->
bool;
255 -> std::shared_ptr<
kcenon::
pacs::integration::thread_pool_interface>;
420 [[nodiscard]] virtual auto
get_name() const -> std::
string = 0;
Coordinates the 6-stage DICOM I/O pipeline.
static constexpr size_t stage_count
std::function< void(const job_context &, bool success)> job_completion_callback
Callback type for job completion notification.
auto get_stage_pool(pipeline_stage stage) -> std::shared_ptr< kcenon::pacs::integration::thread_pool_interface >
Get the thread pool for a specific stage.
void check_backpressure(pipeline_stage stage)
std::function< void(pipeline_stage, size_t queue_depth)> backpressure_callback
Callback type for backpressure notification.
auto get_config() const noexcept -> const pipeline_config &
Get the current configuration.
void notify_job_completion(const job_context &ctx, bool success)
~pipeline_coordinator()
Destructor - ensures graceful shutdown.
job_completion_callback job_completion_callback_
pipeline_coordinator()
Construct coordinator with default configuration.
auto is_running() const noexcept -> bool
Check if the pipeline is running.
std::array< pool_ptr, stage_count > stage_pools_
pipeline_coordinator(const pipeline_coordinator &)=delete
pipeline_coordinator & operator=(pipeline_coordinator &&)=delete
pipeline_metrics metrics_
void set_backpressure_callback(backpressure_callback callback)
Set the backpressure callback.
auto get_total_worker_count() const noexcept -> size_t
Get total number of workers across all stages.
auto create_stage_pool(pipeline_stage stage) -> pool_ptr
std::atomic< uint64_t > next_job_id_
pipeline_coordinator(pipeline_coordinator &&)=delete
void set_job_completion_callback(job_completion_callback callback)
Set the job completion callback.
auto start() -> VoidResult
Start the pipeline.
auto generate_job_id() noexcept -> uint64_t
Generate a unique job ID.
backpressure_callback backpressure_callback_
pipeline_coordinator & operator=(const pipeline_coordinator &)=delete
std::shared_ptr< kcenon::pacs::integration::thread_pool_interface > pool_ptr
auto is_backpressure_active(pipeline_stage stage) const noexcept -> bool
Check if backpressure is active for a stage.
auto submit_task(pipeline_stage stage, std::function< void()> task) -> VoidResult
Submit a raw function to a stage (for simple tasks)
auto stop() -> VoidResult
Stop the pipeline gracefully.
auto submit_to_stage(pipeline_stage stage, std::unique_ptr< pipeline_job_base > job) -> VoidResult
Submit a job to a specific stage.
auto get_metrics() noexcept -> pipeline_metrics &
Get the metrics collector.
auto get_queue_depth(pipeline_stage stage) const noexcept -> size_t
Get queue depth for a specific stage.
auto get_total_pending_jobs() const noexcept -> size_t
Get total pending jobs across all stages.
std::atomic< bool > running_
void reset_metrics() noexcept
Reset all metrics.
Base class for all pipeline jobs.
virtual ~pipeline_job_base()=default
Virtual destructor.
virtual auto execute(pipeline_coordinator &coordinator) -> VoidResult=0
Execute the job.
virtual auto get_context() const noexcept -> const job_context &=0
Get the job context.
virtual auto get_name() const -> std::string=0
Get the job name for logging.
Centralized metrics collection for the entire pipeline.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
@ storage_query_exec
Stage 4: Execute storage/query operations (blocking allowed)
@ dimse_process
Stage 3: Process DIMSE messages and route requests.
@ response_encode
Stage 5: Encode response into PDU bytes.
@ network_send
Stage 6: Send PDU bytes to network.
@ network_receive
Stage 1: Receive raw PDU bytes from network.
@ pdu_decode
Stage 2: Decode PDU bytes into structured data.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
Job type definitions for the 6-stage DICOM I/O pipeline.
Metrics collection for the DICOM I/O pipeline.
Result<T> type aliases and helpers for PACS system.
Context information attached to pipeline jobs for tracking.
Configuration options for the pipeline coordinator.
std::string name_prefix
Name prefix for thread pools (for logging)
size_t encode_workers
Number of workers for encoding stage (5) Response PDU encoding.
std::chrono::milliseconds shutdown_timeout
Graceful shutdown timeout.
bool enable_metrics
Enable metrics collection.
size_t net_io_workers
Number of workers for network I/O stages (1 & 6) Low latency, non-blocking operations.
auto get_workers_for_stage(pipeline_stage stage) const noexcept -> size_t
Get the number of workers for a specific stage.
size_t execution_workers
Number of workers for execution stage (4) Blocking I/O allowed (database, file system)
size_t max_queue_depth
Maximum queue depth per stage (backpressure threshold)
size_t protocol_workers
Number of workers for protocol stages (2 & 3) PDU decoding and DIMSE processing.
Abstract interface for thread pool operations.