| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "db/write_controller.h"
- #include <atomic>
- #include <cassert>
- #include <ratio>
- #include "rocksdb/env.h"
- namespace ROCKSDB_NAMESPACE {
- std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
- ++total_stopped_;
- return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
- }
- std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
- uint64_t write_rate) {
- total_delayed_++;
- // Reset counters.
- last_refill_time_ = 0;
- bytes_left_ = 0;
- set_delayed_write_rate(write_rate);
- return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
- }
- std::unique_ptr<WriteControllerToken>
- WriteController::GetCompactionPressureToken() {
- ++total_compaction_pressure_;
- return std::unique_ptr<WriteControllerToken>(
- new CompactionPressureToken(this));
- }
- bool WriteController::IsStopped() const {
- return total_stopped_.load(std::memory_order_relaxed) > 0;
- }
- // This is inside DB mutex, so we can't sleep and need to minimize
- // frequency to get time.
- // If it turns out to be a performance issue, we can redesign the thread
- // synchronization model here.
- // The function trust caller will sleep micros returned.
- uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
- if (total_stopped_.load(std::memory_order_relaxed) > 0) {
- return 0;
- }
- if (total_delayed_.load(std::memory_order_relaxed) == 0) {
- return 0;
- }
- const uint64_t kMicrosPerSecond = 1000000;
- const uint64_t kRefillInterval = 1024U;
- if (bytes_left_ >= num_bytes) {
- bytes_left_ -= num_bytes;
- return 0;
- }
- // The frequency to get time inside DB mutex is less than one per refill
- // interval.
- auto time_now = NowMicrosMonotonic(env);
- uint64_t sleep_debt = 0;
- uint64_t time_since_last_refill = 0;
- if (last_refill_time_ != 0) {
- if (last_refill_time_ > time_now) {
- sleep_debt = last_refill_time_ - time_now;
- } else {
- time_since_last_refill = time_now - last_refill_time_;
- bytes_left_ +=
- static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
- kMicrosPerSecond * delayed_write_rate_);
- if (time_since_last_refill >= kRefillInterval &&
- bytes_left_ > num_bytes) {
- // If refill interval already passed and we have enough bytes
- // return without extra sleeping.
- last_refill_time_ = time_now;
- bytes_left_ -= num_bytes;
- return 0;
- }
- }
- }
- uint64_t single_refill_amount =
- delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
- if (bytes_left_ + single_refill_amount >= num_bytes) {
- // Wait until a refill interval
- // Never trigger expire for less than one refill interval to avoid to get
- // time.
- bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
- last_refill_time_ = time_now + kRefillInterval;
- return kRefillInterval + sleep_debt;
- }
- // Need to refill more than one interval. Need to sleep longer. Check
- // whether expiration will hit
- // Sleep just until `num_bytes` is allowed.
- uint64_t sleep_amount =
- static_cast<uint64_t>(num_bytes /
- static_cast<long double>(delayed_write_rate_) *
- kMicrosPerSecond) +
- sleep_debt;
- last_refill_time_ = time_now + sleep_amount;
- return sleep_amount;
- }
- uint64_t WriteController::NowMicrosMonotonic(Env* env) {
- return env->NowNanos() / std::milli::den;
- }
- StopWriteToken::~StopWriteToken() {
- assert(controller_->total_stopped_ >= 1);
- --controller_->total_stopped_;
- }
- DelayWriteToken::~DelayWriteToken() {
- controller_->total_delayed_--;
- assert(controller_->total_delayed_.load() >= 0);
- }
- CompactionPressureToken::~CompactionPressureToken() {
- controller_->total_compaction_pressure_--;
- assert(controller_->total_compaction_pressure_ >= 0);
- }
- } // namespace ROCKSDB_NAMESPACE
|