PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
pipeline_adapter.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
15
16#include <chrono>
17
19
20// =============================================================================
21// Construction & Destruction
22// =============================================================================
23
27
29 : coordinator_(std::make_unique<pipeline_coordinator>(config)) {
30
31 // Set up internal callbacks
32 coordinator_->set_job_completion_callback(
33 [this](const job_context& ctx, bool success) {
35 }
36 );
37
38 coordinator_->set_backpressure_callback(
39 [this](pipeline_stage stage, size_t depth) {
40 on_backpressure(stage, depth);
41 }
42 );
43}
44
46 if (is_running()) {
47 static_cast<void>(stop());
48 }
49}
50
51// =============================================================================
52// Lifecycle Management
53// =============================================================================
54
56 return coordinator_->start();
57}
58
60 return coordinator_->stop();
61}
62
63auto pipeline_adapter::is_running() const noexcept -> bool {
64 return coordinator_->is_running();
65}
66
67// =============================================================================
68// Session Management
69// =============================================================================
70
71void pipeline_adapter::register_session(uint64_t session_id,
72 session_context context) {
73 std::lock_guard<std::mutex> lock(sessions_mutex_);
74 context.session_id = session_id;
75 context.created_at = std::chrono::steady_clock::now();
76 context.last_activity = context.created_at;
77 sessions_[session_id] = std::move(context);
78
79 // Update metrics
80 coordinator_->get_metrics().increment_active_associations();
81
82 // Notify session event
84 {
85 std::lock_guard<std::mutex> cb_lock(callbacks_mutex_);
86 callback = session_event_callback_;
87 }
88 if (callback) {
89 callback(session_id, "session_registered");
90 }
91}
92
93void pipeline_adapter::unregister_session(uint64_t session_id) {
94 std::lock_guard<std::mutex> lock(sessions_mutex_);
95 if (sessions_.erase(session_id) > 0) {
96 coordinator_->get_metrics().decrement_active_associations();
97
98 // Notify session event
100 {
101 std::lock_guard<std::mutex> cb_lock(callbacks_mutex_);
102 callback = session_event_callback_;
103 }
104 if (callback) {
105 callback(session_id, "session_unregistered");
106 }
107 }
108}
109
110auto pipeline_adapter::get_session(uint64_t session_id)
111 -> std::optional<session_context> {
112 std::lock_guard<std::mutex> lock(sessions_mutex_);
113 auto it = sessions_.find(session_id);
114 if (it != sessions_.end()) {
115 return it->second;
116 }
117 return std::nullopt;
118}
119
120auto pipeline_adapter::get_active_session_count() const noexcept -> size_t {
121 std::lock_guard<std::mutex> lock(sessions_mutex_);
122 return sessions_.size();
123}
124
125// =============================================================================
126// Data Handling
127// =============================================================================
128
129auto pipeline_adapter::on_data_received(uint64_t session_id,
130 std::vector<uint8_t> data) -> VoidResult {
131 if (!is_running()) {
132 return pacs_void_error(error_codes::not_initialized,
133 "Pipeline adapter is not running");
134 }
135
136 // Update session activity
137 {
138 std::lock_guard<std::mutex> lock(sessions_mutex_);
139 auto it = sessions_.find(session_id);
140 if (it != sessions_.end()) {
141 it->second.last_activity = std::chrono::steady_clock::now();
142 }
143 }
144
145 // Get send callback for use in jobs
146 send_callback send_fn;
147 {
148 std::lock_guard<std::mutex> lock(callbacks_mutex_);
149 send_fn = send_callback_;
150 }
151
152 // Get association callback
153 association_callback assoc_cb;
154 {
155 std::lock_guard<std::mutex> lock(callbacks_mutex_);
156 assoc_cb = association_callback_;
157 }
158
159 // Create receive job
160 auto job = std::make_unique<receive_network_io_job>(
161 session_id,
162 std::move(data),
163 nullptr, // on_data callback not needed
164 nullptr // on_error callback not needed
165 );
166
167 return coordinator_->submit_to_stage(
169 std::move(job)
170 );
171}
172
173void pipeline_adapter::on_connection_closed(uint64_t session_id) {
174 unregister_session(session_id);
175}
176
177// =============================================================================
178// Service Handler Registration
179// =============================================================================
180
182 std::lock_guard<std::mutex> lock(handlers_mutex_);
183 c_store_handler_ = std::move(handler);
184}
185
187 std::lock_guard<std::mutex> lock(handlers_mutex_);
188 c_find_handler_ = std::move(handler);
189}
190
192 std::lock_guard<std::mutex> lock(handlers_mutex_);
193 c_get_handler_ = std::move(handler);
194}
195
197 std::lock_guard<std::mutex> lock(handlers_mutex_);
198 c_move_handler_ = std::move(handler);
199}
200
202 std::lock_guard<std::mutex> lock(handlers_mutex_);
203 c_echo_handler_ = std::move(handler);
204}
205
206// =============================================================================
207// Callback Registration
208// =============================================================================
209
211 std::lock_guard<std::mutex> lock(callbacks_mutex_);
212 send_callback_ = std::move(callback);
213}
214
216 std::lock_guard<std::mutex> lock(callbacks_mutex_);
217 association_callback_ = std::move(callback);
218}
219
221 session_event_callback callback) {
222 std::lock_guard<std::mutex> lock(callbacks_mutex_);
223 session_event_callback_ = std::move(callback);
224}
225
226// =============================================================================
227// Metrics & Statistics
228// =============================================================================
229
231 return coordinator_->get_metrics();
232}
233
234auto pipeline_adapter::get_metrics() const noexcept -> const pipeline_metrics& {
235 return coordinator_->get_metrics();
236}
237
241
243 -> const pipeline_coordinator& {
244 return *coordinator_;
245}
246
247// =============================================================================
248// Private Methods
249// =============================================================================
250
252 -> service_handler {
253 std::lock_guard<std::mutex> lock(handlers_mutex_);
254
255 switch (type) {
257 return c_store_handler_;
258
260 return c_find_handler_;
261
263 return c_get_handler_;
264
266 return c_move_handler_;
267
269 return c_echo_handler_;
270
271 default:
272 return nullptr;
273 }
274}
275
277 // Record end-to-end metrics if this is the final stage
279 auto now = std::chrono::steady_clock::now();
280 auto enqueue_time = std::chrono::steady_clock::time_point(
281 std::chrono::nanoseconds(ctx.enqueue_time_ns)
282 );
283 auto latency_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
284 now - enqueue_time
285 ).count();
286
287 coordinator_->get_metrics().record_operation_completion(
288 ctx.category,
289 static_cast<uint64_t>(latency_ns),
290 success
291 );
292 }
293}
294
295void pipeline_adapter::on_backpressure(pipeline_stage stage, size_t queue_depth) {
296 // Log backpressure event
297 // In a full implementation, this could trigger adaptive throttling
298 session_event_callback callback;
299 {
300 std::lock_guard<std::mutex> lock(callbacks_mutex_);
301 callback = session_event_callback_;
302 }
303
304 if (callback) {
305 std::string event = "backpressure_" + std::string(get_stage_name(stage)) +
306 "_depth_" + std::to_string(queue_depth);
307 callback(0, event); // session_id 0 for system events
308 }
309}
310
311} // namespace kcenon::pacs::network::pipeline
Adapter for integrating pipeline with DICOM server components.
void set_session_event_callback(session_event_callback callback)
Set the session event callback.
void set_send_callback(send_callback callback)
Set the network send callback.
auto get_active_session_count() const noexcept -> size_t
Get number of active sessions.
auto get_metrics() noexcept -> pipeline_metrics &
Get the pipeline metrics.
auto is_running() const noexcept -> bool
Check if the adapter is running.
auto start() -> VoidResult
Start the pipeline adapter.
service_handler c_store_handler_
Service handlers.
void register_c_get_handler(service_handler handler)
Register C-GET handler.
void register_session(uint64_t session_id, session_context context)
Register a new session.
std::unordered_map< uint64_t, session_context > sessions_
Session registry.
auto stop() -> VoidResult
Stop the pipeline adapter.
std::unique_ptr< pipeline_coordinator > coordinator_
Pipeline coordinator.
auto get_session(uint64_t session_id) -> std::optional< session_context >
Get session context.
void register_c_find_handler(service_handler handler)
Register C-FIND handler.
void unregister_session(uint64_t session_id)
Unregister a session.
std::function< VoidResult(uint64_t session_id, const std::vector< uint8_t > &data)> send_callback
Type for network send callback.
auto get_coordinator() noexcept -> pipeline_coordinator &
Get the underlying coordinator.
storage_query_exec_job::service_handler service_handler
Service handler type.
void register_c_store_handler(service_handler handler)
Register C-STORE handler.
void on_connection_closed(uint64_t session_id)
Handle connection closed event.
std::function< void(uint64_t session_id, const std::string &event)> session_event_callback
Type for session event callback.
void on_job_completed(const job_context &ctx, bool success)
Handle job completion.
void set_association_callback(association_callback callback)
Set the association event callback.
pipeline_adapter()
Construct adapter with default configuration.
auto get_handler_for_command(dimse_command_type type) -> service_handler
Get service handler for command type.
auto on_data_received(uint64_t session_id, std::vector< uint8_t > data) -> VoidResult
Handle incoming data from network.
void register_c_echo_handler(service_handler handler)
Register C-ECHO handler.
~pipeline_adapter()
Destructor - ensures graceful shutdown.
std::function< void(uint64_t session_id, kcenon::pacs::network::pdu_type type, const std::vector< uint8_t > &data)> association_callback
Type for association event callback.
void register_c_move_handler(service_handler handler)
Register C-MOVE handler.
void on_backpressure(pipeline_stage stage, size_t queue_depth)
Handle backpressure.
Coordinates the 6-stage DICOM I/O pipeline.
Centralized metrics collection for the entire pipeline.
DIMSE processing job for Stage 3 of the pipeline.
constexpr auto get_stage_name(pipeline_stage stage) noexcept -> std::string_view
Get the human-readable name of a pipeline stage.
dimse_command_type
DICOM DIMSE command types.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
@ network_send
Stage 6: Send PDU bytes to network.
@ network_receive
Stage 1: Receive raw PDU bytes from network.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
Definition association.h:59
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Definition result.h:249
Adapter for integrating pipeline with existing DICOM components.
Context information attached to pipeline jobs for tracking.
pipeline_stage stage
Current pipeline stage.
job_category category
Job category for metrics.
uint64_t enqueue_time_ns
Timestamp when job entered the pipeline (nanoseconds since epoch)
Configuration options for the pipeline coordinator.
Context for a DICOM association session.
std::chrono::steady_clock::time_point created_at
Creation timestamp.
std::chrono::steady_clock::time_point last_activity
Last activity timestamp.