47 static_cast<void>(
stop());
56 return coordinator_->start();
60 return coordinator_->stop();
75 context.
created_at = std::chrono::steady_clock::now();
77 sessions_[session_id] = std::move(context);
80 coordinator_->get_metrics().increment_active_associations();
89 callback(session_id,
"session_registered");
96 coordinator_->get_metrics().decrement_active_associations();
105 callback(session_id,
"session_unregistered");
111 -> std::optional<session_context> {
112 std::lock_guard<std::mutex> lock(sessions_mutex_);
113 auto it = sessions_.find(session_id);
114 if (it != sessions_.end()) {
133 "Pipeline adapter is not running");
138 std::lock_guard<std::mutex> lock(sessions_mutex_);
139 auto it = sessions_.find(session_id);
140 if (it != sessions_.end()) {
141 it->second.last_activity = std::chrono::steady_clock::now();
148 std::lock_guard<std::mutex> lock(callbacks_mutex_);
149 send_fn = send_callback_;
155 std::lock_guard<std::mutex> lock(callbacks_mutex_);
156 assoc_cb = association_callback_;
160 auto job = std::make_unique<receive_network_io_job>(
167 return coordinator_->submit_to_stage(
253 std::lock_guard<std::mutex> lock(handlers_mutex_);
257 return c_store_handler_;
260 return c_find_handler_;
263 return c_get_handler_;
266 return c_move_handler_;
269 return c_echo_handler_;
279 auto now = std::chrono::steady_clock::now();
280 auto enqueue_time = std::chrono::steady_clock::time_point(
283 auto latency_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
287 coordinator_->get_metrics().record_operation_completion(
289 static_cast<uint64_t
>(latency_ns),
305 std::string
event =
"backpressure_" + std::string(
get_stage_name(stage)) +
306 "_depth_" + std::to_string(queue_depth);
Adapter for integrating pipeline with DICOM server components.
void set_session_event_callback(session_event_callback callback)
Set the session event callback.
void set_send_callback(send_callback callback)
Set the network send callback.
service_handler c_find_handler_
auto get_active_session_count() const noexcept -> size_t
Get number of active sessions.
auto get_metrics() noexcept -> pipeline_metrics &
Get the pipeline metrics.
auto is_running() const noexcept -> bool
Check if the adapter is running.
auto start() -> VoidResult
Start the pipeline adapter.
service_handler c_store_handler_
Service handlers.
void register_c_get_handler(service_handler handler)
Register C-GET handler.
void register_session(uint64_t session_id, session_context context)
Register a new session.
service_handler c_move_handler_
std::unordered_map< uint64_t, session_context > sessions_
Session registry.
association_callback association_callback_
auto stop() -> VoidResult
Stop the pipeline adapter.
std::unique_ptr< pipeline_coordinator > coordinator_
Pipeline coordinator.
auto get_session(uint64_t session_id) -> std::optional< session_context >
Get session context.
void register_c_find_handler(service_handler handler)
Register C-FIND handler.
void unregister_session(uint64_t session_id)
Unregister a session.
std::function< VoidResult(uint64_t session_id, const std::vector< uint8_t > &data)> send_callback
Type for network send callback.
auto get_coordinator() noexcept -> pipeline_coordinator &
Get the underlying coordinator.
storage_query_exec_job::service_handler service_handler
Service handler type.
void register_c_store_handler(service_handler handler)
Register C-STORE handler.
std::mutex handlers_mutex_
void on_connection_closed(uint64_t session_id)
Handle connection closed event.
std::function< void(uint64_t session_id, const std::string &event)> session_event_callback
Type for session event callback.
void on_job_completed(const job_context &ctx, bool success)
Handle job completion.
void set_association_callback(association_callback callback)
Set the association event callback.
pipeline_adapter()
Construct adapter with default configuration.
auto get_handler_for_command(dimse_command_type type) -> service_handler
Get service handler for command type.
std::mutex sessions_mutex_
session_event_callback session_event_callback_
auto on_data_received(uint64_t session_id, std::vector< uint8_t > data) -> VoidResult
Handle incoming data from network.
void register_c_echo_handler(service_handler handler)
Register C-ECHO handler.
send_callback send_callback_
Callbacks.
~pipeline_adapter()
Destructor - ensures graceful shutdown.
service_handler c_get_handler_
std::function< void(uint64_t session_id, kcenon::pacs::network::pdu_type type, const std::vector< uint8_t > &data)> association_callback
Type for association event callback.
void register_c_move_handler(service_handler handler)
Register C-MOVE handler.
std::mutex callbacks_mutex_
service_handler c_echo_handler_
void on_backpressure(pipeline_stage stage, size_t queue_depth)
Handle backpressure.
Coordinates the 6-stage DICOM I/O pipeline.
Centralized metrics collection for the entire pipeline.
DIMSE processing job for Stage 3 of the pipeline.
constexpr auto get_stage_name(pipeline_stage stage) noexcept -> std::string_view
Get the human-readable name of a pipeline stage.
dimse_command_type
DICOM DIMSE command types.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
@ network_send
Stage 6: Send PDU bytes to network.
@ network_receive
Stage 1: Receive raw PDU bytes from network.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Adapter for integrating pipeline with existing DICOM components.
Context information attached to pipeline jobs for tracking.
pipeline_stage stage
Current pipeline stage.
job_category category
Job category for metrics.
uint64_t enqueue_time_ns
Timestamp when job entered the pipeline (nanoseconds since epoch)
Configuration options for the pipeline coordinator.
Context for a DICOM association session.
std::chrono::steady_clock::time_point created_at
Creation timestamp.
uint64_t session_id
Session identifier.
std::chrono::steady_clock::time_point last_activity
Last activity timestamp.