33void simulate_client(
int client_id,
const std::string& server_host, uint16_t server_port) {
34 std::cout <<
"Client " << client_id <<
" starting..." << std::endl;
37 auto logger = std::make_unique<kcenon::logger::logger>(
true, 1024);
38 logger->add_writer(
"console", std::make_unique<console_writer>());
39 logger->add_writer(
"network", std::make_unique<network_writer>(
40 server_host, server_port, network_writer::protocol_type::tcp
46 std::random_device rd;
47 std::mt19937 gen(rd());
48 std::uniform_int_distribution<> level_dist(0, 5);
49 std::uniform_int_distribution<> sleep_dist(10, 100);
51 std::vector<std::string> messages = {
52 "User login successful",
53 "Database query executed",
54 "API request processed",
55 "Cache miss occurred",
56 "Background job completed",
57 "Error: Connection timeout",
58 "Warning: High memory usage",
59 "Critical: Disk space low"
62 for (
int i = 0; i < 100; ++i) {
63 auto level =
static_cast<log_level
>(level_dist(gen));
64 auto& msg = messages[i % messages.size()];
66 logger->log(level,
"Client " + std::to_string(client_id) +
": " + msg);
68 std::this_thread::sleep_for(std::chrono::milliseconds(sleep_dist(gen)));
74 std::cout <<
"Client " << client_id <<
" finished" << std::endl;
78 std::cout <<
"=== Distributed Logging Demo ===" << std::endl;
80 const uint16_t server_port = 9999;
81 const std::string server_host =
"localhost";
84 auto analyzer = std::make_unique<log_analyzer>(
85 std::chrono::seconds(10),
90 analyzer->add_pattern(
"errors",
"error|fail|exception");
91 analyzer->add_pattern(
"warnings",
"warning|warn");
92 analyzer->add_pattern(
"database",
"database|query|sql");
93 analyzer->add_pattern(
"api",
"api|request|endpoint");
96 analyzer->add_alert_rule({
98 [](
const log_analyzer::time_window_stats& stats) {
99 auto error_count = stats.level_counts.count(log_level::error) ?
100 stats.level_counts.at(log_level::error) : 0;
101 return error_count > 10;
103 [](
const std::string& rule_name,
const log_analyzer::time_window_stats& stats) {
104 std::cout <<
"\nšØ ALERT: " << rule_name
105 <<
" - Error count: " << stats.level_counts.at(log_level::error)
106 <<
" in current window" << std::endl;
111 auto aggregator = std::make_unique<log_aggregator>();
114 auto server = std::make_unique<log_server>(server_port,
true);
117 server->add_handler([&analyzer, &aggregator](
const log_server::network_log_entry& entry) {
119 log_level level = log_level::info;
120 if (entry.parsed_fields.count(
"level")) {
121 const auto& level_str = entry.parsed_fields.at(
"level");
122 if (level_str ==
"TRACE") level = log_level::trace;
123 else if (level_str ==
"DEBUG") level = log_level::debug;
124 else if (level_str ==
"INFO") level = log_level::info;
125 else if (level_str ==
"WARNING") level = log_level::warning;
126 else if (level_str ==
"ERROR") level = log_level::error;
127 else if (level_str ==
"CRITICAL") level = log_level::critical;
131 std::string message = entry.parsed_fields.count(
"message") ?
132 entry.parsed_fields.at(
"message") : entry.raw_data;
135 analyzer->analyze(level, message,
"", 0,
"", entry.received_time);
138 aggregator->add_log(entry.source_address, level, message, entry.raw_data.size());
141 if (!server->start()) {
142 std::cerr <<
"Failed to start log server" << std::endl;
146 std::cout <<
"\n1. Testing Network Logging:" << std::endl;
147 std::cout <<
"Starting log server on port " << server_port << std::endl;
150 std::this_thread::sleep_for(std::chrono::seconds(1));
153 std::vector<std::thread> clients;
154 for (
int i = 1; i <= 3; ++i) {
159 std::thread monitor_thread([&server, &analyzer]() {
160 for (
int i = 0; i < 30; ++i) {
161 std::this_thread::sleep_for(std::chrono::seconds(2));
163 auto server_stats = server->get_stats();
164 std::cout <<
"\n--- Server Stats ---" << std::endl;
165 std::cout <<
"Total logs received: " << server_stats.total_logs_received << std::endl;
166 std::cout <<
"Active connections: " << server_stats.active_connections << std::endl;
169 auto current_stats = analyzer->get_current_stats();
170 std::cout <<
"Current window messages/sec: "
171 << current_stats.messages_per_second << std::endl;
176 for (
auto& client : clients) {
180 std::cout <<
"\n2. Generating Analysis Report:" << std::endl;
183 std::this_thread::sleep_for(std::chrono::seconds(2));
186 std::string report = analyzer->generate_report(std::chrono::seconds(60));
187 std::cout <<
"\n" << report << std::endl;
190 std::cout <<
"\n3. Source Statistics:" << std::endl;
191 auto all_sources = aggregator->get_all_stats();
192 for (
const auto& [source_id, stats] : all_sources) {
193 std::cout <<
"\nSource: " << source_id << std::endl;
194 std::cout <<
" Total messages: " << stats.total_messages << std::endl;
195 std::cout <<
" Average rate: " << stats.average_message_rate <<
" msg/sec" << std::endl;
196 std::cout <<
" Level distribution:" << std::endl;
197 for (
const auto& [level, count] : stats.level_counts) {
198 std::cout <<
" " <<
static_cast<int>(level) <<
": " << count << std::endl;
203 monitor_thread.join();
208 std::cout <<
"\n=== Demo Complete ===" << std::endl;