14#if LOGGER_HAS_IEXECUTOR
18standalone_executor::standalone_executor(std::size_t queue_size, std::string name)
19 : name_(std::move(name))
20 , queue_size_(queue_size) {}
22standalone_executor::~standalone_executor() {
26void standalone_executor::start() {
27 if (running_.exchange(
true)) {
31 stop_requested_.store(
false);
32 worker_thread_ = std::thread([
this]() { worker_loop(); });
36 std::unique_ptr<common::interfaces::IJob>&& job) {
38 if (!running_.load()) {
40 -1,
"Executor is not running",
"standalone_executor"
44 auto promise = std::make_shared<std::promise<void>>();
45 auto future = promise->get_future();
50 std::chrono::steady_clock::time_point{}
53 if (!enqueue_task(std::move(task))) {
55 -2,
"Queue is full",
"standalone_executor"
63 std::unique_ptr<common::interfaces::IJob>&& job,
64 std::chrono::milliseconds delay) {
66 if (!running_.load()) {
68 -1,
"Executor is not running",
"standalone_executor"
72 auto promise = std::make_shared<std::promise<void>>();
73 auto future = promise->get_future();
78 std::chrono::steady_clock::now() + delay
81 if (!enqueue_task(std::move(task))) {
83 -2,
"Queue is full",
"standalone_executor"
90size_t standalone_executor::worker_count()
const {
91 return running_.load() ? 1 : 0;
94bool standalone_executor::is_running()
const {
95 return running_.load();
98size_t standalone_executor::pending_tasks()
const {
99 std::lock_guard<std::mutex> lock(queue_mutex_);
100 return queue_.size();
103void standalone_executor::shutdown(
bool wait_for_completion) {
104 if (!running_.exchange(
false)) {
108 stop_requested_.store(
true);
109 queue_cv_.notify_all();
111 if (worker_thread_.joinable()) {
112 worker_thread_.join();
115 if (wait_for_completion) {
120std::uint64_t standalone_executor::dropped_count() const noexcept {
121 return dropped_count_.load();
124bool standalone_executor::enqueue_task(pending_task&& task) {
125 std::lock_guard<std::mutex> lock(queue_mutex_);
127 if (queue_.size() >= queue_size_) {
128 dropped_count_.fetch_add(1);
130 task.completion_promise->set_exception(
131 std::make_exception_ptr(
132 std::runtime_error(
"Queue is full - task dropped")));
136 queue_.push(std::move(task));
137 queue_cv_.notify_one();
141void standalone_executor::worker_loop() {
142 while (!stop_requested_.load()) {
146 std::unique_lock<std::mutex> lock(queue_mutex_);
148 queue_cv_.wait(lock, [
this]() {
149 return stop_requested_.load() || !queue_.empty();
152 if (stop_requested_.load() && queue_.empty()) {
156 if (queue_.empty()) {
160 task = std::move(queue_.front());
165 auto now = std::chrono::steady_clock::now();
166 if (task.execute_after > now) {
167 std::this_thread::sleep_until(task.execute_after);
172 auto result = task.job->execute();
173 if (result.is_ok()) {
174 task.completion_promise->set_value();
176 task.completion_promise->set_exception(
177 std::make_exception_ptr(
178 std::runtime_error(result.error().message)));
181 task.completion_promise->set_exception(std::current_exception());
186void standalone_executor::drain_queue() {
187 std::queue<pending_task> remaining;
190 std::lock_guard<std::mutex> lock(queue_mutex_);
191 std::swap(remaining, queue_);
194 while (!remaining.empty()) {
195 auto task = std::move(remaining.front());
199 auto result = task.job->execute();
200 if (result.is_ok()) {
201 task.completion_promise->set_value();
203 task.completion_promise->set_exception(
204 std::make_exception_ptr(
205 std::runtime_error(result.error().message)));
208 task.completion_promise->set_exception(std::current_exception());
214std::shared_ptr<common::interfaces::IExecutor> standalone_executor_factory::create(
215 std::size_t queue_size,
216 const std::string& name) {
218 auto executor = std::make_shared<standalone_executor>(queue_size, name);
Standalone IExecutor implementation using std::jthread.