rate_limiter_test.cc 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "util/rate_limiter.h"
  10. #include <chrono>
  11. #include <cinttypes>
  12. #include <limits>
  13. #include "db/db_test_util.h"
  14. #include "rocksdb/env.h"
  15. #include "test_util/sync_point.h"
  16. #include "test_util/testharness.h"
  17. #include "util/random.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. // TODO(yhchiang): the rate will not be accurate when we run test in parallel.
  20. class RateLimiterTest : public testing::Test {};
  21. TEST_F(RateLimiterTest, OverflowRate) {
  22. GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
  23. RateLimiter::Mode::kWritesOnly, Env::Default(),
  24. false /* auto_tuned */);
  25. ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
  26. }
  27. TEST_F(RateLimiterTest, StartStop) {
  28. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
  29. }
  30. TEST_F(RateLimiterTest, Modes) {
  31. for (auto mode : {RateLimiter::Mode::kWritesOnly,
  32. RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
  33. GenericRateLimiter limiter(
  34. 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  35. 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
  36. limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
  37. RateLimiter::OpType::kRead);
  38. if (mode == RateLimiter::Mode::kWritesOnly) {
  39. ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  40. } else {
  41. ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  42. }
  43. limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
  44. RateLimiter::OpType::kWrite);
  45. if (mode == RateLimiter::Mode::kAllIo) {
  46. ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  47. } else {
  48. ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  49. }
  50. }
  51. }
  52. #if !(defined(TRAVIS) && defined(OS_MACOSX))
  53. TEST_F(RateLimiterTest, Rate) {
  54. auto* env = Env::Default();
  55. struct Arg {
  56. Arg(int32_t _target_rate, int _burst)
  57. : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
  58. request_size(_target_rate / 10),
  59. burst(_burst) {}
  60. std::unique_ptr<RateLimiter> limiter;
  61. int32_t request_size;
  62. int burst;
  63. };
  64. auto writer = [](void* p) {
  65. auto* thread_env = Env::Default();
  66. auto* arg = static_cast<Arg*>(p);
  67. // Test for 2 seconds
  68. auto until = thread_env->NowMicros() + 2 * 1000000;
  69. Random r((uint32_t)(thread_env->NowNanos() %
  70. std::numeric_limits<uint32_t>::max()));
  71. while (thread_env->NowMicros() < until) {
  72. for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
  73. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
  74. Env::IO_HIGH, nullptr /* stats */,
  75. RateLimiter::OpType::kWrite);
  76. }
  77. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
  78. nullptr /* stats */, RateLimiter::OpType::kWrite);
  79. }
  80. };
  81. for (int i = 1; i <= 16; i *= 2) {
  82. int32_t target = i * 1024 * 10;
  83. Arg arg(target, i / 4 + 1);
  84. int64_t old_total_bytes_through = 0;
  85. for (int iter = 1; iter <= 2; ++iter) {
  86. // second iteration changes the target dynamically
  87. if (iter == 2) {
  88. target *= 2;
  89. arg.limiter->SetBytesPerSecond(target);
  90. }
  91. auto start = env->NowMicros();
  92. for (int t = 0; t < i; ++t) {
  93. env->StartThread(writer, &arg);
  94. }
  95. env->WaitForJoin();
  96. auto elapsed = env->NowMicros() - start;
  97. double rate =
  98. (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
  99. 1000000.0 / elapsed;
  100. old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
  101. fprintf(stderr,
  102. "request size [1 - %" PRIi32 "], limit %" PRIi32
  103. " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
  104. arg.request_size - 1, target / 1024, rate / 1024,
  105. elapsed / 1000000.0);
  106. ASSERT_GE(rate / target, 0.80);
  107. ASSERT_LE(rate / target, 1.25);
  108. }
  109. }
  110. }
  111. #endif
  112. TEST_F(RateLimiterTest, LimitChangeTest) {
  113. // starvation test when limit changes to a smaller value
  114. int64_t refill_period = 1000 * 1000;
  115. auto* env = Env::Default();
  116. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  117. struct Arg {
  118. Arg(int32_t _request_size, Env::IOPriority _pri,
  119. std::shared_ptr<RateLimiter> _limiter)
  120. : request_size(_request_size), pri(_pri), limiter(_limiter) {}
  121. int32_t request_size;
  122. Env::IOPriority pri;
  123. std::shared_ptr<RateLimiter> limiter;
  124. };
  125. auto writer = [](void* p) {
  126. auto* arg = static_cast<Arg*>(p);
  127. arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
  128. RateLimiter::OpType::kWrite);
  129. };
  130. for (uint32_t i = 1; i <= 16; i <<= 1) {
  131. int32_t target = i * 1024 * 10;
  132. // refill per second
  133. for (int iter = 0; iter < 2; iter++) {
  134. std::shared_ptr<RateLimiter> limiter =
  135. std::make_shared<GenericRateLimiter>(
  136. target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
  137. Env::Default(), false /* auto_tuned */);
  138. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  139. {{"GenericRateLimiter::Request",
  140. "RateLimiterTest::LimitChangeTest:changeLimitStart"},
  141. {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
  142. "GenericRateLimiter::Refill"}});
  143. Arg arg(target, Env::IO_HIGH, limiter);
  144. // The idea behind is to start a request first, then before it refills,
  145. // update limit to a different value (2X/0.5X). No starvation should
  146. // be guaranteed under any situation
  147. // TODO(lightmark): more test cases are welcome.
  148. env->StartThread(writer, &arg);
  149. int32_t new_limit = (target << 1) >> (iter << 1);
  150. TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
  151. arg.limiter->SetBytesPerSecond(new_limit);
  152. TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
  153. env->WaitForJoin();
  154. fprintf(stderr,
  155. "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
  156. "KB/sec, refill period %" PRIi64 " ms\n",
  157. target / 1024, new_limit / 1024, refill_period / 1000);
  158. }
  159. }
  160. }
  161. TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
  162. const std::chrono::seconds kTimePerRefill(1);
  163. const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
  164. SpecialEnv special_env(Env::Default());
  165. special_env.no_slowdown_ = true;
  166. special_env.time_elapse_only_sleep_ = true;
  167. auto stats = CreateDBStatistics();
  168. std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
  169. 1000 /* rate_bytes_per_sec */,
  170. std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
  171. RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
  172. // Use callback to advance time because we need to advance (1) after Request()
  173. // has determined the bytes are not available; and (2) before Refill()
  174. // computes the next refill time (ensuring refill time in the future allows
  175. // the next request to drain the rate limiter).
  176. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  177. "GenericRateLimiter::Refill", [&](void* /*arg*/) {
  178. special_env.SleepForMicroseconds(static_cast<int>(
  179. std::chrono::microseconds(kTimePerRefill).count()));
  180. });
  181. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  182. // verify rate limit increases after a sequence of periods where rate limiter
  183. // is always drained
  184. int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  185. rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
  186. RateLimiter::OpType::kWrite);
  187. while (std::chrono::microseconds(special_env.NowMicros()) <=
  188. kRefillsPerTune * kTimePerRefill) {
  189. rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
  190. RateLimiter::OpType::kWrite);
  191. }
  192. int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  193. ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
  194. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  195. // decreases after a sequence of periods where rate limiter is not drained
  196. orig_bytes_per_sec = new_bytes_per_sec;
  197. special_env.SleepForMicroseconds(static_cast<int>(
  198. kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
  199. // make a request so tuner can be triggered
  200. rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
  201. RateLimiter::OpType::kWrite);
  202. new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  203. ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
  204. }
  205. } // namespace ROCKSDB_NAMESPACE
  206. int main(int argc, char** argv) {
  207. ::testing::InitGoogleTest(&argc, argv);
  208. return RUN_ALL_TESTS();
  209. }