write_controller.h 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <stdint.h>
  7. #include <atomic>
  8. #include <memory>
  9. #include "rocksdb/rate_limiter.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. class SystemClock;
  12. class WriteControllerToken;
  13. // WriteController is controlling write stalls in our write code-path. Write
  14. // stalls happen when compaction can't keep up with write rate.
  15. // All of the methods here (including WriteControllerToken's destructors) need
  16. // to be called while holding DB mutex
  17. class WriteController {
  18. public:
  19. explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
  20. int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
  21. : total_stopped_(0),
  22. total_delayed_(0),
  23. total_compaction_pressure_(0),
  24. credit_in_bytes_(0),
  25. next_refill_time_(0),
  26. low_pri_rate_limiter_(
  27. NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
  28. set_max_delayed_write_rate(_delayed_write_rate);
  29. }
  30. ~WriteController() = default;
  31. // When an actor (column family) requests a stop token, all writes will be
  32. // stopped until the stop token is released (deleted)
  33. std::unique_ptr<WriteControllerToken> GetStopToken();
  34. // When an actor (column family) requests a delay token, total delay for all
  35. // writes to the DB will be controlled under the delayed write rate. Every
  36. // write needs to call GetDelay() with number of bytes writing to the DB,
  37. // which returns number of microseconds to sleep.
  38. std::unique_ptr<WriteControllerToken> GetDelayToken(
  39. uint64_t delayed_write_rate);
  40. // When an actor (column family) requests a moderate token, compaction
  41. // threads will be increased
  42. std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
  43. // these three metods are querying the state of the WriteController
  44. bool IsStopped() const;
  45. bool NeedsDelay() const { return total_delayed_.load() > 0; }
  46. bool NeedSpeedupCompaction() const {
  47. return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0;
  48. }
  49. // return how many microseconds the caller needs to sleep after the call
  50. // num_bytes: how many number of bytes to put into the DB.
  51. // Prerequisite: DB mutex held.
  52. uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes);
  53. void set_delayed_write_rate(uint64_t write_rate) {
  54. // avoid divide 0
  55. if (write_rate == 0) {
  56. write_rate = 1u;
  57. } else if (write_rate > max_delayed_write_rate()) {
  58. write_rate = max_delayed_write_rate();
  59. }
  60. delayed_write_rate_ = write_rate;
  61. }
  62. void set_max_delayed_write_rate(uint64_t write_rate) {
  63. // avoid divide 0
  64. if (write_rate == 0) {
  65. write_rate = 1u;
  66. }
  67. max_delayed_write_rate_ = write_rate;
  68. // update delayed_write_rate_ as well
  69. delayed_write_rate_ = write_rate;
  70. }
  71. uint64_t delayed_write_rate() const { return delayed_write_rate_; }
  72. uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
  73. RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
  74. private:
  75. uint64_t NowMicrosMonotonic(SystemClock* clock);
  76. friend class WriteControllerToken;
  77. friend class StopWriteToken;
  78. friend class DelayWriteToken;
  79. friend class CompactionPressureToken;
  80. std::atomic<int> total_stopped_;
  81. std::atomic<int> total_delayed_;
  82. std::atomic<int> total_compaction_pressure_;
  83. // Number of bytes allowed to write without delay
  84. uint64_t credit_in_bytes_;
  85. // Next time that we can add more credit of bytes
  86. uint64_t next_refill_time_;
  87. // Write rate set when initialization or by `DBImpl::SetDBOptions`
  88. uint64_t max_delayed_write_rate_;
  89. // Current write rate (bytes / second)
  90. uint64_t delayed_write_rate_;
  91. std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
  92. };
  93. class WriteControllerToken {
  94. public:
  95. explicit WriteControllerToken(WriteController* controller)
  96. : controller_(controller) {}
  97. virtual ~WriteControllerToken() {}
  98. protected:
  99. WriteController* controller_;
  100. private:
  101. // no copying allowed
  102. WriteControllerToken(const WriteControllerToken&) = delete;
  103. void operator=(const WriteControllerToken&) = delete;
  104. };
  105. class StopWriteToken : public WriteControllerToken {
  106. public:
  107. explicit StopWriteToken(WriteController* controller)
  108. : WriteControllerToken(controller) {}
  109. virtual ~StopWriteToken();
  110. };
  111. class DelayWriteToken : public WriteControllerToken {
  112. public:
  113. explicit DelayWriteToken(WriteController* controller)
  114. : WriteControllerToken(controller) {}
  115. virtual ~DelayWriteToken();
  116. };
  117. class CompactionPressureToken : public WriteControllerToken {
  118. public:
  119. explicit CompactionPressureToken(WriteController* controller)
  120. : WriteControllerToken(controller) {}
  121. virtual ~CompactionPressureToken();
  122. };
  123. } // namespace ROCKSDB_NAMESPACE