| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include <mutex>
- #include <condition_variable>
- #include "monitoring/thread_status_updater.h"
- #include "rocksdb/db.h"
- #include "test_util/testharness.h"
- #ifdef ROCKSDB_USING_THREAD_STATUS
- namespace ROCKSDB_NAMESPACE {
- class SimulatedBackgroundTask {
- public:
- SimulatedBackgroundTask(
- const void* db_key, const std::string& db_name,
- const void* cf_key, const std::string& cf_name,
- const ThreadStatus::OperationType operation_type =
- ThreadStatus::OP_UNKNOWN,
- const ThreadStatus::StateType state_type =
- ThreadStatus::STATE_UNKNOWN)
- : db_key_(db_key), db_name_(db_name),
- cf_key_(cf_key), cf_name_(cf_name),
- operation_type_(operation_type), state_type_(state_type),
- should_run_(true), running_count_(0) {
- Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
- db_key_, db_name_, cf_key_, cf_name_);
- }
- ~SimulatedBackgroundTask() {
- Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
- }
- void Run() {
- std::unique_lock<std::mutex> l(mutex_);
- running_count_++;
- Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
- Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
- operation_type_);
- Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_);
- while (should_run_) {
- bg_cv_.wait(l);
- }
- Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
- Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
- Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr);
- running_count_--;
- bg_cv_.notify_all();
- }
- void FinishAllTasks() {
- std::unique_lock<std::mutex> l(mutex_);
- should_run_ = false;
- bg_cv_.notify_all();
- }
- void WaitUntilScheduled(int job_count, Env* env) {
- while (running_count_ < job_count) {
- env->SleepForMicroseconds(1000);
- }
- }
- void WaitUntilDone() {
- std::unique_lock<std::mutex> l(mutex_);
- while (running_count_ > 0) {
- bg_cv_.wait(l);
- }
- }
- static void DoSimulatedTask(void* arg) {
- reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run();
- }
- private:
- const void* db_key_;
- const std::string db_name_;
- const void* cf_key_;
- const std::string cf_name_;
- const ThreadStatus::OperationType operation_type_;
- const ThreadStatus::StateType state_type_;
- std::mutex mutex_;
- std::condition_variable bg_cv_;
- bool should_run_;
- std::atomic<int> running_count_;
- };
- class ThreadListTest : public testing::Test {
- public:
- ThreadListTest() {
- }
- };
- TEST_F(ThreadListTest, GlobalTables) {
- // verify the global tables for operations and states are properly indexed.
- for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
- ASSERT_EQ(global_operation_table[type].type, type);
- ASSERT_EQ(global_operation_table[type].name,
- ThreadStatus::GetOperationName(
- ThreadStatus::OperationType(type)));
- }
- for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) {
- ASSERT_EQ(global_state_table[type].type, type);
- ASSERT_EQ(global_state_table[type].name,
- ThreadStatus::GetStateName(
- ThreadStatus::StateType(type)));
- }
- for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) {
- ASSERT_EQ(global_op_stage_table[stage].stage, stage);
- ASSERT_EQ(global_op_stage_table[stage].name,
- ThreadStatus::GetOperationStageName(
- ThreadStatus::OperationStage(stage)));
- }
- }
- TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
- Env* env = Env::Default();
- const int kHighPriorityThreads = 3;
- const int kLowPriorityThreads = 5;
- const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
- const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
- env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
- env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
- SimulatedBackgroundTask running_task(
- reinterpret_cast<void*>(1234), "running",
- reinterpret_cast<void*>(5678), "pikachu");
- for (int test = 0; test < kSimulatedHighPriThreads; ++test) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &running_task, Env::Priority::HIGH);
- }
- for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &running_task, Env::Priority::LOW);
- }
- running_task.WaitUntilScheduled(
- kSimulatedHighPriThreads + kSimulatedLowPriThreads, env);
- std::vector<ThreadStatus> thread_list;
- // Verify the number of running threads in each pool.
- env->GetThreadList(&thread_list);
- int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0};
- for (auto thread_status : thread_list) {
- if (thread_status.cf_name == "pikachu" &&
- thread_status.db_name == "running") {
- running_count[thread_status.thread_type]++;
- }
- }
- ASSERT_EQ(
- running_count[ThreadStatus::HIGH_PRIORITY],
- kSimulatedHighPriThreads);
- ASSERT_EQ(
- running_count[ThreadStatus::LOW_PRIORITY],
- kSimulatedLowPriThreads);
- ASSERT_EQ(
- running_count[ThreadStatus::USER], 0);
- running_task.FinishAllTasks();
- running_task.WaitUntilDone();
- // Verify none of the threads are running
- env->GetThreadList(&thread_list);
- for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) {
- running_count[i] = 0;
- }
- for (auto thread_status : thread_list) {
- if (thread_status.cf_name == "pikachu" &&
- thread_status.db_name == "running") {
- running_count[thread_status.thread_type]++;
- }
- }
- ASSERT_EQ(
- running_count[ThreadStatus::HIGH_PRIORITY], 0);
- ASSERT_EQ(
- running_count[ThreadStatus::LOW_PRIORITY], 0);
- ASSERT_EQ(
- running_count[ThreadStatus::USER], 0);
- }
- namespace {
- void UpdateStatusCounts(
- const std::vector<ThreadStatus>& thread_list,
- int operation_counts[], int state_counts[]) {
- for (auto thread_status : thread_list) {
- operation_counts[thread_status.operation_type]++;
- state_counts[thread_status.state_type]++;
- }
- }
- void VerifyAndResetCounts(
- const int correct_counts[], int collected_counts[], int size) {
- for (int i = 0; i < size; ++i) {
- ASSERT_EQ(collected_counts[i], correct_counts[i]);
- collected_counts[i] = 0;
- }
- }
- void UpdateCount(
- int operation_counts[], int from_event, int to_event, int amount) {
- operation_counts[from_event] -= amount;
- operation_counts[to_event] += amount;
- }
- } // namespace
- TEST_F(ThreadListTest, SimpleEventTest) {
- Env* env = Env::Default();
- // simulated tasks
- const int kFlushWriteTasks = 3;
- SimulatedBackgroundTask flush_write_task(
- reinterpret_cast<void*>(1234), "running",
- reinterpret_cast<void*>(5678), "pikachu",
- ThreadStatus::OP_FLUSH);
- const int kCompactionWriteTasks = 4;
- SimulatedBackgroundTask compaction_write_task(
- reinterpret_cast<void*>(1234), "running",
- reinterpret_cast<void*>(5678), "pikachu",
- ThreadStatus::OP_COMPACTION);
- const int kCompactionReadTasks = 5;
- SimulatedBackgroundTask compaction_read_task(
- reinterpret_cast<void*>(1234), "running",
- reinterpret_cast<void*>(5678), "pikachu",
- ThreadStatus::OP_COMPACTION);
- const int kCompactionWaitTasks = 6;
- SimulatedBackgroundTask compaction_wait_task(
- reinterpret_cast<void*>(1234), "running",
- reinterpret_cast<void*>(5678), "pikachu",
- ThreadStatus::OP_COMPACTION);
- // setup right answers
- int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
- correct_operation_counts[ThreadStatus::OP_FLUSH] =
- kFlushWriteTasks;
- correct_operation_counts[ThreadStatus::OP_COMPACTION] =
- kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks;
- env->SetBackgroundThreads(
- correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH);
- env->SetBackgroundThreads(
- correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW);
- // schedule the simulated tasks
- for (int t = 0; t < kFlushWriteTasks; ++t) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &flush_write_task, Env::Priority::HIGH);
- }
- flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env);
- for (int t = 0; t < kCompactionWriteTasks; ++t) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &compaction_write_task, Env::Priority::LOW);
- }
- compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env);
- for (int t = 0; t < kCompactionReadTasks; ++t) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &compaction_read_task, Env::Priority::LOW);
- }
- compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env);
- for (int t = 0; t < kCompactionWaitTasks; ++t) {
- env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
- &compaction_wait_task, Env::Priority::LOW);
- }
- compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env);
- // verify the thread-status
- int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
- int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0};
- std::vector<ThreadStatus> thread_list;
- env->GetThreadList(&thread_list);
- UpdateStatusCounts(thread_list, operation_counts, state_counts);
- VerifyAndResetCounts(correct_operation_counts, operation_counts,
- ThreadStatus::NUM_OP_TYPES);
- // terminate compaction-wait tasks and see if the thread-status
- // reflects this update
- compaction_wait_task.FinishAllTasks();
- compaction_wait_task.WaitUntilDone();
- UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
- ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks);
- env->GetThreadList(&thread_list);
- UpdateStatusCounts(thread_list, operation_counts, state_counts);
- VerifyAndResetCounts(correct_operation_counts, operation_counts,
- ThreadStatus::NUM_OP_TYPES);
- // terminate flush-write tasks and see if the thread-status
- // reflects this update
- flush_write_task.FinishAllTasks();
- flush_write_task.WaitUntilDone();
- UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH,
- ThreadStatus::OP_UNKNOWN, kFlushWriteTasks);
- env->GetThreadList(&thread_list);
- UpdateStatusCounts(thread_list, operation_counts, state_counts);
- VerifyAndResetCounts(correct_operation_counts, operation_counts,
- ThreadStatus::NUM_OP_TYPES);
- // terminate compaction-write tasks and see if the thread-status
- // reflects this update
- compaction_write_task.FinishAllTasks();
- compaction_write_task.WaitUntilDone();
- UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
- ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks);
- env->GetThreadList(&thread_list);
- UpdateStatusCounts(thread_list, operation_counts, state_counts);
- VerifyAndResetCounts(correct_operation_counts, operation_counts,
- ThreadStatus::NUM_OP_TYPES);
- // terminate compaction-write tasks and see if the thread-status
- // reflects this update
- compaction_read_task.FinishAllTasks();
- compaction_read_task.WaitUntilDone();
- UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
- ThreadStatus::OP_UNKNOWN, kCompactionReadTasks);
- env->GetThreadList(&thread_list);
- UpdateStatusCounts(thread_list, operation_counts, state_counts);
- VerifyAndResetCounts(correct_operation_counts, operation_counts,
- ThreadStatus::NUM_OP_TYPES);
- }
- } // namespace ROCKSDB_NAMESPACE
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
- #else
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return 0;
- }
- #endif // ROCKSDB_USING_THREAD_STATUS
|