periodic_task_scheduler_test.cc 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "db/periodic_task_scheduler.h"
  7. #include "db/db_test_util.h"
  8. #include "env/composite_env_wrapper.h"
  9. #include "test_util/mock_time_env.h"
  10. namespace ROCKSDB_NAMESPACE {
  11. class PeriodicTaskSchedulerTest : public DBTestBase {
  12. public:
  13. PeriodicTaskSchedulerTest()
  14. : DBTestBase("periodic_task_scheduler_test", /*env_do_fsync=*/true) {
  15. mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
  16. mock_env_.reset(new CompositeEnvWrapper(env_, mock_clock_));
  17. }
  18. protected:
  19. std::unique_ptr<Env> mock_env_;
  20. std::shared_ptr<MockSystemClock> mock_clock_;
  21. void SetUp() override {
  22. mock_clock_->InstallTimedWaitFixCallback();
  23. SyncPoint::GetInstance()->SetCallBack(
  24. "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) {
  25. auto periodic_task_scheduler_ptr =
  26. static_cast<PeriodicTaskScheduler*>(arg);
  27. periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get());
  28. });
  29. }
  30. };
  31. TEST_F(PeriodicTaskSchedulerTest, Basic) {
  32. constexpr unsigned int kPeriodSec = 10;
  33. Close();
  34. Options options;
  35. options.stats_dump_period_sec = kPeriodSec;
  36. options.stats_persist_period_sec = kPeriodSec;
  37. options.create_if_missing = true;
  38. options.env = mock_env_.get();
  39. int dump_st_counter = 0;
  40. SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning",
  41. [&](void*) { dump_st_counter++; });
  42. int pst_st_counter = 0;
  43. SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
  44. [&](void*) { pst_st_counter++; });
  45. int flush_info_log_counter = 0;
  46. SyncPoint::GetInstance()->SetCallBack(
  47. "DBImpl::FlushInfoLog:StartRunning",
  48. [&](void*) { flush_info_log_counter++; });
  49. int trigger_compaction_counter = 0;
  50. SyncPoint::GetInstance()->SetCallBack(
  51. "DBImpl::TriggerPeriodicCompaction:StartRunning",
  52. [&](void*) { trigger_compaction_counter++; });
  53. SyncPoint::GetInstance()->EnableProcessing();
  54. Reopen(options);
  55. ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_dump_period_sec);
  56. ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec);
  57. ASSERT_GT(kPeriodSec, 1u);
  58. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  59. mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec) - 1);
  60. });
  61. const PeriodicTaskScheduler& scheduler =
  62. dbfull()->TEST_GetPeriodicTaskScheduler();
  63. ASSERT_EQ((int)PeriodicTaskType::kMax - 1, scheduler.TEST_GetValidTaskNum());
  64. ASSERT_EQ(1, dump_st_counter);
  65. ASSERT_EQ(1, pst_st_counter);
  66. ASSERT_EQ(1, flush_info_log_counter);
  67. dbfull()->TEST_WaitForPeriodicTaskRun(
  68. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
  69. ASSERT_EQ(2, dump_st_counter);
  70. ASSERT_EQ(2, pst_st_counter);
  71. ASSERT_EQ(2, flush_info_log_counter);
  72. dbfull()->TEST_WaitForPeriodicTaskRun(
  73. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
  74. ASSERT_EQ(3, dump_st_counter);
  75. ASSERT_EQ(3, pst_st_counter);
  76. ASSERT_EQ(3, flush_info_log_counter);
  77. // Disable scheduler with SetOption
  78. ASSERT_OK(dbfull()->SetDBOptions(
  79. {{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}}));
  80. ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
  81. ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
  82. // Info log flush should still run.
  83. dbfull()->TEST_WaitForPeriodicTaskRun(
  84. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
  85. ASSERT_EQ(3, dump_st_counter);
  86. ASSERT_EQ(3, pst_st_counter);
  87. ASSERT_EQ(4, flush_info_log_counter);
  88. ASSERT_EQ(2u, scheduler.TEST_GetValidTaskNum());
  89. // Re-enable one task
  90. ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
  91. ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
  92. ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
  93. ASSERT_EQ(3, scheduler.TEST_GetValidTaskNum());
  94. dbfull()->TEST_WaitForPeriodicTaskRun(
  95. [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
  96. ASSERT_EQ(4, dump_st_counter);
  97. ASSERT_EQ(3, pst_st_counter);
  98. ASSERT_EQ(5, flush_info_log_counter);
  99. ASSERT_EQ(0, trigger_compaction_counter);
  100. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  101. mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
  102. });
  103. ASSERT_EQ(1, trigger_compaction_counter);
  104. dbfull()->TEST_WaitForPeriodicTaskRun([&] {
  105. mock_clock_->MockSleepForSeconds(static_cast<int>(12 * 60 * 60));
  106. });
  107. ASSERT_EQ(2, trigger_compaction_counter);
  108. Close();
  109. }
  110. TEST_F(PeriodicTaskSchedulerTest, MultiInstances) {
  111. constexpr int kPeriodSec = 5;
  112. const int kInstanceNum = 10;
  113. Close();
  114. Options options;
  115. options.stats_dump_period_sec = kPeriodSec;
  116. options.stats_persist_period_sec = kPeriodSec;
  117. options.create_if_missing = true;
  118. options.env = mock_env_.get();
  119. int dump_st_counter = 0;
  120. SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2",
  121. [&](void*) { dump_st_counter++; });
  122. int pst_st_counter = 0;
  123. SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
  124. [&](void*) { pst_st_counter++; });
  125. SyncPoint::GetInstance()->EnableProcessing();
  126. auto dbs = std::vector<DB*>(kInstanceNum);
  127. for (int i = 0; i < kInstanceNum; i++) {
  128. ASSERT_OK(
  129. DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
  130. }
  131. auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
  132. const PeriodicTaskScheduler& scheduler = dbi->TEST_GetPeriodicTaskScheduler();
  133. // kRecordSeqnoTime is not registered since the feature is not enabled
  134. ASSERT_EQ(kInstanceNum * ((int)PeriodicTaskType::kMax - 1),
  135. scheduler.TEST_GetValidTaskNum());
  136. int expected_run = kInstanceNum;
  137. dbi->TEST_WaitForPeriodicTaskRun(
  138. [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); });
  139. ASSERT_EQ(expected_run, dump_st_counter);
  140. ASSERT_EQ(expected_run, pst_st_counter);
  141. expected_run += kInstanceNum;
  142. dbi->TEST_WaitForPeriodicTaskRun(
  143. [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
  144. ASSERT_EQ(expected_run, dump_st_counter);
  145. ASSERT_EQ(expected_run, pst_st_counter);
  146. expected_run += kInstanceNum;
  147. dbi->TEST_WaitForPeriodicTaskRun(
  148. [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
  149. ASSERT_EQ(expected_run, dump_st_counter);
  150. ASSERT_EQ(expected_run, pst_st_counter);
  151. int half = kInstanceNum / 2;
  152. for (int i = 0; i < half; i++) {
  153. delete dbs[i];
  154. }
  155. expected_run += (kInstanceNum - half) * 2;
  156. dbi->TEST_WaitForPeriodicTaskRun(
  157. [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
  158. dbi->TEST_WaitForPeriodicTaskRun(
  159. [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
  160. ASSERT_EQ(expected_run, dump_st_counter);
  161. ASSERT_EQ(expected_run, pst_st_counter);
  162. for (int i = half; i < kInstanceNum; i++) {
  163. ASSERT_OK(dbs[i]->Close());
  164. delete dbs[i];
  165. }
  166. }
  167. TEST_F(PeriodicTaskSchedulerTest, MultiEnv) {
  168. constexpr int kDumpPeriodSec = 5;
  169. constexpr int kPersistPeriodSec = 10;
  170. Close();
  171. Options options1;
  172. options1.stats_dump_period_sec = kDumpPeriodSec;
  173. options1.stats_persist_period_sec = kPersistPeriodSec;
  174. options1.create_if_missing = true;
  175. options1.env = mock_env_.get();
  176. Reopen(options1);
  177. std::unique_ptr<Env> mock_env2(
  178. new CompositeEnvWrapper(Env::Default(), mock_clock_));
  179. Options options2;
  180. options2.stats_dump_period_sec = kDumpPeriodSec;
  181. options2.stats_persist_period_sec = kPersistPeriodSec;
  182. options2.create_if_missing = true;
  183. options1.env = mock_env2.get();
  184. std::string dbname = test::PerThreadDBPath("multi_env_test");
  185. DB* db;
  186. ASSERT_OK(DB::Open(options2, dbname, &db));
  187. ASSERT_OK(db->Close());
  188. delete db;
  189. Close();
  190. }
  191. } // namespace ROCKSDB_NAMESPACE
  192. int main(int argc, char** argv) {
  193. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  194. ::testing::InitGoogleTest(&argc, argv);
  195. return RUN_ALL_TESTS();
  196. }