13 : queue_size_(queue_size)
23 if (
running_.exchange(
true, std::memory_order_acq_rel)) {
45 if (!
running_.exchange(
false, std::memory_order_acq_rel)) {
63 if (!task || !
running_.load(std::memory_order_acquire)) {
73 std::uint64_t dropped =
dropped_count_.fetch_add(1, std::memory_order_relaxed) + 1;
76 if (dropped % 100 == 1) {
78 "[WARNING] Async worker queue full: %llu tasks dropped total\n",
79 static_cast<unsigned long long>(dropped));
84 queue_.push(std::move(task));
93 if (!
running_.load(std::memory_order_acquire)) {
109 std::this_thread::sleep_for(std::chrono::microseconds(100));
114 return running_.load(std::memory_order_acquire);
130#if LOGGER_HAS_JTHREAD
132 while (!stop_token.stop_requested()) {
140 bool has_work =
queue_cv_.wait(lock, stop_token, [
this]() {
145 if (stop_token.stop_requested()) {
150 if (!has_work ||
queue_.empty()) {
155 task = std::move(
queue_.front());
171 while (!
stop.stop_requested()) {
179 return stop.stop_requested() || !
queue_.empty();
183 if (
stop.stop_requested()) {
193 task = std::move(
queue_.front());
210 std::queue<task_type> remaining;
214 std::swap(remaining,
queue_);
218 while (!remaining.empty()) {
219 auto task = std::move(remaining.front());
Standalone async worker implementation with jthread compatibility kcenon.
compat_jthread worker_thread_
Background worker thread.
const std::size_t queue_size_
Maximum queue capacity.
void worker_loop(simple_stop_source &stop)
Worker thread main loop (fallback version)
std::size_t pending_count() const
Get the current queue size.
void drain_queue()
Process all remaining tasks in the queue.
void start()
Start the worker thread.
~async_worker()
Destructor - ensures graceful shutdown.
void flush()
Flush all pending tasks.
std::atomic< bool > running_
Worker state flag.
void stop()
Stop the worker thread gracefully.
std::queue< task_type > queue_
Task queue.
std::atomic< std::uint64_t > dropped_count_
Overflow counter.
std::size_t capacity() const noexcept
Get the queue capacity.
std::mutex queue_mutex_
Protects queue_ access.
std::uint64_t dropped_count() const noexcept
Get the number of dropped tasks due to queue overflow.
std::condition_variable queue_cv_
Signals new tasks or stop (fallback)
bool is_running() const noexcept
Check if the worker is currently running.
bool enqueue(task_type task)
Enqueue a task for async processing.
async_worker(std::size_t queue_size=8192)
Constructor with configurable queue size.
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
std::function< void()> task_type
Task type for async worker.