Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
async_worker.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
5#include "async_worker.h"
6
7#include <cstdio>
8#include <utility>
9
11
12async_worker::async_worker(std::size_t queue_size)
13 : queue_size_(queue_size)
14{
15}
16
20
22 // Check if already running (idempotent)
23 if (running_.exchange(true, std::memory_order_acq_rel)) {
24 return;
25 }
26
27#if LOGGER_HAS_JTHREAD
28 // Create worker thread with std::jthread
29 // The stop_token is automatically provided by jthread
30 worker_thread_ = compat_jthread([this](std::stop_token stop_token) {
31 worker_loop(stop_token);
32 });
33#else
34 // Create worker thread with manual stop source
35 // Note: We use the stop_source created by compat_jthread to ensure
36 // request_stop() correctly signals the worker loop
39 });
40#endif
41}
42
44 // Check if not running (idempotent)
45 if (!running_.exchange(false, std::memory_order_acq_rel)) {
46 return;
47 }
48
49 // Request stop
51
52 // Wake up the worker thread
53 queue_cv_.notify_all();
54
55 // Wait for thread to finish
57
58 // Process any remaining tasks
60}
61
63 if (!task || !running_.load(std::memory_order_acquire)) {
64 return false;
65 }
66
67 {
68 std::lock_guard<std::mutex> lock(queue_mutex_);
69
70 // Check queue capacity
71 if (queue_.size() >= queue_size_) {
72 // Track dropped task
73 std::uint64_t dropped = dropped_count_.fetch_add(1, std::memory_order_relaxed) + 1;
74
75 // Log warning periodically (every 100 drops) to avoid log spam
76 if (dropped % 100 == 1) {
77 std::fprintf(stderr,
78 "[WARNING] Async worker queue full: %llu tasks dropped total\n",
79 static_cast<unsigned long long>(dropped));
80 }
81 return false;
82 }
83
84 queue_.push(std::move(task));
85 }
86
87 // Notify worker thread
88 queue_cv_.notify_one();
89 return true;
90}
91
93 if (!running_.load(std::memory_order_acquire)) {
94 // Not running - just drain the queue directly
96 return;
97 }
98
99 // Wait until queue is empty
100 // Use a polling approach with short waits to avoid deadlock
101 while (true) {
102 {
103 std::lock_guard<std::mutex> lock(queue_mutex_);
104 if (queue_.empty()) {
105 break;
106 }
107 }
108 // Brief yield to allow worker to process
109 std::this_thread::sleep_for(std::chrono::microseconds(100));
110 }
111}
112
113bool async_worker::is_running() const noexcept {
114 return running_.load(std::memory_order_acquire);
115}
116
117std::size_t async_worker::pending_count() const {
118 std::lock_guard<std::mutex> lock(queue_mutex_);
119 return queue_.size();
120}
121
122std::size_t async_worker::capacity() const noexcept {
123 return queue_size_;
124}
125
126std::uint64_t async_worker::dropped_count() const noexcept {
127 return dropped_count_.load(std::memory_order_relaxed);
128}
129
130#if LOGGER_HAS_JTHREAD
131void async_worker::worker_loop(std::stop_token stop_token) {
132 while (!stop_token.stop_requested()) {
133 task_type task;
134
135 {
136 std::unique_lock<std::mutex> lock(queue_mutex_);
137
138 // Wait for task or stop signal
139 // condition_variable_any works with stop_token
140 bool has_work = queue_cv_.wait(lock, stop_token, [this]() {
141 return !queue_.empty();
142 });
143
144 // Check if we should stop
145 if (stop_token.stop_requested()) {
146 break;
147 }
148
149 // Check if we have work (not spurious wakeup)
150 if (!has_work || queue_.empty()) {
151 continue;
152 }
153
154 // Get task from queue
155 task = std::move(queue_.front());
156 queue_.pop();
157 }
158
159 // Execute task outside the lock
160 if (task) {
161 try {
162 task();
163 } catch (...) {
164 // Swallow exceptions to prevent thread termination
165 }
166 }
167 }
168}
169#else
171 while (!stop.stop_requested()) {
172 task_type task;
173
174 {
175 std::unique_lock<std::mutex> lock(queue_mutex_);
176
177 // Wait for task or stop signal
178 queue_cv_.wait(lock, [this, &stop]() {
179 return stop.stop_requested() || !queue_.empty();
180 });
181
182 // Check if we should stop
183 if (stop.stop_requested()) {
184 break;
185 }
186
187 // Check if we have work
188 if (queue_.empty()) {
189 continue;
190 }
191
192 // Get task from queue
193 task = std::move(queue_.front());
194 queue_.pop();
195 }
196
197 // Execute task outside the lock
198 if (task) {
199 try {
200 task();
201 } catch (...) {
202 // Swallow exceptions to prevent thread termination
203 }
204 }
205 }
206}
207#endif
208
210 std::queue<task_type> remaining;
211
212 {
213 std::lock_guard<std::mutex> lock(queue_mutex_);
214 std::swap(remaining, queue_);
215 }
216
217 // Process remaining tasks
218 while (!remaining.empty()) {
219 auto task = std::move(remaining.front());
220 remaining.pop();
221
222 if (task) {
223 try {
224 task();
225 } catch (...) {
226 // Swallow exceptions during drain
227 }
228 }
229 }
230}
231
232} // namespace kcenon::logger::async
Standalone async worker implementation with jthread compatibility kcenon.
compat_jthread worker_thread_
Background worker thread.
const std::size_t queue_size_
Maximum queue capacity.
void worker_loop(simple_stop_source &stop)
Worker thread main loop (fallback version)
std::size_t pending_count() const
Get the current queue size.
void drain_queue()
Process all remaining tasks in the queue.
void start()
Start the worker thread.
~async_worker()
Destructor - ensures graceful shutdown.
void flush()
Flush all pending tasks.
std::atomic< bool > running_
Worker state flag.
void stop()
Stop the worker thread gracefully.
std::queue< task_type > queue_
Task queue.
std::atomic< std::uint64_t > dropped_count_
Overflow counter.
std::size_t capacity() const noexcept
Get the queue capacity.
std::mutex queue_mutex_
Protects queue_ access.
std::uint64_t dropped_count() const noexcept
Get the number of dropped tasks due to queue overflow.
std::condition_variable queue_cv_
Signals new tasks or stop (fallback)
bool is_running() const noexcept
Check if the worker is currently running.
bool enqueue(task_type task)
Enqueue a task for async processing.
async_worker(std::size_t queue_size=8192)
Constructor with configurable queue size.
Wrapper for std::jthread or std::thread with manual stop mechanism.
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.
Simple stop source for environments without std::stop_token.
std::function< void()> task_type
Task type for async worker.