PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
parallel_query_executor.h
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
23#pragma once
24
25#include "query_result_stream.h"
26
29
30#include <kcenon/common/patterns/result.h>
31
32#include <atomic>
33#include <chrono>
34#include <cstddef>
35#include <functional>
36#include <memory>
37#include <mutex>
38#include <optional>
39#include <string>
40#include <vector>
41
42namespace kcenon::pacs::storage {
43class index_database;
44} // namespace kcenon::pacs::storage
45
46namespace kcenon::pacs::services {
47
49template <typename T>
51
53using VoidResult = kcenon::common::VoidResult;
54
55#ifdef PACS_WITH_DATABASE_SYSTEM
56
62struct query_request {
64 query_level level = query_level::study;
65
67 core::dicom_dataset query_keys;
68
70 std::string calling_ae;
71
73 std::string query_id;
74
76 int priority = 0;
77};
78
82struct query_execution_result {
84 std::string query_id;
85
87 bool success = false;
88
90 std::string error_message;
91
93 std::unique_ptr<query_result_stream> stream;
94
96 std::chrono::milliseconds execution_time{0};
97
99 bool cancelled = false;
100
102 bool timed_out = false;
103};
104
108struct parallel_executor_config {
110 size_t max_concurrent = 4;
111
113 std::chrono::milliseconds default_timeout{0};
114
116 size_t page_size = 100;
117
119 bool enable_priority = true;
120};
121
172class parallel_query_executor {
173public:
174 // =========================================================================
175 // Construction
176 // =========================================================================
177
184 explicit parallel_query_executor(storage::index_database* db,
185 const parallel_executor_config& config = {});
186
192 ~parallel_query_executor();
193
194 // Non-copyable
195 parallel_query_executor(const parallel_query_executor&) = delete;
196 auto operator=(const parallel_query_executor&) -> parallel_query_executor& = delete;
197
198 // Movable
199 parallel_query_executor(parallel_query_executor&&) noexcept;
200 auto operator=(parallel_query_executor&&) noexcept -> parallel_query_executor&;
201
202 // =========================================================================
203 // Configuration
204 // =========================================================================
205
211 void set_max_concurrent(size_t max) noexcept;
212
218 [[nodiscard]] auto max_concurrent() const noexcept -> size_t;
219
225 void set_default_timeout(std::chrono::milliseconds timeout) noexcept;
226
232 [[nodiscard]] auto default_timeout() const noexcept -> std::chrono::milliseconds;
233
234 // =========================================================================
235 // Batch Execution
236 // =========================================================================
237
249 [[nodiscard]] auto execute_all(std::vector<query_request> queries)
250 -> std::vector<query_execution_result>;
251
252 // =========================================================================
253 // Single Query with Timeout
254 // =========================================================================
255
266 [[nodiscard]] auto execute_with_timeout(const query_request& query,
267 std::chrono::milliseconds timeout)
268 -> Result<std::unique_ptr<query_result_stream>>;
269
278 [[nodiscard]] auto execute(const query_request& query)
279 -> Result<std::unique_ptr<query_result_stream>>;
280
281 // =========================================================================
282 // Cancellation
283 // =========================================================================
284
291 void cancel_all() noexcept;
292
298 [[nodiscard]] auto is_cancelled() const noexcept -> bool;
299
305 void reset_cancellation() noexcept;
306
307 // =========================================================================
308 // Statistics
309 // =========================================================================
310
316 [[nodiscard]] auto queries_executed() const noexcept -> size_t;
317
323 [[nodiscard]] auto queries_succeeded() const noexcept -> size_t;
324
330 [[nodiscard]] auto queries_failed() const noexcept -> size_t;
331
337 [[nodiscard]] auto queries_timed_out() const noexcept -> size_t;
338
344 [[nodiscard]] auto queries_in_progress() const noexcept -> size_t;
345
349 void reset_statistics() noexcept;
350
351private:
352 // =========================================================================
353 // Private Implementation
354 // =========================================================================
355
363 [[nodiscard]] auto execute_query_internal(const query_request& query,
364 std::chrono::milliseconds timeout)
365 -> query_execution_result;
366
373 [[nodiscard]] auto create_stream(const query_request& query)
374 -> Result<std::unique_ptr<query_result_stream>>;
375
376 // =========================================================================
377 // Member Variables
378 // =========================================================================
379
381 storage::index_database* db_;
382
384 parallel_executor_config config_;
385
387 std::atomic<bool> cancelled_{false};
388
390 std::atomic<size_t> queries_executed_{0};
391 std::atomic<size_t> queries_succeeded_{0};
392 std::atomic<size_t> queries_failed_{0};
393 std::atomic<size_t> queries_timed_out_{0};
394 std::atomic<size_t> queries_in_progress_{0};
395
397 mutable std::mutex mutex_;
398};
399
400#endif // PACS_WITH_DATABASE_SYSTEM
401
402} // namespace kcenon::pacs::services
DICOM Dataset - ordered collection of Data Elements.
query_level
DICOM Query/Retrieve level enumeration.
Definition query_scp.h:63
Streaming query results with pagination support.
DICOM Query SCP service (C-FIND handler)