Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
test_lock_free_collector.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
10#include <gtest/gtest.h>
13#include <thread>
14#include <vector>
15
16using namespace kcenon::monitoring;
17
18class LockFreeCollectorTest : public ::testing::Test {
19protected:
20 void SetUp() override {
21 collector = std::make_shared<central_collector>();
22 }
23
24 std::shared_ptr<central_collector> collector;
25};
26
27TEST_F(LockFreeCollectorTest, BasicSampleRecording) {
28 thread_local_buffer buffer(256, collector);
29
30 // Record some samples
31 metric_sample sample1{"operation1", std::chrono::nanoseconds(1000), true};
32 metric_sample sample2{"operation1", std::chrono::nanoseconds(2000), true};
33 metric_sample sample3{"operation1", std::chrono::nanoseconds(1500), false};
34
35 EXPECT_TRUE(buffer.record(sample1));
36 EXPECT_TRUE(buffer.record(sample2));
37 EXPECT_TRUE(buffer.record(sample3));
38
39 EXPECT_EQ(buffer.size(), 3);
40
41 // Flush to collector
42 size_t flushed = buffer.flush();
43 EXPECT_EQ(flushed, 3);
44 EXPECT_EQ(buffer.size(), 0);
45
46 // Check collector stats
47 auto stats = collector->get_stats();
48 EXPECT_EQ(stats.total_samples, 3);
49 EXPECT_EQ(stats.batches_received, 1);
50 EXPECT_EQ(stats.operation_count, 1);
51
52 // Get profile
53 auto profile_result = collector->get_profile("operation1");
54 ASSERT_TRUE(profile_result.is_ok());
55
56 auto profile = profile_result.value();
57 EXPECT_EQ(profile.total_calls, 3);
58 EXPECT_EQ(profile.error_count, 1);
59 EXPECT_EQ(profile.min_duration_ns, 1000);
60 EXPECT_EQ(profile.max_duration_ns, 2000);
61}
62
63TEST_F(LockFreeCollectorTest, BufferAutoFlush) {
64 thread_local_buffer buffer(10, collector); // Small buffer
65
66 // Fill buffer
67 for (int i = 0; i < 10; ++i) {
68 metric_sample sample{"op", std::chrono::nanoseconds(100), true};
69 EXPECT_TRUE(buffer.record(sample));
70 }
71
72 EXPECT_TRUE(buffer.is_full());
73
74 // Next record should fail
75 metric_sample sample{"op", std::chrono::nanoseconds(100), true};
76 EXPECT_FALSE(buffer.record(sample));
77
78 // But auto-flush should work
79 EXPECT_TRUE(buffer.record_auto_flush(sample));
80 EXPECT_EQ(buffer.size(), 1); // Buffer now has 1 sample after flush+record
81
82 // Check collector received the batch
83 auto stats = collector->get_stats();
84 EXPECT_EQ(stats.total_samples, 10); // First 10 samples flushed
85}
86
87TEST_F(LockFreeCollectorTest, MultiThreadedCollection) {
88 constexpr int NUM_THREADS = 4;
89 constexpr int SAMPLES_PER_THREAD = 1000;
90
91 std::vector<std::thread> threads;
92
93 for (int t = 0; t < NUM_THREADS; ++t) {
94 threads.emplace_back([this, t]() {
95 thread_local_buffer buffer(256, collector);
96 std::string op_name = "thread_op_" + std::to_string(t);
97
98 for (int i = 0; i < SAMPLES_PER_THREAD; ++i) {
99 metric_sample sample{
100 op_name,
101 std::chrono::nanoseconds(100 + i),
102 (i % 10) != 0 // 10% error rate
103 };
104 buffer.record_auto_flush(sample);
105 }
106
107 // Final flush
108 buffer.flush();
109 });
110 }
111
112 // Wait for all threads
113 for (auto& t : threads) {
114 t.join();
115 }
116
117 // Verify all samples were collected
118 auto stats = collector->get_stats();
119 EXPECT_EQ(stats.total_samples, NUM_THREADS * SAMPLES_PER_THREAD);
120 EXPECT_EQ(stats.operation_count, NUM_THREADS);
121
122 // Verify each thread's profile
123 for (int t = 0; t < NUM_THREADS; ++t) {
124 std::string op_name = "thread_op_" + std::to_string(t);
125 auto profile_result = collector->get_profile(op_name);
126 ASSERT_TRUE(profile_result.is_ok());
127
128 auto profile = profile_result.value();
129 EXPECT_EQ(profile.total_calls, SAMPLES_PER_THREAD);
130 EXPECT_EQ(profile.error_count, SAMPLES_PER_THREAD / 10);
131 }
132}
133
135 auto limited_collector = std::make_shared<central_collector>(10); // Only 10 profiles max
136
137 thread_local_buffer buffer(256, limited_collector);
138
139 // Create 15 operations (should evict 5)
140 for (int i = 0; i < 15; ++i) {
141 std::string op_name = "op_" + std::to_string(i);
142 metric_sample sample{op_name, std::chrono::nanoseconds(1000), true};
143 buffer.record_auto_flush(sample);
144 }
145
146 buffer.flush();
147
148 // Should have exactly 10 operations (LRU evicted the rest)
149 auto stats = limited_collector->get_stats();
150 EXPECT_LE(stats.operation_count, 10);
151 EXPECT_GE(stats.lru_evictions, 5);
152}
153
155 thread_local_buffer buffer(256, collector);
156
157 // Create multiple operations
158 for (int i = 0; i < 5; ++i) {
159 std::string op_name = "operation_" + std::to_string(i);
160 metric_sample sample{op_name, std::chrono::nanoseconds(1000 * (i + 1)), true};
161 buffer.record(sample);
162 }
163
164 buffer.flush();
165
166 // Get all profiles
167 auto all_profiles = collector->get_all_profiles();
168 EXPECT_EQ(all_profiles.size(), 5);
169
170 // Verify each operation exists
171 for (int i = 0; i < 5; ++i) {
172 std::string op_name = "operation_" + std::to_string(i);
173 EXPECT_NE(all_profiles.find(op_name), all_profiles.end());
174 EXPECT_EQ(all_profiles[op_name].total_calls, 1);
175 EXPECT_EQ(all_profiles[op_name].avg_duration_ns, 1000 * (i + 1));
176 }
177}
178
180 thread_local_buffer buffer(256, collector);
181
182 // Add some samples
183 for (int i = 0; i < 10; ++i) {
184 metric_sample sample{"op", std::chrono::nanoseconds(1000), true};
185 buffer.record(sample);
186 }
187 buffer.flush();
188
189 // Verify data exists
190 auto stats_before = collector->get_stats();
191 EXPECT_GT(stats_before.total_samples, 0);
192
193 // Clear
194 collector->clear();
195
196 // Verify cleared
197 auto stats_after = collector->get_stats();
198 EXPECT_EQ(stats_after.total_samples, 0);
199 EXPECT_EQ(stats_after.operation_count, 0);
200 EXPECT_EQ(stats_after.batches_received, 0);
201}
202
203TEST_F(LockFreeCollectorTest, ProfileNotFound) {
204 auto result = collector->get_profile("nonexistent");
205 EXPECT_TRUE(result.is_err());
206 EXPECT_EQ(result.error().code, static_cast<int>(monitoring_error_code::metric_not_found));
207}
Central collector for aggregating metrics from thread-local buffers.
std::shared_ptr< central_collector > collector
Thread-local buffer for lock-free metric collection.
size_t flush()
Flush buffered samples to central collector.
size_t size() const
Get current number of buffered samples.
bool record_auto_flush(const metric_sample &sample)
Record a metric sample with automatic flush on overflow.
bool is_full() const
Check if buffer is full.
bool record(const metric_sample &sample)
Record a metric sample (lock-free)
Sample data structure for metric recording.
TEST_F(LockFreeCollectorTest, BasicSampleRecording)
Thread-local buffer for lock-free metric collection.