| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#include "db/write_controller.h"#include <atomic>#include <cassert>#include <ratio>#include "rocksdb/env.h"namespace ROCKSDB_NAMESPACE {std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {  ++total_stopped_;  return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));}std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(    uint64_t write_rate) {  total_delayed_++;  // Reset counters.  last_refill_time_ = 0;  bytes_left_ = 0;  set_delayed_write_rate(write_rate);  return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));}std::unique_ptr<WriteControllerToken>WriteController::GetCompactionPressureToken() {  ++total_compaction_pressure_;  return std::unique_ptr<WriteControllerToken>(      new CompactionPressureToken(this));}bool WriteController::IsStopped() const {  return total_stopped_.load(std::memory_order_relaxed) > 0;}// This is inside DB mutex, so we can't sleep and need to minimize// frequency to get time.// If it turns out to be a performance issue, we can redesign the thread// synchronization model here.// The function trust caller will sleep micros returned.uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {  if (total_stopped_.load(std::memory_order_relaxed) > 0) {    return 0;  }  if (total_delayed_.load(std::memory_order_relaxed) == 0) {    return 0;  }  const uint64_t kMicrosPerSecond = 1000000;  const uint64_t kRefillInterval = 1024U;  if (bytes_left_ >= num_bytes) {    bytes_left_ -= num_bytes;    return 0;  }  // The frequency to get time inside DB mutex is less than one per refill  // interval.  auto time_now = NowMicrosMonotonic(env);  uint64_t sleep_debt = 0;  uint64_t time_since_last_refill = 0;  if (last_refill_time_ != 0) {    if (last_refill_time_ > time_now) {      sleep_debt = last_refill_time_ - time_now;    } else {      time_since_last_refill = time_now - last_refill_time_;      bytes_left_ +=          static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /                                kMicrosPerSecond * delayed_write_rate_);      if (time_since_last_refill >= kRefillInterval &&          bytes_left_ > num_bytes) {        // If refill interval already passed and we have enough bytes        // return without extra sleeping.        last_refill_time_ = time_now;        bytes_left_ -= num_bytes;        return 0;      }    }  }  uint64_t single_refill_amount =      delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;  if (bytes_left_ + single_refill_amount >= num_bytes) {    // Wait until a refill interval    // Never trigger expire for less than one refill interval to avoid to get    // time.    bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;    last_refill_time_ = time_now + kRefillInterval;    return kRefillInterval + sleep_debt;  }  // Need to refill more than one interval. Need to sleep longer. Check  // whether expiration will hit  // Sleep just until `num_bytes` is allowed.  uint64_t sleep_amount =      static_cast<uint64_t>(num_bytes /                            static_cast<long double>(delayed_write_rate_) *                            kMicrosPerSecond) +      sleep_debt;  last_refill_time_ = time_now + sleep_amount;  return sleep_amount;}uint64_t WriteController::NowMicrosMonotonic(Env* env) {  return env->NowNanos() / std::milli::den;}StopWriteToken::~StopWriteToken() {  assert(controller_->total_stopped_ >= 1);  --controller_->total_stopped_;}DelayWriteToken::~DelayWriteToken() {  controller_->total_delayed_--;  assert(controller_->total_delayed_.load() >= 0);}CompactionPressureToken::~CompactionPressureToken() {  controller_->total_compaction_pressure_--;  assert(controller_->total_compaction_pressure_ >= 0);}}  // namespace ROCKSDB_NAMESPACE
 |