Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
database::integrated::adapters::backends::fallback_thread_backend Class Reference

Simple thread pool using std::thread. More...

#include <fallback_thread_backend.h>

Inheritance diagram for database::integrated::adapters::backends::fallback_thread_backend:
Inheritance graph
Collaboration diagram for database::integrated::adapters::backends::fallback_thread_backend:
Collaboration graph

Public Member Functions

 fallback_thread_backend (const db_thread_config &config)
 
 ~fallback_thread_backend () override
 
 fallback_thread_backend (const fallback_thread_backend &)=delete
 
fallback_thread_backendoperator= (const fallback_thread_backend &)=delete
 
 fallback_thread_backend (fallback_thread_backend &&)=delete
 
fallback_thread_backendoperator= (fallback_thread_backend &&)=delete
 
common::VoidResult initialize () override
 Initialize the thread backend.
 
common::VoidResult shutdown () override
 Shutdown the thread backend gracefully.
 
bool is_initialized () const override
 Check if backend is initialized.
 
common::VoidResult execute (std::function< void()> task) override
 Execute a task (fire-and-forget)
 
void wait_for_completion () override
 Wait for all pending tasks to complete.
 
bool wait_for_completion_timeout (std::chrono::milliseconds timeout) override
 Wait for completion with timeout.
 
std::size_t worker_count () const override
 Get number of worker threads.
 
std::size_t queue_size () const override
 Get current queue size.
 
bool is_idle () const override
 Check if thread pool is idle.
 
- Public Member Functions inherited from database::integrated::adapters::backends::thread_backend
virtual ~thread_backend ()=default
 

Private Member Functions

void worker_thread ()
 Worker thread function.
 

Private Attributes

const db_thread_configconfig_
 
bool initialized_
 
std::atomic< bool > shutdown_requested_
 
std::vector< std::thread > workers_
 
std::queue< std::function< void()> > task_queue_
 
std::mutex queue_mutex_
 
std::condition_variable queue_cv_
 
std::atomic< std::size_t > active_tasks_
 
std::mutex completion_mutex_
 
std::condition_variable completion_cv_
 

Detailed Description

Simple thread pool using std::thread.

This backend provides basic thread pooling when thread_system is unavailable. Uses std::thread and std::queue for task management.

Definition at line 46 of file fallback_thread_backend.h.

Constructor & Destructor Documentation

◆ fallback_thread_backend() [1/3]

database::integrated::adapters::backends::fallback_thread_backend::fallback_thread_backend ( const db_thread_config & config)
explicit

◆ ~fallback_thread_backend()

database::integrated::adapters::backends::fallback_thread_backend::~fallback_thread_backend ( )
override

Definition at line 34 of file fallback_thread_backend.cpp.

35{
36 if (initialized_)
37 {
38 shutdown();
39 }
40}
common::VoidResult shutdown() override
Shutdown the thread backend gracefully.

References initialized_, and shutdown().

Here is the call graph for this function:

◆ fallback_thread_backend() [2/3]

database::integrated::adapters::backends::fallback_thread_backend::fallback_thread_backend ( const fallback_thread_backend & )
delete

◆ fallback_thread_backend() [3/3]

database::integrated::adapters::backends::fallback_thread_backend::fallback_thread_backend ( fallback_thread_backend && )
delete

Member Function Documentation

◆ execute()

common::VoidResult database::integrated::adapters::backends::fallback_thread_backend::execute ( std::function< void()> task)
overridevirtual

Execute a task (fire-and-forget)

Parameters
taskTask to execute
Returns
VoidResult::ok() on successful submission

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 127 of file fallback_thread_backend.cpp.

128{
129 if (!initialized_)
130 {
131 return make_error("Thread pool not initialized");
132 }
133
134 if (!task)
135 {
136 return make_error("Invalid task");
137 }
138
140 {
141 return make_error("Thread pool is shutting down");
142 }
143
144 {
145 std::lock_guard<std::mutex> lock(queue_mutex_);
146
147 // Check queue size limit
149 {
150 return make_error("Task queue full");
151 }
152
153 task_queue_.push(std::move(task));
154 }
155
156 queue_cv_.notify_one();
157 return common::ok();
158}
VoidResult ok()
std::size_t max_queue_size
Maximum queued tasks (0 = unlimited)

References config_, initialized_, database::integrated::db_thread_config::max_queue_size, common::ok(), queue_cv_, queue_mutex_, shutdown_requested_, and task_queue_.

Here is the call graph for this function:

◆ initialize()

common::VoidResult database::integrated::adapters::backends::fallback_thread_backend::initialize ( )
overridevirtual

Initialize the thread backend.

Returns
VoidResult::ok() on success, error on failure

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 42 of file fallback_thread_backend.cpp.

43{
44 if (initialized_)
45 {
46 return common::ok();
47 }
48
49 try
50 {
51 shutdown_requested_ = false;
52 active_tasks_ = 0;
53
54 // Determine worker count
55 std::size_t worker_count = config_.thread_count;
56 if (worker_count == 0)
57 {
58 worker_count = std::max(1u, std::thread::hardware_concurrency());
59 }
60
61 // Create worker threads
62 workers_.reserve(worker_count);
63 for (std::size_t i = 0; i < worker_count; ++i)
64 {
66 }
67
68 initialized_ = true;
69 return common::ok();
70 }
71 catch (const std::exception& e)
72 {
73 return make_error(std::string("Thread pool initialization failed: ") + e.what());
74 }
75}
std::size_t worker_count() const override
Get number of worker threads.
std::size_t thread_count
Number of worker threads (0 = auto-detect from hardware)

References active_tasks_, config_, initialized_, common::ok(), shutdown_requested_, database::integrated::db_thread_config::thread_count, worker_count(), worker_thread(), and workers_.

Here is the call graph for this function:

◆ is_idle()

bool database::integrated::adapters::backends::fallback_thread_backend::is_idle ( ) const
overridevirtual

Check if thread pool is idle.

Returns
true if no pending or running tasks

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 199 of file fallback_thread_backend.cpp.

200{
201 std::lock_guard<std::mutex> lock(queue_mutex_);
202 return task_queue_.empty() && active_tasks_ == 0;
203}

References active_tasks_, queue_mutex_, and task_queue_.

◆ is_initialized()

bool database::integrated::adapters::backends::fallback_thread_backend::is_initialized ( ) const
overridevirtual

Check if backend is initialized.

Returns
true if initialized and ready

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 122 of file fallback_thread_backend.cpp.

123{
124 return initialized_;
125}

References initialized_.

◆ operator=() [1/2]

fallback_thread_backend & database::integrated::adapters::backends::fallback_thread_backend::operator= ( const fallback_thread_backend & )
delete

◆ operator=() [2/2]

fallback_thread_backend & database::integrated::adapters::backends::fallback_thread_backend::operator= ( fallback_thread_backend && )
delete

◆ queue_size()

std::size_t database::integrated::adapters::backends::fallback_thread_backend::queue_size ( ) const
overridevirtual

Get current queue size.

Returns
Number of pending tasks

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 193 of file fallback_thread_backend.cpp.

194{
195 std::lock_guard<std::mutex> lock(queue_mutex_);
196 return task_queue_.size();
197}

References queue_mutex_, and task_queue_.

◆ shutdown()

common::VoidResult database::integrated::adapters::backends::fallback_thread_backend::shutdown ( )
overridevirtual

Shutdown the thread backend gracefully.

Returns
VoidResult::ok() on success, error on failure

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 77 of file fallback_thread_backend.cpp.

78{
79 if (!initialized_)
80 {
81 return common::ok();
82 }
83
84 try
85 {
86 // Signal shutdown
88
89 // Wake up all workers
90 {
91 std::lock_guard<std::mutex> lock(queue_mutex_);
92 queue_cv_.notify_all();
93 }
94
95 // Wait for all workers to finish
96 for (auto& worker : workers_)
97 {
98 if (worker.joinable())
99 {
100 worker.join();
101 }
102 }
103
104 workers_.clear();
105
106 // Clear remaining tasks
107 {
108 std::lock_guard<std::mutex> lock(queue_mutex_);
109 std::queue<std::function<void()>> empty;
110 task_queue_.swap(empty);
111 }
112
113 initialized_ = false;
114 return common::ok();
115 }
116 catch (const std::exception& e)
117 {
118 return make_error(std::string("Thread pool shutdown failed: ") + e.what());
119 }
120}
static void worker(int thread_id, const std::string &connection_string)
Worker function: creates its own connection, inserts data, reads it back, and disconnects.

References initialized_, common::ok(), queue_cv_, queue_mutex_, shutdown_requested_, task_queue_, worker(), and workers_.

Referenced by ~fallback_thread_backend().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ wait_for_completion()

void database::integrated::adapters::backends::fallback_thread_backend::wait_for_completion ( )
overridevirtual

Wait for all pending tasks to complete.

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 160 of file fallback_thread_backend.cpp.

161{
162 if (!initialized_)
163 {
164 return;
165 }
166
167 std::unique_lock<std::mutex> lock(completion_mutex_);
168 completion_cv_.wait(lock, [this]() {
169 std::lock_guard<std::mutex> queue_lock(queue_mutex_);
170 return task_queue_.empty() && active_tasks_ == 0;
171 });
172}

References active_tasks_, completion_cv_, completion_mutex_, initialized_, queue_mutex_, and task_queue_.

◆ wait_for_completion_timeout()

bool database::integrated::adapters::backends::fallback_thread_backend::wait_for_completion_timeout ( std::chrono::milliseconds timeout)
overridevirtual

Wait for completion with timeout.

Parameters
timeoutMaximum wait time
Returns
true if all tasks completed, false if timeout

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 174 of file fallback_thread_backend.cpp.

175{
176 if (!initialized_)
177 {
178 return true;
179 }
180
181 std::unique_lock<std::mutex> lock(completion_mutex_);
182 return completion_cv_.wait_for(lock, timeout, [this]() {
183 std::lock_guard<std::mutex> queue_lock(queue_mutex_);
184 return task_queue_.empty() && active_tasks_ == 0;
185 });
186}

References active_tasks_, completion_cv_, completion_mutex_, initialized_, queue_mutex_, task_queue_, and database::timeout.

◆ worker_count()

std::size_t database::integrated::adapters::backends::fallback_thread_backend::worker_count ( ) const
overridevirtual

Get number of worker threads.

Returns
Worker count

Implements database::integrated::adapters::backends::thread_backend.

Definition at line 188 of file fallback_thread_backend.cpp.

189{
190 return workers_.size();
191}

References workers_.

Referenced by initialize().

Here is the caller graph for this function:

◆ worker_thread()

void database::integrated::adapters::backends::fallback_thread_backend::worker_thread ( )
private

Worker thread function.

Definition at line 205 of file fallback_thread_backend.cpp.

206{
207 while (true)
208 {
209 std::function<void()> task;
210
211 {
212 std::unique_lock<std::mutex> lock(queue_mutex_);
213
214 // Wait for task or shutdown
215 queue_cv_.wait(lock, [this]() {
216 return shutdown_requested_ || !task_queue_.empty();
217 });
218
219 // Check shutdown
220 if (shutdown_requested_ && task_queue_.empty())
221 {
222 break;
223 }
224
225 // Get next task
226 if (!task_queue_.empty())
227 {
228 task = std::move(task_queue_.front());
229 task_queue_.pop();
230 }
231 }
232
233 // Execute task
234 if (task)
235 {
237
238 try
239 {
240 task();
241 }
242 catch (...)
243 {
244 // Swallow exceptions to prevent worker thread from terminating
245 }
246
248
249 // Notify completion waiters
250 {
251 std::lock_guard<std::mutex> lock(completion_mutex_);
252 completion_cv_.notify_all();
253 }
254 }
255 }
256}

References active_tasks_, completion_cv_, completion_mutex_, queue_cv_, queue_mutex_, shutdown_requested_, and task_queue_.

Referenced by initialize().

Here is the caller graph for this function:

Member Data Documentation

◆ active_tasks_

std::atomic<std::size_t> database::integrated::adapters::backends::fallback_thread_backend::active_tasks_
private

◆ completion_cv_

std::condition_variable database::integrated::adapters::backends::fallback_thread_backend::completion_cv_
private

◆ completion_mutex_

std::mutex database::integrated::adapters::backends::fallback_thread_backend::completion_mutex_
mutableprivate

◆ config_

const db_thread_config& database::integrated::adapters::backends::fallback_thread_backend::config_
private

Definition at line 76 of file fallback_thread_backend.h.

Referenced by execute(), and initialize().

◆ initialized_

bool database::integrated::adapters::backends::fallback_thread_backend::initialized_
private

◆ queue_cv_

std::condition_variable database::integrated::adapters::backends::fallback_thread_backend::queue_cv_
private

Definition at line 84 of file fallback_thread_backend.h.

Referenced by execute(), shutdown(), and worker_thread().

◆ queue_mutex_

std::mutex database::integrated::adapters::backends::fallback_thread_backend::queue_mutex_
mutableprivate

◆ shutdown_requested_

std::atomic<bool> database::integrated::adapters::backends::fallback_thread_backend::shutdown_requested_
private

Definition at line 78 of file fallback_thread_backend.h.

Referenced by execute(), initialize(), shutdown(), and worker_thread().

◆ task_queue_

std::queue<std::function<void()> > database::integrated::adapters::backends::fallback_thread_backend::task_queue_
private

◆ workers_

std::vector<std::thread> database::integrated::adapters::backends::fallback_thread_backend::workers_
private

Definition at line 80 of file fallback_thread_backend.h.

Referenced by initialize(), shutdown(), and worker_count().


The documentation for this class was generated from the following files: