Monitoring System 0.1.0
System resource monitoring with pluggable collectors and alerting
Loading...
Searching...
No Matches
kcenon::monitoring::quantile_estimator Class Reference

P² algorithm for streaming quantile estimation. More...

#include <stream_aggregator.h>

Collaboration diagram for kcenon::monitoring::quantile_estimator:
Collaboration graph

Public Member Functions

 quantile_estimator (double p)
 Constructor.
 
 quantile_estimator (quantile_estimator &&other) noexcept
 
quantile_estimatoroperator= (quantile_estimator &&other) noexcept
 
 quantile_estimator (const quantile_estimator &)=delete
 
quantile_estimatoroperator= (const quantile_estimator &)=delete
 
void add_observation (double x)
 Add an observation.
 
double get_quantile () const
 Get the estimated quantile.
 
size_t count () const
 Get observation count.
 
void reset ()
 Reset the estimator.
 

Private Member Functions

void init_markers ()
 
double parabolic (int i, int sign) const
 
double linear (int i, int sign) const
 

Private Attributes

std::shared_mutex mutex_
 
double p_
 
size_t count_ = 0
 
double q_ [5] = {0}
 
int n_ [5] = {0}
 
double n_prime_ [5] = {0}
 
double dn_ [5] = {0}
 

Detailed Description

P² algorithm for streaming quantile estimation.

Implements the P² algorithm for estimating quantiles without storing all observations. Uses piecewise-parabolic interpolation.

Definition at line 193 of file stream_aggregator.h.

Constructor & Destructor Documentation

◆ quantile_estimator() [1/3]

kcenon::monitoring::quantile_estimator::quantile_estimator ( double p)
inlineexplicit

Constructor.

Parameters
pThe quantile to estimate (0.0 to 1.0)

Definition at line 199 of file stream_aggregator.h.

References init_markers().

Here is the call graph for this function:

◆ quantile_estimator() [2/3]

kcenon::monitoring::quantile_estimator::quantile_estimator ( quantile_estimator && other)
inlinenoexcept

Definition at line 204 of file stream_aggregator.h.

205 : p_(other.p_)
206 , count_(other.count_) {
207 std::copy(std::begin(other.q_), std::end(other.q_), std::begin(q_));
208 std::copy(std::begin(other.n_), std::end(other.n_), std::begin(n_));
209 std::copy(std::begin(other.n_prime_), std::end(other.n_prime_), std::begin(n_prime_));
210 std::copy(std::begin(other.dn_), std::end(other.dn_), std::begin(dn_));
211 }

References dn_, n_, n_prime_, kcenon::monitoring::other, and q_.

◆ quantile_estimator() [3/3]

kcenon::monitoring::quantile_estimator::quantile_estimator ( const quantile_estimator & )
delete

Member Function Documentation

◆ add_observation()

void kcenon::monitoring::quantile_estimator::add_observation ( double x)
inline

Add an observation.

Parameters
xThe observation value

Definition at line 259 of file stream_aggregator.h.

259 {
260 std::unique_lock<std::shared_mutex> lock(mutex_);
261 count_++;
262
263 if (count_ <= 5) {
264 q_[count_ - 1] = x;
265 if (count_ == 5) {
266 std::sort(q_, q_ + 5);
267 }
268 return;
269 }
270
271 // Find cell k
272 int k;
273 if (x < q_[0]) {
274 q_[0] = x;
275 k = 0;
276 } else if (x >= q_[4]) {
277 q_[4] = x;
278 k = 3;
279 } else {
280 for (k = 0; k < 4; ++k) {
281 if (x < q_[k + 1]) {
282 break;
283 }
284 }
285 }
286
287 // Increment positions
288 for (int i = k + 1; i < 5; ++i) {
289 n_[i]++;
290 }
291
292 // Update desired positions
293 for (int i = 0; i < 5; ++i) {
294 n_prime_[i] += dn_[i];
295 }
296
297 // Adjust marker heights
298 for (int i = 1; i < 4; ++i) {
299 double d = n_prime_[i] - n_[i];
300 if ((d >= 1 && n_[i + 1] - n_[i] > 1) ||
301 (d <= -1 && n_[i - 1] - n_[i] < -1)) {
302 int sign = (d >= 0) ? 1 : -1;
303 double q_new = parabolic(i, sign);
304
305 if (q_[i - 1] < q_new && q_new < q_[i + 1]) {
306 q_[i] = q_new;
307 } else {
308 q_[i] = linear(i, sign);
309 }
310 n_[i] += sign;
311 }
312 }
313 }
double linear(int i, int sign) const
double parabolic(int i, int sign) const

References count_, dn_, linear(), mutex_, n_, n_prime_, parabolic(), and q_.

Referenced by TEST_F(), and TEST_F().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ count()

size_t kcenon::monitoring::quantile_estimator::count ( ) const
inline

Get observation count.

Definition at line 337 of file stream_aggregator.h.

337 {
338 std::shared_lock<std::shared_mutex> lock(mutex_);
339 return count_;
340 }

References count_, and mutex_.

◆ get_quantile()

double kcenon::monitoring::quantile_estimator::get_quantile ( ) const
inline

Get the estimated quantile.

Returns
The estimated quantile value

Definition at line 319 of file stream_aggregator.h.

319 {
320 std::shared_lock<std::shared_mutex> lock(mutex_);
321 if (count_ < 5) {
322 if (count_ == 0) {
323 return 0.0;
324 }
325 // For small samples, use simple interpolation
326 std::vector<double> sorted(q_, q_ + count_);
327 std::sort(sorted.begin(), sorted.end());
328 size_t idx = static_cast<size_t>(p_ * (count_ - 1));
329 return sorted[idx];
330 }
331 return q_[2]; // Middle marker for p-quantile
332 }

References count_, mutex_, p_, and q_.

Referenced by TEST_F(), and TEST_F().

Here is the caller graph for this function:

◆ init_markers()

void kcenon::monitoring::quantile_estimator::init_markers ( )
inlineprivate

Definition at line 230 of file stream_aggregator.h.

230 {
231 // Initialize marker positions
232 n_[0] = 1;
233 n_[1] = 2;
234 n_[2] = 3;
235 n_[3] = 4;
236 n_[4] = 5;
237
238 // Initialize desired marker positions
239 n_prime_[0] = 1;
240 n_prime_[1] = 1 + 2 * p_;
241 n_prime_[2] = 1 + 4 * p_;
242 n_prime_[3] = 3 + 2 * p_;
243 n_prime_[4] = 5;
244
245 // Increments
246 dn_[0] = 0;
247 dn_[1] = p_ / 2;
248 dn_[2] = p_;
249 dn_[3] = (1 + p_) / 2;
250 dn_[4] = 1;
251 }

References dn_, n_, n_prime_, and p_.

Referenced by quantile_estimator().

Here is the caller graph for this function:

◆ linear()

double kcenon::monitoring::quantile_estimator::linear ( int i,
int sign ) const
inlineprivate

Definition at line 372 of file stream_aggregator.h.

372 {
373 int idx = (sign < 0) ? i - 1 : i + 1;
374 return q_[i] + static_cast<double>(sign) * (q_[idx] - q_[i]) /
375 (n_[idx] - n_[i]);
376 }

References n_, and q_.

Referenced by add_observation().

Here is the caller graph for this function:

◆ operator=() [1/2]

quantile_estimator & kcenon::monitoring::quantile_estimator::operator= ( const quantile_estimator & )
delete

◆ operator=() [2/2]

quantile_estimator & kcenon::monitoring::quantile_estimator::operator= ( quantile_estimator && other)
inlinenoexcept

Definition at line 213 of file stream_aggregator.h.

213 {
214 if (this != &other) {
215 p_ = other.p_;
216 count_ = other.count_;
217 std::copy(std::begin(other.q_), std::end(other.q_), std::begin(q_));
218 std::copy(std::begin(other.n_), std::end(other.n_), std::begin(n_));
219 std::copy(std::begin(other.n_prime_), std::end(other.n_prime_), std::begin(n_prime_));
220 std::copy(std::begin(other.dn_), std::end(other.dn_), std::begin(dn_));
221 }
222 return *this;
223 }

References count_, dn_, n_, n_prime_, kcenon::monitoring::other, p_, and q_.

◆ parabolic()

double kcenon::monitoring::quantile_estimator::parabolic ( int i,
int sign ) const
inlineprivate

Definition at line 357 of file stream_aggregator.h.

357 {
358 double qi = q_[i];
359 double qim1 = q_[i - 1];
360 double qip1 = q_[i + 1];
361 int ni = n_[i];
362 int nim1 = n_[i - 1];
363 int nip1 = n_[i + 1];
364
365 double term1 = static_cast<double>(sign) / (nip1 - nim1);
366 double term2 = (ni - nim1 + sign) * (qip1 - qi) / (nip1 - ni);
367 double term3 = (nip1 - ni - sign) * (qi - qim1) / (ni - nim1);
368
369 return qi + term1 * (term2 + term3);
370 }

References n_, and q_.

Referenced by add_observation().

Here is the caller graph for this function:

◆ reset()

void kcenon::monitoring::quantile_estimator::reset ( )
inline

Reset the estimator.

Definition at line 345 of file stream_aggregator.h.

345 {
346 std::unique_lock<std::shared_mutex> lock(mutex_);
347 count_ = 0;
348 n_[0] = 1; n_[1] = 2; n_[2] = 3; n_[3] = 4; n_[4] = 5;
349 n_prime_[0] = 1;
350 n_prime_[1] = 1 + 2 * p_;
351 n_prime_[2] = 1 + 4 * p_;
352 n_prime_[3] = 3 + 2 * p_;
353 n_prime_[4] = 5;
354 }

References count_, mutex_, n_, n_prime_, and p_.

Member Data Documentation

◆ count_

size_t kcenon::monitoring::quantile_estimator::count_ = 0
private

Definition at line 380 of file stream_aggregator.h.

Referenced by add_observation(), count(), get_quantile(), operator=(), and reset().

◆ dn_

double kcenon::monitoring::quantile_estimator::dn_[5] = {0}
private

Definition at line 384 of file stream_aggregator.h.

384{0};

Referenced by add_observation(), init_markers(), operator=(), and quantile_estimator().

◆ mutex_

std::shared_mutex kcenon::monitoring::quantile_estimator::mutex_
mutableprivate

Definition at line 378 of file stream_aggregator.h.

Referenced by add_observation(), count(), get_quantile(), and reset().

◆ n_

int kcenon::monitoring::quantile_estimator::n_[5] = {0}
private

◆ n_prime_

double kcenon::monitoring::quantile_estimator::n_prime_[5] = {0}
private

Definition at line 383 of file stream_aggregator.h.

383{0};

Referenced by add_observation(), init_markers(), operator=(), quantile_estimator(), and reset().

◆ p_

double kcenon::monitoring::quantile_estimator::p_
private

Definition at line 379 of file stream_aggregator.h.

Referenced by get_quantile(), init_markers(), operator=(), and reset().

◆ q_

double kcenon::monitoring::quantile_estimator::q_[5] = {0}
private

Definition at line 381 of file stream_aggregator.h.

381{0};

Referenced by add_observation(), get_quantile(), linear(), operator=(), parabolic(), and quantile_estimator().


The documentation for this class was generated from the following file: