#include <iostream>
#include <thread>
#include <chrono>
#include <random>
#include <atomic>
using namespace thread_module;
void simulate_client(
int client_id,
const std::string& server_host, uint16_t server_port) {
std::cout << "Client " << client_id << " starting..." << std::endl;
auto logger = std::make_unique<kcenon::logger::logger>(
true, 1024);
logger->add_writer(
"console", std::make_unique<console_writer>());
logger->add_writer(
"network", std::make_unique<network_writer>(
server_host, server_port, network_writer::protocol_type::tcp
));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> level_dist(0, 5);
std::uniform_int_distribution<> sleep_dist(10, 100);
std::vector<std::string> messages = {
"User login successful",
"Database query executed",
"API request processed",
"Cache miss occurred",
"Background job completed",
"Error: Connection timeout",
"Warning: High memory usage",
"Critical: Disk space low"
};
for (int i = 0; i < 100; ++i) {
auto level = static_cast<log_level>(level_dist(gen));
auto& msg = messages[i % messages.size()];
logger->log(level,
"Client " + std::to_string(client_id) +
": " + msg);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_dist(gen)));
}
std::cout << "Client " << client_id << " finished" << std::endl;
}
std::cout << "=== Distributed Logging Demo ===" << std::endl;
const uint16_t server_port = 9999;
const std::string server_host = "localhost";
auto analyzer = std::make_unique<log_analyzer>(
std::chrono::seconds(10),
6
);
analyzer->add_pattern("errors", "error|fail|exception");
analyzer->add_pattern("warnings", "warning|warn");
analyzer->add_pattern("database", "database|query|sql");
analyzer->add_pattern("api", "api|request|endpoint");
analyzer->add_alert_rule({
"high_error_rate",
[](const log_analyzer::time_window_stats& stats) {
auto error_count = stats.level_counts.count(log_level::error) ?
stats.level_counts.at(log_level::error) : 0;
return error_count > 10;
},
[](const std::string& rule_name, const log_analyzer::time_window_stats& stats) {
std::cout << "\nšØ ALERT: " << rule_name
<< " - Error count: " << stats.level_counts.at(log_level::error)
<< " in current window" << std::endl;
}
});
auto aggregator = std::make_unique<log_aggregator>();
auto server = std::make_unique<log_server>(server_port, true);
server->add_handler([&analyzer, &aggregator](const log_server::network_log_entry& entry) {
if (entry.parsed_fields.count("level")) {
const auto& level_str = entry.parsed_fields.at("level");
if (level_str == "TRACE") level = log_level::trace;
else if (level_str == "DEBUG") level = log_level::debug;
else if (level_str == "INFO") level = log_level::info;
else if (level_str == "WARNING") level = log_level::warning;
else if (level_str == "ERROR") level = log_level::error;
else if (level_str == "CRITICAL") level = log_level::critical;
}
std::string message = entry.parsed_fields.count("message") ?
entry.parsed_fields.at("message") : entry.raw_data;
analyzer->analyze(level, message, "", 0, "", entry.received_time);
aggregator->add_log(entry.source_address, level, message, entry.raw_data.size());
});
if (!server->start()) {
std::cerr << "Failed to start log server" << std::endl;
return 1;
}
std::cout << "\n1. Testing Network Logging:" << std::endl;
std::cout << "Starting log server on port " << server_port << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::vector<std::thread> clients;
for (int i = 1; i <= 3; ++i) {
}
std::thread monitor_thread([&server, &analyzer]() {
for (int i = 0; i < 30; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(2));
auto server_stats = server->get_stats();
std::cout << "\n--- Server Stats ---" << std::endl;
std::cout << "Total logs received: " << server_stats.total_logs_received << std::endl;
std::cout << "Active connections: " << server_stats.active_connections << std::endl;
auto current_stats = analyzer->get_current_stats();
std::cout << "Current window messages/sec: "
<< current_stats.messages_per_second << std::endl;
}
});
for (auto& client : clients) {
client.join();
}
std::cout << "\n2. Generating Analysis Report:" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::string report = analyzer->generate_report(std::chrono::seconds(60));
std::cout << "\n" << report << std::endl;
std::cout << "\n3. Source Statistics:" << std::endl;
auto all_sources = aggregator->get_all_stats();
for (const auto& [source_id, stats] : all_sources) {
std::cout << "\nSource: " << source_id << std::endl;
std::cout << " Total messages: " << stats.total_messages << std::endl;
std::cout << " Average rate: " << stats.average_message_rate << " msg/sec" << std::endl;
std::cout << " Level distribution:" << std::endl;
for (const auto& [level, count] : stats.level_counts) {
std::cout << " " << static_cast<int>(level) << ": " << count << std::endl;
}
}
monitor_thread.join();
server->stop();
std::cout << "\n=== Demo Complete ===" << std::endl;
return 0;
}
Console writer for logging to stdout/stderr.
void simulate_client(int client_id, const std::string &server_host, uint16_t server_port)
Log analysis and metrics functionality.
Log server for distributed logging.
High-performance, thread-safe logging system with asynchronous capabilities.
common::interfaces::log_level log_level
Network writer for sending logs over TCP/UDP.