PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
pipeline_metrics.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
18#pragma once
19
21
22#include <array>
23#include <atomic>
24#include <chrono>
25#include <cstdint>
26
28
37 std::atomic<uint64_t> jobs_processed{0};
38
40 std::atomic<uint64_t> jobs_queued{0};
41
43 std::atomic<uint64_t> jobs_failed{0};
44
46 std::atomic<uint64_t> total_processing_time_ns{0};
47
49 std::atomic<uint64_t> max_processing_time_ns{0};
50
52 std::atomic<uint32_t> active_workers{0};
53
55 std::atomic<uint32_t> idle_workers{0};
56
62 void record_job_completion(uint64_t processing_time_ns, bool success) noexcept {
63 jobs_processed.fetch_add(1, std::memory_order_relaxed);
64 total_processing_time_ns.fetch_add(processing_time_ns, std::memory_order_relaxed);
65
66 if (!success) {
67 jobs_failed.fetch_add(1, std::memory_order_relaxed);
68 }
69
70 // Update max processing time (compare-exchange loop)
71 uint64_t current_max = max_processing_time_ns.load(std::memory_order_relaxed);
72 while (processing_time_ns > current_max &&
73 !max_processing_time_ns.compare_exchange_weak(
74 current_max, processing_time_ns,
75 std::memory_order_relaxed, std::memory_order_relaxed)) {
76 // Loop until successful or processing_time_ns <= current_max
77 }
78 }
79
84 [[nodiscard]] auto get_avg_processing_time_ns() const noexcept -> uint64_t {
85 uint64_t processed = jobs_processed.load(std::memory_order_relaxed);
86 if (processed == 0) {
87 return 0;
88 }
89 return total_processing_time_ns.load(std::memory_order_relaxed) / processed;
90 }
91
95 void reset() noexcept {
96 jobs_processed.store(0, std::memory_order_relaxed);
97 jobs_queued.store(0, std::memory_order_relaxed);
98 jobs_failed.store(0, std::memory_order_relaxed);
99 total_processing_time_ns.store(0, std::memory_order_relaxed);
100 max_processing_time_ns.store(0, std::memory_order_relaxed);
101 }
102};
103
110 std::atomic<uint64_t> total_operations{0};
111
113 std::atomic<uint64_t> successful_operations{0};
114
116 std::atomic<uint64_t> failed_operations{0};
117
119 std::atomic<uint64_t> total_latency_ns{0};
120
122 std::atomic<uint64_t> max_latency_ns{0};
123
125 std::atomic<uint64_t> min_latency_ns{UINT64_MAX};
126
132 void record_operation(uint64_t latency_ns, bool success) noexcept {
133 total_operations.fetch_add(1, std::memory_order_relaxed);
134 total_latency_ns.fetch_add(latency_ns, std::memory_order_relaxed);
135
136 if (success) {
137 successful_operations.fetch_add(1, std::memory_order_relaxed);
138 } else {
139 failed_operations.fetch_add(1, std::memory_order_relaxed);
140 }
141
142 // Update max latency
143 uint64_t current_max = max_latency_ns.load(std::memory_order_relaxed);
144 while (latency_ns > current_max &&
145 !max_latency_ns.compare_exchange_weak(
146 current_max, latency_ns,
147 std::memory_order_relaxed, std::memory_order_relaxed)) {
148 }
149
150 // Update min latency
151 uint64_t current_min = min_latency_ns.load(std::memory_order_relaxed);
152 while (latency_ns < current_min &&
153 !min_latency_ns.compare_exchange_weak(
154 current_min, latency_ns,
155 std::memory_order_relaxed, std::memory_order_relaxed)) {
156 }
157 }
158
162 [[nodiscard]] auto get_avg_latency_ns() const noexcept -> uint64_t {
163 uint64_t total = total_operations.load(std::memory_order_relaxed);
164 if (total == 0) {
165 return 0;
166 }
167 return total_latency_ns.load(std::memory_order_relaxed) / total;
168 }
169
173 void reset() noexcept {
174 total_operations.store(0, std::memory_order_relaxed);
175 successful_operations.store(0, std::memory_order_relaxed);
176 failed_operations.store(0, std::memory_order_relaxed);
177 total_latency_ns.store(0, std::memory_order_relaxed);
178 max_latency_ns.store(0, std::memory_order_relaxed);
179 min_latency_ns.store(UINT64_MAX, std::memory_order_relaxed);
180 }
181};
182
205public:
207 static constexpr size_t stage_count =
208 static_cast<size_t>(pipeline_stage::stage_count);
209
211 static constexpr size_t category_count = 8;
212
216 pipeline_metrics() = default;
217
218 // Non-copyable, non-movable (contains atomics)
223
224 // =========================================================================
225 // Stage Metrics
226 // =========================================================================
227
233 [[nodiscard]] auto get_stage_metrics(pipeline_stage stage) noexcept
234 -> stage_metrics& {
235 return stage_metrics_[static_cast<size_t>(stage)];
236 }
237
241 [[nodiscard]] auto get_stage_metrics(pipeline_stage stage) const noexcept
242 -> const stage_metrics& {
243 return stage_metrics_[static_cast<size_t>(stage)];
244 }
245
252 void record_stage_completion(pipeline_stage stage, uint64_t processing_time_ns,
253 bool success) noexcept {
254 stage_metrics_[static_cast<size_t>(stage)]
255 .record_job_completion(processing_time_ns, success);
256 }
257
262 stage_metrics_[static_cast<size_t>(stage)].jobs_queued.fetch_add(
263 1, std::memory_order_relaxed);
264 }
265
270 stage_metrics_[static_cast<size_t>(stage)].jobs_queued.fetch_sub(
271 1, std::memory_order_relaxed);
272 }
273
274 // =========================================================================
275 // Category Metrics
276 // =========================================================================
277
283 [[nodiscard]] auto get_category_metrics(job_category category) noexcept
284 -> category_metrics& {
285 return category_metrics_[static_cast<size_t>(category)];
286 }
287
291 [[nodiscard]] auto get_category_metrics(job_category category) const noexcept
292 -> const category_metrics& {
293 return category_metrics_[static_cast<size_t>(category)];
294 }
295
302 void record_operation_completion(job_category category, uint64_t latency_ns,
303 bool success) noexcept {
304 category_metrics_[static_cast<size_t>(category)]
305 .record_operation(latency_ns, success);
306 total_operations_.fetch_add(1, std::memory_order_relaxed);
307 }
308
309 // =========================================================================
310 // Global Metrics
311 // =========================================================================
312
316 [[nodiscard]] auto get_total_operations() const noexcept -> uint64_t {
317 return total_operations_.load(std::memory_order_relaxed);
318 }
319
323 [[nodiscard]] auto get_active_associations() const noexcept -> uint32_t {
324 return active_associations_.load(std::memory_order_relaxed);
325 }
326
331 uint32_t current = active_associations_.fetch_add(1, std::memory_order_relaxed);
332 // Update peak if necessary
333 uint32_t peak = peak_associations_.load(std::memory_order_relaxed);
334 while (current + 1 > peak &&
335 !peak_associations_.compare_exchange_weak(
336 peak, current + 1,
337 std::memory_order_relaxed, std::memory_order_relaxed)) {
338 }
339 }
340
345 active_associations_.fetch_sub(1, std::memory_order_relaxed);
346 }
347
351 [[nodiscard]] auto get_peak_associations() const noexcept -> uint32_t {
352 return peak_associations_.load(std::memory_order_relaxed);
353 }
354
355 // =========================================================================
356 // Utility Methods
357 // =========================================================================
358
362 void reset() noexcept {
363 for (auto& stage : stage_metrics_) {
364 stage.reset();
365 }
366 for (auto& category : category_metrics_) {
367 category.reset();
368 }
369 total_operations_.store(0, std::memory_order_relaxed);
370 peak_associations_.store(0, std::memory_order_relaxed);
371 }
372
376 void mark_start_time() noexcept {
377 start_time_ = std::chrono::steady_clock::now();
378 }
379
385 [[nodiscard]] auto get_throughput_per_second(job_category category) const noexcept
386 -> double {
387 auto elapsed = std::chrono::steady_clock::now() - start_time_;
388 auto seconds = std::chrono::duration<double>(elapsed).count();
389 if (seconds <= 0.0) {
390 return 0.0;
391 }
392 return static_cast<double>(
393 category_metrics_[static_cast<size_t>(category)]
394 .total_operations.load(std::memory_order_relaxed)) / seconds;
395 }
396
397private:
398 std::array<stage_metrics, stage_count> stage_metrics_;
399 std::array<category_metrics, category_count> category_metrics_;
400 std::atomic<uint64_t> total_operations_{0};
401 std::atomic<uint32_t> active_associations_{0};
402 std::atomic<uint32_t> peak_associations_{0};
403 std::chrono::steady_clock::time_point start_time_{std::chrono::steady_clock::now()};
404};
405
406} // namespace kcenon::pacs::network::pipeline
Centralized metrics collection for the entire pipeline.
void increment_stage_queue(pipeline_stage stage) noexcept
Increment queued job count for a stage.
auto get_active_associations() const noexcept -> uint32_t
Get current number of active associations.
void decrement_stage_queue(pipeline_stage stage) noexcept
Decrement queued job count for a stage.
pipeline_metrics & operator=(const pipeline_metrics &)=delete
std::chrono::steady_clock::time_point start_time_
void mark_start_time() noexcept
Mark the start time for throughput calculation.
auto get_throughput_per_second(job_category category) const noexcept -> double
Calculate throughput for a category (operations per second)
void increment_active_associations() noexcept
Increment active association count.
pipeline_metrics()=default
Default constructor.
auto get_total_operations() const noexcept -> uint64_t
Get total operations processed.
auto get_stage_metrics(pipeline_stage stage) noexcept -> stage_metrics &
Get metrics for a specific stage.
void record_stage_completion(pipeline_stage stage, uint64_t processing_time_ns, bool success) noexcept
Record a stage job completion.
std::array< stage_metrics, stage_count > stage_metrics_
auto get_category_metrics(job_category category) const noexcept -> const category_metrics &
Get metrics for a specific category (const)
auto get_category_metrics(job_category category) noexcept -> category_metrics &
Get metrics for a specific category.
pipeline_metrics(const pipeline_metrics &)=delete
static constexpr size_t category_count
Number of job categories.
auto get_stage_metrics(pipeline_stage stage) const noexcept -> const stage_metrics &
Get metrics for a specific stage (const)
static constexpr size_t stage_count
Number of pipeline stages.
void decrement_active_associations() noexcept
Decrement active association count.
auto get_peak_associations() const noexcept -> uint32_t
Get peak concurrent associations.
std::array< category_metrics, category_count > category_metrics_
void record_operation_completion(job_category category, uint64_t latency_ns, bool success) noexcept
Record an operation completion.
pipeline_metrics & operator=(pipeline_metrics &&)=delete
job_category
Categories for pipeline jobs used in metrics and monitoring.
pipeline_stage
Identifies the 6 stages of the DICOM I/O pipeline.
Job type definitions for the 6-stage DICOM I/O pipeline.
Metrics for a job category (e.g., C-STORE, C-FIND)
std::atomic< uint64_t > min_latency_ns
Minimum end-to-end latency observed (initialized to max)
std::atomic< uint64_t > max_latency_ns
Maximum end-to-end latency observed.
auto get_avg_latency_ns() const noexcept -> uint64_t
Get average latency in nanoseconds.
std::atomic< uint64_t > total_operations
Total operations of this category.
void record_operation(uint64_t latency_ns, bool success) noexcept
Record an operation completion.
std::atomic< uint64_t > successful_operations
Successful operations.
std::atomic< uint64_t > failed_operations
Failed operations.
std::atomic< uint64_t > total_latency_ns
Total end-to-end latency in nanoseconds.
Metrics for a single pipeline stage.
std::atomic< uint64_t > jobs_processed
Total jobs processed by this stage.
void reset() noexcept
Reset all metrics to zero.
std::atomic< uint64_t > jobs_queued
Total jobs currently in queue for this stage.
std::atomic< uint32_t > active_workers
Number of active workers in this stage.
std::atomic< uint64_t > total_processing_time_ns
Cumulative processing time in nanoseconds.
void record_job_completion(uint64_t processing_time_ns, bool success) noexcept
Record a job completion.
std::atomic< uint64_t > max_processing_time_ns
Maximum processing time observed (nanoseconds)
std::atomic< uint32_t > idle_workers
Number of idle workers in this stage.
std::atomic< uint64_t > jobs_failed
Jobs that failed in this stage.
auto get_avg_processing_time_ns() const noexcept -> uint64_t
Get average processing time in nanoseconds.