| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- // Copyright (c) 2013, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- //
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "monitoring/histogram_windowing.h"
- #include "monitoring/histogram.h"
- #include "util/cast_util.h"
- #include <algorithm>
- namespace ROCKSDB_NAMESPACE {
- HistogramWindowingImpl::HistogramWindowingImpl() {
- env_ = Env::Default();
- window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
- Clear();
- }
- HistogramWindowingImpl::HistogramWindowingImpl(
- uint64_t num_windows,
- uint64_t micros_per_window,
- uint64_t min_num_per_window) :
- num_windows_(num_windows),
- micros_per_window_(micros_per_window),
- min_num_per_window_(min_num_per_window) {
- env_ = Env::Default();
- window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
- Clear();
- }
- HistogramWindowingImpl::~HistogramWindowingImpl() {
- }
- void HistogramWindowingImpl::Clear() {
- std::lock_guard<std::mutex> lock(mutex_);
- stats_.Clear();
- for (size_t i = 0; i < num_windows_; i++) {
- window_stats_[i].Clear();
- }
- current_window_.store(0, std::memory_order_relaxed);
- last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
- }
- bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
- // This function is designed to be lock free, as it's in the critical path
- // of any operation.
- // Each individual value is atomic, it is just that some samples can go
- // in the older bucket which is tolerable.
- void HistogramWindowingImpl::Add(uint64_t value){
- TimerTick();
- // Parent (global) member update
- stats_.Add(value);
- // Current window update
- window_stats_[static_cast<size_t>(current_window())].Add(value);
- }
- void HistogramWindowingImpl::Merge(const Histogram& other) {
- if (strcmp(Name(), other.Name()) == 0) {
- Merge(
- *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
- &other));
- }
- }
- void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
- std::lock_guard<std::mutex> lock(mutex_);
- stats_.Merge(other.stats_);
- if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
- micros_per_window_ != other.micros_per_window_) {
- return;
- }
- uint64_t cur_window = current_window();
- uint64_t other_cur_window = other.current_window();
- // going backwards for alignment
- for (unsigned int i = 0;
- i < std::min(num_windows_, other.num_windows_); i++) {
- uint64_t window_index =
- (cur_window + num_windows_ - i) % num_windows_;
- uint64_t other_window_index =
- (other_cur_window + other.num_windows_ - i) % other.num_windows_;
- size_t windex = static_cast<size_t>(window_index);
- size_t other_windex = static_cast<size_t>(other_window_index);
- window_stats_[windex].Merge(
- other.window_stats_[other_windex]);
- }
- }
- std::string HistogramWindowingImpl::ToString() const {
- return stats_.ToString();
- }
- double HistogramWindowingImpl::Median() const {
- return Percentile(50.0);
- }
- double HistogramWindowingImpl::Percentile(double p) const {
- // Retry 3 times in total
- for (int retry = 0; retry < 3; retry++) {
- uint64_t start_num = stats_.num();
- double result = stats_.Percentile(p);
- // Detect if swap buckets or Clear() was called during calculation
- if (stats_.num() >= start_num) {
- return result;
- }
- }
- return 0.0;
- }
- double HistogramWindowingImpl::Average() const {
- return stats_.Average();
- }
- double HistogramWindowingImpl::StandardDeviation() const {
- return stats_.StandardDeviation();
- }
- void HistogramWindowingImpl::Data(HistogramData * const data) const {
- stats_.Data(data);
- }
- void HistogramWindowingImpl::TimerTick() {
- uint64_t curr_time = env_->NowMicros();
- size_t curr_window_ = static_cast<size_t>(current_window());
- if (curr_time - last_swap_time() > micros_per_window_ &&
- window_stats_[curr_window_].num() >= min_num_per_window_) {
- SwapHistoryBucket();
- }
- }
- void HistogramWindowingImpl::SwapHistoryBucket() {
- // Threads executing Add() would be competing for this mutex, the first one
- // who got the metex would take care of the bucket swap, other threads
- // can skip this.
- // If mutex is held by Merge() or Clear(), next Add() will take care of the
- // swap, if needed.
- if (mutex_.try_lock()) {
- last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
- uint64_t curr_window = current_window();
- uint64_t next_window = (curr_window == num_windows_ - 1) ?
- 0 : curr_window + 1;
- // subtract next buckets from totals and swap to next buckets
- HistogramStat& stats_to_drop =
- window_stats_[static_cast<size_t>(next_window)];
- if (!stats_to_drop.Empty()) {
- for (size_t b = 0; b < stats_.num_buckets_; b++){
- stats_.buckets_[b].fetch_sub(
- stats_to_drop.bucket_at(b), std::memory_order_relaxed);
- }
- if (stats_.min() == stats_to_drop.min()) {
- uint64_t new_min = std::numeric_limits<uint64_t>::max();
- for (unsigned int i = 0; i < num_windows_; i++) {
- if (i != next_window) {
- uint64_t m = window_stats_[i].min();
- if (m < new_min) new_min = m;
- }
- }
- stats_.min_.store(new_min, std::memory_order_relaxed);
- }
- if (stats_.max() == stats_to_drop.max()) {
- uint64_t new_max = 0;
- for (unsigned int i = 0; i < num_windows_; i++) {
- if (i != next_window) {
- uint64_t m = window_stats_[i].max();
- if (m > new_max) new_max = m;
- }
- }
- stats_.max_.store(new_max, std::memory_order_relaxed);
- }
- stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
- stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
- stats_.sum_squares_.fetch_sub(
- stats_to_drop.sum_squares(), std::memory_order_relaxed);
- stats_to_drop.Clear();
- }
- // advance to next window bucket
- current_window_.store(next_window, std::memory_order_relaxed);
- mutex_.unlock();
- }
- }
- } // namespace ROCKSDB_NAMESPACE
|