Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
database::async::async_executor Class Reference

High-performance asynchronous executor using thread_system. More...

#include <async_operations.h>

Collaboration diagram for database::async::async_executor:
Collaboration graph

Public Member Functions

 async_executor (size_t thread_count=std::thread::hardware_concurrency(), const thread_context_type &=thread_context_type())
 Constructs an async executor with specified thread count.
 
 ~async_executor ()
 
 async_executor (const async_executor &)=delete
 
async_executoroperator= (const async_executor &)=delete
 
 async_executor (async_executor &&)=delete
 
async_executoroperator= (async_executor &&)=delete
 
template<typename F , typename... Args>
requires concepts::SubmittableTask<F, Args...>
auto submit (F &&func, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
 Submits a task for asynchronous execution.
 
void shutdown ()
 Gracefully shuts down the executor.
 
void wait_for_completion ()
 Waits for all pending tasks to complete.
 
size_t pending_tasks () const
 Returns the number of pending tasks.
 
size_t thread_count () const
 Returns the number of worker threads.
 
constexpr bool is_using_thread_system () const
 Checks if using thread_system implementation.
 

Private Member Functions

void worker_thread ()
 

Private Attributes

std::vector< std::thread > workers_
 
std::queue< std::function< void()> > tasks_
 
std::mutex queue_mutex_
 
std::condition_variable condition_
 
std::atomic< bool > stop_
 
size_t thread_count_
 

Detailed Description

High-performance asynchronous executor using thread_system.

This executor leverages thread_system's advanced features when available:

  • Adaptive job queue (mutex ↔ lock-free automatic switching)
  • Sub-microsecond latency (77ns job scheduling)
  • 1.16M+ jobs/second throughput
  • Integrated monitoring and logging

When USE_THREAD_SYSTEM is not defined, falls back to std::thread implementation.

Thread Safety

All methods are thread-safe and can be called from multiple threads.

Performance

  • Throughput: 1.16M+ jobs/s (vs ~50K with std::thread)
  • Latency: 77ns scheduling (vs 2-5μs with std::thread)
  • Scalability: Linear scaling up to hardware concurrency
Examples
async_executor_demo.cpp, and async_operations_demo.cpp.

Definition at line 328 of file async_operations.h.

Constructor & Destructor Documentation

◆ async_executor() [1/3]

database::async::async_executor::async_executor ( size_t thread_count = std::thread::hardware_concurrency(),
const thread_context_type & = thread_context_type() )
inlineexplicit

Constructs an async executor with specified thread count.

Parameters
thread_countNumber of worker threads (defaults to hardware concurrency)
contextThread context for logging/monitoring (optional)
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 362 of file async_operations.h.

366 , stop_(false)
367 {
368 workers_.reserve(thread_count_);
369 for (size_t i = 0; i < thread_count_; ++i) {
370 workers_.emplace_back([this] { worker_thread(); });
371 }
372 }
size_t thread_count() const
Returns the number of worker threads.
std::vector< std::thread > workers_

References thread_count_, worker_thread(), and workers_.

Here is the call graph for this function:

◆ ~async_executor()

database::async::async_executor::~async_executor ( )
inline
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 375 of file async_operations.h.

375 {
376 shutdown();
377 }
void shutdown()
Gracefully shuts down the executor.

References shutdown().

Here is the call graph for this function:

◆ async_executor() [2/3]

database::async::async_executor::async_executor ( const async_executor & )
delete

◆ async_executor() [3/3]

database::async::async_executor::async_executor ( async_executor && )
delete

Member Function Documentation

◆ is_using_thread_system()

bool database::async::async_executor::is_using_thread_system ( ) const
inlineconstexpr

Checks if using thread_system implementation.

Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h, and async_executor_demo.cpp.

Definition at line 510 of file async_operations.h.

510 {
511 return using_thread_system;
512 }
constexpr bool using_thread_system
Compile-time flag indicating fallback mode.

References database::async::using_thread_system.

Referenced by demonstrate_basic_usage(), and demonstrate_high_throughput().

Here is the caller graph for this function:

◆ operator=() [1/2]

async_executor & database::async::async_executor::operator= ( async_executor && )
delete

◆ operator=() [2/2]

async_executor & database::async::async_executor::operator= ( const async_executor & )
delete

◆ pending_tasks()

size_t database::async::async_executor::pending_tasks ( ) const
inline

Returns the number of pending tasks.

Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 488 of file async_operations.h.

488 {
489#ifdef USE_THREAD_SYSTEM
490 if (pool_) {
491 return pool_->get_job_queue()->size();
492 }
493 return 0;
494#else
495 std::unique_lock<std::mutex> lock(queue_mutex_);
496 return tasks_.size();
497#endif
498 }
std::queue< std::function< void()> > tasks_

References queue_mutex_, and tasks_.

Referenced by demonstrate_shutdown().

Here is the caller graph for this function:

◆ shutdown()

void database::async::async_executor::shutdown ( )
inline

Gracefully shuts down the executor.

Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 441 of file async_operations.h.

441 {
442#ifdef USE_THREAD_SYSTEM
443 if (pool_) {
444 pool_->stop(false);
445 }
446#else
447 {
448 std::unique_lock<std::mutex> lock(queue_mutex_);
449 stop_ = true;
450 }
451 condition_.notify_all();
452
453 for (auto& worker : workers_) {
454 if (worker.joinable()) {
455 worker.join();
456 }
457 }
458 workers_.clear();
459#endif
460 }
std::condition_variable condition_
static void worker(int thread_id, const std::string &connection_string)
Worker function: creates its own connection, inserts data, reads it back, and disconnects.

References condition_, queue_mutex_, stop_, worker(), and workers_.

Referenced by demonstrate_shutdown(), TEST_F(), and ~async_executor().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ submit()

template<typename F , typename... Args>
requires concepts::SubmittableTask<F, Args...>
auto database::async::async_executor::submit ( F && func,
Args &&... args ) -> std::future<std::invoke_result_t<F, Args...>>
inline

Submits a task for asynchronous execution.

Template Parameters
FCallable type - constrained by SubmittableTask concept
ArgsArgument types
Parameters
funcThe callable to execute
argsArguments to pass to the callable
Returns
std::future with the result of the callable
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h, and async_executor_demo.cpp.

Definition at line 395 of file async_operations.h.

396 {
397 using return_type = std::invoke_result_t<F, Args...>;
398
399#ifdef USE_THREAD_SYSTEM
400 auto task = std::make_shared<std::packaged_task<return_type()>>(
401 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
402 );
403
404 auto future = task->get_future();
405
406 auto job = std::make_unique<lambda_job>(
407 [task]() { (*task)(); },
408 "async_task"
409 );
410
411 auto result = pool_->enqueue(std::move(job));
412 if (result.is_err()) {
413 throw std::runtime_error("Failed to enqueue job: " +
414 result.error().message);
415 }
416
417 return future;
418#else
419 auto task = std::make_shared<std::packaged_task<return_type()>>(
420 std::bind(std::forward<F>(func), std::forward<Args>(args)...)
421 );
422
423 auto future = task->get_future();
424
425 {
426 std::unique_lock<std::mutex> lock(queue_mutex_);
427 if (stop_) {
428 throw std::runtime_error("Cannot submit task to stopped executor");
429 }
430 tasks_.emplace([task]() { (*task)(); });
431 }
432
433 condition_.notify_one();
434 return future;
435#endif
436 }

References condition_, queue_mutex_, stop_, and tasks_.

Referenced by compare_with_legacy(), demonstrate_basic_usage(), demonstrate_error_handling(), demonstrate_high_throughput(), and demonstrate_shutdown().

Here is the caller graph for this function:

◆ thread_count()

size_t database::async::async_executor::thread_count ( ) const
inline

Returns the number of worker threads.

Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h, and async_executor_demo.cpp.

Definition at line 503 of file async_operations.h.

503 {
504 return thread_count_;
505 }

References thread_count_.

Referenced by demonstrate_basic_usage(), and TEST_F().

Here is the caller graph for this function:

◆ wait_for_completion()

void database::async::async_executor::wait_for_completion ( )
inline

Waits for all pending tasks to complete.

Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 465 of file async_operations.h.

465 {
466#ifdef USE_THREAD_SYSTEM
467 if (pool_) {
468 while (pool_->get_job_queue()->size() > 0) {
469 std::this_thread::sleep_for(std::chrono::milliseconds(10));
470 }
471 }
472#else
473 while (true) {
474 {
475 std::unique_lock<std::mutex> lock(queue_mutex_);
476 if (tasks_.empty()) {
477 break;
478 }
479 }
480 std::this_thread::sleep_for(std::chrono::milliseconds(10));
481 }
482#endif
483 }

References queue_mutex_, and tasks_.

Referenced by demonstrate_high_throughput().

Here is the caller graph for this function:

◆ worker_thread()

void database::async::async_executor::worker_thread ( )
inlineprivate
Examples
/home/runner/work/database_system/database_system/database/async/async_operations.h.

Definition at line 519 of file async_operations.h.

519 {
520 while (true) {
521 std::function<void()> task;
522
523 {
524 std::unique_lock<std::mutex> lock(queue_mutex_);
525 condition_.wait(lock, [this] {
526 return stop_ || !tasks_.empty();
527 });
528
529 if (stop_ && tasks_.empty()) {
530 return;
531 }
532
533 if (!tasks_.empty()) {
534 task = std::move(tasks_.front());
535 tasks_.pop();
536 }
537 }
538
539 if (task) {
540 task();
541 }
542 }
543 }

References condition_, queue_mutex_, stop_, and tasks_.

Referenced by async_executor().

Here is the caller graph for this function:

Member Data Documentation

◆ condition_

std::condition_variable database::async::async_executor::condition_
private

◆ queue_mutex_

std::mutex database::async::async_executor::queue_mutex_
mutableprivate

◆ stop_

std::atomic<bool> database::async::async_executor::stop_
private

◆ tasks_

std::queue<std::function<void()> > database::async::async_executor::tasks_
private

◆ thread_count_

size_t database::async::async_executor::thread_count_
private

◆ workers_

std::vector<std::thread> database::async::async_executor::workers_
private

The documentation for this class was generated from the following file: