21 std::vector<uint8_t> raw_data,
24 : raw_data_(std::
move(raw_data))
25 , on_decoded_(std::
move(on_decoded))
26 , on_error_(std::
move(on_error)) {
32 std::chrono::duration_cast<std::chrono::nanoseconds>(
33 std::chrono::steady_clock::now().time_since_epoch()
40 if (raw_data_.size() < 6) {
42 on_error_(context_.session_id,
"PDU data too short");
45 "PDU data too short for header");
49 auto decode_result = decode_pdu();
50 if (!decode_result.is_ok()) {
52 auto err = decode_result.error();
53 on_error_(context_.session_id, err.message);
58 auto pdu = decode_result.value();
76 auto dimse_job = std::make_unique<dimse_process_job>(std::move(
pdu));
79 dimse_job->get_context() = context_;
82 return coordinator.submit_to_stage(
97 return "pdu_decode_job[session=" +
99 ", bytes=" + std::to_string(
raw_data_.size()) +
"]";
103 -> const std::vector<uint8_t>& {
112 if (raw_data_.size() < 6) {
114 "Insufficient data for PDU header");
121 uint32_t length = (
static_cast<uint32_t
>(raw_data_[2]) << 24) |
122 (
static_cast<uint32_t
>(raw_data_[3]) << 16) |
123 (
static_cast<uint32_t
>(raw_data_[4]) << 8) |
124 static_cast<uint32_t
>(raw_data_[5]);
127 if (raw_data_.size() < 6 + length) {
129 "Incomplete PDU data");
133 result.
data.assign(raw_data_.begin() + 6,
134 raw_data_.begin() + 6 + length);
140 if (result.
data.size() >= 6) {
142 uint8_t control_header = result.
data[5];
147 return ok(std::move(result));
std::function< void(const decoded_pdu &pdu)> decode_callback
Callback type for decoded PDU.
auto get_name() const -> std::string override
Get the job name.
auto get_raw_data() const noexcept -> const std::vector< uint8_t > &
Get the raw PDU data.
std::vector< uint8_t > raw_data_
auto decode_pdu() -> Result< decoded_pdu >
Internal decode method.
auto execute(pipeline_coordinator &coordinator) -> VoidResult override
Execute the decode job.
pdu_decode_job(uint64_t session_id, std::vector< uint8_t > raw_data, decode_callback on_decoded=nullptr, error_callback on_error=nullptr)
Construct a decode job.
std::function< void(uint64_t session_id, const std::string &error)> error_callback
Callback type for decode errors.
auto get_context() const noexcept -> const job_context &override
Get the job context.
Coordinates the 6-stage DICOM I/O pipeline.
DIMSE processing job for Stage 3 of the pipeline.
constexpr int insufficient_data
constexpr int incomplete_pdu
@ move
C-MOVE move request/response.
@ other
Unknown or other category.
@ association
Association management (A-ASSOCIATE, A-RELEASE, A-ABORT)
@ dimse_process
Stage 3: Process DIMSE messages and route requests.
@ pdu_decode
Stage 2: Decode PDU bytes into structured data.
pdu_type
PDU (Protocol Data Unit) types as defined in DICOM PS3.8.
@ associate_rj
A-ASSOCIATE-RJ (Association Reject)
@ associate_ac
A-ASSOCIATE-AC (Association Accept)
@ p_data_tf
P-DATA-TF (Data Transfer)
@ release_rq
A-RELEASE-RQ (Release Request)
@ release_rp
A-RELEASE-RP (Release Response)
@ associate_rq
A-ASSOCIATE-RQ (Association Request)
std::variant< associate_rq, associate_ac, associate_rj, p_data_tf_pdu, release_rq_pdu, release_rp_pdu, abort_pdu > pdu
Variant type that can hold any PDU.
kcenon::pacs::VoidResult VoidResult
VoidResult type alias for operations without return value.
VoidResult pacs_void_error(int code, const std::string &message, const std::string &details="")
Create a PACS void error result.
Result< T > pacs_error(int code, const std::string &message, const std::string &details="")
Create a PACS error result with module context.
PDU decoding job for Stage 2 of the pipeline.
Result of PDU decoding containing the PDU type and data.
uint64_t session_id
Session this PDU belongs to.
std::vector< uint8_t > data
Raw PDU data for further processing.
kcenon::pacs::network::pdu_type type
The type of PDU that was decoded.
bool is_last_fragment
Whether this is the last fragment (for P-DATA-TF)
uint8_t presentation_context_id
Presentation context ID (for P-DATA-TF)
Context information attached to pipeline jobs for tracking.
pipeline_stage stage
Current pipeline stage.
job_category category
Job category for metrics.
uint64_t enqueue_time_ns
Timestamp when job entered the pipeline (nanoseconds since epoch)
uint64_t session_id
Session/association identifier.