14#ifdef PACS_WITH_DATABASE_SYSTEM
18#include <kcenon/common/patterns/result.h>
31Result<T> make_error(
const std::string& message,
const std::string& module =
"parallel_query_executor") {
32 return Result<T>::err(kcenon::common::error_info(0, message, module));
41parallel_query_executor::parallel_query_executor(storage::index_database* db,
42 const parallel_executor_config& config)
43 : db_(db), config_(config) {}
45parallel_query_executor::~parallel_query_executor() {
48 while (queries_in_progress_.load() > 0) {
49 std::this_thread::sleep_for(std::chrono::milliseconds(10));
53parallel_query_executor::parallel_query_executor(parallel_query_executor&& other) noexcept
55 config_(
other.config_),
56 cancelled_(
other.cancelled_.load()),
57 queries_executed_(
other.queries_executed_.load()),
58 queries_succeeded_(
other.queries_succeeded_.load()),
59 queries_failed_(
other.queries_failed_.load()),
60 queries_timed_out_(
other.queries_timed_out_.load()),
61 queries_in_progress_(
other.queries_in_progress_.load()) {
65auto parallel_query_executor::operator=(parallel_query_executor&& other)
noexcept
66 -> parallel_query_executor& {
69 while (queries_in_progress_.load() > 0) {
70 std::this_thread::sleep_for(std::chrono::milliseconds(10));
74 config_ =
other.config_;
75 cancelled_.store(
other.cancelled_.load());
76 queries_executed_.store(
other.queries_executed_.load());
77 queries_succeeded_.store(
other.queries_succeeded_.load());
78 queries_failed_.store(
other.queries_failed_.load());
79 queries_timed_out_.store(
other.queries_timed_out_.load());
80 queries_in_progress_.store(
other.queries_in_progress_.load());
91void parallel_query_executor::set_max_concurrent(
size_t max)
noexcept {
92 std::lock_guard<std::mutex> lock(mutex_);
93 config_.max_concurrent = max > 0 ? max : 1;
96auto parallel_query_executor::max_concurrent() const noexcept ->
size_t {
97 std::lock_guard<std::mutex> lock(mutex_);
98 return config_.max_concurrent;
101void parallel_query_executor::set_default_timeout(std::chrono::milliseconds timeout)
noexcept {
102 std::lock_guard<std::mutex> lock(mutex_);
103 config_.default_timeout =
timeout;
106auto parallel_query_executor::default_timeout() const noexcept -> std::chrono::milliseconds {
107 std::lock_guard<std::mutex> lock(mutex_);
108 return config_.default_timeout;
115auto parallel_query_executor::execute_all(std::vector<query_request> queries)
116 -> std::vector<query_execution_result> {
118 if (queries.empty()) {
123 reset_cancellation();
126 parallel_executor_config config;
127 std::chrono::milliseconds
timeout;
129 std::lock_guard<std::mutex> lock(mutex_);
131 timeout = config_.default_timeout;
135 if (config.enable_priority) {
136 std::stable_sort(queries.begin(), queries.end(),
137 [](
const query_request& a,
const query_request& b) {
138 return a.priority < b.priority;
143 std::vector<size_t> original_indices(queries.size());
144 for (
size_t i = 0; i < queries.size(); ++i) {
145 original_indices[i] = i;
150 std::vector<std::pair<size_t, query_request>> indexed_queries;
151 indexed_queries.reserve(queries.size());
152 for (
size_t i = 0; i < queries.size(); ++i) {
153 indexed_queries.emplace_back(i, std::move(queries[i]));
157 std::vector<query_execution_result> results(indexed_queries.size());
158 std::vector<std::future<query_execution_result>> futures;
159 futures.reserve(config.max_concurrent);
161 size_t submitted = 0;
163 while (submitted < indexed_queries.size() || !futures.empty()) {
165 if (is_cancelled()) {
167 for (
size_t i = submitted; i < indexed_queries.size(); ++i) {
168 auto& [idx,
query] = indexed_queries[i];
169 query_execution_result result;
170 result.query_id =
query.query_id;
171 result.success =
false;
172 result.cancelled =
true;
173 result.error_message =
"Query cancelled";
174 results[idx] = std::move(result);
180 while (submitted < indexed_queries.size() && futures.size() < config.max_concurrent) {
181 auto& [idx,
query] = indexed_queries[submitted];
185 auto captured_query =
query;
186 auto captured_timeout =
timeout;
189 futures.push_back(std::async(std::launch::async,
190 [
this, captured_query = std::move(captured_query), captured_timeout]() {
191 return execute_query_internal(captured_query, captured_timeout);
198 if (!futures.empty()) {
200 for (
auto it = futures.begin(); it != futures.end();) {
201 if (it->wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) {
202 auto result = it->get();
205 size_t result_idx = 0;
206 for (
size_t i = 0; i < indexed_queries.size(); ++i) {
207 if (indexed_queries[i].second.query_id == result.query_id) {
208 result_idx = indexed_queries[i].first;
213 results[result_idx] = std::move(result);
214 it = futures.erase(it);
221 if (futures.size() >= config.max_concurrent) {
222 std::this_thread::sleep_for(std::chrono::milliseconds(1));
228 for (
auto& future : futures) {
229 auto result = future.get();
232 for (
size_t i = 0; i < indexed_queries.size(); ++i) {
233 if (indexed_queries[i].second.query_id == result.query_id) {
234 results[indexed_queries[i].first] = std::move(result);
247auto parallel_query_executor::execute_with_timeout(
const query_request& query,
248 std::chrono::milliseconds timeout)
249 -> Result<std::unique_ptr<query_result_stream>> {
250 auto result = execute_query_internal(query, timeout);
252 if (result.success) {
253 return Result<std::unique_ptr<query_result_stream>>::ok(std::move(result.stream));
256 if (result.timed_out) {
257 return make_error<std::unique_ptr<query_result_stream>>(
258 "Query timed out after " + std::to_string(
timeout.count()) +
"ms");
261 if (result.cancelled) {
262 return make_error<std::unique_ptr<query_result_stream>>(
"Query was cancelled");
265 return make_error<std::unique_ptr<query_result_stream>>(result.error_message);
268auto parallel_query_executor::execute(
const query_request& query)
269 -> Result<std::unique_ptr<query_result_stream>> {
270 std::chrono::milliseconds
timeout;
272 std::lock_guard<std::mutex> lock(mutex_);
273 timeout = config_.default_timeout;
275 return execute_with_timeout(query, timeout);
282void parallel_query_executor::cancel_all() noexcept {
283 cancelled_.store(
true);
286auto parallel_query_executor::is_cancelled() const noexcept ->
bool {
287 return cancelled_.load();
290void parallel_query_executor::reset_cancellation() noexcept {
291 cancelled_.store(
false);
298auto parallel_query_executor::queries_executed() const noexcept ->
size_t {
299 return queries_executed_.load();
302auto parallel_query_executor::queries_succeeded() const noexcept ->
size_t {
303 return queries_succeeded_.load();
306auto parallel_query_executor::queries_failed() const noexcept ->
size_t {
307 return queries_failed_.load();
310auto parallel_query_executor::queries_timed_out() const noexcept ->
size_t {
311 return queries_timed_out_.load();
314auto parallel_query_executor::queries_in_progress() const noexcept ->
size_t {
315 return queries_in_progress_.load();
318void parallel_query_executor::reset_statistics() noexcept {
319 queries_executed_.store(0);
320 queries_succeeded_.store(0);
321 queries_failed_.store(0);
322 queries_timed_out_.store(0);
329auto parallel_query_executor::execute_query_internal(
const query_request& query,
330 std::chrono::milliseconds timeout)
331 -> query_execution_result {
332 query_execution_result result;
333 result.query_id =
query.query_id;
335 auto start_time = std::chrono::steady_clock::now();
338 ++queries_in_progress_;
342 if (is_cancelled()) {
343 result.success =
false;
344 result.cancelled =
true;
345 result.error_message =
"Query cancelled before execution";
346 --queries_in_progress_;
354 auto future = std::async(std::launch::async, [
this, &query]() {
355 return create_stream(query);
358 auto status = future.wait_for(timeout);
360 if (status == std::future_status::timeout) {
361 result.success =
false;
362 result.timed_out =
true;
363 result.error_message =
"Query timed out";
364 result.execution_time =
timeout;
365 --queries_in_progress_;
367 ++queries_timed_out_;
371 auto stream_result = future.get();
372 auto end_time = std::chrono::steady_clock::now();
373 result.execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
374 end_time - start_time);
376 if (stream_result.is_ok()) {
377 result.success =
true;
378 result.stream = std::move(stream_result.value());
379 --queries_in_progress_;
380 ++queries_succeeded_;
382 result.success =
false;
383 result.error_message = stream_result.error().message;
384 --queries_in_progress_;
389 auto stream_result = create_stream(query);
390 auto end_time = std::chrono::steady_clock::now();
391 result.execution_time = std::chrono::duration_cast<std::chrono::milliseconds>(
392 end_time - start_time);
394 if (stream_result.is_ok()) {
395 result.success =
true;
396 result.stream = std::move(stream_result.value());
397 --queries_in_progress_;
398 ++queries_succeeded_;
400 result.success =
false;
401 result.error_message = stream_result.error().message;
402 --queries_in_progress_;
410auto parallel_query_executor::create_stream(
const query_request& query)
411 -> Result<std::unique_ptr<query_result_stream>> {
412 if (db_ ==
nullptr) {
413 return make_error<std::unique_ptr<query_result_stream>>(
"Database not initialized");
416 stream_config config;
418 std::lock_guard<std::mutex> lock(mutex_);
419 config.page_size = config_.page_size;
422 return query_result_stream::create(db_,
query.level,
query.query_keys, config);
PACS index database for metadata storage and retrieval.
@ other
Unknown or other category.
const atna_coded_value query
Query (110112)
kcenon::common::Result< T > Result
Result type alias for operations returning a value.
constexpr int timeout
Lock timeout exceeded.
Parallel query executor for concurrent query processing.