rate_limiter_impl.h 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #pragma once
  10. #include <algorithm>
  11. #include <atomic>
  12. #include <chrono>
  13. #include <deque>
  14. #include "port/port.h"
  15. #include "rocksdb/env.h"
  16. #include "rocksdb/rate_limiter.h"
  17. #include "rocksdb/status.h"
  18. #include "rocksdb/system_clock.h"
  19. #include "util/mutexlock.h"
  20. #include "util/random.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. class GenericRateLimiter : public RateLimiter {
  23. public:
  24. GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
  25. int32_t fairness, RateLimiter::Mode mode,
  26. const std::shared_ptr<SystemClock>& clock, bool auto_tuned,
  27. int64_t single_burst_bytes);
  28. virtual ~GenericRateLimiter();
  29. // This API allows user to dynamically change rate limiter's bytes per second.
  30. void SetBytesPerSecond(int64_t bytes_per_second) override;
  31. Status SetSingleBurstBytes(int64_t single_burst_bytes) override;
  32. // Request for token to write bytes. If this request can not be satisfied,
  33. // the call is blocked. Caller is responsible to make sure
  34. // bytes <= GetSingleBurstBytes() and bytes >= 0. Negative bytes
  35. // passed in will be rounded up to 0.
  36. using RateLimiter::Request;
  37. void Request(const int64_t bytes, const Env::IOPriority pri,
  38. Statistics* stats) override;
  39. int64_t GetSingleBurstBytes() const override {
  40. int64_t raw_single_burst_bytes =
  41. raw_single_burst_bytes_.load(std::memory_order_relaxed);
  42. if (raw_single_burst_bytes == 0) {
  43. return refill_bytes_per_period_.load(std::memory_order_relaxed);
  44. }
  45. return raw_single_burst_bytes;
  46. }
  47. int64_t GetTotalBytesThrough(
  48. const Env::IOPriority pri = Env::IO_TOTAL) const override {
  49. MutexLock g(&request_mutex_);
  50. if (pri == Env::IO_TOTAL) {
  51. int64_t total_bytes_through_sum = 0;
  52. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  53. total_bytes_through_sum += total_bytes_through_[i];
  54. }
  55. return total_bytes_through_sum;
  56. }
  57. return total_bytes_through_[pri];
  58. }
  59. int64_t GetTotalRequests(
  60. const Env::IOPriority pri = Env::IO_TOTAL) const override {
  61. MutexLock g(&request_mutex_);
  62. if (pri == Env::IO_TOTAL) {
  63. int64_t total_requests_sum = 0;
  64. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  65. total_requests_sum += total_requests_[i];
  66. }
  67. return total_requests_sum;
  68. }
  69. return total_requests_[pri];
  70. }
  71. Status GetTotalPendingRequests(
  72. int64_t* total_pending_requests,
  73. const Env::IOPriority pri = Env::IO_TOTAL) const override {
  74. assert(total_pending_requests != nullptr);
  75. MutexLock g(&request_mutex_);
  76. if (pri == Env::IO_TOTAL) {
  77. int64_t total_pending_requests_sum = 0;
  78. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  79. total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
  80. }
  81. *total_pending_requests = total_pending_requests_sum;
  82. } else {
  83. *total_pending_requests = static_cast<int64_t>(queue_[pri].size());
  84. }
  85. return Status::OK();
  86. }
  87. int64_t GetBytesPerSecond() const override {
  88. return rate_bytes_per_sec_.load(std::memory_order_relaxed);
  89. }
  90. virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
  91. MutexLock g(&request_mutex_);
  92. clock_ = std::move(clock);
  93. next_refill_us_ = NowMicrosMonotonicLocked();
  94. }
  95. private:
  96. static constexpr int kMicrosecondsPerSecond = 1000000;
  97. void RefillBytesAndGrantRequestsLocked();
  98. std::vector<Env::IOPriority> GeneratePriorityIterationOrderLocked();
  99. int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec);
  100. Status TuneLocked();
  101. void SetBytesPerSecondLocked(int64_t bytes_per_second);
  102. uint64_t NowMicrosMonotonicLocked() {
  103. return clock_->NowNanos() / std::milli::den;
  104. }
  105. // This mutex guard all internal states
  106. mutable port::Mutex request_mutex_;
  107. const int64_t refill_period_us_;
  108. std::atomic<int64_t> rate_bytes_per_sec_;
  109. std::atomic<int64_t> refill_bytes_per_period_;
  110. // This value is validated but unsanitized (may be zero).
  111. std::atomic<int64_t> raw_single_burst_bytes_;
  112. std::shared_ptr<SystemClock> clock_;
  113. bool stop_;
  114. port::CondVar exit_cv_;
  115. int32_t requests_to_wait_;
  116. int64_t total_requests_[Env::IO_TOTAL];
  117. int64_t total_bytes_through_[Env::IO_TOTAL];
  118. int64_t available_bytes_;
  119. int64_t next_refill_us_;
  120. int32_t fairness_;
  121. Random rnd_;
  122. struct Req;
  123. std::deque<Req*> queue_[Env::IO_TOTAL];
  124. bool wait_until_refill_pending_;
  125. bool auto_tuned_;
  126. int64_t num_drains_;
  127. const int64_t max_bytes_per_sec_;
  128. std::chrono::microseconds tuned_time_;
  129. };
  130. } // namespace ROCKSDB_NAMESPACE