Database System 0.1.0
Advanced C++20 Database System with Multi-Backend Support
Loading...
Searching...
No Matches
fallback_thread_backend.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
6
7#include <algorithm>
8
9namespace
10{
11 inline common::VoidResult make_error(const std::string& msg)
12 {
13 return common::VoidResult(common::error_info{ -1, msg, "" });
14 }
15}
16
17namespace database
18{
19namespace integrated
20{
21namespace adapters
22{
23namespace backends
24{
25
27 : config_(config)
28 , initialized_(false)
29 , shutdown_requested_(false)
30 , active_tasks_(0)
31{
32}
33
41
43{
44 if (initialized_)
45 {
46 return common::ok();
47 }
48
49 try
50 {
51 shutdown_requested_ = false;
52 active_tasks_ = 0;
53
54 // Determine worker count
55 std::size_t worker_count = config_.thread_count;
56 if (worker_count == 0)
57 {
58 worker_count = std::max(1u, std::thread::hardware_concurrency());
59 }
60
61 // Create worker threads
62 workers_.reserve(worker_count);
63 for (std::size_t i = 0; i < worker_count; ++i)
64 {
66 }
67
68 initialized_ = true;
69 return common::ok();
70 }
71 catch (const std::exception& e)
72 {
73 return make_error(std::string("Thread pool initialization failed: ") + e.what());
74 }
75}
76
78{
79 if (!initialized_)
80 {
81 return common::ok();
82 }
83
84 try
85 {
86 // Signal shutdown
88
89 // Wake up all workers
90 {
91 std::lock_guard<std::mutex> lock(queue_mutex_);
92 queue_cv_.notify_all();
93 }
94
95 // Wait for all workers to finish
96 for (auto& worker : workers_)
97 {
98 if (worker.joinable())
99 {
100 worker.join();
101 }
102 }
103
104 workers_.clear();
105
106 // Clear remaining tasks
107 {
108 std::lock_guard<std::mutex> lock(queue_mutex_);
109 std::queue<std::function<void()>> empty;
110 task_queue_.swap(empty);
111 }
112
113 initialized_ = false;
114 return common::ok();
115 }
116 catch (const std::exception& e)
117 {
118 return make_error(std::string("Thread pool shutdown failed: ") + e.what());
119 }
120}
121
123{
124 return initialized_;
125}
126
128{
129 if (!initialized_)
130 {
131 return make_error("Thread pool not initialized");
132 }
133
134 if (!task)
135 {
136 return make_error("Invalid task");
137 }
138
140 {
141 return make_error("Thread pool is shutting down");
142 }
143
144 {
145 std::lock_guard<std::mutex> lock(queue_mutex_);
146
147 // Check queue size limit
149 {
150 return make_error("Task queue full");
151 }
152
153 task_queue_.push(std::move(task));
154 }
155
156 queue_cv_.notify_one();
157 return common::ok();
158}
159
161{
162 if (!initialized_)
163 {
164 return;
165 }
166
167 std::unique_lock<std::mutex> lock(completion_mutex_);
168 completion_cv_.wait(lock, [this]() {
169 std::lock_guard<std::mutex> queue_lock(queue_mutex_);
170 return task_queue_.empty() && active_tasks_ == 0;
171 });
172}
173
175{
176 if (!initialized_)
177 {
178 return true;
179 }
180
181 std::unique_lock<std::mutex> lock(completion_mutex_);
182 return completion_cv_.wait_for(lock, timeout, [this]() {
183 std::lock_guard<std::mutex> queue_lock(queue_mutex_);
184 return task_queue_.empty() && active_tasks_ == 0;
185 });
186}
187
189{
190 return workers_.size();
191}
192
194{
195 std::lock_guard<std::mutex> lock(queue_mutex_);
196 return task_queue_.size();
197}
198
200{
201 std::lock_guard<std::mutex> lock(queue_mutex_);
202 return task_queue_.empty() && active_tasks_ == 0;
203}
204
206{
207 while (true)
208 {
209 std::function<void()> task;
210
211 {
212 std::unique_lock<std::mutex> lock(queue_mutex_);
213
214 // Wait for task or shutdown
215 queue_cv_.wait(lock, [this]() {
216 return shutdown_requested_ || !task_queue_.empty();
217 });
218
219 // Check shutdown
220 if (shutdown_requested_ && task_queue_.empty())
221 {
222 break;
223 }
224
225 // Get next task
226 if (!task_queue_.empty())
227 {
228 task = std::move(task_queue_.front());
229 task_queue_.pop();
230 }
231 }
232
233 // Execute task
234 if (task)
235 {
237
238 try
239 {
240 task();
241 }
242 catch (...)
243 {
244 // Swallow exceptions to prevent worker thread from terminating
245 }
246
248
249 // Notify completion waiters
250 {
251 std::lock_guard<std::mutex> lock(completion_mutex_);
252 completion_cv_.notify_all();
253 }
254 }
255 }
256}
257
258} // namespace backends
259} // namespace adapters
260} // namespace integrated
261} // namespace database
common::VoidResult execute(std::function< void()> task) override
Execute a task (fire-and-forget)
std::size_t queue_size() const override
Get current queue size.
std::size_t worker_count() const override
Get number of worker threads.
bool is_initialized() const override
Check if backend is initialized.
common::VoidResult initialize() override
Initialize the thread backend.
common::VoidResult shutdown() override
Shutdown the thread backend gracefully.
bool wait_for_completion_timeout(std::chrono::milliseconds timeout) override
Wait for completion with timeout.
void wait_for_completion() override
Wait for all pending tasks to complete.
static void worker(int thread_id, const std::string &connection_string)
Worker function: creates its own connection, inserts data, reads it back, and disconnects.
Fallback thread backend using std::thread.
VoidResult ok()
Result< std::monostate > VoidResult
Thread pool configuration for async operations.
std::size_t max_queue_size
Maximum queued tasks (0 = unlimited)
std::size_t thread_count
Number of worker threads (0 = auto-detect from hardware)