26#if defined(PACS_WITH_AWS_SDK) && !defined(PACS_USE_MOCK_S3)
27#include <aws/core/Aws.h>
28#include <aws/core/auth/AWSCredentials.h>
29#include <aws/s3/S3Client.h>
30#include <aws/s3/model/DeleteObjectRequest.h>
31#include <aws/s3/model/GetObjectRequest.h>
32#include <aws/s3/model/HeadObjectRequest.h>
33#include <aws/s3/model/ListObjectsV2Request.h>
34#include <aws/s3/model/PutObjectRequest.h>
36#include <aws/s3/model/AbortMultipartUploadRequest.h>
37#include <aws/s3/model/CompleteMultipartUploadRequest.h>
38#include <aws/s3/model/CreateMultipartUploadRequest.h>
39#include <aws/s3/model/UploadPartRequest.h>
46using kcenon::common::make_error;
47using kcenon::common::ok;
52constexpr int kMissingRequiredUid = -1;
53constexpr int kObjectNotFound = -2;
54constexpr int kUploadError = -3;
55constexpr int kDownloadError = -4;
56constexpr int kConnectionError = -6;
57constexpr int kIntegrityError = -7;
58constexpr int kSerializationError = -8;
76 [[nodiscard]]
virtual auto put_object(
const std::string &key,
77 const std::vector<std::uint8_t> &data)
80 [[nodiscard]]
virtual auto get_object(
const std::string &key)
86 [[nodiscard]]
virtual auto head_object(
const std::string &key)
const
93 -> std::vector<std::
string> = 0;
98 const std::
string &key, const std::vector<std::uint8_t> &data,
115 : connected_(true) {}
118 const std::vector<std::uint8_t> &data)
119 -> VoidResult
override {
121 return make_error<std::monostate>(
122 kConnectionError,
"S3 client not connected",
"s3_storage");
124 objects_[key] = data;
131 return make_error<std::vector<std::uint8_t>>(
132 kConnectionError,
"S3 client not connected",
"s3_storage");
134 auto it = objects_.find(key);
135 if (it == objects_.end()) {
136 return make_error<std::vector<std::uint8_t>>(
137 kObjectNotFound,
"Object not found: " + key,
"s3_storage");
143 -> VoidResult
override {
145 return make_error<std::monostate>(
146 kConnectionError,
"S3 client not connected",
"s3_storage");
157 return objects_.contains(key);
161 -> std::size_t
override {
162 auto it = objects_.find(key);
163 if (it != objects_.end()) {
164 return it->second.size();
170 -> std::vector<std::
string>
override {
171 std::vector<std::string> keys;
172 keys.reserve(objects_.size());
173 for (
const auto &[key, data] : objects_) {
184 const std::vector<std::uint8_t> &data,
185 std::size_t part_size,
187 -> VoidResult
override {
188 std::size_t total_bytes = data.size();
189 std::size_t bytes_uploaded = 0;
191 while (bytes_uploaded < total_bytes) {
192 std::size_t chunk = (std::min)(part_size, total_bytes - bytes_uploaded);
193 bytes_uploaded += chunk;
195 if (callback && !callback(bytes_uploaded, total_bytes)) {
196 return make_error<std::monostate>(
197 kUploadError,
"Upload cancelled by user",
"s3_storage");
205 std::unordered_map<std::string, std::vector<std::uint8_t>>
objects_;
213#if defined(PACS_WITH_AWS_SDK) && !defined(PACS_USE_MOCK_S3)
220 static void ensure_initialized() {
221 std::call_once(init_flag_, [] {
222 Aws::InitAPI(options_);
223 std::atexit([] { Aws::ShutdownAPI(options_); });
228 static inline std::once_flag init_flag_;
229 static inline Aws::SDKOptions options_;
240class aws_s3_client :
public s3_storage::s3_client_interface {
242 explicit aws_s3_client(
const cloud_storage_config &config)
243 : bucket_(config.bucket_name) {
244 aws_sdk_guard::ensure_initialized();
246 Aws::Client::ClientConfiguration client_config;
247 client_config.region = config.region;
248 client_config.connectTimeoutMs = config.connect_timeout_ms;
249 client_config.requestTimeoutMs = config.request_timeout_ms;
250 client_config.maxConnections =
static_cast<unsigned>(config.max_connections);
252 if (config.endpoint_url.has_value()) {
253 client_config.endpointOverride = config.endpoint_url.value();
255 use_path_style_ =
true;
258 Aws::Auth::AWSCredentials credentials(config.access_key_id,
259 config.secret_access_key);
261 Aws::S3::S3ClientConfiguration s3_config(client_config);
262 s3_config.useVirtualAddressing = !use_path_style_;
263 client_ = std::make_unique<Aws::S3::S3Client>(credentials,
nullptr,
267 [[nodiscard]]
auto put_object(
const std::string &key,
268 const std::vector<std::uint8_t> &data)
269 -> VoidResult
override {
270 Aws::S3::Model::PutObjectRequest request;
271 request.SetBucket(bucket_);
273 request.SetContentType(
"application/dicom");
275 auto stream = Aws::MakeShared<Aws::StringStream>(
"PutObjectStream");
276 stream->write(
reinterpret_cast<const char *
>(data.data()),
277 static_cast<std::streamsize
>(data.size()));
278 request.SetBody(stream);
280 auto outcome = client_->PutObject(request);
281 if (!outcome.IsSuccess()) {
282 return make_error<std::monostate>(
284 "S3 PutObject failed: " +
285 std::string(outcome.GetError().GetMessage()),
292 [[nodiscard]]
auto get_object(
const std::string &key)
293 -> Result<std::vector<std::uint8_t>>
override {
294 Aws::S3::Model::GetObjectRequest request;
295 request.SetBucket(bucket_);
298 auto outcome = client_->GetObject(request);
299 if (!outcome.IsSuccess()) {
300 const auto &
error = outcome.GetError();
301 if (
error.GetErrorType() ==
302 Aws::S3::S3Errors::NO_SUCH_KEY) {
303 return make_error<std::vector<std::uint8_t>>(
304 kObjectNotFound,
"Object not found: " + key,
"s3_storage");
306 return make_error<std::vector<std::uint8_t>>(
308 "S3 GetObject failed: " + std::string(
error.GetMessage()),
312 auto &body = outcome.GetResult().GetBody();
313 std::vector<std::uint8_t> result(
314 (std::istreambuf_iterator<char>(body)),
315 std::istreambuf_iterator<char>());
319 [[nodiscard]]
auto delete_object(
const std::string &key)
320 -> VoidResult
override {
321 Aws::S3::Model::DeleteObjectRequest request;
322 request.SetBucket(bucket_);
325 auto outcome = client_->DeleteObject(request);
326 if (!outcome.IsSuccess()) {
327 return make_error<std::monostate>(
329 "S3 DeleteObject failed: " +
330 std::string(outcome.GetError().GetMessage()),
337 [[nodiscard]]
auto head_object(
const std::string &key)
const
339 Aws::S3::Model::HeadObjectRequest request;
340 request.SetBucket(bucket_);
343 auto outcome = client_->HeadObject(request);
344 return outcome.IsSuccess();
347 [[nodiscard]]
auto get_object_size(
const std::string &key)
const
348 -> std::size_t
override {
349 Aws::S3::Model::HeadObjectRequest request;
350 request.SetBucket(bucket_);
353 auto outcome = client_->HeadObject(request);
354 if (outcome.IsSuccess()) {
355 return static_cast<std::size_t
>(
356 outcome.GetResult().GetContentLength());
361 [[nodiscard]]
auto list_objects() const
362 -> std::vector<std::
string>
override {
363 std::vector<std::string> keys;
365 Aws::S3::Model::ListObjectsV2Request request;
366 request.SetBucket(bucket_);
368 bool has_more =
true;
370 auto outcome = client_->ListObjectsV2(request);
371 if (!outcome.IsSuccess()) {
375 const auto &result = outcome.GetResult();
376 for (
const auto &
object : result.GetContents()) {
377 keys.push_back(
object.GetKey());
380 has_more = result.GetIsTruncated();
382 request.SetContinuationToken(result.GetNextContinuationToken());
389 [[nodiscard]]
auto is_connected() const ->
bool override {
390 return client_ !=
nullptr;
393 [[nodiscard]]
auto multipart_upload(
const std::string &key,
394 const std::vector<std::uint8_t> &data,
395 std::size_t part_size,
396 progress_callback callback)
397 -> VoidResult
override {
399 Aws::S3::Model::CreateMultipartUploadRequest create_request;
400 create_request.SetBucket(bucket_);
401 create_request.SetKey(key);
402 create_request.SetContentType(
"application/dicom");
404 auto create_outcome = client_->CreateMultipartUpload(create_request);
405 if (!create_outcome.IsSuccess()) {
406 return make_error<std::monostate>(
408 "Failed to initiate multipart upload: " +
409 std::string(create_outcome.GetError().GetMessage()),
413 auto upload_id = create_outcome.GetResult().GetUploadId();
414 Aws::S3::Model::CompletedMultipartUpload completed_upload;
416 std::size_t total_bytes = data.size();
417 std::size_t bytes_uploaded = 0;
420 while (bytes_uploaded < total_bytes) {
422 (std::min)(part_size, total_bytes - bytes_uploaded);
424 auto stream = Aws::MakeShared<Aws::StringStream>(
"UploadPartStream");
426 reinterpret_cast<const char *
>(data.data() + bytes_uploaded),
427 static_cast<std::streamsize
>(chunk));
429 Aws::S3::Model::UploadPartRequest part_request;
430 part_request.SetBucket(bucket_);
431 part_request.SetKey(key);
432 part_request.SetUploadId(upload_id);
433 part_request.SetPartNumber(part_number);
434 part_request.SetBody(stream);
435 part_request.SetContentLength(
static_cast<long long>(chunk));
437 auto part_outcome = client_->UploadPart(part_request);
438 if (!part_outcome.IsSuccess()) {
440 Aws::S3::Model::AbortMultipartUploadRequest abort_request;
441 abort_request.SetBucket(bucket_);
442 abort_request.SetKey(key);
443 abort_request.SetUploadId(upload_id);
444 client_->AbortMultipartUpload(abort_request);
446 return make_error<std::monostate>(
448 "Failed to upload part " + std::to_string(part_number) +
": " +
449 std::string(part_outcome.GetError().GetMessage()),
453 Aws::S3::Model::CompletedPart completed_part;
454 completed_part.SetPartNumber(part_number);
455 completed_part.SetETag(part_outcome.GetResult().GetETag());
456 completed_upload.AddParts(std::move(completed_part));
458 bytes_uploaded += chunk;
461 if (callback && !callback(bytes_uploaded, total_bytes)) {
463 Aws::S3::Model::AbortMultipartUploadRequest abort_request;
464 abort_request.SetBucket(bucket_);
465 abort_request.SetKey(key);
466 abort_request.SetUploadId(upload_id);
467 client_->AbortMultipartUpload(abort_request);
469 return make_error<std::monostate>(
470 kUploadError,
"Upload cancelled by user",
"s3_storage");
475 Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
476 complete_request.SetBucket(bucket_);
477 complete_request.SetKey(key);
478 complete_request.SetUploadId(upload_id);
479 complete_request.SetMultipartUpload(completed_upload);
481 auto complete_outcome = client_->CompleteMultipartUpload(complete_request);
482 if (!complete_outcome.IsSuccess()) {
483 return make_error<std::monostate>(
485 "Failed to complete multipart upload: " +
486 std::string(complete_outcome.GetError().GetMessage()),
495 std::unique_ptr<Aws::S3::S3Client> client_;
496 bool use_path_style_{
false};
507#
if defined(PACS_WITH_AWS_SDK) && !defined(PACS_USE_MOCK_S3)
508 client_(std::make_unique<aws_s3_client>(config))
522 return store_with_progress(dataset,
nullptr);
532 if (study_uid.empty() || series_uid.empty() || sop_uid.empty()) {
533 return make_error<std::monostate>(
535 "Missing required UID (Study, Series, or SOP Instance UID)",
540 auto object_key = build_object_key(study_uid, series_uid, sop_uid);
546 auto data = dicom_file.to_bytes();
548 return make_error<std::monostate>(
549 kSerializationError,
"Failed to serialize DICOM dataset",
"s3_storage");
553 if (callback && !callback(0, data.size())) {
554 return make_error<std::monostate>(kUploadError,
"Upload cancelled by user",
559 VoidResult upload_result = ok();
560 if (data.size() > config_.multipart_threshold) {
562 client_->multipart_upload(object_key, data, config_.part_size, callback);
564 upload_result = client_->put_object(object_key, data);
568 callback(data.size(), data.size());
572 if (upload_result.is_err()) {
573 return upload_result;
578 std::unique_lock lock(mutex_);
580 info.key = object_key;
581 info.sop_instance_uid = sop_uid;
582 info.study_instance_uid = study_uid;
583 info.series_instance_uid = series_uid;
584 info.size_bytes = data.size();
585 index_[sop_uid] = std::move(info);
593 return retrieve_with_progress(sop_instance_uid,
nullptr);
599 std::string object_key;
602 std::shared_lock lock(mutex_);
603 auto it = index_.find(std::string{sop_instance_uid});
604 if (it == index_.end()) {
605 return make_error<core::dicom_dataset>(
607 "Instance not found: " + std::string{sop_instance_uid},
"s3_storage");
609 object_key = it->second.key;
613 auto download_result = client_->get_object(object_key);
614 if (download_result.is_err()) {
615 return make_error<core::dicom_dataset>(
616 kDownloadError,
"Failed to download from S3",
"s3_storage");
619 const auto &data = download_result.value();
623 callback(data.size(), data.size());
628 if (parse_result.is_err()) {
629 return make_error<core::dicom_dataset>(
631 "Failed to parse DICOM data: " + parse_result.error().message,
635 return parse_result.value().dataset();
639 std::string object_key;
642 std::unique_lock lock(mutex_);
643 auto it = index_.find(std::string{sop_instance_uid});
644 if (it == index_.end()) {
648 object_key = it->second.key;
653 auto delete_result = client_->delete_object(object_key);
660 std::shared_lock lock(mutex_);
661 return index_.contains(std::string{sop_instance_uid});
666 std::vector<core::dicom_dataset> results;
668 std::vector<std::string> keys_to_retrieve;
670 std::shared_lock lock(mutex_);
671 keys_to_retrieve.reserve(index_.size());
672 for (
const auto &[
uid, info] : index_) {
673 keys_to_retrieve.push_back(info.key);
677 for (
const auto &key : keys_to_retrieve) {
678 auto download_result = client_->get_object(key);
679 if (download_result.is_err()) {
684 if (parse_result.is_err()) {
688 const auto &dataset = parse_result.value().dataset();
689 if (matches_query(dataset, query)) {
690 results.push_back(dataset);
700 std::set<std::string> studies;
701 std::set<std::string> series;
702 std::set<std::string> patients;
705 std::shared_lock lock(
mutex_);
706 stats.total_instances =
index_.size();
709 stats.total_bytes += info.size_bytes;
711 if (!info.study_instance_uid.empty()) {
712 studies.insert(info.study_instance_uid);
714 if (!info.series_instance_uid.empty()) {
715 series.insert(info.series_instance_uid);
720 stats.studies_count = studies.size();
721 stats.series_count = series.size();
728 std::vector<std::pair<std::string, std::string>> entries;
730 std::shared_lock lock(mutex_);
731 entries.reserve(index_.size());
732 for (
const auto &[
uid, info] : index_) {
733 entries.emplace_back(
uid, info.key);
737 std::vector<std::string> invalid_entries;
739 for (
const auto &[
uid, key] : entries) {
740 if (!client_->head_object(key)) {
741 invalid_entries.push_back(
uid +
" (object missing)");
745 if (!invalid_entries.empty()) {
746 std::string message =
"Integrity check failed for " +
747 std::to_string(invalid_entries.size()) +
" entries";
748 return make_error<std::monostate>(kIntegrityError, message,
"s3_storage");
760 std::shared_lock lock(mutex_);
761 auto it = index_.find(std::string{sop_instance_uid});
762 if (it != index_.end()) {
763 return it->second.key;
773 std::unique_lock lock(mutex_);
777 auto keys = client_->list_objects();
779 for (
const auto &key : keys) {
781 auto download_result = client_->get_object(key);
782 if (download_result.is_err()) {
787 if (parse_result.is_err()) {
791 const auto &dataset = parse_result.value().dataset();
796 if (!sop_uid.empty()) {
799 info.sop_instance_uid = sop_uid;
800 info.study_instance_uid = study_uid;
801 info.series_instance_uid = series_uid;
802 info.size_bytes = client_->get_object_size(key);
803 index_[sop_uid] = std::move(info);
819 std::string_view series_uid,
820 std::string_view sop_uid)
const
822 std::ostringstream oss;
823 oss << sanitize_uid(study_uid) <<
"/" << sanitize_uid(series_uid) <<
"/"
824 << sanitize_uid(sop_uid) <<
".dcm";
830 result.reserve(
uid.length());
835 if (std::isalnum(
static_cast<unsigned char>(c)) || c ==
'.') {
846 const std::vector<std::uint8_t> &data,
848 return client_->multipart_upload(key, data, config_.part_size, callback);
859 for (
const auto &[tag, element] : query) {
860 auto query_value = element.as_string().unwrap_or(
"");
861 if (query_value.empty()) {
865 auto dataset_value = dataset.get_string(tag);
868 if (query_value.find(
'*') != std::string::npos ||
869 query_value.find(
'?') != std::string::npos) {
871 if (query_value.front() ==
'*' && query_value.back() ==
'*') {
873 auto inner = query_value.substr(1, query_value.length() - 2);
874 if (dataset_value.find(inner) == std::string::npos) {
877 }
else if (query_value.front() ==
'*') {
879 auto suffix = query_value.substr(1);
880 if (dataset_value.length() < suffix.length() ||
881 dataset_value.substr(dataset_value.length() - suffix.length()) !=
885 }
else if (query_value.back() ==
'*') {
887 auto prefix = query_value.substr(0, query_value.length() - 1);
888 if (dataset_value.substr(0, prefix.length()) != prefix) {
894 if (dataset_value != query_value) {
if(!color.empty()) style.color
static auto from_bytes(std::span< const uint8_t > data) -> kcenon::pacs::Result< dicom_file >
Parse a DICOM file from raw bytes.
static auto create(dicom_dataset dataset, const encoding::transfer_syntax &ts) -> dicom_file
Create a new DICOM file from a dataset.
static const transfer_syntax explicit_vr_little_endian
Explicit VR Little Endian (1.2.840.10008.1.2.1)
Mock S3 client for testing without AWS SDK dependency.
auto put_object(const std::string &key, const std::vector< std::uint8_t > &data) -> VoidResult override
auto head_object(const std::string &key) const -> bool override
auto list_objects() const -> std::vector< std::string > override
auto is_connected() const -> bool override
std::unordered_map< std::string, std::vector< std::uint8_t > > objects_
auto delete_object(const std::string &key) -> VoidResult override
auto multipart_upload(const std::string &key, const std::vector< std::uint8_t > &data, std::size_t part_size, progress_callback callback) -> VoidResult override
auto get_object(const std::string &key) -> Result< std::vector< std::uint8_t > > override
auto get_object_size(const std::string &key) const -> std::size_t override
mock_s3_client(const cloud_storage_config &)
Abstract interface for S3 client operations.
virtual auto delete_object(const std::string &key) -> VoidResult=0
virtual auto get_object_size(const std::string &key) const -> std::size_t=0
virtual auto head_object(const std::string &key) const -> bool=0
virtual auto put_object(const std::string &key, const std::vector< std::uint8_t > &data) -> VoidResult=0
virtual auto get_object(const std::string &key) -> Result< std::vector< std::uint8_t > >=0
virtual auto multipart_upload(const std::string &key, const std::vector< std::uint8_t > &data, std::size_t part_size, progress_callback callback) -> VoidResult=0
virtual ~s3_client_interface()=default
virtual auto is_connected() const -> bool=0
virtual auto list_objects() const -> std::vector< std::string >=0
auto build_object_key(std::string_view study_uid, std::string_view series_uid, std::string_view sop_uid) const -> std::string
Build S3 object key for a dataset.
auto rebuild_index() -> VoidResult
Rebuild the local index from S3.
auto get_object_key(std::string_view sop_instance_uid) const -> std::string
Get the S3 object key for a SOP Instance UID.
auto store(const core::dicom_dataset &dataset) -> VoidResult override
Store a DICOM dataset to S3.
static auto sanitize_uid(std::string_view uid) -> std::string
Sanitize UID for use in S3 object key.
cloud_storage_config config_
Storage configuration.
auto retrieve(std::string_view sop_instance_uid) -> Result< core::dicom_dataset > override
Retrieve a DICOM dataset by SOP Instance UID.
auto remove(std::string_view sop_instance_uid) -> VoidResult override
Remove a DICOM object from S3.
auto get_statistics() const -> storage_statistics override
Get storage statistics.
std::unique_ptr< s3_client_interface > client_
S3 client (mock for testing, AWS SDK for production)
auto verify_integrity() -> VoidResult override
Verify storage integrity.
std::shared_mutex mutex_
Mutex for thread-safe access.
s3_storage(const cloud_storage_config &config)
Construct S3 storage with configuration.
auto is_connected() const -> bool
Check S3 connectivity.
auto exists(std::string_view sop_instance_uid) const -> bool override
Check if a DICOM instance exists in S3.
auto retrieve_with_progress(std::string_view sop_instance_uid, progress_callback callback) -> Result< core::dicom_dataset >
Retrieve with progress tracking.
auto store_with_progress(const core::dicom_dataset &dataset, progress_callback callback) -> VoidResult
Store with progress tracking.
auto find(const core::dicom_dataset &query) -> Result< std::vector< core::dicom_dataset > > override
Find DICOM datasets matching query criteria.
auto upload_multipart(const std::string &key, const std::vector< std::uint8_t > &data, progress_callback callback) -> VoidResult
Execute multipart upload for large files.
std::unordered_map< std::string, s3_object_info > index_
Mapping from SOP Instance UID to S3 object info.
~s3_storage() override
Destructor.
static auto matches_query(const core::dicom_dataset &dataset, const core::dicom_dataset &query) -> bool
Check if dataset matches query criteria.
auto bucket_name() const -> const std::string &
Get the bucket name.
DICOM Part 10 file handling for reading/writing DICOM files.
Compile-time constants for commonly used DICOM tags.
@ error
Node returned an error.
std::function< bool(std::size_t bytes_transferred, std::size_t total_bytes)> progress_callback
Callback type for upload/download progress tracking.
S3-compatible DICOM storage backend for cloud storage support.
Configuration for S3-compatible cloud storage.
std::string bucket_name
S3 bucket name for storing DICOM files.
Information about an S3 object.
Storage statistics structure.