histogram_windowing.cc 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "monitoring/histogram_windowing.h"
  10. #include "monitoring/histogram.h"
  11. #include "util/cast_util.h"
  12. #include <algorithm>
  13. namespace ROCKSDB_NAMESPACE {
  14. HistogramWindowingImpl::HistogramWindowingImpl() {
  15. env_ = Env::Default();
  16. window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
  17. Clear();
  18. }
  19. HistogramWindowingImpl::HistogramWindowingImpl(
  20. uint64_t num_windows,
  21. uint64_t micros_per_window,
  22. uint64_t min_num_per_window) :
  23. num_windows_(num_windows),
  24. micros_per_window_(micros_per_window),
  25. min_num_per_window_(min_num_per_window) {
  26. env_ = Env::Default();
  27. window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
  28. Clear();
  29. }
  30. HistogramWindowingImpl::~HistogramWindowingImpl() {
  31. }
  32. void HistogramWindowingImpl::Clear() {
  33. std::lock_guard<std::mutex> lock(mutex_);
  34. stats_.Clear();
  35. for (size_t i = 0; i < num_windows_; i++) {
  36. window_stats_[i].Clear();
  37. }
  38. current_window_.store(0, std::memory_order_relaxed);
  39. last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
  40. }
  41. bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
  42. // This function is designed to be lock free, as it's in the critical path
  43. // of any operation.
  44. // Each individual value is atomic, it is just that some samples can go
  45. // in the older bucket which is tolerable.
  46. void HistogramWindowingImpl::Add(uint64_t value){
  47. TimerTick();
  48. // Parent (global) member update
  49. stats_.Add(value);
  50. // Current window update
  51. window_stats_[static_cast<size_t>(current_window())].Add(value);
  52. }
  53. void HistogramWindowingImpl::Merge(const Histogram& other) {
  54. if (strcmp(Name(), other.Name()) == 0) {
  55. Merge(
  56. *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
  57. &other));
  58. }
  59. }
  60. void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
  61. std::lock_guard<std::mutex> lock(mutex_);
  62. stats_.Merge(other.stats_);
  63. if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
  64. micros_per_window_ != other.micros_per_window_) {
  65. return;
  66. }
  67. uint64_t cur_window = current_window();
  68. uint64_t other_cur_window = other.current_window();
  69. // going backwards for alignment
  70. for (unsigned int i = 0;
  71. i < std::min(num_windows_, other.num_windows_); i++) {
  72. uint64_t window_index =
  73. (cur_window + num_windows_ - i) % num_windows_;
  74. uint64_t other_window_index =
  75. (other_cur_window + other.num_windows_ - i) % other.num_windows_;
  76. size_t windex = static_cast<size_t>(window_index);
  77. size_t other_windex = static_cast<size_t>(other_window_index);
  78. window_stats_[windex].Merge(
  79. other.window_stats_[other_windex]);
  80. }
  81. }
  82. std::string HistogramWindowingImpl::ToString() const {
  83. return stats_.ToString();
  84. }
  85. double HistogramWindowingImpl::Median() const {
  86. return Percentile(50.0);
  87. }
  88. double HistogramWindowingImpl::Percentile(double p) const {
  89. // Retry 3 times in total
  90. for (int retry = 0; retry < 3; retry++) {
  91. uint64_t start_num = stats_.num();
  92. double result = stats_.Percentile(p);
  93. // Detect if swap buckets or Clear() was called during calculation
  94. if (stats_.num() >= start_num) {
  95. return result;
  96. }
  97. }
  98. return 0.0;
  99. }
  100. double HistogramWindowingImpl::Average() const {
  101. return stats_.Average();
  102. }
  103. double HistogramWindowingImpl::StandardDeviation() const {
  104. return stats_.StandardDeviation();
  105. }
  106. void HistogramWindowingImpl::Data(HistogramData * const data) const {
  107. stats_.Data(data);
  108. }
  109. void HistogramWindowingImpl::TimerTick() {
  110. uint64_t curr_time = env_->NowMicros();
  111. size_t curr_window_ = static_cast<size_t>(current_window());
  112. if (curr_time - last_swap_time() > micros_per_window_ &&
  113. window_stats_[curr_window_].num() >= min_num_per_window_) {
  114. SwapHistoryBucket();
  115. }
  116. }
  117. void HistogramWindowingImpl::SwapHistoryBucket() {
  118. // Threads executing Add() would be competing for this mutex, the first one
  119. // who got the metex would take care of the bucket swap, other threads
  120. // can skip this.
  121. // If mutex is held by Merge() or Clear(), next Add() will take care of the
  122. // swap, if needed.
  123. if (mutex_.try_lock()) {
  124. last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
  125. uint64_t curr_window = current_window();
  126. uint64_t next_window = (curr_window == num_windows_ - 1) ?
  127. 0 : curr_window + 1;
  128. // subtract next buckets from totals and swap to next buckets
  129. HistogramStat& stats_to_drop =
  130. window_stats_[static_cast<size_t>(next_window)];
  131. if (!stats_to_drop.Empty()) {
  132. for (size_t b = 0; b < stats_.num_buckets_; b++){
  133. stats_.buckets_[b].fetch_sub(
  134. stats_to_drop.bucket_at(b), std::memory_order_relaxed);
  135. }
  136. if (stats_.min() == stats_to_drop.min()) {
  137. uint64_t new_min = std::numeric_limits<uint64_t>::max();
  138. for (unsigned int i = 0; i < num_windows_; i++) {
  139. if (i != next_window) {
  140. uint64_t m = window_stats_[i].min();
  141. if (m < new_min) new_min = m;
  142. }
  143. }
  144. stats_.min_.store(new_min, std::memory_order_relaxed);
  145. }
  146. if (stats_.max() == stats_to_drop.max()) {
  147. uint64_t new_max = 0;
  148. for (unsigned int i = 0; i < num_windows_; i++) {
  149. if (i != next_window) {
  150. uint64_t m = window_stats_[i].max();
  151. if (m > new_max) new_max = m;
  152. }
  153. }
  154. stats_.max_.store(new_max, std::memory_order_relaxed);
  155. }
  156. stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
  157. stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
  158. stats_.sum_squares_.fetch_sub(
  159. stats_to_drop.sum_squares(), std::memory_order_relaxed);
  160. stats_to_drop.Clear();
  161. }
  162. // advance to next window bucket
  163. current_window_.store(next_window, std::memory_order_relaxed);
  164. mutex_.unlock();
  165. }
  166. }
  167. } // namespace ROCKSDB_NAMESPACE