rate_limiter_test.cc 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <chrono>
  10. #include <cinttypes>
  11. #include <cstdint>
  12. #include <limits>
  13. #include "db/db_test_util.h"
  14. #include "port/port.h"
  15. #include "rocksdb/system_clock.h"
  16. #include "test_util/mock_time_env.h"
  17. #include "test_util/sync_point.h"
  18. #include "test_util/testharness.h"
  19. #include "util/random.h"
  20. #include "util/rate_limiter_impl.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. // TODO(yhchiang): the rate will not be accurate when we run test in parallel.
  23. class RateLimiterTest : public testing::Test {
  24. protected:
  25. ~RateLimiterTest() override {
  26. SyncPoint::GetInstance()->DisableProcessing();
  27. SyncPoint::GetInstance()->ClearAllCallBacks();
  28. }
  29. };
  30. TEST_F(RateLimiterTest, OverflowRate) {
  31. GenericRateLimiter limiter(std::numeric_limits<int64_t>::max(), 1000, 10,
  32. RateLimiter::Mode::kWritesOnly,
  33. SystemClock::Default(), false /* auto_tuned */,
  34. 0 /* single_burst_bytes */);
  35. ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
  36. }
  37. TEST_F(RateLimiterTest, StartStop) {
  38. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
  39. }
  40. TEST_F(RateLimiterTest, GetTotalBytesThrough) {
  41. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
  42. 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  43. 10 /* fairness */));
  44. for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
  45. ASSERT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
  46. 0);
  47. }
  48. std::int64_t request_byte = 200;
  49. std::int64_t request_byte_sum = 0;
  50. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  51. limiter->Request(request_byte, static_cast<Env::IOPriority>(i),
  52. nullptr /* stats */, RateLimiter::OpType::kWrite);
  53. request_byte_sum += request_byte;
  54. }
  55. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  56. EXPECT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
  57. request_byte)
  58. << "Failed to track total_bytes_through_ correctly when IOPriority = "
  59. << static_cast<Env::IOPriority>(i);
  60. }
  61. EXPECT_EQ(limiter->GetTotalBytesThrough(Env::IO_TOTAL), request_byte_sum)
  62. << "Failed to track total_bytes_through_ correctly when IOPriority = "
  63. "Env::IO_TOTAL";
  64. }
  65. TEST_F(RateLimiterTest, GetTotalRequests) {
  66. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
  67. 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  68. 10 /* fairness */));
  69. for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
  70. ASSERT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 0);
  71. }
  72. std::int64_t total_requests_sum = 0;
  73. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  74. limiter->Request(200, static_cast<Env::IOPriority>(i), nullptr /* stats */,
  75. RateLimiter::OpType::kWrite);
  76. total_requests_sum += 1;
  77. }
  78. for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
  79. EXPECT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 1)
  80. << "Failed to track total_requests_ correctly when IOPriority = "
  81. << static_cast<Env::IOPriority>(i);
  82. }
  83. EXPECT_EQ(limiter->GetTotalRequests(Env::IO_TOTAL), total_requests_sum)
  84. << "Failed to track total_requests_ correctly when IOPriority = "
  85. "Env::IO_TOTAL";
  86. }
  87. TEST_F(RateLimiterTest, GetTotalPendingRequests) {
  88. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
  89. 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  90. 10 /* fairness */));
  91. int64_t total_pending_requests = 0;
  92. for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
  93. ASSERT_OK(limiter->GetTotalPendingRequests(
  94. &total_pending_requests, static_cast<Env::IOPriority>(i)));
  95. ASSERT_EQ(total_pending_requests, 0);
  96. }
  97. // This is a variable for making sure the following callback is called
  98. // and the assertions in it are indeed excuted
  99. bool nonzero_pending_requests_verified = false;
  100. SyncPoint::GetInstance()->SetCallBack(
  101. "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
  102. port::Mutex* request_mutex = (port::Mutex*)arg;
  103. // We temporarily unlock the mutex so that the following
  104. // GetTotalPendingRequests() can acquire it
  105. request_mutex->Unlock();
  106. for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
  107. EXPECT_OK(limiter->GetTotalPendingRequests(
  108. &total_pending_requests, static_cast<Env::IOPriority>(i)))
  109. << "Failed to return total pending requests for priority level = "
  110. << static_cast<Env::IOPriority>(i);
  111. if (i == Env::IO_USER || i == Env::IO_TOTAL) {
  112. EXPECT_EQ(total_pending_requests, 1)
  113. << "Failed to correctly return total pending requests for "
  114. "priority level = "
  115. << static_cast<Env::IOPriority>(i);
  116. } else {
  117. EXPECT_EQ(total_pending_requests, 0)
  118. << "Failed to correctly return total pending requests for "
  119. "priority level = "
  120. << static_cast<Env::IOPriority>(i);
  121. }
  122. }
  123. // We lock the mutex again so that the request thread can resume running
  124. // with the mutex locked
  125. request_mutex->Lock();
  126. nonzero_pending_requests_verified = true;
  127. });
  128. SyncPoint::GetInstance()->EnableProcessing();
  129. limiter->Request(200, Env::IO_USER, nullptr /* stats */,
  130. RateLimiter::OpType::kWrite);
  131. ASSERT_EQ(nonzero_pending_requests_verified, true);
  132. for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
  133. EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
  134. static_cast<Env::IOPriority>(i)))
  135. << "Failed to return total pending requests for priority level = "
  136. << static_cast<Env::IOPriority>(i);
  137. EXPECT_EQ(total_pending_requests, 0)
  138. << "Failed to correctly return total pending requests for priority "
  139. "level = "
  140. << static_cast<Env::IOPriority>(i);
  141. }
  142. SyncPoint::GetInstance()->DisableProcessing();
  143. SyncPoint::GetInstance()->ClearCallBack(
  144. "GenericRateLimiter::Request:PostEnqueueRequest");
  145. }
  146. TEST_F(RateLimiterTest, Modes) {
  147. for (auto mode : {RateLimiter::Mode::kWritesOnly,
  148. RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
  149. GenericRateLimiter limiter(
  150. 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  151. 10 /* fairness */, mode, SystemClock::Default(), false /* auto_tuned */,
  152. 0 /* single_burst_bytes */);
  153. limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
  154. RateLimiter::OpType::kRead);
  155. if (mode == RateLimiter::Mode::kWritesOnly) {
  156. ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  157. } else {
  158. ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  159. }
  160. limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
  161. RateLimiter::OpType::kWrite);
  162. if (mode == RateLimiter::Mode::kAllIo) {
  163. ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  164. } else {
  165. ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
  166. }
  167. }
  168. }
  169. TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
  170. std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
  171. 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
  172. 10 /* fairness */));
  173. bool possible_random_one_in_fairness_results_for_high_mid_pri[4][2] = {
  174. {false, false}, {false, true}, {true, false}, {true, true}};
  175. std::vector<Env::IOPriority> possible_priority_iteration_orders[4] = {
  176. {Env::IO_USER, Env::IO_HIGH, Env::IO_MID, Env::IO_LOW},
  177. {Env::IO_USER, Env::IO_HIGH, Env::IO_LOW, Env::IO_MID},
  178. {Env::IO_USER, Env::IO_MID, Env::IO_LOW, Env::IO_HIGH},
  179. {Env::IO_USER, Env::IO_LOW, Env::IO_MID, Env::IO_HIGH}};
  180. for (int i = 0; i < 4; ++i) {
  181. // These are variables for making sure the following callbacks are called
  182. // and the assertion in the last callback is indeed excuted
  183. bool high_pri_iterated_after_mid_low_pri_set = false;
  184. bool mid_pri_itereated_after_low_pri_set = false;
  185. bool pri_iteration_order_verified = false;
  186. SyncPoint::GetInstance()->SetCallBack(
  187. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  188. "PostRandomOneInFairnessForHighPri",
  189. [&](void* arg) {
  190. bool* high_pri_iterated_after_mid_low_pri = (bool*)arg;
  191. *high_pri_iterated_after_mid_low_pri =
  192. possible_random_one_in_fairness_results_for_high_mid_pri[i][0];
  193. high_pri_iterated_after_mid_low_pri_set = true;
  194. });
  195. SyncPoint::GetInstance()->SetCallBack(
  196. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  197. "PostRandomOneInFairnessForMidPri",
  198. [&](void* arg) {
  199. bool* mid_pri_itereated_after_low_pri = (bool*)arg;
  200. *mid_pri_itereated_after_low_pri =
  201. possible_random_one_in_fairness_results_for_high_mid_pri[i][1];
  202. mid_pri_itereated_after_low_pri_set = true;
  203. });
  204. SyncPoint::GetInstance()->SetCallBack(
  205. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  206. "PreReturnPriIterationOrder",
  207. [&](void* arg) {
  208. std::vector<Env::IOPriority>* pri_iteration_order =
  209. (std::vector<Env::IOPriority>*)arg;
  210. EXPECT_EQ(*pri_iteration_order, possible_priority_iteration_orders[i])
  211. << "Failed to generate priority iteration order correctly when "
  212. "high_pri_iterated_after_mid_low_pri = "
  213. << possible_random_one_in_fairness_results_for_high_mid_pri[i][0]
  214. << ", mid_pri_itereated_after_low_pri = "
  215. << possible_random_one_in_fairness_results_for_high_mid_pri[i][1]
  216. << std::endl;
  217. pri_iteration_order_verified = true;
  218. });
  219. SyncPoint::GetInstance()->EnableProcessing();
  220. limiter->Request(200 /* request max bytes to drain so that refill and order
  221. generation will be triggered every time
  222. GenericRateLimiter::Request() is called */
  223. ,
  224. Env::IO_USER, nullptr /* stats */,
  225. RateLimiter::OpType::kWrite);
  226. ASSERT_EQ(high_pri_iterated_after_mid_low_pri_set, true);
  227. ASSERT_EQ(mid_pri_itereated_after_low_pri_set, true);
  228. ASSERT_EQ(pri_iteration_order_verified, true);
  229. SyncPoint::GetInstance()->DisableProcessing();
  230. SyncPoint::GetInstance()->ClearCallBack(
  231. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  232. "PreReturnPriIterationOrder");
  233. SyncPoint::GetInstance()->ClearCallBack(
  234. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  235. "PostRandomOneInFairnessForMidPri");
  236. SyncPoint::GetInstance()->ClearCallBack(
  237. "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
  238. "PostRandomOneInFairnessForHighPri");
  239. }
  240. }
  241. TEST_F(RateLimiterTest, Rate) {
  242. auto* env = Env::Default();
  243. struct Arg {
  244. Arg(int32_t _target_rate, int _burst)
  245. : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */,
  246. 100 * 1000 /* refill_period_us */,
  247. 10 /* fairness */)),
  248. request_size(_target_rate /
  249. 10 /* refill period here is 1/10 second */),
  250. burst(_burst) {}
  251. std::unique_ptr<RateLimiter> limiter;
  252. int32_t request_size;
  253. int burst;
  254. };
  255. auto writer = [](void* p) {
  256. const auto& thread_clock = SystemClock::Default();
  257. auto* arg = static_cast<Arg*>(p);
  258. // Test for 2 seconds
  259. auto until = thread_clock->NowMicros() + 2 * 1000000;
  260. Random r((uint32_t)(thread_clock->NowNanos() %
  261. std::numeric_limits<uint32_t>::max()));
  262. while (thread_clock->NowMicros() < until) {
  263. for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst * 2) + 1); ++i) {
  264. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
  265. Env::IO_USER, nullptr /* stats */,
  266. RateLimiter::OpType::kWrite);
  267. }
  268. for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
  269. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
  270. Env::IO_HIGH, nullptr /* stats */,
  271. RateLimiter::OpType::kWrite);
  272. }
  273. for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst / 2 + 1) + 1);
  274. ++i) {
  275. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_MID,
  276. nullptr /* stats */, RateLimiter::OpType::kWrite);
  277. }
  278. arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
  279. nullptr /* stats */, RateLimiter::OpType::kWrite);
  280. }
  281. };
  282. int samples = 0;
  283. int samples_at_minimum = 0;
  284. for (int i = 1; i <= 16; i *= 2) {
  285. int32_t target = i * 1024 * 10;
  286. Arg arg(target, i / 4 + 1);
  287. int64_t old_total_bytes_through = 0;
  288. for (int iter = 1; iter <= 2; ++iter) {
  289. // second iteration changes the target dynamically
  290. if (iter == 2) {
  291. target *= 2;
  292. arg.limiter->SetBytesPerSecond(target);
  293. }
  294. auto start = env->NowMicros();
  295. for (int t = 0; t < i; ++t) {
  296. env->StartThread(writer, &arg);
  297. }
  298. env->WaitForJoin();
  299. auto elapsed = env->NowMicros() - start;
  300. double rate =
  301. (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
  302. 1000000.0 / elapsed;
  303. old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
  304. fprintf(stderr,
  305. "request size [1 - %" PRIi32 "], limit %" PRIi32
  306. " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
  307. arg.request_size - 1, target / 1024, rate / 1024,
  308. elapsed / 1000000.0);
  309. ++samples;
  310. if (rate / target >= 0.80) {
  311. ++samples_at_minimum;
  312. }
  313. ASSERT_LE(rate / target, 1.25);
  314. }
  315. }
  316. // This can fail due to slow execution speed, like when using valgrind or in
  317. // heavily loaded CI environments
  318. bool skip_minimum_rate_check =
  319. #if defined(ROCKSDB_VALGRIND_RUN)
  320. true;
  321. #elif defined(OS_MACOSX)
  322. getenv("CIRCLECI") || getenv("GITHUB_ACTIONS");
  323. #else
  324. getenv("SANDCASTLE");
  325. #endif
  326. if (skip_minimum_rate_check) {
  327. fprintf(stderr, "Skipped minimum rate check (%d / %d passed)\n",
  328. samples_at_minimum, samples);
  329. } else {
  330. ASSERT_EQ(samples_at_minimum, samples);
  331. }
  332. }
  333. TEST_F(RateLimiterTest, LimitChangeTest) {
  334. // starvation test when limit changes to a smaller value
  335. int64_t refill_period = 1000 * 1000;
  336. auto* env = Env::Default();
  337. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  338. struct Arg {
  339. Arg(int32_t _request_size, Env::IOPriority _pri,
  340. std::shared_ptr<RateLimiter> _limiter)
  341. : request_size(_request_size), pri(_pri), limiter(_limiter) {}
  342. int32_t request_size;
  343. Env::IOPriority pri;
  344. std::shared_ptr<RateLimiter> limiter;
  345. };
  346. auto writer = [](void* p) {
  347. auto* arg = static_cast<Arg*>(p);
  348. arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
  349. RateLimiter::OpType::kWrite);
  350. };
  351. for (uint32_t i = 1; i <= 16; i <<= 1) {
  352. int32_t target = i * 1024 * 10;
  353. // refill per second
  354. for (int iter = 0; iter < 2; iter++) {
  355. std::shared_ptr<RateLimiter> limiter =
  356. std::make_shared<GenericRateLimiter>(
  357. target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
  358. SystemClock::Default(), false /* auto_tuned */,
  359. 0 /* single_burst_bytes */);
  360. // After "GenericRateLimiter::Request:1" the mutex is held until the bytes
  361. // are refilled. This test could be improved to change the limit when lock
  362. // is released in `TimedWait()`.
  363. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  364. {{"GenericRateLimiter::Request",
  365. "RateLimiterTest::LimitChangeTest:changeLimitStart"},
  366. {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
  367. "GenericRateLimiter::Request:1"}});
  368. Arg arg(target, Env::IO_HIGH, limiter);
  369. // The idea behind is to start a request first, then before it refills,
  370. // update limit to a different value (2X/0.5X). No starvation should
  371. // be guaranteed under any situation
  372. // TODO(lightmark): more test cases are welcome.
  373. env->StartThread(writer, &arg);
  374. int32_t new_limit = (target << 1) >> (iter << 1);
  375. TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
  376. arg.limiter->SetBytesPerSecond(new_limit);
  377. TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
  378. env->WaitForJoin();
  379. fprintf(stderr,
  380. "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
  381. "KB/sec, refill period %" PRIi64 " ms\n",
  382. target / 1024, new_limit / 1024, refill_period / 1000);
  383. }
  384. }
  385. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  386. }
  387. TEST_F(RateLimiterTest, AvailableByteSizeExhaustTest) {
  388. SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true);
  389. const std::chrono::seconds kTimePerRefill(1);
  390. // This test makes sure available_bytes_ get exhausted first before queuing
  391. // any remaining bytes when requested_bytes > available_bytes
  392. const int64_t available_bytes_per_period = 500;
  393. std::shared_ptr<RateLimiter> limiter = std::make_shared<GenericRateLimiter>(
  394. available_bytes_per_period,
  395. std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
  396. RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(),
  397. false /* auto_tuned */, 0 /* single_burst_bytes */);
  398. // Step 1. Request 100 and wait for the refill
  399. // so that the remaining available bytes are 400
  400. limiter->Request(100, Env::IO_USER, nullptr /* stats */,
  401. RateLimiter::OpType::kWrite);
  402. special_env.SleepForMicroseconds(
  403. static_cast<int>(std::chrono::microseconds(kTimePerRefill).count()));
  404. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  405. "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
  406. port::Mutex* request_mutex = (port::Mutex*)arg;
  407. request_mutex->Unlock();
  408. // Step 3. Check GetTotalBytesThrough = available_bytes_per_period
  409. // to make sure that the first request (100) and the part of the second
  410. // request (400) made through when the remaining of the second request
  411. // got queued
  412. ASSERT_EQ(available_bytes_per_period,
  413. limiter->GetTotalBytesThrough(Env::IO_USER));
  414. request_mutex->Lock();
  415. });
  416. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  417. // Step 2. Request 500, which is greater than the remaining available bytes
  418. // (400)
  419. limiter->Request(500, Env::IO_USER, nullptr /* stats */,
  420. RateLimiter::OpType::kWrite);
  421. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
  422. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
  423. "GenericRateLimiter::Request:PostEnqueueRequest");
  424. }
  425. TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
  426. const std::chrono::seconds kTimePerRefill(1);
  427. const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
  428. auto mock_clock =
  429. std::make_shared<MockSystemClock>(Env::Default()->GetSystemClock());
  430. auto stats = CreateDBStatistics();
  431. std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
  432. 1000 /* rate_bytes_per_sec */,
  433. std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
  434. RateLimiter::Mode::kWritesOnly, mock_clock, true /* auto_tuned */,
  435. 0 /* single_burst_bytes */));
  436. // verify rate limit increases after a sequence of periods where rate limiter
  437. // is always drained
  438. int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  439. rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
  440. RateLimiter::OpType::kWrite);
  441. while (std::chrono::microseconds(mock_clock->NowMicros()) <=
  442. kRefillsPerTune * kTimePerRefill) {
  443. rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
  444. RateLimiter::OpType::kWrite);
  445. }
  446. int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  447. ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
  448. // decreases after a sequence of periods where rate limiter is not drained
  449. orig_bytes_per_sec = new_bytes_per_sec;
  450. mock_clock->SleepForMicroseconds(static_cast<int>(
  451. kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
  452. // make a request so tuner can be triggered
  453. rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
  454. RateLimiter::OpType::kWrite);
  455. new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
  456. ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
  457. }
  458. TEST_F(RateLimiterTest, WaitHangingBug) {
  459. // At t=0: Threads 0 and 1 request `kBytesPerRefill` bytes at low-pri. One
  460. // will be granted immediately and the other will enter `TimedWait()`.
  461. //
  462. // At t=`kMicrosPerRefill`: Thread 2 requests `kBytesPerRefill` bytes at
  463. // low-pri. Thread 2's request enters the queue. To expose the bug scenario,
  464. // `SyncPoint`s ensure this happens while the lock is temporarily released in
  465. // `TimedWait()`. Before the bug fix, Thread 2's request would then hang in
  466. // `Wait()` interminably.
  467. const int kBytesPerSecond = 100;
  468. const int kMicrosPerSecond = 1000 * 1000;
  469. const int kMicrosPerRefill = kMicrosPerSecond;
  470. const int kBytesPerRefill =
  471. kBytesPerSecond * kMicrosPerRefill / kMicrosPerSecond;
  472. auto mock_clock =
  473. std::make_shared<MockSystemClock>(Env::Default()->GetSystemClock());
  474. std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(
  475. kBytesPerSecond, kMicrosPerRefill, 10 /* fairness */,
  476. RateLimiter::Mode::kWritesOnly, mock_clock, false /* auto_tuned */,
  477. 0 /* single_burst_bytes */));
  478. std::array<std::thread, 3> request_threads;
  479. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  480. {{"RateLimiterTest::WaitHangingBug:InitialRequestsReady",
  481. "MockSystemClock::TimedWait:UnlockedPreSleep"},
  482. {"MockSystemClock::TimedWait:UnlockedPostSleep1",
  483. "RateLimiterTest::WaitHangingBug:TestThreadRequestBegin"},
  484. {"RateLimiterTest::WaitHangingBug:TestThreadRequestEnd",
  485. "MockSystemClock::TimedWait:UnlockedPostSleep2"}});
  486. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  487. for (int i = 0; i < 2; i++) {
  488. request_threads[i] = std::thread([&]() {
  489. limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW,
  490. nullptr /* stats */, RateLimiter::OpType::kWrite);
  491. });
  492. }
  493. while (limiter->GetTotalRequests() < 2) {
  494. }
  495. TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:InitialRequestsReady");
  496. TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestBegin");
  497. request_threads[2] = std::thread([&]() {
  498. limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW,
  499. nullptr /* stats */, RateLimiter::OpType::kWrite);
  500. });
  501. while (limiter->GetTotalRequests() < 3) {
  502. }
  503. TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestEnd");
  504. for (int i = 0; i < 3; i++) {
  505. request_threads[i].join();
  506. }
  507. }
  508. TEST_F(RateLimiterTest, RuntimeSingleBurstBytesChange) {
  509. constexpr int kMicrosecondsPerSecond = 1000000;
  510. const int64_t kRateBytesPerSec = 400;
  511. const int64_t kRefillBytes = 100;
  512. const int64_t kRefillPeriodMicros =
  513. kRefillBytes * kMicrosecondsPerSecond / kRateBytesPerSec;
  514. const int64_t kRefillsPerBurst = 17;
  515. const int64_t kBurstBytes = kRefillBytes * kRefillsPerBurst;
  516. auto mock_clock =
  517. std::make_shared<MockSystemClock>(Env::Default()->GetSystemClock());
  518. // Zero as `single_burst_bytes` is a special value meaning the refill size
  519. std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(
  520. kRateBytesPerSec, kRefillPeriodMicros, 10 /* fairness */,
  521. RateLimiter::Mode::kWritesOnly, mock_clock, false /* auto_tuned */,
  522. 0 /* single_burst_bytes */));
  523. ASSERT_EQ(kRefillBytes, limiter->GetSingleBurstBytes());
  524. // Dynamically setting to zero should change nothing
  525. ASSERT_OK(limiter->SetSingleBurstBytes(0));
  526. ASSERT_EQ(kRefillBytes, limiter->GetSingleBurstBytes());
  527. // Negative values are invalid and should change nothing
  528. ASSERT_TRUE(limiter->SetSingleBurstBytes(-1).IsInvalidArgument());
  529. ASSERT_EQ(kRefillBytes, limiter->GetSingleBurstBytes());
  530. // Positive values take effect as the new burst size
  531. ASSERT_OK(limiter->SetSingleBurstBytes(kBurstBytes));
  532. ASSERT_EQ(kBurstBytes, limiter->GetSingleBurstBytes());
  533. // Initially the supply is full so a request of size `kBurstBytes` needs
  534. // `kRefillsPerBurst - 1` refill periods to elapse.
  535. limiter->Request(limiter->GetSingleBurstBytes() /* bytes */,
  536. Env::IOPriority::IO_USER, nullptr /* stats */,
  537. RateLimiter::OpType::kWrite);
  538. ASSERT_EQ((kRefillsPerBurst - 1) * kRefillPeriodMicros,
  539. mock_clock->NowMicros());
  540. }
  541. } // namespace ROCKSDB_NAMESPACE
  542. int main(int argc, char** argv) {
  543. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  544. ::testing::InitGoogleTest(&argc, argv);
  545. return RUN_ALL_TESTS();
  546. }