29#include <shared_mutex>
67 std::unique_lock<std::shared_mutex> lock(
mutex_);
71 double delta = value -
mean_;
73 double delta2 = value -
mean_;
74 m2_ += delta * delta2;
89 std::shared_lock<std::shared_mutex> lock(
mutex_);
97 std::shared_lock<std::shared_mutex> lock(
mutex_);
105 std::shared_lock<std::shared_mutex> lock(
mutex_);
109 return m2_ /
static_cast<double>(
count_ - 1);
123 std::shared_lock<std::shared_mutex> lock(
mutex_);
131 std::shared_lock<std::shared_mutex> lock(
mutex_);
139 std::shared_lock<std::shared_mutex> lock(
mutex_);
147 std::shared_lock<std::shared_mutex> lock(
mutex_);
167 std::unique_lock<std::shared_mutex> lock(
mutex_);
207 std::copy(std::begin(
other.q_), std::end(
other.q_), std::begin(
q_));
208 std::copy(std::begin(
other.n_), std::end(
other.n_), std::begin(
n_));
209 std::copy(std::begin(
other.n_prime_), std::end(
other.n_prime_), std::begin(
n_prime_));
210 std::copy(std::begin(
other.dn_), std::end(
other.dn_), std::begin(
dn_));
214 if (
this != &
other) {
217 std::copy(std::begin(
other.q_), std::end(
other.q_), std::begin(
q_));
218 std::copy(std::begin(
other.n_), std::end(
other.n_), std::begin(
n_));
219 std::copy(std::begin(
other.n_prime_), std::end(
other.n_prime_), std::begin(
n_prime_));
220 std::copy(std::begin(
other.dn_), std::end(
other.dn_), std::begin(
dn_));
249 dn_[3] = (1 +
p_) / 2;
260 std::unique_lock<std::shared_mutex> lock(
mutex_);
266 std::sort(
q_,
q_ + 5);
276 }
else if (x >=
q_[4]) {
280 for (k = 0; k < 4; ++k) {
288 for (
int i = k + 1; i < 5; ++i) {
293 for (
int i = 0; i < 5; ++i) {
298 for (
int i = 1; i < 4; ++i) {
300 if ((d >= 1 &&
n_[i + 1] -
n_[i] > 1) ||
301 (d <= -1 &&
n_[i - 1] -
n_[i] < -1)) {
302 int sign = (d >= 0) ? 1 : -1;
305 if (
q_[i - 1] < q_new && q_new <
q_[i + 1]) {
320 std::shared_lock<std::shared_mutex> lock(
mutex_);
327 std::sort(sorted.begin(), sorted.end());
328 size_t idx =
static_cast<size_t>(
p_ * (
count_ - 1));
338 std::shared_lock<std::shared_mutex> lock(
mutex_);
346 std::unique_lock<std::shared_mutex> lock(
mutex_);
348 n_[0] = 1;
n_[1] = 2;
n_[2] = 3;
n_[3] = 4;
n_[4] = 5;
359 double qim1 =
q_[i - 1];
360 double qip1 =
q_[i + 1];
362 int nim1 =
n_[i - 1];
363 int nip1 =
n_[i + 1];
365 double term1 =
static_cast<double>(sign) / (nip1 - nim1);
366 double term2 = (ni - nim1 + sign) * (qip1 - qi) / (nip1 - ni);
367 double term3 = (nip1 - ni - sign) * (qi - qim1) / (ni - nim1);
369 return qi + term1 * (term2 + term3);
373 int idx = (sign < 0) ? i - 1 : i + 1;
374 return q_[i] +
static_cast<double>(sign) * (
q_[idx] -
q_[i]) /
399 using duration = std::chrono::system_clock::duration;
417 std::unique_lock<std::shared_mutex> lock(
mutex_);
426 entries_.push_back({value, timestamp});
433 std::shared_lock<std::shared_mutex> lock(
mutex_);
441 std::shared_lock<std::shared_mutex> lock(
mutex_);
449 std::shared_lock<std::shared_mutex> lock(
mutex_);
450 std::vector<T> result;
462 std::unique_lock<std::shared_mutex> lock(
mutex_);
502 "Window size must be positive").to_common_error());
506 "Window duration must be positive").to_common_error());
510 "Outlier threshold must be positive").to_common_error());
538 percentile_estimators_.emplace(p, quantile_estimator(p));
548 std::unique_lock<std::shared_mutex> lock(
mutex_);
552 double z_score = std::abs(value -
stats_.
mean()) /
567 estimator.add_observation(value);
577 std::shared_lock<std::shared_mutex> lock(
mutex_);
584 stats.percentiles[p] = estimator.get_quantile();
596 std::shared_lock<std::shared_mutex> lock(
mutex_);
599 return it->second.get_quantile();
636 std::unique_lock<std::shared_mutex> lock(
mutex_);
661 const std::vector<double>& y) {
662 if (x.size() != y.size() || x.size() < 2) {
667 double sum_x = 0, sum_y = 0, sum_xy = 0;
668 double sum_x2 = 0, sum_y2 = 0;
670 for (
size_t i = 0; i < n; ++i) {
673 sum_xy += x[i] * y[i];
674 sum_x2 += x[i] * x[i];
675 sum_y2 += y[i] * y[i];
678 double num = n * sum_xy - sum_x * sum_y;
679 double den = std::sqrt((n * sum_x2 - sum_x * sum_x) *
680 (n * sum_y2 - sum_y * sum_y));
Time-windowed value collection.
void add_value(const T &value, time_point timestamp)
Add a value with timestamp.
void clear()
Clear all entries.
bool empty() const
Check if empty.
size_t size() const
Get current size.
std::chrono::system_clock::duration duration
std::chrono::milliseconds window_duration_
std::vector< T > get_values() const
Get all values in the window.
moving_window_aggregator(std::chrono::milliseconds window_duration, size_t max_size)
Constructor.
std::chrono::system_clock::time_point time_point
std::deque< entry > entries_
void expire_old_entries(time_point current)
Welford's algorithm for computing streaming statistics.
online_statistics()=default
double variance() const
Get running variance (sample variance)
void add_value(double value)
Add a value to the statistics.
double stddev() const
Get running standard deviation.
double min() const
Get minimum value.
double sum() const
Get sum of all values.
double max() const
Get maximum value.
double mean() const
Get running mean.
streaming_statistics get_statistics() const
Get full statistics.
size_t count() const
Get sample count.
void reset()
Reset statistics.
P² algorithm for streaming quantile estimation.
quantile_estimator(double p)
Constructor.
quantile_estimator & operator=(const quantile_estimator &)=delete
size_t count() const
Get observation count.
void add_observation(double x)
Add an observation.
double linear(int i, int sign) const
quantile_estimator & operator=(quantile_estimator &&other) noexcept
void reset()
Reset the estimator.
quantile_estimator(const quantile_estimator &)=delete
quantile_estimator(quantile_estimator &&other) noexcept
double get_quantile() const
Get the estimated quantile.
double parabolic(int i, int sign) const
Full-featured streaming aggregation.
double mean() const
Get mean.
stream_aggregator(const stream_aggregator_config &config)
Constructor with configuration.
std::map< double, quantile_estimator > percentile_estimators_
stream_aggregator_config config_
streaming_statistics get_statistics() const
Get full statistics.
stream_aggregator()
Default constructor.
common::VoidResult add_observation(double value)
Add an observation.
double variance() const
Get variance.
std::optional< double > get_percentile(double p) const
Get specific percentile.
double stddev() const
Get standard deviation.
std::vector< double > outliers_
void reset()
Reset the aggregator.
size_t count() const
Get observation count.
Monitoring system specific error codes.
double pearson_correlation(const std::vector< double > &x, const std::vector< double > &y)
Calculate Pearson correlation coefficient.
Result pattern type definitions for monitoring system.
Extended error information with context.
Configuration for stream aggregator.
std::chrono::milliseconds window_duration
common::VoidResult validate() const
Validate configuration.
std::vector< double > percentiles_to_track
bool enable_outlier_detection
Statistical summary from streaming computation.
std::map< double, double > percentiles
std::vector< double > outliers