write_controller.cc 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/write_controller.h"
  6. #include <algorithm>
  7. #include <atomic>
  8. #include <cassert>
  9. #include <ratio>
  10. #include "rocksdb/system_clock.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
  13. ++total_stopped_;
  14. return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
  15. }
  16. std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
  17. uint64_t write_rate) {
  18. if (0 == total_delayed_++) {
  19. // Starting delay, so reset counters.
  20. next_refill_time_ = 0;
  21. credit_in_bytes_ = 0;
  22. }
  23. // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
  24. // next_refill_time_ will be based on an old rate. This rate will apply
  25. // for subsequent additional debts and for the next refill.
  26. set_delayed_write_rate(write_rate);
  27. return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
  28. }
  29. std::unique_ptr<WriteControllerToken>
  30. WriteController::GetCompactionPressureToken() {
  31. ++total_compaction_pressure_;
  32. return std::unique_ptr<WriteControllerToken>(
  33. new CompactionPressureToken(this));
  34. }
  35. bool WriteController::IsStopped() const {
  36. return total_stopped_.load(std::memory_order_relaxed) > 0;
  37. }
  38. // This is inside DB mutex, so we can't sleep and need to minimize
  39. // frequency to get time.
  40. // If it turns out to be a performance issue, we can redesign the thread
  41. // synchronization model here.
  42. // The function trust caller will sleep micros returned.
  43. uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
  44. if (total_stopped_.load(std::memory_order_relaxed) > 0) {
  45. return 0;
  46. }
  47. if (total_delayed_.load(std::memory_order_relaxed) == 0) {
  48. return 0;
  49. }
  50. if (credit_in_bytes_ >= num_bytes) {
  51. credit_in_bytes_ -= num_bytes;
  52. return 0;
  53. }
  54. // The frequency to get time inside DB mutex is less than one per refill
  55. // interval.
  56. auto time_now = NowMicrosMonotonic(clock);
  57. const uint64_t kMicrosPerSecond = 1000000;
  58. // Refill every 1 ms
  59. const uint64_t kMicrosPerRefill = 1000;
  60. if (next_refill_time_ == 0) {
  61. // Start with an initial allotment of bytes for one interval
  62. next_refill_time_ = time_now;
  63. }
  64. if (next_refill_time_ <= time_now) {
  65. // Refill based on time interval plus any extra elapsed
  66. uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
  67. credit_in_bytes_ += static_cast<uint64_t>(
  68. 1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
  69. next_refill_time_ = time_now + kMicrosPerRefill;
  70. if (credit_in_bytes_ >= num_bytes) {
  71. // Avoid delay if possible, to reduce DB mutex release & re-aquire.
  72. credit_in_bytes_ -= num_bytes;
  73. return 0;
  74. }
  75. }
  76. // We need to delay to avoid exceeding write rate.
  77. assert(num_bytes > credit_in_bytes_);
  78. uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
  79. uint64_t needed_delay = static_cast<uint64_t>(
  80. 1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
  81. credit_in_bytes_ = 0;
  82. next_refill_time_ += needed_delay;
  83. // Minimum delay of refill interval, to reduce DB mutex contention.
  84. return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
  85. }
  86. uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
  87. return clock->NowNanos() / std::milli::den;
  88. }
  89. StopWriteToken::~StopWriteToken() {
  90. assert(controller_->total_stopped_ >= 1);
  91. --controller_->total_stopped_;
  92. }
  93. DelayWriteToken::~DelayWriteToken() {
  94. controller_->total_delayed_--;
  95. assert(controller_->total_delayed_.load() >= 0);
  96. }
  97. CompactionPressureToken::~CompactionPressureToken() {
  98. controller_->total_compaction_pressure_--;
  99. assert(controller_->total_compaction_pressure_ >= 0);
  100. }
  101. } // namespace ROCKSDB_NAMESPACE