Logger System 0.1.3
High-performance C++20 thread-safe logging system with asynchronous capabilities
Loading...
Searching...
No Matches
kcenon::logger::async::async_worker Class Reference

Standalone async worker with jthread compatibility. More...

#include <async_worker.h>

Collaboration diagram for kcenon::logger::async::async_worker:
Collaboration graph

Public Member Functions

 async_worker (std::size_t queue_size=8192)
 Constructor with configurable queue size.
 
 ~async_worker ()
 Destructor - ensures graceful shutdown.
 
 async_worker (const async_worker &)=delete
 
async_workeroperator= (const async_worker &)=delete
 
 async_worker (async_worker &&)=delete
 
async_workeroperator= (async_worker &&)=delete
 
void start ()
 Start the worker thread.
 
void stop ()
 Stop the worker thread gracefully.
 
bool enqueue (task_type task)
 Enqueue a task for async processing.
 
void flush ()
 Flush all pending tasks.
 
bool is_running () const noexcept
 Check if the worker is currently running.
 
std::size_t pending_count () const
 Get the current queue size.
 
std::size_t capacity () const noexcept
 Get the queue capacity.
 
std::uint64_t dropped_count () const noexcept
 Get the number of dropped tasks due to queue overflow.
 

Private Member Functions

void worker_loop (simple_stop_source &stop)
 Worker thread main loop (fallback version)
 
void drain_queue ()
 Process all remaining tasks in the queue.
 

Private Attributes

const std::size_t queue_size_
 Maximum queue capacity.
 
std::queue< task_typequeue_
 Task queue.
 
std::mutex queue_mutex_
 Protects queue_ access.
 
std::condition_variable queue_cv_
 Signals new tasks or stop (fallback)
 
compat_jthread worker_thread_
 Background worker thread.
 
std::atomic< bool > running_ {false}
 Worker state flag.
 
std::atomic< std::uint64_t > dropped_count_ {0}
 Overflow counter.
 

Detailed Description

Standalone async worker with jthread compatibility.

This worker provides a background thread for processing logging tasks asynchronously. It uses std::jthread's cooperative cancellation mechanism via std::stop_token for graceful shutdown when available, or falls back to manual stop mechanism for environments without jthread support.

Thread Safety:

  • enqueue() is thread-safe for multiple producers
  • The worker thread is the single consumer
  • flush() blocks until all pending tasks are processed

Performance Characteristics:

  • Lock-free enqueue path when queue is not full
  • Batch processing reduces context switching overhead
  • Configurable queue size for different workloads

Usage:

async_worker worker(8192);
worker.start();
worker.enqueue([&logger, message]() {
logger.write(message);
});
worker.flush(); // Wait for all pending tasks
worker.stop(); // Graceful shutdown
Standalone async worker with jthread compatibility.
Since
1.3.0

Definition at line 82 of file async_worker.h.

Constructor & Destructor Documentation

◆ async_worker() [1/3]

kcenon::logger::async::async_worker::async_worker ( std::size_t queue_size = 8192)
explicit

Constructor with configurable queue size.

Parameters
queue_sizeMaximum number of pending tasks (default: 8192)

The queue size should be chosen based on expected logging throughput:

  • Higher values reduce the chance of dropped messages under burst load
  • Lower values reduce memory footprint
Note
Memory usage is approximately queue_size * sizeof(task_type)

Definition at line 12 of file async_worker.cpp.

13 : queue_size_(queue_size)
14{
15}
const std::size_t queue_size_
Maximum queue capacity.

◆ ~async_worker()

kcenon::logger::async::async_worker::~async_worker ( )

Destructor - ensures graceful shutdown.

Automatically calls stop() if the worker is still running. The destructor will block until all pending tasks are processed and the worker thread has terminated.

Definition at line 17 of file async_worker.cpp.

17 {
18 stop();
19}
void stop()
Stop the worker thread gracefully.

References stop().

Here is the call graph for this function:

◆ async_worker() [2/3]

kcenon::logger::async::async_worker::async_worker ( const async_worker & )
delete

◆ async_worker() [3/3]

kcenon::logger::async::async_worker::async_worker ( async_worker && )
delete

Member Function Documentation

◆ capacity()

std::size_t kcenon::logger::async::async_worker::capacity ( ) const
nodiscardnoexcept

Get the queue capacity.

Returns
Maximum number of tasks the queue can hold

Definition at line 122 of file async_worker.cpp.

122 {
123 return queue_size_;
124}

References queue_size_.

◆ drain_queue()

void kcenon::logger::async::async_worker::drain_queue ( )
private

Process all remaining tasks in the queue.

Called during shutdown to ensure no tasks are lost.

Definition at line 209 of file async_worker.cpp.

209 {
210 std::queue<task_type> remaining;
211
212 {
213 std::lock_guard<std::mutex> lock(queue_mutex_);
214 std::swap(remaining, queue_);
215 }
216
217 // Process remaining tasks
218 while (!remaining.empty()) {
219 auto task = std::move(remaining.front());
220 remaining.pop();
221
222 if (task) {
223 try {
224 task();
225 } catch (...) {
226 // Swallow exceptions during drain
227 }
228 }
229 }
230}
std::queue< task_type > queue_
Task queue.
std::mutex queue_mutex_
Protects queue_ access.

References queue_, and queue_mutex_.

Referenced by flush(), and stop().

Here is the caller graph for this function:

◆ dropped_count()

std::uint64_t kcenon::logger::async::async_worker::dropped_count ( ) const
nodiscardnoexcept

Get the number of dropped tasks due to queue overflow.

Returns
Total number of tasks that were not enqueued

Definition at line 126 of file async_worker.cpp.

126 {
127 return dropped_count_.load(std::memory_order_relaxed);
128}
std::atomic< std::uint64_t > dropped_count_
Overflow counter.

References dropped_count_.

◆ enqueue()

bool kcenon::logger::async::async_worker::enqueue ( task_type task)

Enqueue a task for async processing.

Parameters
taskThe task to execute (callable with no arguments)
Returns
true if task was enqueued, false if queue is full

If the queue is full, the task is dropped and false is returned. The caller should handle this case (e.g., log synchronously or retry).

Thread-safe: Multiple threads can call enqueue() concurrently.

if (!worker.enqueue(task)) {
// Queue full - handle overflow
task(); // Execute synchronously as fallback
}

Definition at line 62 of file async_worker.cpp.

62 {
63 if (!task || !running_.load(std::memory_order_acquire)) {
64 return false;
65 }
66
67 {
68 std::lock_guard<std::mutex> lock(queue_mutex_);
69
70 // Check queue capacity
71 if (queue_.size() >= queue_size_) {
72 // Track dropped task
73 std::uint64_t dropped = dropped_count_.fetch_add(1, std::memory_order_relaxed) + 1;
74
75 // Log warning periodically (every 100 drops) to avoid log spam
76 if (dropped % 100 == 1) {
77 std::fprintf(stderr,
78 "[WARNING] Async worker queue full: %llu tasks dropped total\n",
79 static_cast<unsigned long long>(dropped));
80 }
81 return false;
82 }
83
84 queue_.push(std::move(task));
85 }
86
87 // Notify worker thread
88 queue_cv_.notify_one();
89 return true;
90}
std::atomic< bool > running_
Worker state flag.
std::condition_variable queue_cv_
Signals new tasks or stop (fallback)

References dropped_count_, queue_, queue_cv_, queue_mutex_, queue_size_, and running_.

◆ flush()

void kcenon::logger::async::async_worker::flush ( )

Flush all pending tasks.

Blocks until all tasks currently in the queue have been processed. New tasks enqueued during flush() may or may not be included.

Thread-safe: Can be called from any thread.

Note
This method may block for an extended period if the queue contains many tasks or tasks are slow to execute.

Definition at line 92 of file async_worker.cpp.

92 {
93 if (!running_.load(std::memory_order_acquire)) {
94 // Not running - just drain the queue directly
96 return;
97 }
98
99 // Wait until queue is empty
100 // Use a polling approach with short waits to avoid deadlock
101 while (true) {
102 {
103 std::lock_guard<std::mutex> lock(queue_mutex_);
104 if (queue_.empty()) {
105 break;
106 }
107 }
108 // Brief yield to allow worker to process
109 std::this_thread::sleep_for(std::chrono::microseconds(100));
110 }
111}
void drain_queue()
Process all remaining tasks in the queue.

References drain_queue(), queue_, queue_mutex_, and running_.

Here is the call graph for this function:

◆ is_running()

bool kcenon::logger::async::async_worker::is_running ( ) const
nodiscardnoexcept

Check if the worker is currently running.

Returns
true if the worker thread is active

Definition at line 113 of file async_worker.cpp.

113 {
114 return running_.load(std::memory_order_acquire);
115}

References running_.

◆ operator=() [1/2]

async_worker & kcenon::logger::async::async_worker::operator= ( async_worker && )
delete

◆ operator=() [2/2]

async_worker & kcenon::logger::async::async_worker::operator= ( const async_worker & )
delete

◆ pending_count()

std::size_t kcenon::logger::async::async_worker::pending_count ( ) const
nodiscard

Get the current queue size.

Returns
Number of pending tasks in the queue
Note
This is an approximate value due to concurrent access.

Definition at line 117 of file async_worker.cpp.

117 {
118 std::lock_guard<std::mutex> lock(queue_mutex_);
119 return queue_.size();
120}

References queue_, and queue_mutex_.

◆ start()

void kcenon::logger::async::async_worker::start ( )

Start the worker thread.

Creates a background thread that processes tasks from the queue. The thread will run until stop() is called or the worker is destroyed.

Thread-safe: Can be called from any thread. Idempotent: Multiple calls are safe (subsequent calls are no-ops).

Definition at line 21 of file async_worker.cpp.

21 {
22 // Check if already running (idempotent)
23 if (running_.exchange(true, std::memory_order_acq_rel)) {
24 return;
25 }
26
27#if LOGGER_HAS_JTHREAD
28 // Create worker thread with std::jthread
29 // The stop_token is automatically provided by jthread
30 worker_thread_ = compat_jthread([this](std::stop_token stop_token) {
31 worker_loop(stop_token);
32 });
33#else
34 // Create worker thread with manual stop source
35 // Note: We use the stop_source created by compat_jthread to ensure
36 // request_stop() correctly signals the worker loop
37 worker_thread_ = compat_jthread([this](simple_stop_source& stop) {
39 });
40#endif
41}
compat_jthread worker_thread_
Background worker thread.
void worker_loop(simple_stop_source &stop)
Worker thread main loop (fallback version)

References running_, stop(), worker_loop(), and worker_thread_.

Here is the call graph for this function:

◆ stop()

void kcenon::logger::async::async_worker::stop ( )

Stop the worker thread gracefully.

Signals the worker thread to stop and waits for it to complete processing remaining tasks. This method:

  1. Requests stop via stop mechanism
  2. Wakes up the worker thread
  3. Waits for the thread to join
  4. Processes any remaining tasks in the queue

Thread-safe: Can be called from any thread. Idempotent: Multiple calls are safe (subsequent calls are no-ops).

Note
This method blocks until the worker thread terminates.

Definition at line 43 of file async_worker.cpp.

43 {
44 // Check if not running (idempotent)
45 if (!running_.exchange(false, std::memory_order_acq_rel)) {
46 return;
47 }
48
49 // Request stop
51
52 // Wake up the worker thread
53 queue_cv_.notify_all();
54
55 // Wait for thread to finish
57
58 // Process any remaining tasks
60}
void request_stop()
Request the thread to stop.
void join()
Wait for thread to complete.

References drain_queue(), kcenon::logger::async::compat_jthread::join(), queue_cv_, kcenon::logger::async::compat_jthread::request_stop(), running_, and worker_thread_.

Referenced by start(), worker_loop(), and ~async_worker().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ worker_loop()

void kcenon::logger::async::async_worker::worker_loop ( simple_stop_source & stop)
private

Worker thread main loop (fallback version)

Parameters
stopStop source for manual cancellation

Processes tasks from the queue until stop is requested. Uses condition variable for efficient waiting.

Definition at line 170 of file async_worker.cpp.

170 {
171 while (!stop.stop_requested()) {
172 task_type task;
173
174 {
175 std::unique_lock<std::mutex> lock(queue_mutex_);
176
177 // Wait for task or stop signal
178 queue_cv_.wait(lock, [this, &stop]() {
179 return stop.stop_requested() || !queue_.empty();
180 });
181
182 // Check if we should stop
183 if (stop.stop_requested()) {
184 break;
185 }
186
187 // Check if we have work
188 if (queue_.empty()) {
189 continue;
190 }
191
192 // Get task from queue
193 task = std::move(queue_.front());
194 queue_.pop();
195 }
196
197 // Execute task outside the lock
198 if (task) {
199 try {
200 task();
201 } catch (...) {
202 // Swallow exceptions to prevent thread termination
203 }
204 }
205 }
206}
std::function< void()> task_type
Task type for async worker.

References queue_, queue_cv_, queue_mutex_, and stop().

Referenced by start().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ dropped_count_

std::atomic<std::uint64_t> kcenon::logger::async::async_worker::dropped_count_ {0}
private

Overflow counter.

Definition at line 238 of file async_worker.h.

238{0};

Referenced by dropped_count(), and enqueue().

◆ queue_

std::queue<task_type> kcenon::logger::async::async_worker::queue_
private

Task queue.

Definition at line 227 of file async_worker.h.

Referenced by drain_queue(), enqueue(), flush(), pending_count(), and worker_loop().

◆ queue_cv_

std::condition_variable kcenon::logger::async::async_worker::queue_cv_
private

Signals new tasks or stop (fallback)

Definition at line 233 of file async_worker.h.

Referenced by enqueue(), stop(), and worker_loop().

◆ queue_mutex_

std::mutex kcenon::logger::async::async_worker::queue_mutex_
mutableprivate

Protects queue_ access.

Definition at line 228 of file async_worker.h.

Referenced by drain_queue(), enqueue(), flush(), pending_count(), and worker_loop().

◆ queue_size_

const std::size_t kcenon::logger::async::async_worker::queue_size_
private

Maximum queue capacity.

Definition at line 226 of file async_worker.h.

Referenced by capacity(), and enqueue().

◆ running_

std::atomic<bool> kcenon::logger::async::async_worker::running_ {false}
private

Worker state flag.

Definition at line 237 of file async_worker.h.

237{false};

Referenced by enqueue(), flush(), is_running(), start(), and stop().

◆ worker_thread_

compat_jthread kcenon::logger::async::async_worker::worker_thread_
private

Background worker thread.

Definition at line 236 of file async_worker.h.

Referenced by start(), and stop().


The documentation for this class was generated from the following files: