PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
parallel_query_executor.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13
14#ifdef PACS_WITH_DATABASE_SYSTEM
15
17
18#include <kcenon/common/patterns/result.h>
19
20#include <algorithm>
21#include <future>
22#include <thread>
23#include <utility>
24
25namespace kcenon::pacs::services {
26
27namespace {
28
30template<typename T>
31Result<T> make_error(const std::string& message, const std::string& module = "parallel_query_executor") {
32 return Result<T>::err(kcenon::common::error_info(0, message, module));
33}
34
35} // namespace
36
37// ============================================================================
38// Construction / Destruction
39// ============================================================================
40
41parallel_query_executor::parallel_query_executor(storage::index_database* db,
42 const parallel_executor_config& config)
43 : db_(db), config_(config) {}
44
45parallel_query_executor::~parallel_query_executor() {
46 cancel_all();
47 // Wait for in-progress queries to complete
48 while (queries_in_progress_.load() > 0) {
49 std::this_thread::sleep_for(std::chrono::milliseconds(10));
50 }
51}
52
53parallel_query_executor::parallel_query_executor(parallel_query_executor&& other) noexcept
54 : db_(other.db_),
55 config_(other.config_),
56 cancelled_(other.cancelled_.load()),
57 queries_executed_(other.queries_executed_.load()),
58 queries_succeeded_(other.queries_succeeded_.load()),
59 queries_failed_(other.queries_failed_.load()),
60 queries_timed_out_(other.queries_timed_out_.load()),
61 queries_in_progress_(other.queries_in_progress_.load()) {
62 other.db_ = nullptr;
63}
64
65auto parallel_query_executor::operator=(parallel_query_executor&& other) noexcept
66 -> parallel_query_executor& {
67 if (this != &other) {
68 cancel_all();
69 while (queries_in_progress_.load() > 0) {
70 std::this_thread::sleep_for(std::chrono::milliseconds(10));
71 }
72
73 db_ = other.db_;
74 config_ = other.config_;
75 cancelled_.store(other.cancelled_.load());
76 queries_executed_.store(other.queries_executed_.load());
77 queries_succeeded_.store(other.queries_succeeded_.load());
78 queries_failed_.store(other.queries_failed_.load());
79 queries_timed_out_.store(other.queries_timed_out_.load());
80 queries_in_progress_.store(other.queries_in_progress_.load());
81
82 other.db_ = nullptr;
83 }
84 return *this;
85}
86
87// ============================================================================
88// Configuration
89// ============================================================================
90
91void parallel_query_executor::set_max_concurrent(size_t max) noexcept {
92 std::lock_guard<std::mutex> lock(mutex_);
93 config_.max_concurrent = max > 0 ? max : 1;
94}
95
96auto parallel_query_executor::max_concurrent() const noexcept -> size_t {
97 std::lock_guard<std::mutex> lock(mutex_);
98 return config_.max_concurrent;
99}
100
101void parallel_query_executor::set_default_timeout(std::chrono::milliseconds timeout) noexcept {
102 std::lock_guard<std::mutex> lock(mutex_);
103 config_.default_timeout = timeout;
104}
105
106auto parallel_query_executor::default_timeout() const noexcept -> std::chrono::milliseconds {
107 std::lock_guard<std::mutex> lock(mutex_);
108 return config_.default_timeout;
109}
110
111// ============================================================================
112// Batch Execution
113// ============================================================================
114
115auto parallel_query_executor::execute_all(std::vector<query_request> queries)
116 -> std::vector<query_execution_result> {
117 // Directly implement here instead of delegating to avoid ambiguity
118 if (queries.empty()) {
119 return {};
120 }
121
122 // Reset cancellation for new batch
123 reset_cancellation();
124
125 // Get configuration snapshot
126 parallel_executor_config config;
127 std::chrono::milliseconds timeout;
128 {
129 std::lock_guard<std::mutex> lock(mutex_);
130 config = config_;
131 timeout = config_.default_timeout;
132 }
133
134 // Sort by priority if enabled
135 if (config.enable_priority) {
136 std::stable_sort(queries.begin(), queries.end(),
137 [](const query_request& a, const query_request& b) {
138 return a.priority < b.priority;
139 });
140 }
141
142 // Store original indices for result ordering
143 std::vector<size_t> original_indices(queries.size());
144 for (size_t i = 0; i < queries.size(); ++i) {
145 original_indices[i] = i;
146 }
147
148 // If priority sorting was done, we need to track original positions
149 // Create index mapping after sort
150 std::vector<std::pair<size_t, query_request>> indexed_queries;
151 indexed_queries.reserve(queries.size());
152 for (size_t i = 0; i < queries.size(); ++i) {
153 indexed_queries.emplace_back(i, std::move(queries[i]));
154 }
155
156 // Execute queries in batches
157 std::vector<query_execution_result> results(indexed_queries.size());
158 std::vector<std::future<query_execution_result>> futures;
159 futures.reserve(config.max_concurrent);
160
161 size_t submitted = 0;
162
163 while (submitted < indexed_queries.size() || !futures.empty()) {
164 // Check cancellation
165 if (is_cancelled()) {
166 // Mark remaining queries as cancelled
167 for (size_t i = submitted; i < indexed_queries.size(); ++i) {
168 auto& [idx, query] = indexed_queries[i];
169 query_execution_result result;
170 result.query_id = query.query_id;
171 result.success = false;
172 result.cancelled = true;
173 result.error_message = "Query cancelled";
174 results[idx] = std::move(result);
175 }
176 break;
177 }
178
179 // Submit new queries up to max_concurrent
180 while (submitted < indexed_queries.size() && futures.size() < config.max_concurrent) {
181 auto& [idx, query] = indexed_queries[submitted];
182 (void)idx; // Used later for result mapping
183
184 // Capture by value for thread safety
185 auto captured_query = query;
186 auto captured_timeout = timeout;
187
188 // Use std::async for simpler and more portable parallel execution
189 futures.push_back(std::async(std::launch::async,
190 [this, captured_query = std::move(captured_query), captured_timeout]() {
191 return execute_query_internal(captured_query, captured_timeout);
192 }));
193
194 ++submitted;
195 }
196
197 // Wait for at least one future to complete
198 if (!futures.empty()) {
199 // Find completed futures
200 for (auto it = futures.begin(); it != futures.end();) {
201 if (it->wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) {
202 auto result = it->get();
203
204 // Find the corresponding original index
205 size_t result_idx = 0;
206 for (size_t i = 0; i < indexed_queries.size(); ++i) {
207 if (indexed_queries[i].second.query_id == result.query_id) {
208 result_idx = indexed_queries[i].first;
209 break;
210 }
211 }
212
213 results[result_idx] = std::move(result);
214 it = futures.erase(it);
215 } else {
216 ++it;
217 }
218 }
219
220 // Small sleep to avoid busy waiting
221 if (futures.size() >= config.max_concurrent) {
222 std::this_thread::sleep_for(std::chrono::milliseconds(1));
223 }
224 }
225 }
226
227 // Wait for remaining futures
228 for (auto& future : futures) {
229 auto result = future.get();
230
231 // Find the corresponding original index
232 for (size_t i = 0; i < indexed_queries.size(); ++i) {
233 if (indexed_queries[i].second.query_id == result.query_id) {
234 results[indexed_queries[i].first] = std::move(result);
235 break;
236 }
237 }
238 }
239
240 return results;
241}
242
243// ============================================================================
244// Single Query Execution
245// ============================================================================
246
247auto parallel_query_executor::execute_with_timeout(const query_request& query,
248 std::chrono::milliseconds timeout)
249 -> Result<std::unique_ptr<query_result_stream>> {
250 auto result = execute_query_internal(query, timeout);
251
252 if (result.success) {
253 return Result<std::unique_ptr<query_result_stream>>::ok(std::move(result.stream));
254 }
255
256 if (result.timed_out) {
257 return make_error<std::unique_ptr<query_result_stream>>(
258 "Query timed out after " + std::to_string(timeout.count()) + "ms");
259 }
260
261 if (result.cancelled) {
262 return make_error<std::unique_ptr<query_result_stream>>("Query was cancelled");
263 }
264
265 return make_error<std::unique_ptr<query_result_stream>>(result.error_message);
266}
267
268auto parallel_query_executor::execute(const query_request& query)
269 -> Result<std::unique_ptr<query_result_stream>> {
270 std::chrono::milliseconds timeout;
271 {
272 std::lock_guard<std::mutex> lock(mutex_);
273 timeout = config_.default_timeout;
274 }
275 return execute_with_timeout(query, timeout);
276}
277
278// ============================================================================
279// Cancellation
280// ============================================================================
281
282void parallel_query_executor::cancel_all() noexcept {
283 cancelled_.store(true);
284}
285
286auto parallel_query_executor::is_cancelled() const noexcept -> bool {
287 return cancelled_.load();
288}
289
290void parallel_query_executor::reset_cancellation() noexcept {
291 cancelled_.store(false);
292}
293
294// ============================================================================
295// Statistics
296// ============================================================================
297
298auto parallel_query_executor::queries_executed() const noexcept -> size_t {
299 return queries_executed_.load();
300}
301
302auto parallel_query_executor::queries_succeeded() const noexcept -> size_t {
303 return queries_succeeded_.load();
304}
305
306auto parallel_query_executor::queries_failed() const noexcept -> size_t {
307 return queries_failed_.load();
308}
309
310auto parallel_query_executor::queries_timed_out() const noexcept -> size_t {
311 return queries_timed_out_.load();
312}
313
314auto parallel_query_executor::queries_in_progress() const noexcept -> size_t {
315 return queries_in_progress_.load();
316}
317
318void parallel_query_executor::reset_statistics() noexcept {
319 queries_executed_.store(0);
320 queries_succeeded_.store(0);
321 queries_failed_.store(0);
322 queries_timed_out_.store(0);
323}
324
325// ============================================================================
326// Private Implementation
327// ============================================================================
328
329auto parallel_query_executor::execute_query_internal(const query_request& query,
330 std::chrono::milliseconds timeout)
331 -> query_execution_result {
332 query_execution_result result;
333 result.query_id = query.query_id;
334
335 auto start_time = std::chrono::steady_clock::now();
336
337 // Track in-progress count
338 ++queries_in_progress_;
339 ++queries_executed_;
340
341 // Check cancellation before starting
342 if (is_cancelled()) {
343 result.success = false;
344 result.cancelled = true;
345 result.error_message = "Query cancelled before execution";
346 --queries_in_progress_;
347 ++queries_failed_;
348 return result;
349 }
350
351 // Execute with timeout if specified
352 if (timeout.count() > 0) {
353 // Use async with timeout
354 auto future = std::async(std::launch::async, [this, &query]() {
355 return create_stream(query);
356 });
357
358 auto status = future.wait_for(timeout);
359
360 if (status == std::future_status::timeout) {
361 result.success = false;
362 result.timed_out = true;
363 result.error_message = "Query timed out";
364 result.execution_time = timeout;
365 --queries_in_progress_;
366 ++queries_failed_;
367 ++queries_timed_out_;
368 return result;
369 }
370
371 auto stream_result = future.get();
372 auto end_time = std::chrono::steady_clock::now();
373 result.execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
374 end_time - start_time);
375
376 if (stream_result.is_ok()) {
377 result.success = true;
378 result.stream = std::move(stream_result.value());
379 --queries_in_progress_;
380 ++queries_succeeded_;
381 } else {
382 result.success = false;
383 result.error_message = stream_result.error().message;
384 --queries_in_progress_;
385 ++queries_failed_;
386 }
387 } else {
388 // No timeout - direct execution
389 auto stream_result = create_stream(query);
390 auto end_time = std::chrono::steady_clock::now();
391 result.execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
392 end_time - start_time);
393
394 if (stream_result.is_ok()) {
395 result.success = true;
396 result.stream = std::move(stream_result.value());
397 --queries_in_progress_;
398 ++queries_succeeded_;
399 } else {
400 result.success = false;
401 result.error_message = stream_result.error().message;
402 --queries_in_progress_;
403 ++queries_failed_;
404 }
405 }
406
407 return result;
408}
409
410auto parallel_query_executor::create_stream(const query_request& query)
411 -> Result<std::unique_ptr<query_result_stream>> {
412 if (db_ == nullptr) {
413 return make_error<std::unique_ptr<query_result_stream>>("Database not initialized");
414 }
415
416 stream_config config;
417 {
418 std::lock_guard<std::mutex> lock(mutex_);
419 config.page_size = config_.page_size;
420 }
421
422 return query_result_stream::create(db_, query.level, query.query_keys, config);
423}
424
425} // namespace kcenon::pacs::services
426
427#endif // PACS_WITH_DATABASE_SYSTEM
PACS index database for metadata storage and retrieval.
constexpr dicom_tag status
Status.
const atna_coded_value query
Query (110112)
kcenon::common::Result< T > Result
Result type alias for operations returning a value.
constexpr int timeout
Lock timeout exceeded.
Parallel query executor for concurrent query processing.