write_controller.cc 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/write_controller.h"
  6. #include <atomic>
  7. #include <cassert>
  8. #include <ratio>
  9. #include "rocksdb/env.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
  12. ++total_stopped_;
  13. return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
  14. }
  15. std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
  16. uint64_t write_rate) {
  17. total_delayed_++;
  18. // Reset counters.
  19. last_refill_time_ = 0;
  20. bytes_left_ = 0;
  21. set_delayed_write_rate(write_rate);
  22. return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
  23. }
  24. std::unique_ptr<WriteControllerToken>
  25. WriteController::GetCompactionPressureToken() {
  26. ++total_compaction_pressure_;
  27. return std::unique_ptr<WriteControllerToken>(
  28. new CompactionPressureToken(this));
  29. }
  30. bool WriteController::IsStopped() const {
  31. return total_stopped_.load(std::memory_order_relaxed) > 0;
  32. }
  33. // This is inside DB mutex, so we can't sleep and need to minimize
  34. // frequency to get time.
  35. // If it turns out to be a performance issue, we can redesign the thread
  36. // synchronization model here.
  37. // The function trust caller will sleep micros returned.
  38. uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
  39. if (total_stopped_.load(std::memory_order_relaxed) > 0) {
  40. return 0;
  41. }
  42. if (total_delayed_.load(std::memory_order_relaxed) == 0) {
  43. return 0;
  44. }
  45. const uint64_t kMicrosPerSecond = 1000000;
  46. const uint64_t kRefillInterval = 1024U;
  47. if (bytes_left_ >= num_bytes) {
  48. bytes_left_ -= num_bytes;
  49. return 0;
  50. }
  51. // The frequency to get time inside DB mutex is less than one per refill
  52. // interval.
  53. auto time_now = NowMicrosMonotonic(env);
  54. uint64_t sleep_debt = 0;
  55. uint64_t time_since_last_refill = 0;
  56. if (last_refill_time_ != 0) {
  57. if (last_refill_time_ > time_now) {
  58. sleep_debt = last_refill_time_ - time_now;
  59. } else {
  60. time_since_last_refill = time_now - last_refill_time_;
  61. bytes_left_ +=
  62. static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
  63. kMicrosPerSecond * delayed_write_rate_);
  64. if (time_since_last_refill >= kRefillInterval &&
  65. bytes_left_ > num_bytes) {
  66. // If refill interval already passed and we have enough bytes
  67. // return without extra sleeping.
  68. last_refill_time_ = time_now;
  69. bytes_left_ -= num_bytes;
  70. return 0;
  71. }
  72. }
  73. }
  74. uint64_t single_refill_amount =
  75. delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
  76. if (bytes_left_ + single_refill_amount >= num_bytes) {
  77. // Wait until a refill interval
  78. // Never trigger expire for less than one refill interval to avoid to get
  79. // time.
  80. bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
  81. last_refill_time_ = time_now + kRefillInterval;
  82. return kRefillInterval + sleep_debt;
  83. }
  84. // Need to refill more than one interval. Need to sleep longer. Check
  85. // whether expiration will hit
  86. // Sleep just until `num_bytes` is allowed.
  87. uint64_t sleep_amount =
  88. static_cast<uint64_t>(num_bytes /
  89. static_cast<long double>(delayed_write_rate_) *
  90. kMicrosPerSecond) +
  91. sleep_debt;
  92. last_refill_time_ = time_now + sleep_amount;
  93. return sleep_amount;
  94. }
  95. uint64_t WriteController::NowMicrosMonotonic(Env* env) {
  96. return env->NowNanos() / std::milli::den;
  97. }
  98. StopWriteToken::~StopWriteToken() {
  99. assert(controller_->total_stopped_ >= 1);
  100. --controller_->total_stopped_;
  101. }
  102. DelayWriteToken::~DelayWriteToken() {
  103. controller_->total_delayed_--;
  104. assert(controller_->total_delayed_.load() >= 0);
  105. }
  106. CompactionPressureToken::~CompactionPressureToken() {
  107. controller_->total_compaction_pressure_--;
  108. assert(controller_->total_compaction_pressure_ >= 0);
  109. }
  110. } // namespace ROCKSDB_NAMESPACE