PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
dicom_server.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
15
16#include <kcenon/common/patterns/event_bus.h>
17
18#include <algorithm>
19#include <sstream>
20
21// KCENON_HAS_COMMON_SYSTEM is defined by CMake when common_system is available
22#ifndef KCENON_HAS_COMMON_SYSTEM
23#define KCENON_HAS_COMMON_SYSTEM 0
24#endif
25
26#if KCENON_HAS_COMMON_SYSTEM
27using kcenon::common::error_info;
28#endif
29
30namespace kcenon::pacs::network {
31
32// Registry for in-memory testing
33static std::mutex registry_mutex_;
34static std::map<uint16_t, dicom_server*> server_registry_;
35
36// =============================================================================
37// Construction / Destruction
38// =============================================================================
39
42 config,
43 std::make_shared<integration::thread_pool_adapter>(
44 integration::thread_pool_config{})) {
45}
46
48 const server_config& config,
49 std::shared_ptr<integration::thread_pool_interface> thread_pool)
50 : thread_pool_(std::move(thread_pool))
51 , config_(config) {
52 stats_.start_time = clock::now();
54
55 // Ensure thread pool is started
56 if (thread_pool_ && !thread_pool_->is_running()) {
57 static_cast<void>(thread_pool_->start());
58 }
59}
60
62 if (running_) {
63 stop();
64 }
65}
66
67// =============================================================================
68// Service Registration
69// =============================================================================
70
72 if (!service) {
73 return;
74 }
75
76 std::lock_guard<std::mutex> lock(services_mutex_);
77
78 // Register SOP Class mappings
79 for (const auto& sop_class : service->supported_sop_classes()) {
80 sop_class_to_service_[sop_class] = service.get();
81 }
82
83 services_.push_back(std::move(service));
84}
85
86std::vector<std::string> dicom_server::supported_sop_classes() const {
87 std::lock_guard<std::mutex> lock(services_mutex_);
88
89 std::vector<std::string> sop_classes;
90 sop_classes.reserve(sop_class_to_service_.size());
91
92 for (const auto& [uid, _] : sop_class_to_service_) {
93 sop_classes.push_back(uid);
94 }
95
96 return sop_classes;
97}
98
99// =============================================================================
100// Lifecycle Management
101// =============================================================================
102
104 if (running_.exchange(true)) {
105 return error_info("Server already running");
106 }
107
108 // Validate configuration
109 if (config_.ae_title.empty()) {
110 running_ = false;
111 return error_info("AE Title cannot be empty");
112 }
113
114 if (config_.ae_title.length() > AE_TITLE_LENGTH) {
115 running_ = false;
116 return error_info("AE Title exceeds 16 characters");
117 }
118
119 if (config_.port == 0) {
120 running_ = false;
121 return error_info("Invalid port number");
122 }
123
124 // Check that at least one service is registered
125 {
126 std::lock_guard<std::mutex> lock(services_mutex_);
127 if (services_.empty()) {
128 running_ = false;
129 return error_info("No services registered");
130 }
131 }
132
133 // Reset statistics
134 {
135 std::lock_guard<std::mutex> lock(stats_mutex_);
136 stats_.start_time = clock::now();
143 stats_.bytes_sent = 0;
144 }
145
146 // Create and start accept worker
147 accept_worker_ = std::make_unique<detail::accept_worker>(
149 // Connection callback (currently unused - placeholder for future TCP integration)
150 [](uint64_t /*session_id*/) {
151 // Future: handle new TCP connection
152 },
153 // Maintenance callback for periodic idle timeout checks
154 [this]() {
155 check_idle_timeouts();
156 }
157 );
158 accept_worker_->set_wake_interval(std::chrono::milliseconds(100));
159
160 auto start_result = accept_worker_->start();
161 if (start_result.is_err()) {
162 running_ = false;
163 return error_info("Failed to start accept worker: " +
164 start_result.error().message);
165 }
166
167 // Register in global registry
168 {
169 std::lock_guard<std::mutex> lock(registry_mutex_);
171 }
172
173 return std::monostate{};
174}
175
177 if (!running_.exchange(false)) {
178 return; // Already stopped
179 }
180
181 // Unregister from global registry
182 {
183 std::lock_guard<std::mutex> lock(registry_mutex_);
185 }
186
187 // Stop accept worker (handles graceful shutdown via on_stop_requested hook)
188 if (accept_worker_) {
189 accept_worker_->stop();
190 accept_worker_.reset();
191 }
192
193 // =========================================================================
194 // Phase 1: Request graceful cancellation via cancellation tokens
195 // =========================================================================
196 // This triggers cooperative cancellation in all message loops.
197 // The message_loop will detect cancellation and attempt graceful release.
198 {
199 std::lock_guard<std::mutex> lock(associations_mutex_);
200 for (auto& [id, info] : associations_) {
201 info->cancel_token.cancel();
202 }
203 }
204
205 // =========================================================================
206 // Phase 2: Wait for graceful shutdown with timeout
207 // =========================================================================
208 // Give associations time to complete their graceful release.
209 // The message_loop will call release() and remove itself from the map.
210 auto deadline = clock::now() + timeout;
211 {
212 std::unique_lock<std::mutex> lock(associations_mutex_);
213 while (!associations_.empty() && clock::now() < deadline) {
214 // Release lock while waiting to allow message_loop to remove associations
215 lock.unlock();
216 std::this_thread::sleep_for(std::chrono::milliseconds{100});
217 lock.lock();
218 }
219 }
220
221 // =========================================================================
222 // Phase 3: Force abort remaining associations
223 // =========================================================================
224 // If any associations are still active after timeout, force abort them.
225 // This ensures we don't hang indefinitely on unresponsive connections.
226 {
227 std::lock_guard<std::mutex> lock(associations_mutex_);
228 for (auto& [id, info] : associations_) {
229 if (info->assoc.is_established()) {
230 info->assoc.abort(
231 static_cast<uint8_t>(abort_source::service_provider),
232 static_cast<uint8_t>(abort_reason::not_specified));
233 }
234 // Mark as no longer processing
235 // Note: Already holding associations_mutex_ so no atomic needed
236 info->processing = false;
237 }
238 associations_.clear();
239 }
240
241 // Notify shutdown waiters
242 {
243 std::lock_guard<std::mutex> lock(shutdown_mutex_);
244 shutdown_cv_.notify_all();
245 }
246}
247
249 std::unique_lock<std::mutex> lock(shutdown_mutex_);
250 shutdown_cv_.wait(lock, [this]() { return !running_; });
251}
252
253// =============================================================================
254// Status Queries
255// =============================================================================
256
257bool dicom_server::is_running() const noexcept {
258 return running_;
259}
260
261size_t dicom_server::active_associations() const noexcept {
262 std::lock_guard<std::mutex> lock(associations_mutex_);
263 return associations_.size();
264}
265
267 std::lock_guard<std::mutex> lock(stats_mutex_);
268 server_statistics result = stats_;
270 return result;
271}
272
273const server_config& dicom_server::config() const noexcept {
274 return config_;
275}
276
277// =============================================================================
278// Callbacks
279// =============================================================================
280
282 std::lock_guard<std::mutex> lock(callback_mutex_);
283 on_established_cb_ = std::move(callback);
284}
285
287 std::lock_guard<std::mutex> lock(callback_mutex_);
288 on_released_cb_ = std::move(callback);
289}
290
292 std::lock_guard<std::mutex> lock(callback_mutex_);
293 on_error_cb_ = std::move(callback);
294}
295
296// =============================================================================
297// Private Methods - Association Handling
298// =============================================================================
299
300void dicom_server::handle_association(uint64_t session_id, association assoc) {
301 // Check max associations limit
302 {
303 std::lock_guard<std::mutex> lock(associations_mutex_);
304 if (config_.max_associations > 0 &&
306 // Reject due to resource limit
307 assoc.abort(
308 static_cast<uint8_t>(abort_source::service_provider),
309 static_cast<uint8_t>(abort_reason::not_specified));
310
311 std::lock_guard<std::mutex> stats_lock(stats_mutex_);
313 return;
314 }
315 }
316
317 // Create association info
318 association_info info;
319 info.id = session_id;
320 info.assoc = std::move(assoc);
321 info.connected_at = clock::now();
322 info.last_activity = info.connected_at;
323
324 // Invoke established callback
325 {
326 std::lock_guard<std::mutex> lock(callback_mutex_);
327 if (on_established_cb_) {
328 on_established_cb_(info.assoc);
329 }
330 }
331
332 // Publish association established event
333 kcenon::common::get_event_bus().publish(
335 std::string(info.assoc.calling_ae()),
336 std::string(info.assoc.called_ae()),
337 info.remote_address,
339 info.assoc.max_pdu_size()
340 }
341 );
342
343 // Add to active associations
344 add_association(std::move(info));
345
346 // Update statistics
347 {
348 std::lock_guard<std::mutex> lock(stats_mutex_);
350 stats_.last_activity = clock::now();
351 }
352}
353
355 // Check both running flag and cancellation token for cooperative shutdown
356 while (running_ && !info.cancel_token.is_cancelled() && info.assoc.is_established()) {
357 // Receive DIMSE message with timeout
358 // Use shorter timeout to check cancellation more frequently
359 auto receive_timeout = (std::min)(
360 std::chrono::duration_cast<association::duration>(config_.idle_timeout),
361 association::duration{1000} // Check cancellation at least every second
362 );
363
364 auto result = info.assoc.receive_dimse(receive_timeout);
365
366 // Check for cancellation after potentially blocking receive
367 if (info.cancel_token.is_cancelled()) {
368 break;
369 }
370
371 if (result.is_err()) {
372 // Check if it was a timeout vs. actual error
373 if (info.assoc.is_established()) {
374 // Still established, might be timeout - check idle
375 auto now = clock::now();
376 auto idle_duration = std::chrono::duration_cast<std::chrono::seconds>(
377 now - info.last_activity);
378
379 if (config_.idle_timeout.count() > 0 &&
380 idle_duration >= config_.idle_timeout) {
381 // Idle timeout - abort association
382 info.assoc.abort(
383 static_cast<uint8_t>(abort_source::service_provider),
384 static_cast<uint8_t>(abort_reason::not_specified));
385 }
386 }
387 break;
388 }
389
390 // Update activity timestamp
391 info.last_activity = clock::now();
392 touch_association(info.id);
393
394 // Dispatch message to service
395 auto& [context_id, msg] = result.value();
396 auto dispatch_result = dispatch_to_service(info.assoc, context_id, msg);
397
398 if (dispatch_result.is_err()) {
399 report_error(dispatch_result.error().message);
400 }
401
402 // Update statistics
403 {
404 std::lock_guard<std::mutex> lock(stats_mutex_);
406 stats_.last_activity = clock::now();
407 }
408 }
409
410 // If cancelled, attempt graceful release before forced abort
411 if (info.cancel_token.is_cancelled() && info.assoc.is_established()) {
412 // Try graceful release first - ignore result as we'll abort anyway if it fails
413 // The Phase 3 shutdown will handle any remaining established associations
414 (void)info.assoc.release();
415 }
416
417 // Calculate association duration
418 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
419 clock::now() - info.connected_at);
420
421 // Determine if aborted
422 bool is_aborted = info.assoc.state() == association_state::aborted;
423
424 // Association ended - invoke released callback
425 {
426 std::lock_guard<std::mutex> lock(callback_mutex_);
427 if (on_released_cb_) {
428 on_released_cb_(info.assoc);
429 }
430 }
431
432 // Publish association event (released or aborted)
433 if (is_aborted) {
434 kcenon::common::get_event_bus().publish(
436 std::string(info.assoc.calling_ae()),
437 std::string(info.assoc.called_ae()),
438 "Association aborted"
439 }
440 );
441 } else {
442 kcenon::common::get_event_bus().publish(
444 std::string(info.assoc.calling_ae()),
445 std::string(info.assoc.called_ae()),
446 duration,
447 0 // operations_count not tracked yet
448 }
449 );
450 }
451
452 // Remove from active associations
453 remove_association(info.id);
454}
455
457 association& assoc,
458 uint8_t context_id,
459 const dimse::dimse_message& msg) {
460
461 // Get the SOP Class from the message
462 std::string sop_class_uid = msg.affected_sop_class_uid();
463
464 if (sop_class_uid.empty()) {
465 return error_info("Cannot determine SOP Class UID from message");
466 }
467
468 // Find the appropriate service
469 auto* service = find_service(sop_class_uid);
470 if (!service) {
471 return error_info("No service registered for SOP Class: " + sop_class_uid);
472 }
473
474 // Dispatch to service
475 return service->handle_message(assoc, context_id, msg);
476}
477
478bool dicom_server::validate_calling_ae(const std::string& calling_ae) const {
479 if (config_.ae_whitelist.empty()) {
480 return true; // No whitelist = accept all
481 }
482
484 return true;
485 }
486
487 return std::find(config_.ae_whitelist.begin(),
488 config_.ae_whitelist.end(),
489 calling_ae) != config_.ae_whitelist.end();
490}
491
492bool dicom_server::validate_called_ae(const std::string& called_ae) const {
493 return called_ae == config_.ae_title;
494}
495
497 const std::string& sop_class_uid) const {
498
499 std::lock_guard<std::mutex> lock(services_mutex_);
500 auto it = sop_class_to_service_.find(sop_class_uid);
501 if (it != sop_class_to_service_.end()) {
502 return it->second;
503 }
504 return nullptr;
505}
506
508 std::lock_guard<std::mutex> lock(associations_mutex_);
509 auto id = info.id;
510 associations_.emplace(id, std::make_unique<association_info>(std::move(info)));
511}
512
514 std::lock_guard<std::mutex> lock(associations_mutex_);
515 auto it = associations_.find(id);
516 if (it != associations_.end()) {
517 // With thread_pool_adapter, no thread join is needed.
518 // Just mark as not processing and remove from map.
519 // Note: Already holding associations_mutex_ so no atomic needed
520 it->second->processing = false;
521 associations_.erase(it);
522 }
523}
524
526 std::lock_guard<std::mutex> lock(associations_mutex_);
527 auto it = associations_.find(id);
528 if (it != associations_.end()) {
529 it->second->last_activity = clock::now();
530 }
531}
532
534 if (config_.idle_timeout.count() == 0) {
535 return; // No timeout configured
536 }
537
538 auto now = clock::now();
539 std::vector<uint64_t> timed_out;
540
541 {
542 std::lock_guard<std::mutex> lock(associations_mutex_);
543 for (auto& [id, info] : associations_) {
544 auto idle_duration = std::chrono::duration_cast<std::chrono::seconds>(
545 now - info->last_activity);
546
547 if (idle_duration >= config_.idle_timeout) {
548 timed_out.push_back(id);
549 }
550 }
551
552 // Abort timed-out associations
553 for (auto id : timed_out) {
554 auto it = associations_.find(id);
555 if (it != associations_.end()) {
556 it->second->assoc.abort(
557 static_cast<uint8_t>(abort_source::service_provider),
558 static_cast<uint8_t>(abort_reason::not_specified));
559 }
560 }
561 }
562}
563
565 return association_id_counter_.fetch_add(1, std::memory_order_relaxed);
566}
567
568void dicom_server::report_error(const std::string& error) {
569 std::lock_guard<std::mutex> lock(callback_mutex_);
570 if (on_error_cb_) {
571 on_error_cb_(error);
572 }
573}
574
576 std::lock_guard<std::mutex> lock(registry_mutex_);
577 auto it = server_registry_.find(port);
578 return it != server_registry_.end() ? it->second : nullptr;
579}
580
582 // Validate called AE title
584 return error_info("Called AE Title not recognized");
585 }
586
587 // Validate calling AE title
589 return error_info("Calling AE Title not recognized");
590 }
591
592 // Create SCP configuration for negotiation
593 scp_config negotiation_config;
594 negotiation_config.ae_title = config_.ae_title;
595 negotiation_config.max_pdu_length = config_.max_pdu_size;
599 // Accept compressed and uncompressed transfer syntaxes
600 negotiation_config.supported_transfer_syntaxes = {
601 "1.2.840.10008.1.2.4.201", // HTJ2K Lossless Only
602 "1.2.840.10008.1.2.4.202", // HTJ2K RPCL Lossless Only
603 "1.2.840.10008.1.2.4.203", // HTJ2K (Lossless or Lossy)
604 "1.2.840.10008.1.2.4.90", // JPEG 2000 Lossless
605 "1.2.840.10008.1.2.4.91", // JPEG 2000 Lossy
606 "1.2.840.10008.1.2.4.80", // JPEG-LS Lossless
607 "1.2.840.10008.1.2.4.81", // JPEG-LS Near-Lossless
608 "1.2.840.10008.1.2.4.70", // JPEG Lossless
609 "1.2.840.10008.1.2.4.50", // JPEG Baseline
610 "1.2.840.10008.1.2.5", // RLE Lossless
611 "1.2.840.10008.1.2.1", // Explicit VR LE
612 "1.2.840.10008.1.2" // Implicit VR LE
613 };
614
615 // Accept association
616 auto assoc = association::accept(rq, negotiation_config);
617
618 if (!assoc.is_established()) {
619 return error_info("Association rejected: no acceptable presentation contexts");
620 }
621
622 // Build AC PDU before moving association
623 auto ac = assoc.build_associate_ac();
624
625 // Register association
626 uint64_t id = next_association_id();
627 handle_association(id, std::move(assoc));
628
629 // Link peers and start message processing via thread pool
630 {
631 std::lock_guard<std::mutex> lock(associations_mutex_);
632 auto it = associations_.find(id);
633 if (it != associations_.end()) {
634 // Link peers
635 it->second->assoc.set_peer(client_peer);
636 client_peer->set_peer(&it->second->assoc);
637
638 // Submit message loop to thread_pool_adapter instead of creating dedicated thread.
639 // We need to capture the raw pointer to info because unique_ptr is in the map.
640 // The map might rehash but the unique_ptr value (the pointer to association_info) is stable.
641 auto* info_ptr = it->second.get();
642
643 // Mark as processing before submitting to pool
644 // Note: Already holding associations_mutex_ so no atomic needed
645 info_ptr->processing = true;
646
647 // Submit with high priority - association message handling is important
648 thread_pool_->submit_fire_and_forget(
649 [this, info_ptr]() {
650 message_loop(*info_ptr);
651 }
652 );
653 }
654 }
655
656 return ac;
657}
658
659} // namespace kcenon::pacs::network
static association accept(const associate_rq &rq, const scp_config &config)
Accept an incoming SCP association.
void set_peer(association *peer)
Set peer association for in-memory testing.
std::chrono::milliseconds duration
void abort(uint8_t source=0, uint8_t reason=0)
Abort the association immediately.
Result< std::monostate > dispatch_to_service(association &assoc, uint8_t context_id, const dimse::dimse_message &msg)
Dispatch message to appropriate service.
bool validate_called_ae(const std::string &called_ae) const
Validate called AE title.
uint64_t next_association_id()
Generate unique association ID.
std::unordered_map< std::string, services::scp_service * > sop_class_to_service_
Map from SOP Class UID to service.
std::mutex stats_mutex_
Statistics mutex.
void stop(duration timeout=std::chrono::seconds{30})
Stop the server gracefully.
void report_error(const std::string &error)
Report error through callback.
std::vector< std::string > supported_sop_classes() const
Get list of supported SOP Class UIDs.
void register_service(services::scp_service_ptr service)
Register an SCP service.
std::unique_ptr< detail::accept_worker > accept_worker_
Accept worker (replaces std::thread accept_thread_)
std::mutex services_mutex_
Service registry mutex.
services::scp_service * find_service(const std::string &sop_class_uid) const
Get service for SOP Class.
association_callback on_released_cb_
Association released callback.
bool validate_calling_ae(const std::string &calling_ae) const
Validate calling AE title.
Result< associate_ac > simulate_association_request(const associate_rq &rq, association *client_peer)
Simulate an incoming association request (for in-memory testing).
std::shared_ptr< integration::thread_pool_interface > thread_pool_
Thread pool for asynchronous task execution.
void on_error(error_callback callback)
Set callback for error events.
server_statistics get_statistics() const
Get server statistics.
void message_loop(association_info &info)
Process incoming DIMSE messages.
void remove_association(uint64_t id)
Remove association from pool.
void touch_association(uint64_t id)
Update association activity timestamp.
void wait_for_shutdown()
Wait for server shutdown.
std::function< void(const std::string &)> error_callback
Callback type for error events.
std::mutex associations_mutex_
Association mutex.
bool is_running() const noexcept
Check if server is running.
std::atomic< uint64_t > association_id_counter_
Association ID counter.
void on_association_released(association_callback callback)
Set callback for association released events.
std::vector< services::scp_service_ptr > services_
Registered SCP services.
std::mutex shutdown_mutex_
Shutdown mutex.
void handle_association(uint64_t session_id, association assoc)
Handle a single association (runs in worker thread)
dicom_server(const server_config &config)
Construct server with configuration.
static dicom_server * get_server_on_port(uint16_t port)
Get server instance listening on port (for in-memory testing).
std::unordered_map< uint64_t, std::unique_ptr< association_info > > associations_
Active associations.
size_t active_associations() const noexcept
Get number of active associations.
std::mutex callback_mutex_
Callback mutex.
error_callback on_error_cb_
Error callback.
std::atomic< bool > running_
Running flag.
~dicom_server()
Destructor (stops server if running)
association_callback on_established_cb_
Association established callback.
std::function< void(const association &)> association_callback
Callback type for association events.
void on_association_established(association_callback callback)
Set callback for association established events.
void check_idle_timeouts()
Check for idle timeout.
std::chrono::milliseconds duration
const server_config & config() const noexcept
Get server configuration.
server_config config_
Server configuration.
Result< std::monostate > start()
Start the server.
void add_association(association_info info)
Add association to pool.
std::condition_variable shutdown_cv_
Shutdown condition variable.
server_statistics stats_
Server statistics.
auto affected_sop_class_uid() const -> std::string
Get the Affected SOP Class UID.
Multi-threaded DICOM server for handling multiple associations.
DICOM event definitions for event-based communication.
static std::map< uint16_t, dicom_server * > server_registry_
@ not_specified
Reason not specified.
constexpr size_t AE_TITLE_LENGTH
AE Title length (fixed 16 characters, space-padded)
Definition pdu_types.h:273
static std::mutex registry_mutex_
@ service_provider
DICOM UL service-provider (ACSE)
@ aborted
Association aborted (error condition)
std::shared_ptr< scp_service > scp_service_ptr
Shared pointer type for SCP services.
kcenon::common::error_info error_info
Error information type.
Definition result.h:40
Event published when a DICOM association is aborted.
Definition events.h:80
Event published when a DICOM association is successfully established.
Definition events.h:35
Event published when a DICOM association is gracefully released.
Definition events.h:59
A-ASSOCIATE-RQ PDU data.
Definition pdu_types.h:205
std::string called_ae_title
Called AE Title (16 chars max)
Definition pdu_types.h:206
std::string calling_ae_title
Calling AE Title (16 chars max)
Definition pdu_types.h:207
Internal association info for tracking.
Configuration for SCP to accept associations.
std::string ae_title
Our AE Title.
std::vector< std::string > supported_abstract_syntaxes
std::vector< std::string > supported_transfer_syntaxes
size_t max_associations
Maximum concurrent associations (0 = unlimited)
std::chrono::seconds idle_timeout
Idle timeout for associations (0 = no timeout)
uint32_t max_pdu_size
Maximum PDU size for data transfer.
std::vector< std::string > ae_whitelist
AE Title whitelist (empty = accept all)
std::string implementation_version_name
Implementation Version Name.
uint16_t port
Port to listen on (default: 11112, standard alternate DICOM port)
std::string ae_title
Application Entity Title for this server (16 chars max)
std::string implementation_class_uid
Implementation Class UID.
bool accept_unknown_calling_ae
Accept unknown calling AE titles (when whitelist is non-empty)
Statistics for server monitoring.
std::chrono::steady_clock::time_point last_activity
Time of last activity.
uint64_t total_associations
Total associations since server start.
uint64_t messages_processed
Total DIMSE messages processed.
size_t active_associations
Currently active associations.
std::chrono::steady_clock::time_point start_time
Server start time.
uint64_t bytes_received
Total bytes received.
uint64_t rejected_associations
Total associations rejected due to limit.
Concrete implementation of thread_pool_interface using kcenon::thread.
std::string_view uid