17#include <condition_variable>
22#include <unordered_map>
36const std::vector<std::string> default_transfer_syntaxes = {
37 "1.2.840.10008.1.2.1",
49 std::shared_ptr<storage::node_repository>
repo;
66 std::unordered_map<std::string, node_statistics>
statistics;
71 std::unique_ptr<network::association>
assoc;
95 const std::string& error_msg =
"") {
101 auto old_status = it->second.status;
102 it->second.status = status;
104 it->second.last_verified = std::chrono::system_clock::now();
106 it->second.last_error = std::chrono::system_clock::now();
107 it->second.last_error_message = error_msg;
111 if (old_status != status) {
119 [[maybe_unused]]
auto result =
repo->update_status(node_id, status, error_msg);
124 using namespace network;
127 association_config assoc_config;
129 assoc_config.called_ae_title = node.
ae_title;
130 assoc_config.proposed_contexts.push_back({
132 std::string(verification_sop_class_uid),
133 default_transfer_syntaxes
137 auto connect_result = association::connect(
144 if (connect_result.is_err()) {
147 "Failed to connect to node: " + node.
node_id,
148 connect_result.error().message);
151 auto& assoc = connect_result.value();
154 auto context_id = assoc.accepted_context_id(verification_sop_class_uid);
159 "Verification SOP Class not accepted by " + node.
node_id);
163 auto echo_rq = dimse::make_c_echo_rq(1);
166 auto send_result = assoc.send_dimse(*context_id, echo_rq);
167 if (send_result.is_err()) {
171 "Failed to send C-ECHO-RQ to " + node.
node_id);
175 auto recv_result = assoc.receive_dimse(
176 std::chrono::duration_cast<association::duration>(node.
dimse_timeout)
179 if (recv_result.is_err()) {
183 "Failed to receive C-ECHO-RSP from " + node.
node_id);
186 const auto& [recv_context_id, response] = recv_result.value();
189 if (response.command() != dimse::command_field::c_echo_rsp) {
193 "Unexpected response from " + node.
node_id);
197 if (response.status() != dimse::status_success) {
198 [[maybe_unused]]
auto release_result = assoc.release();
201 "C-ECHO failed with status: " +
202 std::to_string(
static_cast<uint16_t
>(response.status())));
206 [[maybe_unused]]
auto release_result = assoc.release();
208 return kcenon::pacs::ok();
214 std::vector<std::string> node_ids;
218 node_ids.push_back(
id);
222 for (
const auto&
id : node_ids) {
225 std::optional<remote_node> node;
238 if (result.is_ok()) {
243 statistics[id].successful_operations++;
244 statistics[id].last_activity = std::chrono::system_clock::now();
250 statistics[id].failed_operations++;
258 return !health_check_running.load();
266#ifdef PACS_WITH_DATABASE_SYSTEM
267 auto nodes_result =
repo->find_all();
268 if (nodes_result.is_err())
return;
271 for (
auto& node : nodes_result.value()) {
275 auto nodes =
repo->find_all();
277 for (
auto& node : nodes) {
289 std::shared_ptr<storage::node_repository> repo,
291 std::shared_ptr<di::ILogger> logger)
292 : impl_(std::make_unique<
impl>()) {
324 kcenon::pacs::error_codes::invalid_argument,
325 "Node ID cannot be empty");
330 kcenon::pacs::error_codes::invalid_argument,
331 "AE Title cannot be empty");
334 if (node.
host.empty()) {
336 kcenon::pacs::error_codes::invalid_argument,
337 "Host cannot be empty");
345 kcenon::pacs::error_codes::already_exists,
346 "Node with ID already exists: " + node.
node_id);
352#ifdef PACS_WITH_DATABASE_SYSTEM
354 if (result.is_err()) {
357 "Failed to persist node: " + result.error().message);
361 if (result.is_err()) {
364 "Failed to persist node: " + result.error().message);
375 impl_->
logger->info_fmt(
"Added remote node: {} ({}:{})",
378 return kcenon::pacs::ok();
387 kcenon::pacs::error_codes::not_found,
388 "Node not found: " + node.
node_id);
394#ifdef PACS_WITH_DATABASE_SYSTEM
396 if (result.is_err()) {
399 "Failed to update node: " + result.error().message);
403 if (result.is_err()) {
406 "Failed to update node: " + result.error().message);
419 return kcenon::pacs::ok();
423 std::string id_str(node_id);
430 kcenon::pacs::error_codes::not_found,
431 "Node not found: " + id_str);
437 auto result =
impl_->
repo->remove(std::string(node_id));
438 if (result.is_err()) {
461 impl_->
logger->info_fmt(
"Removed remote node: {}", id_str);
463 return kcenon::pacs::ok();
476 std::vector<remote_node> result;
480 result.push_back(node);
486 std::vector<remote_node> result;
489 if (node.status == status) {
490 result.push_back(node);
501 std::optional<remote_node> node;
512 kcenon::pacs::error_codes::not_found,
513 "Node not found: " + std::string(node_id));
520 if (result.is_ok()) {
523 [[maybe_unused]]
auto update_result =
impl_->
repo->update_last_verified(node_id);
527 result.error().message);
534 std::string_view node_id) {
536 std::string id_str(node_id);
538 return std::async(std::launch::async, [
this, id_str]() {
544 std::vector<std::string> node_ids;
548 node_ids.push_back(
id);
552 for (
const auto&
id : node_ids) {
553 std::thread([
this,
id]() {
564 std::string_view node_id,
565 std::span<const std::string> sop_classes) {
567 std::string id_str(node_id);
568 std::optional<remote_node> node;
579 return kcenon::pacs::pacs_error<std::unique_ptr<network::association>>(
580 kcenon::pacs::error_codes::not_found,
581 "Node not found: " + id_str);
589 auto pooled = std::move(it->second.front());
590 it->second.pop_front();
593 auto age = std::chrono::steady_clock::now() - pooled.acquired_at;
594 if (age < impl_->
config.pool_connection_ttl &&
595 pooled.assoc && pooled.assoc->is_established()) {
596 impl_->
logger->debug_fmt(
"Reusing pooled association for {}", id_str);
597 return kcenon::pacs::ok(std::move(pooled.assoc));
607 uint8_t context_id = 1;
608 for (
const auto& sop_class : sop_classes) {
612 default_transfer_syntaxes
621 std::chrono::duration_cast<network::association::duration>(node->connection_timeout)
624 if (connect_result.is_err()) {
626 connect_result.error().message);
627 return connect_result.error();
637 return kcenon::pacs::ok(std::make_unique<network::association>(std::move(connect_result.value())));
641 std::string_view node_id,
642 std::unique_ptr<network::association> assoc) {
644 std::string id_str(node_id);
650 if (it !=
impl_->
statistics.end() && it->second.active_connections > 0) {
651 it->second.active_connections--;
655 if (!assoc || !assoc->is_established()) {
665 pooled.
assoc = std::move(assoc);
666 pooled.
acquired_at = std::chrono::steady_clock::now();
667 pool.push_back(std::move(pooled));
668 impl_->
logger->debug_fmt(
"Returned association to pool for {}", id_str);
674 [[maybe_unused]]
auto release_result = assoc->release();
691 impl_->
logger->info(
"Started health check scheduler");
706 impl_->
logger->info(
"Stopped health check scheduler");
721 return it->second.status;
746 if (node_id.empty()) {
DICOM Association management per PS3.8.
void start_health_check()
Start the automatic health check scheduler.
auto is_health_check_running() const noexcept -> bool
Check if health check is running.
auto get_statistics(std::string_view node_id) const -> node_statistics
Get statistics for a node.
auto update_node(const remote_node &node) -> kcenon::pacs::VoidResult
Update an existing remote node.
auto get_node(std::string_view node_id) const -> std::optional< remote_node >
Get a node by ID.
auto get_status(std::string_view node_id) const -> node_status
Get the current status of a node.
std::unique_ptr< impl > impl_
~remote_node_manager()
Destructor - stops health check if running.
void verify_all_nodes_async()
Verify all nodes asynchronously.
void stop_health_check()
Stop the automatic health check scheduler.
remote_node_manager(std::shared_ptr< storage::node_repository > repo, node_manager_config config={}, std::shared_ptr< di::ILogger > logger=nullptr)
Construct a remote node manager.
auto verify_node(std::string_view node_id) -> kcenon::pacs::VoidResult
Verify a node's connectivity synchronously.
auto list_nodes_by_status(node_status status) const -> std::vector< remote_node >
List nodes filtered by status.
auto remove_node(std::string_view node_id) -> kcenon::pacs::VoidResult
Remove a remote node.
auto list_nodes() const -> std::vector< remote_node >
List all registered nodes.
auto acquire_association(std::string_view node_id, std::span< const std::string > sop_classes) -> kcenon::pacs::Result< std::unique_ptr< network::association > >
Acquire an association from the pool.
void reset_statistics(std::string_view node_id="")
Reset statistics for a node.
auto config() const noexcept -> const node_manager_config &
Get the current configuration.
void set_config(node_manager_config new_config)
Update the configuration.
void set_status_callback(node_status_callback callback)
Set the status change callback.
auto add_node(const remote_node &node) -> kcenon::pacs::VoidResult
Add a new remote node.
void release_association(std::string_view node_id, std::unique_ptr< network::association > assoc)
Release an association back to the pool.
auto verify_node_async(std::string_view node_id) -> std::future< kcenon::pacs::VoidResult >
Verify a node's connectivity asynchronously.
static Result< association > connect(const std::string &host, uint16_t port, const association_config &config, duration timeout=default_timeout)
Initiate an SCU association to a remote SCP.
DIMSE message encoding and decoding.
node_status
Status of a remote PACS node.
@ verifying
Verification in progress.
@ offline
Node is not responding.
@ online
Node is responding to C-ECHO.
@ unknown
Status not yet determined.
@ error
Node returned an error.
std::function< void(std::string_view node_id, node_status status)> node_status_callback
Callback function type for node status changes.
std::shared_ptr< ILogger > null_logger()
Get a shared null logger instance.
constexpr int no_acceptable_context
constexpr int receive_failed
constexpr int send_failed
constexpr int connection_failed
constexpr int dimse_error
constexpr std::string_view verification_sop_class_uid
Verification SOP Class UID (1.2.840.10008.1.1)
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Repository for remote PACS node persistence using base_repository pattern.
Remote PACS node manager for client operations.
Configuration for the remote node manager.
size_t max_pool_connections_per_node
Maximum pooled connections per node.
std::string local_ae_title
Our AE Title for outgoing associations.
std::chrono::seconds health_check_interval
Interval between automatic health checks.
bool auto_start_health_check
Start health check automatically on construction.
Statistics for a remote node.
std::unique_ptr< network::association > assoc
std::chrono::steady_clock::time_point acquired_at
void update_node_status(const std::string &node_id, node_status status, const std::string &error_msg="")
kcenon::pacs::VoidResult perform_echo(const remote_node &node)
std::thread health_check_thread
node_status_callback status_callback
void notify_status_change(std::string_view node_id, node_status status)
void load_nodes_from_repo()
std::mutex callback_mutex
std::unordered_map< std::string, node_statistics > statistics
std::atomic< bool > health_check_running
std::mutex health_check_mutex
std::unordered_map< std::string, remote_node > node_cache
std::condition_variable health_check_cv
node_manager_config config
std::unordered_map< std::string, std::deque< pooled_association > > connection_pool
std::shared_ptr< storage::node_repository > repo
std::shared_ptr< di::ILogger > logger
std::string ae_title
DICOM Application Entity Title.
uint16_t port
DICOM port (default: 104)
std::string node_id
Unique identifier for this node.
std::chrono::seconds dimse_timeout
DIMSE operation timeout.
std::string host
IP address or hostname.
std::chrono::seconds connection_timeout
TCP connection timeout.
Configuration for SCU association request.
std::string called_ae_title
Remote AE Title (16 chars max)
std::string calling_ae_title
Our AE Title (16 chars max)
std::vector< proposed_presentation_context > proposed_contexts