| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "util/rate_limiter.h"#include "monitoring/statistics.h"#include "port/port.h"#include "rocksdb/env.h"#include "test_util/sync_point.h"#include "util/aligned_buffer.h"namespace ROCKSDB_NAMESPACE {size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,                                 Env::IOPriority io_priority, Statistics* stats,                                 RateLimiter::OpType op_type) {  if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {    bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));    if (alignment > 0) {      // Here we may actually require more than burst and block      // but we can not write less than one page at a time on direct I/O      // thus we may want not to use ratelimiter      bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));    }    Request(bytes, io_priority, stats, op_type);  }  return bytes;}// Pending requeststruct GenericRateLimiter::Req {  explicit Req(int64_t _bytes, port::Mutex* _mu)      : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}  int64_t request_bytes;  int64_t bytes;  port::CondVar cv;  bool granted;};GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,                                       int64_t refill_period_us,                                       int32_t fairness, RateLimiter::Mode mode,                                       Env* env, bool auto_tuned)    : RateLimiter(mode),      refill_period_us_(refill_period_us),      rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2                                     : rate_bytes_per_sec),      refill_bytes_per_period_(          CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),      env_(env),      stop_(false),      exit_cv_(&request_mutex_),      requests_to_wait_(0),      available_bytes_(0),      next_refill_us_(NowMicrosMonotonic(env_)),      fairness_(fairness > 100 ? 100 : fairness),      rnd_((uint32_t)time(nullptr)),      leader_(nullptr),      auto_tuned_(auto_tuned),      num_drains_(0),      prev_num_drains_(0),      max_bytes_per_sec_(rate_bytes_per_sec),      tuned_time_(NowMicrosMonotonic(env_)) {  total_requests_[0] = 0;  total_requests_[1] = 0;  total_bytes_through_[0] = 0;  total_bytes_through_[1] = 0;}GenericRateLimiter::~GenericRateLimiter() {  MutexLock g(&request_mutex_);  stop_ = true;  requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +                                           queue_[Env::IO_HIGH].size());  for (auto& r : queue_[Env::IO_HIGH]) {    r->cv.Signal();  }  for (auto& r : queue_[Env::IO_LOW]) {    r->cv.Signal();  }  while (requests_to_wait_ > 0) {    exit_cv_.Wait();  }}// This API allows user to dynamically change rate limiter's bytes per second.void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {  assert(bytes_per_second > 0);  rate_bytes_per_sec_ = bytes_per_second;  refill_bytes_per_period_.store(      CalculateRefillBytesPerPeriod(bytes_per_second),      std::memory_order_relaxed);}void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,                                 Statistics* stats) {  assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));  TEST_SYNC_POINT("GenericRateLimiter::Request");  TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",                           &rate_bytes_per_sec_);  MutexLock g(&request_mutex_);  if (auto_tuned_) {    static const int kRefillsPerTune = 100;    std::chrono::microseconds now(NowMicrosMonotonic(env_));    if (now - tuned_time_ >=        kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {      Tune();    }  }  if (stop_) {    return;  }  ++total_requests_[pri];  if (available_bytes_ >= bytes) {    // Refill thread assigns quota and notifies requests waiting on    // the queue under mutex. So if we get here, that means nobody    // is waiting?    available_bytes_ -= bytes;    total_bytes_through_[pri] += bytes;    return;  }  // Request cannot be satisfied at this moment, enqueue  Req r(bytes, &request_mutex_);  queue_[pri].push_back(&r);  do {    bool timedout = false;    // Leader election, candidates can be:    // (1) a new incoming request,    // (2) a previous leader, whose quota has not been not assigned yet due    //     to lower priority    // (3) a previous waiter at the front of queue, who got notified by    //     previous leader    if (leader_ == nullptr &&        ((!queue_[Env::IO_HIGH].empty() &&            &r == queue_[Env::IO_HIGH].front()) ||         (!queue_[Env::IO_LOW].empty() &&            &r == queue_[Env::IO_LOW].front()))) {      leader_ = &r;      int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);      delta = delta > 0 ? delta : 0;      if (delta == 0) {        timedout = true;      } else {        int64_t wait_until = env_->NowMicros() + delta;        RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);        ++num_drains_;        timedout = r.cv.TimedWait(wait_until);      }    } else {      // Not at the front of queue or an leader has already been elected      r.cv.Wait();    }    // request_mutex_ is held from now on    if (stop_) {      --requests_to_wait_;      exit_cv_.Signal();      return;    }    // Make sure the waken up request is always the header of its queue    assert(r.granted ||           (!queue_[Env::IO_HIGH].empty() &&            &r == queue_[Env::IO_HIGH].front()) ||           (!queue_[Env::IO_LOW].empty() &&            &r == queue_[Env::IO_LOW].front()));    assert(leader_ == nullptr ||           (!queue_[Env::IO_HIGH].empty() &&            leader_ == queue_[Env::IO_HIGH].front()) ||           (!queue_[Env::IO_LOW].empty() &&            leader_ == queue_[Env::IO_LOW].front()));    if (leader_ == &r) {      // Waken up from TimedWait()      if (timedout) {        // Time to do refill!        Refill();        // Re-elect a new leader regardless. This is to simplify the        // election handling.        leader_ = nullptr;        // Notify the header of queue if current leader is going away        if (r.granted) {          // Current leader already got granted with quota. Notify header          // of waiting queue to participate next round of election.          assert((queue_[Env::IO_HIGH].empty() ||                    &r != queue_[Env::IO_HIGH].front()) &&                 (queue_[Env::IO_LOW].empty() ||                    &r != queue_[Env::IO_LOW].front()));          if (!queue_[Env::IO_HIGH].empty()) {            queue_[Env::IO_HIGH].front()->cv.Signal();          } else if (!queue_[Env::IO_LOW].empty()) {            queue_[Env::IO_LOW].front()->cv.Signal();          }          // Done          break;        }      } else {        // Spontaneous wake up, need to continue to wait        assert(!r.granted);        leader_ = nullptr;      }    } else {      // Waken up by previous leader:      // (1) if requested quota is granted, it is done.      // (2) if requested quota is not granted, this means current thread      // was picked as a new leader candidate (previous leader got quota).      // It needs to participate leader election because a new request may      // come in before this thread gets waken up. So it may actually need      // to do Wait() again.      assert(!timedout);    }  } while (!r.granted);}void GenericRateLimiter::Refill() {  TEST_SYNC_POINT("GenericRateLimiter::Refill");  next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;  // Carry over the left over quota from the last period  auto refill_bytes_per_period =      refill_bytes_per_period_.load(std::memory_order_relaxed);  if (available_bytes_ < refill_bytes_per_period) {    available_bytes_ += refill_bytes_per_period;  }  int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;  for (int q = 0; q < 2; ++q) {    auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;    auto* queue = &queue_[use_pri];    while (!queue->empty()) {      auto* next_req = queue->front();      if (available_bytes_ < next_req->request_bytes) {        // avoid starvation        next_req->request_bytes -= available_bytes_;        available_bytes_ = 0;        break;      }      available_bytes_ -= next_req->request_bytes;      next_req->request_bytes = 0;      total_bytes_through_[use_pri] += next_req->bytes;      queue->pop_front();      next_req->granted = true;      if (next_req != leader_) {        // Quota granted, signal the thread        next_req->cv.Signal();      }    }  }}int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(    int64_t rate_bytes_per_sec) {  if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) {    // Avoid unexpected result in the overflow case. The result now is still    // inaccurate but is a number that is large enough.    return port::kMaxInt64 / 1000000;  } else {    return std::max(kMinRefillBytesPerPeriod,                    rate_bytes_per_sec * refill_period_us_ / 1000000);  }}Status GenericRateLimiter::Tune() {  const int kLowWatermarkPct = 50;  const int kHighWatermarkPct = 90;  const int kAdjustFactorPct = 5;  // computed rate limit will be in  // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.  const int kAllowedRangeFactor = 20;  std::chrono::microseconds prev_tuned_time = tuned_time_;  tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));  int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +                               std::chrono::microseconds(refill_period_us_) -                               std::chrono::microseconds(1)) /                              std::chrono::microseconds(refill_period_us_);  // We tune every kRefillsPerTune intervals, so the overflow and division-by-  // zero conditions should never happen.  assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);  assert(elapsed_intervals > 0);  int64_t drained_pct =      (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;  int64_t prev_bytes_per_sec = GetBytesPerSecond();  int64_t new_bytes_per_sec;  if (drained_pct == 0) {    new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;  } else if (drained_pct < kLowWatermarkPct) {    // sanitize to prevent overflow    int64_t sanitized_prev_bytes_per_sec =        std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);    new_bytes_per_sec =        std::max(max_bytes_per_sec_ / kAllowedRangeFactor,                 sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));  } else if (drained_pct > kHighWatermarkPct) {    // sanitize to prevent overflow    int64_t sanitized_prev_bytes_per_sec = std::min(        prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));    new_bytes_per_sec =        std::min(max_bytes_per_sec_,                 sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);  } else {    new_bytes_per_sec = prev_bytes_per_sec;  }  if (new_bytes_per_sec != prev_bytes_per_sec) {    SetBytesPerSecond(new_bytes_per_sec);  }  num_drains_ = prev_num_drains_;  return Status::OK();}RateLimiter* NewGenericRateLimiter(    int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,    int32_t fairness /* = 10 */,    RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,    bool auto_tuned /* = false */) {  assert(rate_bytes_per_sec > 0);  assert(refill_period_us > 0);  assert(fairness > 0);  return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,                                mode, Env::Default(), auto_tuned);}}  // namespace ROCKSDB_NAMESPACE
 |