write_controller.h 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <stdint.h>
  7. #include <atomic>
  8. #include <memory>
  9. #include "rocksdb/rate_limiter.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. class Env;
  12. class WriteControllerToken;
  13. // WriteController is controlling write stalls in our write code-path. Write
  14. // stalls happen when compaction can't keep up with write rate.
  15. // All of the methods here (including WriteControllerToken's destructors) need
  16. // to be called while holding DB mutex
  17. class WriteController {
  18. public:
  19. explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
  20. int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
  21. : total_stopped_(0),
  22. total_delayed_(0),
  23. total_compaction_pressure_(0),
  24. bytes_left_(0),
  25. last_refill_time_(0),
  26. low_pri_rate_limiter_(
  27. NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
  28. set_max_delayed_write_rate(_delayed_write_rate);
  29. }
  30. ~WriteController() = default;
  31. // When an actor (column family) requests a stop token, all writes will be
  32. // stopped until the stop token is released (deleted)
  33. std::unique_ptr<WriteControllerToken> GetStopToken();
  34. // When an actor (column family) requests a delay token, total delay for all
  35. // writes to the DB will be controlled under the delayed write rate. Every
  36. // write needs to call GetDelay() with number of bytes writing to the DB,
  37. // which returns number of microseconds to sleep.
  38. std::unique_ptr<WriteControllerToken> GetDelayToken(
  39. uint64_t delayed_write_rate);
  40. // When an actor (column family) requests a moderate token, compaction
  41. // threads will be increased
  42. std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
  43. // these three metods are querying the state of the WriteController
  44. bool IsStopped() const;
  45. bool NeedsDelay() const { return total_delayed_.load() > 0; }
  46. bool NeedSpeedupCompaction() const {
  47. return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
  48. }
  49. // return how many microseconds the caller needs to sleep after the call
  50. // num_bytes: how many number of bytes to put into the DB.
  51. // Prerequisite: DB mutex held.
  52. uint64_t GetDelay(Env* env, uint64_t num_bytes);
  53. void set_delayed_write_rate(uint64_t write_rate) {
  54. // avoid divide 0
  55. if (write_rate == 0) {
  56. write_rate = 1u;
  57. } else if (write_rate > max_delayed_write_rate()) {
  58. write_rate = max_delayed_write_rate();
  59. }
  60. delayed_write_rate_ = write_rate;
  61. }
  62. void set_max_delayed_write_rate(uint64_t write_rate) {
  63. // avoid divide 0
  64. if (write_rate == 0) {
  65. write_rate = 1u;
  66. }
  67. max_delayed_write_rate_ = write_rate;
  68. // update delayed_write_rate_ as well
  69. delayed_write_rate_ = write_rate;
  70. }
  71. uint64_t delayed_write_rate() const { return delayed_write_rate_; }
  72. uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
  73. RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
  74. private:
  75. uint64_t NowMicrosMonotonic(Env* env);
  76. friend class WriteControllerToken;
  77. friend class StopWriteToken;
  78. friend class DelayWriteToken;
  79. friend class CompactionPressureToken;
  80. std::atomic<int> total_stopped_;
  81. std::atomic<int> total_delayed_;
  82. std::atomic<int> total_compaction_pressure_;
  83. uint64_t bytes_left_;
  84. uint64_t last_refill_time_;
  85. // write rate set when initialization or by `DBImpl::SetDBOptions`
  86. uint64_t max_delayed_write_rate_;
  87. // current write rate
  88. uint64_t delayed_write_rate_;
  89. std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
  90. };
  91. class WriteControllerToken {
  92. public:
  93. explicit WriteControllerToken(WriteController* controller)
  94. : controller_(controller) {}
  95. virtual ~WriteControllerToken() {}
  96. protected:
  97. WriteController* controller_;
  98. private:
  99. // no copying allowed
  100. WriteControllerToken(const WriteControllerToken&) = delete;
  101. void operator=(const WriteControllerToken&) = delete;
  102. };
  103. class StopWriteToken : public WriteControllerToken {
  104. public:
  105. explicit StopWriteToken(WriteController* controller)
  106. : WriteControllerToken(controller) {}
  107. virtual ~StopWriteToken();
  108. };
  109. class DelayWriteToken : public WriteControllerToken {
  110. public:
  111. explicit DelayWriteToken(WriteController* controller)
  112. : WriteControllerToken(controller) {}
  113. virtual ~DelayWriteToken();
  114. };
  115. class CompactionPressureToken : public WriteControllerToken {
  116. public:
  117. explicit CompactionPressureToken(WriteController* controller)
  118. : WriteControllerToken(controller) {}
  119. virtual ~CompactionPressureToken();
  120. };
  121. } // namespace ROCKSDB_NAMESPACE