rate_limiter.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <algorithm>
  10. #include "monitoring/statistics_impl.h"
  11. #include "port/port.h"
  12. #include "rocksdb/system_clock.h"
  13. #include "test_util/sync_point.h"
  14. #include "util/aligned_buffer.h"
  15. #include "util/rate_limiter_impl.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
  18. Env::IOPriority io_priority, Statistics* stats,
  19. RateLimiter::OpType op_type) {
  20. if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
  21. bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
  22. if (alignment > 0) {
  23. // Here we may actually require more than burst and block
  24. // as we can not write/read less than one page at a time on direct I/O
  25. // thus we do not want to be strictly constrained by burst
  26. bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
  27. }
  28. Request(bytes, io_priority, stats, op_type);
  29. }
  30. return bytes;
  31. }
  32. // Pending request
  33. struct GenericRateLimiter::Req {
  34. explicit Req(int64_t _bytes, port::Mutex* _mu)
  35. : request_bytes(_bytes), bytes(_bytes), cv(_mu) {}
  36. int64_t request_bytes;
  37. int64_t bytes;
  38. port::CondVar cv;
  39. };
  40. GenericRateLimiter::GenericRateLimiter(
  41. int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
  42. RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
  43. bool auto_tuned, int64_t single_burst_bytes)
  44. : RateLimiter(mode),
  45. refill_period_us_(refill_period_us),
  46. rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
  47. : rate_bytes_per_sec),
  48. refill_bytes_per_period_(
  49. CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
  50. raw_single_burst_bytes_(single_burst_bytes),
  51. clock_(clock),
  52. stop_(false),
  53. exit_cv_(&request_mutex_),
  54. requests_to_wait_(0),
  55. available_bytes_(0),
  56. next_refill_us_(NowMicrosMonotonicLocked()),
  57. fairness_(fairness > 100 ? 100 : fairness),
  58. rnd_((uint32_t)time(nullptr)),
  59. wait_until_refill_pending_(false),
  60. auto_tuned_(auto_tuned),
  61. num_drains_(0),
  62. max_bytes_per_sec_(rate_bytes_per_sec),
  63. tuned_time_(NowMicrosMonotonicLocked()) {
  64. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  65. total_requests_[i] = 0;
  66. total_bytes_through_[i] = 0;
  67. }
  68. }
  69. GenericRateLimiter::~GenericRateLimiter() {
  70. MutexLock g(&request_mutex_);
  71. stop_ = true;
  72. std::deque<Req*>::size_type queues_size_sum = 0;
  73. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  74. queues_size_sum += queue_[i].size();
  75. }
  76. requests_to_wait_ = static_cast<int32_t>(queues_size_sum);
  77. for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
  78. std::deque<Req*> queue = queue_[i];
  79. for (auto& r : queue) {
  80. r->cv.Signal();
  81. }
  82. }
  83. while (requests_to_wait_ > 0) {
  84. exit_cv_.Wait();
  85. }
  86. }
  87. // This API allows user to dynamically change rate limiter's bytes per second.
  88. void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
  89. MutexLock g(&request_mutex_);
  90. SetBytesPerSecondLocked(bytes_per_second);
  91. }
  92. void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
  93. assert(bytes_per_second > 0);
  94. rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
  95. refill_bytes_per_period_.store(
  96. CalculateRefillBytesPerPeriodLocked(bytes_per_second),
  97. std::memory_order_relaxed);
  98. }
  99. Status GenericRateLimiter::SetSingleBurstBytes(int64_t single_burst_bytes) {
  100. if (single_burst_bytes < 0) {
  101. return Status::InvalidArgument(
  102. "`single_burst_bytes` must be greater than or equal to 0");
  103. }
  104. MutexLock g(&request_mutex_);
  105. raw_single_burst_bytes_.store(single_burst_bytes, std::memory_order_relaxed);
  106. return Status::OK();
  107. }
  108. void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
  109. Statistics* stats) {
  110. assert(bytes <= GetSingleBurstBytes());
  111. bytes = std::max(static_cast<int64_t>(0), bytes);
  112. TEST_SYNC_POINT("GenericRateLimiter::Request");
  113. TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
  114. &rate_bytes_per_sec_);
  115. MutexLock g(&request_mutex_);
  116. if (auto_tuned_) {
  117. static const int kRefillsPerTune = 100;
  118. std::chrono::microseconds now(NowMicrosMonotonicLocked());
  119. if (now - tuned_time_ >=
  120. kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
  121. Status s = TuneLocked();
  122. s.PermitUncheckedError(); //**TODO: What to do on error?
  123. }
  124. }
  125. if (stop_) {
  126. // It is now in the clean-up of ~GenericRateLimiter().
  127. // Therefore any new incoming request will exit from here
  128. // and not get satiesfied.
  129. return;
  130. }
  131. ++total_requests_[pri];
  132. if (available_bytes_ > 0) {
  133. int64_t bytes_through = std::min(available_bytes_, bytes);
  134. total_bytes_through_[pri] += bytes_through;
  135. available_bytes_ -= bytes_through;
  136. bytes -= bytes_through;
  137. }
  138. if (bytes == 0) {
  139. return;
  140. }
  141. // Request cannot be satisfied at this moment, enqueue
  142. Req r(bytes, &request_mutex_);
  143. queue_[pri].push_back(&r);
  144. TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
  145. &request_mutex_);
  146. // A thread representing a queued request coordinates with other such threads.
  147. // There are two main duties.
  148. //
  149. // (1) Waiting for the next refill time.
  150. // (2) Refilling the bytes and granting requests.
  151. do {
  152. int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
  153. if (time_until_refill_us > 0) {
  154. if (wait_until_refill_pending_) {
  155. // Somebody is performing (1). Trust we'll be woken up when our request
  156. // is granted or we are needed for future duties.
  157. r.cv.Wait();
  158. } else {
  159. // Whichever thread reaches here first performs duty (1) as described
  160. // above.
  161. int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
  162. RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
  163. ++num_drains_;
  164. wait_until_refill_pending_ = true;
  165. clock_->TimedWait(&r.cv, std::chrono::microseconds(wait_until));
  166. TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
  167. &time_until_refill_us);
  168. wait_until_refill_pending_ = false;
  169. }
  170. } else {
  171. // Whichever thread reaches here first performs duty (2) as described
  172. // above.
  173. RefillBytesAndGrantRequestsLocked();
  174. }
  175. if (r.request_bytes == 0) {
  176. // If there is any remaining requests, make sure there exists at least
  177. // one candidate is awake for future duties by signaling a front request
  178. // of a queue.
  179. for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
  180. auto& queue = queue_[i];
  181. if (!queue.empty()) {
  182. queue.front()->cv.Signal();
  183. break;
  184. }
  185. }
  186. }
  187. // Invariant: non-granted request is always in one queue, and granted
  188. // request is always in zero queues.
  189. #ifndef NDEBUG
  190. int num_found = 0;
  191. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  192. if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
  193. queue_[i].end()) {
  194. ++num_found;
  195. }
  196. }
  197. if (r.request_bytes == 0) {
  198. assert(num_found == 0);
  199. } else {
  200. assert(num_found == 1);
  201. }
  202. #endif // NDEBUG
  203. } while (!stop_ && r.request_bytes > 0);
  204. if (stop_) {
  205. // It is now in the clean-up of ~GenericRateLimiter().
  206. // Therefore any woken-up request will have come out of the loop and then
  207. // exit here. It might or might not have been satisfied.
  208. --requests_to_wait_;
  209. exit_cv_.Signal();
  210. }
  211. }
  212. std::vector<Env::IOPriority>
  213. GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
  214. std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
  215. // We make Env::IO_USER a superior priority by always iterating its queue
  216. // first
  217. pri_iteration_order[0] = Env::IO_USER;
  218. bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
  219. TEST_SYNC_POINT_CALLBACK(
  220. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  221. "PostRandomOneInFairnessForHighPri",
  222. &high_pri_iterated_after_mid_low_pri);
  223. bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
  224. TEST_SYNC_POINT_CALLBACK(
  225. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  226. "PostRandomOneInFairnessForMidPri",
  227. &mid_pri_itereated_after_low_pri);
  228. if (high_pri_iterated_after_mid_low_pri) {
  229. pri_iteration_order[3] = Env::IO_HIGH;
  230. pri_iteration_order[2] =
  231. mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
  232. pri_iteration_order[1] =
  233. (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
  234. } else {
  235. pri_iteration_order[1] = Env::IO_HIGH;
  236. pri_iteration_order[3] =
  237. mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
  238. pri_iteration_order[2] =
  239. (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
  240. }
  241. TEST_SYNC_POINT_CALLBACK(
  242. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  243. "PreReturnPriIterationOrder",
  244. &pri_iteration_order);
  245. return pri_iteration_order;
  246. }
  247. void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
  248. TEST_SYNC_POINT_CALLBACK(
  249. "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
  250. next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
  251. // Carry over the left over quota from the last period
  252. auto refill_bytes_per_period =
  253. refill_bytes_per_period_.load(std::memory_order_relaxed);
  254. assert(available_bytes_ == 0);
  255. available_bytes_ = refill_bytes_per_period;
  256. std::vector<Env::IOPriority> pri_iteration_order =
  257. GeneratePriorityIterationOrderLocked();
  258. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  259. assert(!pri_iteration_order.empty());
  260. Env::IOPriority current_pri = pri_iteration_order[i];
  261. auto* queue = &queue_[current_pri];
  262. while (!queue->empty()) {
  263. auto* next_req = queue->front();
  264. if (available_bytes_ < next_req->request_bytes) {
  265. // Grant partial request_bytes even if request is for more than
  266. // `available_bytes_`, which can happen in a few situations:
  267. //
  268. // - The available bytes were partially consumed by other request(s)
  269. // - The rate was dynamically reduced while requests were already
  270. // enqueued
  271. // - The burst size was explicitly set to be larger than the refill size
  272. next_req->request_bytes -= available_bytes_;
  273. available_bytes_ = 0;
  274. break;
  275. }
  276. available_bytes_ -= next_req->request_bytes;
  277. next_req->request_bytes = 0;
  278. total_bytes_through_[current_pri] += next_req->bytes;
  279. queue->pop_front();
  280. // Quota granted, signal the thread to exit
  281. next_req->cv.Signal();
  282. }
  283. }
  284. }
  285. int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
  286. int64_t rate_bytes_per_sec) {
  287. if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
  288. refill_period_us_) {
  289. // Avoid unexpected result in the overflow case. The result now is still
  290. // inaccurate but is a number that is large enough.
  291. return std::numeric_limits<int64_t>::max() / kMicrosecondsPerSecond;
  292. } else {
  293. return rate_bytes_per_sec * refill_period_us_ / kMicrosecondsPerSecond;
  294. }
  295. }
  296. Status GenericRateLimiter::TuneLocked() {
  297. const int kLowWatermarkPct = 50;
  298. const int kHighWatermarkPct = 90;
  299. const int kAdjustFactorPct = 5;
  300. // computed rate limit will be in
  301. // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
  302. const int kAllowedRangeFactor = 20;
  303. std::chrono::microseconds prev_tuned_time = tuned_time_;
  304. tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());
  305. int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
  306. std::chrono::microseconds(refill_period_us_) -
  307. std::chrono::microseconds(1)) /
  308. std::chrono::microseconds(refill_period_us_);
  309. // We tune every kRefillsPerTune intervals, so the overflow and division-by-
  310. // zero conditions should never happen.
  311. assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
  312. assert(elapsed_intervals > 0);
  313. int64_t drained_pct = num_drains_ * 100 / elapsed_intervals;
  314. int64_t prev_bytes_per_sec = GetBytesPerSecond();
  315. int64_t new_bytes_per_sec;
  316. if (drained_pct == 0) {
  317. new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
  318. } else if (drained_pct < kLowWatermarkPct) {
  319. // sanitize to prevent overflow
  320. int64_t sanitized_prev_bytes_per_sec =
  321. std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
  322. new_bytes_per_sec =
  323. std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
  324. sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
  325. } else if (drained_pct > kHighWatermarkPct) {
  326. // sanitize to prevent overflow
  327. int64_t sanitized_prev_bytes_per_sec =
  328. std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
  329. (100 + kAdjustFactorPct));
  330. new_bytes_per_sec =
  331. std::min(max_bytes_per_sec_,
  332. sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
  333. } else {
  334. new_bytes_per_sec = prev_bytes_per_sec;
  335. }
  336. if (new_bytes_per_sec != prev_bytes_per_sec) {
  337. SetBytesPerSecondLocked(new_bytes_per_sec);
  338. }
  339. num_drains_ = 0;
  340. return Status::OK();
  341. }
  342. RateLimiter* NewGenericRateLimiter(
  343. int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
  344. int32_t fairness /* = 10 */,
  345. RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
  346. bool auto_tuned /* = false */, int64_t single_burst_bytes /* = 0 */) {
  347. assert(rate_bytes_per_sec > 0);
  348. assert(refill_period_us > 0);
  349. assert(fairness > 0);
  350. std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(
  351. rate_bytes_per_sec, refill_period_us, fairness, mode,
  352. SystemClock::Default(), auto_tuned, single_burst_bytes));
  353. return limiter.release();
  354. }
  355. } // namespace ROCKSDB_NAMESPACE