PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
pipeline_coordinator.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
18#pragma once
19
24
25#include <array>
26#include <atomic>
27#include <chrono>
28#include <cstddef>
29#include <functional>
30#include <memory>
31#include <mutex>
32#include <string>
33
35
46 size_t net_io_workers = 4;
47
50 size_t protocol_workers = 2;
51
55
58 size_t encode_workers = 2;
59
61 size_t max_queue_depth = 10000;
62
64 std::chrono::milliseconds shutdown_timeout{500};
65
67 bool enable_metrics = true;
68
70 std::string name_prefix = "pipeline";
71
77 [[nodiscard]] auto get_workers_for_stage(pipeline_stage stage) const noexcept
78 -> size_t {
79 switch (stage) {
82 return net_io_workers;
83
86 return protocol_workers;
87
89 return execution_workers;
90
92 return encode_workers;
93
94 default:
95 return 1;
96 }
97 }
98};
99
100// Forward declarations for job types
101class pipeline_job_base;
102
142public:
144 using job_completion_callback = std::function<void(const job_context&, bool success)>;
145
147 using backpressure_callback = std::function<void(pipeline_stage, size_t queue_depth)>;
148
149 // =========================================================================
150 // Construction
151 // =========================================================================
152
157
162 explicit pipeline_coordinator(const pipeline_config& config);
163
168
169 // Non-copyable, non-movable
174
175 // =========================================================================
176 // Lifecycle Management
177 // =========================================================================
178
186 [[nodiscard]] auto start() -> VoidResult;
187
196 [[nodiscard]] auto stop() -> VoidResult;
197
202 [[nodiscard]] auto is_running() const noexcept -> bool;
203
204 // =========================================================================
205 // Job Submission
206 // =========================================================================
207
218 [[nodiscard]] auto submit_to_stage(pipeline_stage stage,
219 std::unique_ptr<pipeline_job_base> job)
220 -> VoidResult;
221
230 [[nodiscard]] auto submit_to_stage(pipeline_stage stage,
231 std::unique_ptr<pipeline_job_base> job,
232 const job_context& ctx) -> VoidResult;
233
241 [[nodiscard]] auto submit_task(pipeline_stage stage,
242 std::function<void()> task) -> VoidResult;
243
244 // =========================================================================
245 // Stage Management
246 // =========================================================================
247
254 [[nodiscard]] auto get_stage_pool(pipeline_stage stage)
255 -> std::shared_ptr<kcenon::pacs::integration::thread_pool_interface>;
256
263 [[nodiscard]] auto get_queue_depth(pipeline_stage stage) const noexcept
264 -> size_t;
265
272 [[nodiscard]] auto is_backpressure_active(pipeline_stage stage) const noexcept
273 -> bool;
274
275 // =========================================================================
276 // Configuration & Callbacks
277 // =========================================================================
278
283 [[nodiscard]] auto get_config() const noexcept -> const pipeline_config&;
284
293
302
303 // =========================================================================
304 // Metrics
305 // =========================================================================
306
311 [[nodiscard]] auto get_metrics() noexcept -> pipeline_metrics&;
312
316 [[nodiscard]] auto get_metrics() const noexcept -> const pipeline_metrics&;
317
321 void reset_metrics() noexcept;
322
323 // =========================================================================
324 // Statistics
325 // =========================================================================
326
331 [[nodiscard]] auto get_total_worker_count() const noexcept -> size_t;
332
337 [[nodiscard]] auto get_total_pending_jobs() const noexcept -> size_t;
338
343 [[nodiscard]] auto generate_job_id() noexcept -> uint64_t;
344
345private:
346 // Stage thread pool array
347 static constexpr size_t stage_count =
348 static_cast<size_t>(pipeline_stage::stage_count);
349
350 using pool_ptr = std::shared_ptr<kcenon::pacs::integration::thread_pool_interface>;
352
353 // Configuration
355
356 // State
357 std::atomic<bool> running_{false};
358 mutable std::mutex mutex_;
359
360 // Metrics
362
363 // Callbacks
366
367 // Job ID generator
368 std::atomic<uint64_t> next_job_id_{1};
369
370 // Internal methods
371 [[nodiscard]] auto create_stage_pool(pipeline_stage stage) -> pool_ptr;
372 void notify_job_completion(const job_context& ctx, bool success);
374};
375
384public:
388 virtual ~pipeline_job_base() = default;
389
400 [[nodiscard]] virtual auto execute(pipeline_coordinator& coordinator)
401 -> VoidResult = 0;
402
407 [[nodiscard]] virtual auto get_context() const noexcept
408 -> const job_context& = 0;
409
414 [[nodiscard]] virtual auto get_context() noexcept -> job_context& = 0;
415
420 [[nodiscard]] virtual auto get_name() const -> std::string = 0;
421
422protected:
423 pipeline_job_base() = default;
424
425 // Non-copyable but movable
427 pipeline_job_base& operator=(const pipeline_job_base&) = delete;
429 pipeline_job_base& operator=(pipeline_job_base&&) = default;
430};
431
432} // namespace kcenon::pacs::network::pipeline
Coordinates the 6-stage DICOM I/O pipeline.
std::function< void(const job_context &, bool success)> job_completion_callback
Callback type for job completion notification.
auto get_stage_pool(pipeline_stage stage) -> std::shared_ptr< kcenon::pacs::integration::thread_pool_interface >
Get the thread pool for a specific stage.
std::function< void(pipeline_stage, size_t queue_depth)> backpressure_callback
Callback type for backpressure notification.
auto get_config() const noexcept -> const pipeline_config &
Get the current configuration.
void notify_job_completion(const job_context &ctx, bool success)
~pipeline_coordinator()
Destructor - ensures graceful shutdown.
pipeline_coordinator()
Construct coordinator with default configuration.
auto is_running() const noexcept -> bool
Check if the pipeline is running.
pipeline_coordinator(const pipeline_coordinator &)=delete
pipeline_coordinator & operator=(pipeline_coordinator &&)=delete
void set_backpressure_callback(backpressure_callback callback)
Set the backpressure callback.
auto get_total_worker_count() const noexcept -> size_t
Get total number of workers across all stages.
auto create_stage_pool(pipeline_stage stage) -> pool_ptr
pipeline_coordinator(pipeline_coordinator &&)=delete
void set_job_completion_callback(job_completion_callback callback)
Set the job completion callback.
auto generate_job_id() noexcept -> uint64_t
Generate a unique job ID.
pipeline_coordinator & operator=(const pipeline_coordinator &)=delete
std::shared_ptr< kcenon::pacs::integration::thread_pool_interface > pool_ptr
auto is_backpressure_active(pipeline_stage stage) const noexcept -> bool
Check if backpressure is active for a stage.
auto submit_task(pipeline_stage stage, std::function< void()> task) -> VoidResult
Submit a raw function to a stage (for simple tasks)
auto stop() -> VoidResult
Stop the pipeline gracefully.
auto submit_to_stage(pipeline_stage stage, std::unique_ptr< pipeline_job_base > job) -> VoidResult
Submit a job to a specific stage.
auto get_metrics() noexcept -> pipeline_metrics &
Get the metrics collector.
auto get_queue_depth(pipeline_stage stage) const noexcept -> size_t
Get queue depth for a specific stage.
auto get_total_pending_jobs() const noexcept -> size_t
Get total pending jobs across all stages.
virtual ~pipeline_job_base()=default
Virtual destructor.
virtual auto execute(pipeline_coordinator &coordinator) -> VoidResult=0
Execute the job.
virtual auto get_context() const noexcept -> const job_context &=0
Get the job context.
virtual auto get_name() const -> std::string=0
Get the job name for logging.
Centralized metrics collection for the entire pipeline.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
@ storage_query_exec
Stage 4: Execute storage/query operations (blocking allowed)
@ dimse_process
Stage 3: Process DIMSE messages and route requests.
@ response_encode
Stage 5: Encode response into PDU bytes.
@ network_send
Stage 6: Send PDU bytes to network.
@ network_receive
Stage 1: Receive raw PDU bytes from network.
@ pdu_decode
Stage 2: Decode PDU bytes into structured data.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
Definition association.h:59
Job type definitions for the 6-stage DICOM I/O pipeline.
Metrics collection for the DICOM I/O pipeline.
Result<T> type aliases and helpers for PACS system.
Context information attached to pipeline jobs for tracking.
Configuration options for the pipeline coordinator.
std::string name_prefix
Name prefix for thread pools (for logging)
size_t encode_workers
Number of workers for encoding stage (5) Response PDU encoding.
std::chrono::milliseconds shutdown_timeout
Graceful shutdown timeout.
size_t net_io_workers
Number of workers for network I/O stages (1 & 6) Low latency, non-blocking operations.
auto get_workers_for_stage(pipeline_stage stage) const noexcept -> size_t
Get the number of workers for a specific stage.
size_t execution_workers
Number of workers for execution stage (4) Blocking I/O allowed (database, file system)
size_t max_queue_depth
Maximum queue depth per stage (backpressure threshold)
size_t protocol_workers
Number of workers for protocol stages (2 & 3) PDU decoding and DIMSE processing.
Abstract interface for thread pool operations.