43#include <shared_mutex>
50#include <sys/eventfd.h>
51#include <sys/inotify.h>
54#elif defined(__APPLE__) || defined(__FreeBSD__)
60#ifndef WIN32_LEAN_AND_MEAN
61#define WIN32_LEAN_AND_MEAN
71namespace watcher_error_codes {
162#if defined(__linux__)
166#elif defined(__APPLE__) || defined(__FreeBSD__)
170 , dir_handle_(INVALID_HANDLE_VALUE)
175 if (result.is_ok()) {
206 "Config watcher is already running",
212 if (init_result.is_err()) {
307 std::vector<config_snapshot>
history(
size_t count = 0)
const {
310 if (count == 0 || count >
history_.size()) {
314 std::vector<config_snapshot> result;
315 result.reserve(count);
317 for (
size_t i = 0; i < count && it !=
history_.rend(); ++i, ++it) {
318 result.push_back(*it);
331 for (
const auto& snapshot :
history_) {
332 if (snapshot.version == target_version) {
333 std::unique_lock<std::shared_mutex> config_lock(
config_mutex_);
339 config_lock.unlock();
348 "Target version not found in history: " + std::to_string(target_version),
369 if (count == 0 || count >
events_.size()) {
373 std::vector<config_change_event> result;
374 result.reserve(count);
376 for (
size_t i = 0; i < count && it !=
events_.rend(); ++i, ++it) {
377 result.push_back(*it);
387#if defined(__linux__)
388 return init_inotify();
389#elif defined(__APPLE__) || defined(__FreeBSD__)
390 return init_kqueue();
392 return init_win32_watcher();
396 "File watching not supported on this platform",
406#if defined(__linux__)
408#elif defined(__APPLE__) || defined(__FreeBSD__)
410 int kq = kqueue_fd_.exchange(-1);
415 if (dir_handle_ != INVALID_HANDLE_VALUE) {
416 CancelIo(dir_handle_);
425#if defined(__linux__)
427#elif defined(__APPLE__) || defined(__FreeBSD__)
430 cleanup_win32_watcher();
434#if defined(__linux__)
439 shutdown_fd_ = eventfd(0, EFD_NONBLOCK);
440 if (shutdown_fd_ < 0) {
443 "Failed to create eventfd: " + std::string(strerror(errno)),
448 int ifd = inotify_init1(IN_NONBLOCK);
454 "Failed to initialize inotify: " + std::string(strerror(errno)),
461 std::string dir_path = path.parent_path().string();
462 if (dir_path.empty()) {
466 int wfd = inotify_add_watch(ifd, dir_path.c_str(),
467 IN_MODIFY | IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE);
474 "Failed to add inotify watch: " + std::string(strerror(errno)),
479 inotify_fd_.store(ifd);
480 watch_fd_.store(wfd);
484 void signal_eventfd() {
485 if (shutdown_fd_ >= 0) {
487 [[maybe_unused]]
auto unused = write(shutdown_fd_, &val,
sizeof(val));
491 void cleanup_inotify() {
492 int wfd = watch_fd_.exchange(-1);
493 int ifd = inotify_fd_.exchange(-1);
494 if (wfd >= 0 && ifd >= 0) {
495 inotify_rm_watch(ifd, wfd);
500 if (shutdown_fd_ >= 0) {
506 void watch_loop_linux() {
508 std::string filename = path.filename().string();
510 constexpr size_t EVENT_BUF_LEN = 4096;
511 alignas(
struct inotify_event) char buffer[EVENT_BUF_LEN];
514 int ifd = inotify_fd_.load();
517 struct pollfd pfds[2] = {
519 {shutdown_fd_, POLLIN, 0}
521 int ret = poll(pfds, 2, 500);
524 if (errno == EINTR)
continue;
528 if (ret == 0)
continue;
531 if (pfds[1].revents & POLLIN)
break;
533 if (!(pfds[0].revents & POLLIN))
continue;
535 ssize_t len = read(ifd, buffer, EVENT_BUF_LEN);
537 if (errno == EAGAIN || errno == EINTR)
continue;
541 bool should_reload =
false;
542 for (
char* ptr = buffer; ptr < buffer + len; ) {
543 auto*
event =
reinterpret_cast<struct inotify_event*
>(ptr);
545 if (event->len > 0) {
546 std::string event_name(event->name);
547 if (event_name == filename) {
548 if (event->mask & (IN_MODIFY | IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE)) {
549 should_reload =
true;
554 ptr +=
sizeof(
struct inotify_event) + event->len;
559 std::this_thread::sleep_for(std::chrono::milliseconds(100));
566#if defined(__APPLE__) || defined(__FreeBSD__)
571 kqueue_fd_ = kqueue();
572 if (kqueue_fd_ < 0) {
575 "Failed to create kqueue: " + std::string(strerror(errno)),
580 file_fd_ = open(config_path_.c_str(), O_RDONLY);
583 std::filesystem::path path(config_path_);
584 std::string dir_path = path.parent_path().string();
585 if (dir_path.empty()) {
589 file_fd_ = open(dir_path.c_str(), O_RDONLY);
595 "Failed to open file/directory for watching: " + std::string(strerror(errno)),
599 watching_directory_ =
true;
602 struct kevent change;
603 EV_SET(&change, file_fd_, EVFILT_VNODE,
604 EV_ADD | EV_ENABLE | EV_CLEAR,
605 NOTE_WRITE | NOTE_EXTEND | NOTE_RENAME | NOTE_DELETE | NOTE_ATTRIB,
608 if (kevent(kqueue_fd_, &change, 1,
nullptr, 0,
nullptr) < 0) {
615 "Failed to register kevent: " + std::string(strerror(errno)),
623 void cleanup_kqueue() {
625 int kq = kqueue_fd_.exchange(-1);
632 void watch_loop_kqueue() {
634 struct timespec
timeout = {0, 500000000};
636 while (running_.load()) {
637 int kq = kqueue_fd_.load();
640 int n = kevent(kq,
nullptr, 0, &event, 1, &timeout);
643 if (errno == EINTR)
continue;
647 if (n == 0)
continue;
649 if (event.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_RENAME | NOTE_ATTRIB)) {
651 std::this_thread::sleep_for(std::chrono::milliseconds(100));
654 if (event.fflags & (NOTE_DELETE | NOTE_RENAME)) {
656 std::lock_guard<std::mutex> lock(file_fd_mutex_);
660 file_fd_ = open(config_path_.c_str(), O_RDONLY);
662 int kq_local = kqueue_fd_.load();
664 struct kevent change;
665 EV_SET(&change, file_fd_, EVFILT_VNODE,
666 EV_ADD | EV_ENABLE | EV_CLEAR,
667 NOTE_WRITE | NOTE_EXTEND | NOTE_RENAME | NOTE_DELETE | NOTE_ATTRIB,
669 kevent(kq_local, &change, 1,
nullptr, 0,
nullptr);
679 std::lock_guard<std::mutex> lock(file_fd_mutex_);
686 bool watching_directory_ =
false;
694 std::filesystem::path path(config_path_);
695 std::wstring dir_path = path.parent_path().wstring();
696 if (dir_path.empty()) {
700 dir_handle_ = CreateFileW(
703 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
706 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
710 if (dir_handle_ == INVALID_HANDLE_VALUE) {
713 "Failed to open directory for watching",
721 void cleanup_win32_watcher() {
722 if (dir_handle_ != INVALID_HANDLE_VALUE) {
723 CancelIo(dir_handle_);
724 CloseHandle(dir_handle_);
725 dir_handle_ = INVALID_HANDLE_VALUE;
729 void watch_loop_win32() {
730 std::filesystem::path path(config_path_);
731 std::wstring filename = path.filename().wstring();
733 constexpr DWORD BUFFER_SIZE = 4096;
734 alignas(DWORD)
char buffer[BUFFER_SIZE];
736 OVERLAPPED overlapped = {};
737 overlapped.hEvent = CreateEvent(
nullptr, TRUE, FALSE,
nullptr);
739 while (running_.load()) {
740 DWORD bytes_returned = 0;
741 BOOL
success = ReadDirectoryChangesW(
746 FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_SIZE,
756 DWORD wait_result = WaitForSingleObject(overlapped.hEvent, 500);
758 if (wait_result == WAIT_TIMEOUT) {
759 CancelIo(dir_handle_);
763 if (wait_result != WAIT_OBJECT_0) {
767 if (!GetOverlappedResult(dir_handle_, &overlapped, &bytes_returned, FALSE)) {
771 ResetEvent(overlapped.hEvent);
773 bool should_reload =
false;
774 auto* notification =
reinterpret_cast<FILE_NOTIFY_INFORMATION*
>(buffer);
777 std::wstring changed_name(notification->FileName,
778 notification->FileNameLength /
sizeof(WCHAR));
780 if (changed_name == filename) {
781 should_reload =
true;
784 if (notification->NextEntryOffset == 0)
break;
785 notification =
reinterpret_cast<FILE_NOTIFY_INFORMATION*
>(
786 reinterpret_cast<char*
>(notification) + notification->NextEntryOffset);
790 std::this_thread::sleep_for(std::chrono::milliseconds(100));
795 CloseHandle(overlapped.hEvent);
803#if defined(__linux__)
805#elif defined(__APPLE__) || defined(__FreeBSD__)
819 event.timestamp = std::chrono::system_clock::now();
820 event.version = version_.load() + 1;
822 if (result.is_err()) {
823 event.success =
false;
824 event.error_message = result.error().message;
826 notify_error(result.error().message);
834 if (validation_result.is_err()) {
835 event.success =
false;
836 event.error_message = validation_result.error().message;
838 notify_error(
"Validation failed: " + validation_result.error().message);
841 "Configuration validation failed: " + validation_result.error().message,
847 std::unique_lock<std::shared_mutex> lock(config_mutex_);
848 event.changed_fields = get_changed_fields(current_config_, new_config);
851 std::vector<std::string> non_reloadable_changes;
852 for (
const auto& field :
event.changed_fields) {
854 non_reloadable_changes.push_back(field);
860 current_config_ = new_config;
861 version_.fetch_add(1);
862 event.version = version_.load();
863 event.success =
true;
866 add_to_history(new_config);
874 notify_change(old_config, new_config);
886 std::vector<std::string> changes;
890 changes.push_back(
"thread.pool_size");
893 changes.push_back(
"thread.queue_type");
896 changes.push_back(
"thread.max_queue_size");
901 changes.push_back(
"logger.level");
904 changes.push_back(
"logger.async");
907 changes.push_back(
"logger.buffer_size");
910 changes.push_back(
"logger.file_path");
913 changes.push_back(
"logger.writers");
918 changes.push_back(
"monitoring.enabled");
921 changes.push_back(
"monitoring.metrics_interval");
924 changes.push_back(
"monitoring.tracing.enabled");
927 changes.push_back(
"monitoring.tracing.sampling_rate");
932 changes.push_back(
"database.backend");
935 changes.push_back(
"database.connection_string");
940 changes.push_back(
"network.tls.enabled");
943 changes.push_back(
"network.compression");
946 changes.push_back(
"network.buffer_size");
956 std::lock_guard<std::mutex> lock(history_mutex_);
959 snapshot.
version = version_.load();
960 snapshot.
timestamp = std::chrono::system_clock::now();
963 history_.push_back(snapshot);
966 while (history_.size() > max_history_) {
967 history_.pop_front();
975 std::lock_guard<std::mutex> lock(events_mutex_);
976 events_.push_back(
event);
979 while (events_.size() > 100) {
988 std::lock_guard<std::mutex> lock(callbacks_mutex_);
989 for (
const auto& callback : change_callbacks_) {
991 callback(old_cfg, new_cfg);
1002 std::lock_guard<std::mutex> lock(callbacks_mutex_);
1003 for (
const auto& callback : error_callbacks_) {
1039#if defined(__linux__)
1040 std::atomic<int> inotify_fd_;
1041 std::atomic<int> watch_fd_;
1043#elif defined(__APPLE__) || defined(__FreeBSD__)
1044 std::atomic<int> kqueue_fd_;
1046 std::mutex file_fd_mutex_;
1047#elif defined(_WIN32)
Result type for error handling with member function support.
static Result< T > ok(U &&value)
Create a successful result with value (static factory)
static unified_config defaults()
Get default configuration.
static VoidResult validate(const unified_config &config)
Validate a configuration.
static Result< unified_config > load(const std::string &path)
Load configuration from a YAML file.
Monitors configuration files for changes and supports hot-reload.
std::vector< config_snapshot > history(size_t count=0) const
Get configuration history snapshots.
std::shared_mutex config_mutex_
VoidResult rollback(uint64_t target_version)
Rollback to a previous configuration version.
std::atomic< bool > running_
VoidResult init_platform_watcher()
Initialize platform-specific file watching.
std::thread watch_thread_
unified_config current_config_
std::vector< config_change_event > recent_events(size_t count=10) const
Get recent change events.
std::deque< config_change_event > events_
config_watcher & operator=(config_watcher &&)=delete
std::atomic< uint64_t > version_
bool is_running() const
Check if the watcher is currently running.
VoidResult reload()
Manually trigger a configuration reload.
static std::vector< std::string > get_changed_fields(const unified_config &old_cfg, const unified_config &new_cfg)
Compare two configurations and return changed field paths.
void signal_watcher_shutdown()
Signal the watch thread to wake up and exit.
VoidResult start()
Start watching the configuration file for changes.
config_watcher(config_watcher &&)=delete
void on_change(change_callback callback)
Register a callback for configuration changes.
config_watcher(const std::string &config_path, size_t max_history=10)
Construct a config_watcher for the specified file.
void stop()
Stop watching the configuration file.
std::vector< error_callback > error_callbacks_
void notify_change(const unified_config &old_cfg, const unified_config &new_cfg)
Notify all registered change callbacks.
const unified_config & current() const
Get the current configuration.
void on_error(error_callback callback)
Register a callback for reload errors.
std::vector< change_callback > change_callbacks_
uint64_t version() const
Get the current configuration version.
const std::string & config_path() const
Get the path to the configuration file being watched.
void add_event(const config_change_event &event)
Add a change event to the event log.
void watch_loop()
Main watch loop - dispatches to platform-specific implementation.
void add_to_history(const unified_config &config)
Add a configuration to history.
std::function< void(const std::string &error_message)> error_callback
Callback type for reload errors.
std::mutex callbacks_mutex_
config_watcher(const config_watcher &)=delete
void notify_error(const std::string &message)
Notify all registered error callbacks.
VoidResult do_reload()
Perform the actual configuration reload.
std::deque< config_snapshot > history_
config_watcher & operator=(const config_watcher &)=delete
void cleanup_platform_watcher()
Cleanup platform-specific resources (call after thread join).
~config_watcher()
Destructor. Automatically stops watching if running.
std::mutex history_mutex_
std::function< void(const unified_config &old_config, const unified_config &new_config)> change_callback
Callback type for configuration changes.
YAML-based configuration loader for the unified system.
constexpr int platform_not_supported
constexpr int validation_failed
constexpr int rollback_failed
constexpr int watch_failed
constexpr int reload_failed
constexpr int already_running
constexpr int not_started
bool is_hot_reloadable(const std::string &field_path)
Check if a configuration field supports hot-reload.
Result< T > make_error(int code, const std::string &message, const std::string &module="")
Create an error result with code and message.
Result< std::monostate > VoidResult
Specialized Result for void operations.
Umbrella header for Result<T> type and related utilities.
Information about a configuration change event.
std::chrono::system_clock::time_point timestamp
Timestamp of the change.
std::string error_message
Error message if change failed.
bool success
Whether the change was successful.
std::vector< std::string > changed_fields
List of changed field paths.
uint64_t version
Configuration version (incrementing counter)
Represents a configuration snapshot for version history.
std::chrono::system_clock::time_point timestamp
Timestamp when this configuration was active.
uint64_t version
Configuration version number.
unified_config config
The configuration data.
std::string connection_string
Connection string or URI.
std::string backend
Database backend: "postgresql", "mysql", "sqlite", "mongodb", "redis".
bool async
Enable async logging.
std::vector< std::string > writers
List of writers: "console", "file", "rotating_file", "network", "json".
std::string file_path
Log file path (for file writers)
size_t buffer_size
Async buffer size in bytes.
std::string level
Log level: "trace", "debug", "info", "warn", "error", "critical", "off".
std::chrono::milliseconds metrics_interval
Metrics collection interval.
bool enabled
Enable monitoring.
tracing_config tracing
Tracing configuration.
size_t buffer_size
Send/receive buffer size.
tls_config tls
TLS configuration.
std::string compression
Compression type: "none", "lz4", "gzip", "deflate", "zstd".
std::string queue_type
Queue type: "mutex", "lockfree", "bounded".
size_t pool_size
Number of worker threads (default: hardware concurrency)
size_t max_queue_size
Maximum queue size (for bounded queue)
bool enabled
Enable tracing.
double sampling_rate
Sampling rate (0.0 to 1.0)
Root configuration structure for the unified system.
network_config network
Network system configuration.
logger_config logger
Logger system configuration.
thread_config thread
Thread system configuration.
monitoring_config monitoring
Monitoring system configuration.
database_config database
Database system configuration.
Generic event structure for the event bus.
Unified configuration schema for the entire system.