histogram_windowing.cc 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "monitoring/histogram_windowing.h"
  10. #include <algorithm>
  11. #include "monitoring/histogram.h"
  12. #include "rocksdb/system_clock.h"
  13. #include "util/cast_util.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. HistogramWindowingImpl::HistogramWindowingImpl() {
  16. clock_ = SystemClock::Default();
  17. window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
  18. Clear();
  19. }
  20. HistogramWindowingImpl::HistogramWindowingImpl(uint64_t num_windows,
  21. uint64_t micros_per_window,
  22. uint64_t min_num_per_window)
  23. : num_windows_(num_windows),
  24. micros_per_window_(micros_per_window),
  25. min_num_per_window_(min_num_per_window) {
  26. clock_ = SystemClock::Default();
  27. window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
  28. Clear();
  29. }
  30. HistogramWindowingImpl::~HistogramWindowingImpl() = default;
  31. void HistogramWindowingImpl::Clear() {
  32. std::lock_guard<std::mutex> lock(mutex_);
  33. stats_.Clear();
  34. for (size_t i = 0; i < num_windows_; i++) {
  35. window_stats_[i].Clear();
  36. }
  37. current_window_.store(0, std::memory_order_relaxed);
  38. last_swap_time_.store(clock_->NowMicros(), std::memory_order_relaxed);
  39. }
  40. bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
  41. // This function is designed to be lock free, as it's in the critical path
  42. // of any operation.
  43. // Each individual value is atomic, it is just that some samples can go
  44. // in the older bucket which is tolerable.
  45. void HistogramWindowingImpl::Add(uint64_t value) {
  46. TimerTick();
  47. // Parent (global) member update
  48. stats_.Add(value);
  49. // Current window update
  50. window_stats_[static_cast<size_t>(current_window())].Add(value);
  51. }
  52. void HistogramWindowingImpl::Merge(const Histogram& other) {
  53. if (strcmp(Name(), other.Name()) == 0) {
  54. Merge(*static_cast_with_check<const HistogramWindowingImpl>(&other));
  55. }
  56. }
  57. void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
  58. std::lock_guard<std::mutex> lock(mutex_);
  59. stats_.Merge(other.stats_);
  60. if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
  61. micros_per_window_ != other.micros_per_window_) {
  62. return;
  63. }
  64. uint64_t cur_window = current_window();
  65. uint64_t other_cur_window = other.current_window();
  66. // going backwards for alignment
  67. for (unsigned int i = 0; i < std::min(num_windows_, other.num_windows_);
  68. i++) {
  69. uint64_t window_index = (cur_window + num_windows_ - i) % num_windows_;
  70. uint64_t other_window_index =
  71. (other_cur_window + other.num_windows_ - i) % other.num_windows_;
  72. size_t windex = static_cast<size_t>(window_index);
  73. size_t other_windex = static_cast<size_t>(other_window_index);
  74. window_stats_[windex].Merge(other.window_stats_[other_windex]);
  75. }
  76. }
  77. std::string HistogramWindowingImpl::ToString() const {
  78. return stats_.ToString();
  79. }
  80. double HistogramWindowingImpl::Median() const { return Percentile(50.0); }
  81. double HistogramWindowingImpl::Percentile(double p) const {
  82. // Retry 3 times in total
  83. for (int retry = 0; retry < 3; retry++) {
  84. uint64_t start_num = stats_.num();
  85. double result = stats_.Percentile(p);
  86. // Detect if swap buckets or Clear() was called during calculation
  87. if (stats_.num() >= start_num) {
  88. return result;
  89. }
  90. }
  91. return 0.0;
  92. }
  93. double HistogramWindowingImpl::Average() const { return stats_.Average(); }
  94. double HistogramWindowingImpl::StandardDeviation() const {
  95. return stats_.StandardDeviation();
  96. }
  97. void HistogramWindowingImpl::Data(HistogramData* const data) const {
  98. stats_.Data(data);
  99. }
  100. void HistogramWindowingImpl::TimerTick() {
  101. uint64_t curr_time = clock_->NowMicros();
  102. size_t curr_window_ = static_cast<size_t>(current_window());
  103. if (curr_time - last_swap_time() > micros_per_window_ &&
  104. window_stats_[curr_window_].num() >= min_num_per_window_) {
  105. SwapHistoryBucket();
  106. }
  107. }
  108. void HistogramWindowingImpl::SwapHistoryBucket() {
  109. // Threads executing Add() would be competing for this mutex, the first one
  110. // who got the metex would take care of the bucket swap, other threads
  111. // can skip this.
  112. // If mutex is held by Merge() or Clear(), next Add() will take care of the
  113. // swap, if needed.
  114. if (mutex_.try_lock()) {
  115. last_swap_time_.store(clock_->NowMicros(), std::memory_order_relaxed);
  116. uint64_t curr_window = current_window();
  117. uint64_t next_window =
  118. (curr_window == num_windows_ - 1) ? 0 : curr_window + 1;
  119. // subtract next buckets from totals and swap to next buckets
  120. HistogramStat& stats_to_drop =
  121. window_stats_[static_cast<size_t>(next_window)];
  122. if (!stats_to_drop.Empty()) {
  123. for (size_t b = 0; b < stats_.num_buckets_; b++) {
  124. stats_.buckets_[b].fetch_sub(stats_to_drop.bucket_at(b),
  125. std::memory_order_relaxed);
  126. }
  127. if (stats_.min() == stats_to_drop.min()) {
  128. uint64_t new_min = std::numeric_limits<uint64_t>::max();
  129. for (unsigned int i = 0; i < num_windows_; i++) {
  130. if (i != next_window) {
  131. uint64_t m = window_stats_[i].min();
  132. if (m < new_min) {
  133. new_min = m;
  134. }
  135. }
  136. }
  137. stats_.min_.store(new_min, std::memory_order_relaxed);
  138. }
  139. if (stats_.max() == stats_to_drop.max()) {
  140. uint64_t new_max = 0;
  141. for (unsigned int i = 0; i < num_windows_; i++) {
  142. if (i != next_window) {
  143. uint64_t m = window_stats_[i].max();
  144. if (m > new_max) {
  145. new_max = m;
  146. }
  147. }
  148. }
  149. stats_.max_.store(new_max, std::memory_order_relaxed);
  150. }
  151. stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
  152. stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
  153. stats_.sum_squares_.fetch_sub(stats_to_drop.sum_squares(),
  154. std::memory_order_relaxed);
  155. stats_to_drop.Clear();
  156. }
  157. // advance to next window bucket
  158. current_window_.store(next_window, std::memory_order_relaxed);
  159. mutex_.unlock();
  160. }
  161. }
  162. } // namespace ROCKSDB_NAMESPACE