thread_list_test.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <condition_variable>
  6. #include <mutex>
  7. #include "monitoring/thread_status_updater.h"
  8. #include "rocksdb/db.h"
  9. #include "test_util/testharness.h"
  10. #ifdef ROCKSDB_USING_THREAD_STATUS
  11. namespace ROCKSDB_NAMESPACE {
  12. class SimulatedBackgroundTask {
  13. public:
  14. SimulatedBackgroundTask(
  15. const void* db_key, const std::string& db_name, const void* cf_key,
  16. const std::string& cf_name,
  17. const ThreadStatus::OperationType operation_type =
  18. ThreadStatus::OP_UNKNOWN,
  19. const ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN)
  20. : db_key_(db_key),
  21. db_name_(db_name),
  22. cf_key_(cf_key),
  23. cf_name_(cf_name),
  24. operation_type_(operation_type),
  25. state_type_(state_type),
  26. should_run_(true),
  27. running_count_(0) {
  28. Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
  29. db_key_, db_name_, cf_key_, cf_name_);
  30. }
  31. ~SimulatedBackgroundTask() {
  32. Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
  33. }
  34. void Run() {
  35. std::unique_lock<std::mutex> l(mutex_);
  36. running_count_++;
  37. bg_cv_.notify_all();
  38. assert(cf_key_);
  39. Env::Default()->GetThreadStatusUpdater()->SetEnableTracking(true);
  40. Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
  41. Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
  42. operation_type_);
  43. Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_);
  44. while (should_run_) {
  45. bg_cv_.wait(l);
  46. }
  47. Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
  48. Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
  49. Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr);
  50. running_count_--;
  51. bg_cv_.notify_all();
  52. }
  53. void FinishAllTasks() {
  54. std::unique_lock<std::mutex> l(mutex_);
  55. should_run_ = false;
  56. bg_cv_.notify_all();
  57. }
  58. void WaitUntilScheduled(int job_count) {
  59. std::unique_lock<std::mutex> l(mutex_);
  60. while (running_count_ < job_count) {
  61. bg_cv_.wait(l);
  62. }
  63. }
  64. void WaitUntilDone() {
  65. std::unique_lock<std::mutex> l(mutex_);
  66. while (running_count_ > 0) {
  67. bg_cv_.wait(l);
  68. }
  69. }
  70. static void DoSimulatedTask(void* arg) {
  71. static_cast<SimulatedBackgroundTask*>(arg)->Run();
  72. }
  73. private:
  74. const void* db_key_;
  75. const std::string db_name_;
  76. const void* cf_key_;
  77. const std::string cf_name_;
  78. const ThreadStatus::OperationType operation_type_;
  79. const ThreadStatus::StateType state_type_;
  80. std::mutex mutex_;
  81. std::condition_variable bg_cv_;
  82. bool should_run_;
  83. std::atomic<int> running_count_;
  84. };
  85. class ThreadListTest : public testing::Test {
  86. public:
  87. ThreadListTest() = default;
  88. };
  89. TEST_F(ThreadListTest, GlobalTables) {
  90. // verify the global tables for operations and states are properly indexed.
  91. for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
  92. ASSERT_EQ(global_operation_table[type].type, type);
  93. ASSERT_EQ(
  94. global_operation_table[type].name,
  95. ThreadStatus::GetOperationName(ThreadStatus::OperationType(type)));
  96. }
  97. for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) {
  98. ASSERT_EQ(global_state_table[type].type, type);
  99. ASSERT_EQ(global_state_table[type].name,
  100. ThreadStatus::GetStateName(ThreadStatus::StateType(type)));
  101. }
  102. for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) {
  103. ASSERT_EQ(global_op_stage_table[stage].stage, stage);
  104. ASSERT_EQ(global_op_stage_table[stage].name,
  105. ThreadStatus::GetOperationStageName(
  106. ThreadStatus::OperationStage(stage)));
  107. }
  108. }
  109. TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
  110. Env* env = Env::Default();
  111. const int kHighPriorityThreads = 3;
  112. const int kLowPriorityThreads = 5;
  113. const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
  114. const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
  115. const int kDelayMicros = 1000000;
  116. env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
  117. env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
  118. // Wait 1 second so that threads start
  119. Env::Default()->SleepForMicroseconds(kDelayMicros);
  120. SimulatedBackgroundTask running_task(reinterpret_cast<void*>(1234), "running",
  121. reinterpret_cast<void*>(5678),
  122. "pikachu");
  123. for (int test = 0; test < kSimulatedHighPriThreads; ++test) {
  124. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, &running_task,
  125. Env::Priority::HIGH);
  126. }
  127. for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
  128. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, &running_task,
  129. Env::Priority::LOW);
  130. }
  131. running_task.WaitUntilScheduled(kSimulatedHighPriThreads +
  132. kSimulatedLowPriThreads);
  133. // We can only reserve limited number of waiting threads
  134. ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads,
  135. env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH));
  136. ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads,
  137. env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW));
  138. // Reservation shall not affect the existing thread list
  139. std::vector<ThreadStatus> thread_list;
  140. // Verify the number of running threads in each pool.
  141. ASSERT_OK(env->GetThreadList(&thread_list));
  142. int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0};
  143. for (const auto& thread_status : thread_list) {
  144. if (thread_status.cf_name == "pikachu" &&
  145. thread_status.db_name == "running") {
  146. running_count[thread_status.thread_type]++;
  147. }
  148. }
  149. // Cannot reserve more threads
  150. ASSERT_EQ(0, env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH));
  151. ASSERT_EQ(0, env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW));
  152. ASSERT_EQ(running_count[ThreadStatus::HIGH_PRIORITY],
  153. kSimulatedHighPriThreads);
  154. ASSERT_EQ(running_count[ThreadStatus::LOW_PRIORITY], kSimulatedLowPriThreads);
  155. ASSERT_EQ(running_count[ThreadStatus::USER], 0);
  156. running_task.FinishAllTasks();
  157. running_task.WaitUntilDone();
  158. ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads,
  159. env->ReleaseThreads(kHighPriorityThreads, Env::Priority::HIGH));
  160. ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads,
  161. env->ReleaseThreads(kLowPriorityThreads, Env::Priority::LOW));
  162. // Verify none of the threads are running
  163. ASSERT_OK(env->GetThreadList(&thread_list));
  164. for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) {
  165. running_count[i] = 0;
  166. }
  167. for (const auto& thread_status : thread_list) {
  168. if (thread_status.cf_name == "pikachu" &&
  169. thread_status.db_name == "running") {
  170. running_count[thread_status.thread_type]++;
  171. }
  172. }
  173. ASSERT_EQ(running_count[ThreadStatus::HIGH_PRIORITY], 0);
  174. ASSERT_EQ(running_count[ThreadStatus::LOW_PRIORITY], 0);
  175. ASSERT_EQ(running_count[ThreadStatus::USER], 0);
  176. }
  177. namespace {
  178. void UpdateStatusCounts(const std::vector<ThreadStatus>& thread_list,
  179. int operation_counts[], int state_counts[]) {
  180. for (const auto& thread_status : thread_list) {
  181. operation_counts[thread_status.operation_type]++;
  182. state_counts[thread_status.state_type]++;
  183. }
  184. }
  185. void VerifyAndResetCounts(const int correct_counts[], int collected_counts[],
  186. int size) {
  187. for (int i = 0; i < size; ++i) {
  188. ASSERT_EQ(collected_counts[i], correct_counts[i]);
  189. collected_counts[i] = 0;
  190. }
  191. }
  192. void UpdateCount(int operation_counts[], int from_event, int to_event,
  193. int amount) {
  194. operation_counts[from_event] -= amount;
  195. operation_counts[to_event] += amount;
  196. }
  197. } // namespace
  198. TEST_F(ThreadListTest, SimpleEventTest) {
  199. Env* env = Env::Default();
  200. // simulated tasks
  201. const int kFlushWriteTasks = 3;
  202. SimulatedBackgroundTask flush_write_task(
  203. reinterpret_cast<void*>(1234), "running", reinterpret_cast<void*>(5678),
  204. "pikachu", ThreadStatus::OP_FLUSH);
  205. const int kCompactionWriteTasks = 4;
  206. SimulatedBackgroundTask compaction_write_task(
  207. reinterpret_cast<void*>(1234), "running", reinterpret_cast<void*>(5678),
  208. "pikachu", ThreadStatus::OP_COMPACTION);
  209. const int kCompactionReadTasks = 5;
  210. SimulatedBackgroundTask compaction_read_task(
  211. reinterpret_cast<void*>(1234), "running", reinterpret_cast<void*>(5678),
  212. "pikachu", ThreadStatus::OP_COMPACTION);
  213. const int kCompactionWaitTasks = 6;
  214. SimulatedBackgroundTask compaction_wait_task(
  215. reinterpret_cast<void*>(1234), "running", reinterpret_cast<void*>(5678),
  216. "pikachu", ThreadStatus::OP_COMPACTION);
  217. // setup right answers
  218. int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
  219. correct_operation_counts[ThreadStatus::OP_FLUSH] = kFlushWriteTasks;
  220. correct_operation_counts[ThreadStatus::OP_COMPACTION] =
  221. kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks;
  222. env->SetBackgroundThreads(correct_operation_counts[ThreadStatus::OP_FLUSH],
  223. Env::HIGH);
  224. env->SetBackgroundThreads(
  225. correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW);
  226. // schedule the simulated tasks
  227. for (int t = 0; t < kFlushWriteTasks; ++t) {
  228. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, &flush_write_task,
  229. Env::Priority::HIGH);
  230. }
  231. flush_write_task.WaitUntilScheduled(kFlushWriteTasks);
  232. for (int t = 0; t < kCompactionWriteTasks; ++t) {
  233. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
  234. &compaction_write_task, Env::Priority::LOW);
  235. }
  236. compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks);
  237. for (int t = 0; t < kCompactionReadTasks; ++t) {
  238. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
  239. &compaction_read_task, Env::Priority::LOW);
  240. }
  241. compaction_read_task.WaitUntilScheduled(kCompactionReadTasks);
  242. for (int t = 0; t < kCompactionWaitTasks; ++t) {
  243. env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
  244. &compaction_wait_task, Env::Priority::LOW);
  245. }
  246. compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks);
  247. // verify the thread-status
  248. int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
  249. int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0};
  250. std::vector<ThreadStatus> thread_list;
  251. ASSERT_OK(env->GetThreadList(&thread_list));
  252. UpdateStatusCounts(thread_list, operation_counts, state_counts);
  253. VerifyAndResetCounts(correct_operation_counts, operation_counts,
  254. ThreadStatus::NUM_OP_TYPES);
  255. // terminate compaction-wait tasks and see if the thread-status
  256. // reflects this update
  257. compaction_wait_task.FinishAllTasks();
  258. compaction_wait_task.WaitUntilDone();
  259. UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
  260. ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks);
  261. ASSERT_OK(env->GetThreadList(&thread_list));
  262. UpdateStatusCounts(thread_list, operation_counts, state_counts);
  263. VerifyAndResetCounts(correct_operation_counts, operation_counts,
  264. ThreadStatus::NUM_OP_TYPES);
  265. // terminate flush-write tasks and see if the thread-status
  266. // reflects this update
  267. flush_write_task.FinishAllTasks();
  268. flush_write_task.WaitUntilDone();
  269. UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH,
  270. ThreadStatus::OP_UNKNOWN, kFlushWriteTasks);
  271. ASSERT_OK(env->GetThreadList(&thread_list));
  272. UpdateStatusCounts(thread_list, operation_counts, state_counts);
  273. VerifyAndResetCounts(correct_operation_counts, operation_counts,
  274. ThreadStatus::NUM_OP_TYPES);
  275. // terminate compaction-write tasks and see if the thread-status
  276. // reflects this update
  277. compaction_write_task.FinishAllTasks();
  278. compaction_write_task.WaitUntilDone();
  279. UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
  280. ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks);
  281. ASSERT_OK(env->GetThreadList(&thread_list));
  282. UpdateStatusCounts(thread_list, operation_counts, state_counts);
  283. VerifyAndResetCounts(correct_operation_counts, operation_counts,
  284. ThreadStatus::NUM_OP_TYPES);
  285. // terminate compaction-write tasks and see if the thread-status
  286. // reflects this update
  287. compaction_read_task.FinishAllTasks();
  288. compaction_read_task.WaitUntilDone();
  289. UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
  290. ThreadStatus::OP_UNKNOWN, kCompactionReadTasks);
  291. ASSERT_OK(env->GetThreadList(&thread_list));
  292. UpdateStatusCounts(thread_list, operation_counts, state_counts);
  293. VerifyAndResetCounts(correct_operation_counts, operation_counts,
  294. ThreadStatus::NUM_OP_TYPES);
  295. }
  296. } // namespace ROCKSDB_NAMESPACE
  297. int main(int argc, char** argv) {
  298. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  299. ::testing::InitGoogleTest(&argc, argv);
  300. return RUN_ALL_TESTS();
  301. }
  302. #else
  303. int main(int argc, char** argv) {
  304. ::testing::InitGoogleTest(&argc, argv);
  305. return 0;
  306. }
  307. #endif // ROCKSDB_USING_THREAD_STATUS