Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
standalone_executor.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
13
14#if LOGGER_HAS_IEXECUTOR
15
17
18standalone_executor::standalone_executor(std::size_t queue_size, std::string name)
19 : name_(std::move(name))
20 , queue_size_(queue_size) {}
21
22standalone_executor::~standalone_executor() {
23 shutdown(true);
24}
25
26void standalone_executor::start() {
27 if (running_.exchange(true)) {
28 return; // Already running
29 }
30
31 stop_requested_.store(false);
32 worker_thread_ = std::thread([this]() { worker_loop(); });
33}
34
35common::Result<std::future<void>> standalone_executor::execute(
36 std::unique_ptr<common::interfaces::IJob>&& job) {
37
38 if (!running_.load()) {
40 -1, "Executor is not running", "standalone_executor"
41 });
42 }
43
44 auto promise = std::make_shared<std::promise<void>>();
45 auto future = promise->get_future();
46
47 pending_task task{
48 std::move(job),
49 promise,
50 std::chrono::steady_clock::time_point{} // No delay
51 };
52
53 if (!enqueue_task(std::move(task))) {
55 -2, "Queue is full", "standalone_executor"
56 });
57 }
58
59 return common::Result<std::future<void>>::ok(std::move(future));
60}
61
62common::Result<std::future<void>> standalone_executor::execute_delayed(
63 std::unique_ptr<common::interfaces::IJob>&& job,
64 std::chrono::milliseconds delay) {
65
66 if (!running_.load()) {
68 -1, "Executor is not running", "standalone_executor"
69 });
70 }
71
72 auto promise = std::make_shared<std::promise<void>>();
73 auto future = promise->get_future();
74
75 pending_task task{
76 std::move(job),
77 promise,
78 std::chrono::steady_clock::now() + delay
79 };
80
81 if (!enqueue_task(std::move(task))) {
83 -2, "Queue is full", "standalone_executor"
84 });
85 }
86
87 return common::Result<std::future<void>>::ok(std::move(future));
88}
89
90size_t standalone_executor::worker_count() const {
91 return running_.load() ? 1 : 0;
92}
93
94bool standalone_executor::is_running() const {
95 return running_.load();
96}
97
98size_t standalone_executor::pending_tasks() const {
99 std::lock_guard<std::mutex> lock(queue_mutex_);
100 return queue_.size();
101}
102
103void standalone_executor::shutdown(bool wait_for_completion) {
104 if (!running_.exchange(false)) {
105 return; // Already stopped
106 }
107
108 stop_requested_.store(true);
109 queue_cv_.notify_all();
110
111 if (worker_thread_.joinable()) {
112 worker_thread_.join();
113 }
114
115 if (wait_for_completion) {
116 drain_queue();
117 }
118}
119
120std::uint64_t standalone_executor::dropped_count() const noexcept {
121 return dropped_count_.load();
122}
123
124bool standalone_executor::enqueue_task(pending_task&& task) {
125 std::lock_guard<std::mutex> lock(queue_mutex_);
126
127 if (queue_.size() >= queue_size_) {
128 dropped_count_.fetch_add(1);
129 // Set exception on the promise to signal failure
130 task.completion_promise->set_exception(
131 std::make_exception_ptr(
132 std::runtime_error("Queue is full - task dropped")));
133 return false;
134 }
135
136 queue_.push(std::move(task));
137 queue_cv_.notify_one();
138 return true;
139}
140
141void standalone_executor::worker_loop() {
142 while (!stop_requested_.load()) {
143 pending_task task;
144
145 {
146 std::unique_lock<std::mutex> lock(queue_mutex_);
147
148 queue_cv_.wait(lock, [this]() {
149 return stop_requested_.load() || !queue_.empty();
150 });
151
152 if (stop_requested_.load() && queue_.empty()) {
153 break;
154 }
155
156 if (queue_.empty()) {
157 continue;
158 }
159
160 task = std::move(queue_.front());
161 queue_.pop();
162 }
163
164 // Handle delayed execution
165 auto now = std::chrono::steady_clock::now();
166 if (task.execute_after > now) {
167 std::this_thread::sleep_until(task.execute_after);
168 }
169
170 // Execute the job
171 try {
172 auto result = task.job->execute();
173 if (result.is_ok()) {
174 task.completion_promise->set_value();
175 } else {
176 task.completion_promise->set_exception(
177 std::make_exception_ptr(
178 std::runtime_error(result.error().message)));
179 }
180 } catch (...) {
181 task.completion_promise->set_exception(std::current_exception());
182 }
183 }
184}
185
186void standalone_executor::drain_queue() {
187 std::queue<pending_task> remaining;
188
189 {
190 std::lock_guard<std::mutex> lock(queue_mutex_);
191 std::swap(remaining, queue_);
192 }
193
194 while (!remaining.empty()) {
195 auto task = std::move(remaining.front());
196 remaining.pop();
197
198 try {
199 auto result = task.job->execute();
200 if (result.is_ok()) {
201 task.completion_promise->set_value();
202 } else {
203 task.completion_promise->set_exception(
204 std::make_exception_ptr(
205 std::runtime_error(result.error().message)));
206 }
207 } catch (...) {
208 task.completion_promise->set_exception(std::current_exception());
209 }
210 }
211}
212
213// Factory implementation
214std::shared_ptr<common::interfaces::IExecutor> standalone_executor_factory::create(
215 std::size_t queue_size,
216 const std::string& name) {
217
218 auto executor = std::make_shared<standalone_executor>(queue_size, name);
219 executor->start();
220 return executor;
221}
222
223} // namespace kcenon::logger::integration
224
225#endif // LOGGER_HAS_IEXECUTOR
Standalone IExecutor implementation using std::jthread.