thread_status_updater.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "monitoring/thread_status_updater.h"
  6. #include <memory>
  7. #include "port/likely.h"
  8. #include "rocksdb/env.h"
  9. #include "rocksdb/system_clock.h"
  10. #include "util/mutexlock.h"
  11. namespace ROCKSDB_NAMESPACE {
  12. #ifdef ROCKSDB_USING_THREAD_STATUS
  13. thread_local ThreadStatusData* ThreadStatusUpdater::thread_status_data_ =
  14. nullptr;
  15. void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
  16. uint64_t thread_id) {
  17. if (UNLIKELY(thread_status_data_ == nullptr)) {
  18. thread_status_data_ = new ThreadStatusData();
  19. thread_status_data_->thread_type = ttype;
  20. thread_status_data_->thread_id = thread_id;
  21. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  22. thread_data_set_.insert(thread_status_data_);
  23. }
  24. ClearThreadOperationProperties();
  25. }
  26. void ThreadStatusUpdater::UnregisterThread() {
  27. if (thread_status_data_ != nullptr) {
  28. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  29. thread_data_set_.erase(thread_status_data_);
  30. delete thread_status_data_;
  31. thread_status_data_ = nullptr;
  32. }
  33. }
  34. void ThreadStatusUpdater::ResetThreadStatus() {
  35. ClearThreadState();
  36. ClearThreadOperation();
  37. SetColumnFamilyInfoKey(nullptr);
  38. }
  39. void ThreadStatusUpdater::SetEnableTracking(bool enable_tracking) {
  40. auto* data = Get();
  41. if (data == nullptr) {
  42. return;
  43. }
  44. data->enable_tracking.store(enable_tracking, std::memory_order_relaxed);
  45. }
  46. void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
  47. auto* data = Get();
  48. if (data == nullptr) {
  49. return;
  50. }
  51. data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
  52. }
  53. const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
  54. auto* data = GetLocalThreadStatus();
  55. if (data == nullptr) {
  56. return nullptr;
  57. }
  58. return data->cf_key.load(std::memory_order_relaxed);
  59. }
  60. void ThreadStatusUpdater::SetThreadOperation(
  61. const ThreadStatus::OperationType type) {
  62. auto* data = GetLocalThreadStatus();
  63. if (data == nullptr) {
  64. return;
  65. }
  66. // NOTE: Our practice here is to set all the thread operation properties
  67. // and stage before we set thread operation, and thread operation
  68. // will be set in std::memory_order_release. This is to ensure
  69. // whenever a thread operation is not OP_UNKNOWN, we will always
  70. // have a consistent information on its properties.
  71. data->operation_type.store(type, std::memory_order_release);
  72. if (type == ThreadStatus::OP_UNKNOWN) {
  73. data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
  74. std::memory_order_relaxed);
  75. ClearThreadOperationProperties();
  76. }
  77. }
  78. ThreadStatus::OperationType ThreadStatusUpdater::GetThreadOperation() {
  79. ThreadStatusData* data = GetLocalThreadStatus();
  80. if (data == nullptr) {
  81. return ThreadStatus::OperationType::OP_UNKNOWN;
  82. }
  83. return data->operation_type.load(std::memory_order_relaxed);
  84. }
  85. void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
  86. auto* data = GetLocalThreadStatus();
  87. if (data == nullptr) {
  88. return;
  89. }
  90. data->op_properties[i].store(value, std::memory_order_relaxed);
  91. }
  92. void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
  93. uint64_t delta) {
  94. auto* data = GetLocalThreadStatus();
  95. if (data == nullptr) {
  96. return;
  97. }
  98. data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
  99. }
  100. void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
  101. auto* data = GetLocalThreadStatus();
  102. if (data == nullptr) {
  103. return;
  104. }
  105. data->op_start_time.store(start_time, std::memory_order_relaxed);
  106. }
  107. void ThreadStatusUpdater::ClearThreadOperation() {
  108. auto* data = GetLocalThreadStatus();
  109. if (data == nullptr) {
  110. return;
  111. }
  112. data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
  113. std::memory_order_relaxed);
  114. data->operation_type.store(ThreadStatus::OP_UNKNOWN,
  115. std::memory_order_relaxed);
  116. ClearThreadOperationProperties();
  117. }
  118. void ThreadStatusUpdater::ClearThreadOperationProperties() {
  119. auto* data = GetLocalThreadStatus();
  120. if (data == nullptr) {
  121. return;
  122. }
  123. for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
  124. data->op_properties[i].store(0, std::memory_order_relaxed);
  125. }
  126. }
  127. ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
  128. ThreadStatus::OperationStage stage) {
  129. auto* data = GetLocalThreadStatus();
  130. if (data == nullptr) {
  131. return ThreadStatus::STAGE_UNKNOWN;
  132. }
  133. return data->operation_stage.exchange(stage, std::memory_order_relaxed);
  134. }
  135. void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
  136. auto* data = GetLocalThreadStatus();
  137. if (data == nullptr) {
  138. return;
  139. }
  140. data->state_type.store(type, std::memory_order_relaxed);
  141. }
  142. void ThreadStatusUpdater::ClearThreadState() {
  143. auto* data = GetLocalThreadStatus();
  144. if (data == nullptr) {
  145. return;
  146. }
  147. data->state_type.store(ThreadStatus::STATE_UNKNOWN,
  148. std::memory_order_relaxed);
  149. }
  150. Status ThreadStatusUpdater::GetThreadList(
  151. std::vector<ThreadStatus>* thread_list) {
  152. thread_list->clear();
  153. std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
  154. uint64_t now_micros = SystemClock::Default()->NowMicros();
  155. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  156. for (auto* thread_data : thread_data_set_) {
  157. assert(thread_data);
  158. auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
  159. auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
  160. // Since any change to cf_info_map requires thread_list_mutex,
  161. // which is currently held by GetThreadList(), here we can safely
  162. // use "memory_order_relaxed" to load the cf_key.
  163. auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
  164. ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
  165. ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
  166. ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
  167. uint64_t op_elapsed_micros = 0;
  168. uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
  169. auto iter = cf_info_map_.find(cf_key);
  170. if (iter != cf_info_map_.end()) {
  171. op_type = thread_data->operation_type.load(std::memory_order_acquire);
  172. // display lower-level info only when higher-level info is available.
  173. if (op_type != ThreadStatus::OP_UNKNOWN) {
  174. op_elapsed_micros = now_micros - thread_data->op_start_time.load(
  175. std::memory_order_relaxed);
  176. op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
  177. state_type = thread_data->state_type.load(std::memory_order_relaxed);
  178. for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
  179. op_props[i] =
  180. thread_data->op_properties[i].load(std::memory_order_relaxed);
  181. }
  182. }
  183. }
  184. thread_list->emplace_back(
  185. thread_id, thread_type,
  186. iter != cf_info_map_.end() ? iter->second.db_name : "",
  187. iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
  188. op_elapsed_micros, op_stage, op_props, state_type);
  189. }
  190. return Status::OK();
  191. }
  192. ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
  193. if (thread_status_data_ == nullptr) {
  194. return nullptr;
  195. }
  196. if (!thread_status_data_->enable_tracking.load(std::memory_order_relaxed)) {
  197. return nullptr;
  198. }
  199. return thread_status_data_;
  200. }
  201. void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
  202. const std::string& db_name,
  203. const void* cf_key,
  204. const std::string& cf_name) {
  205. // Acquiring same lock as GetThreadList() to guarantee
  206. // a consistent view of global column family table (cf_info_map).
  207. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  208. cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
  209. std::make_tuple(db_key, db_name, cf_name));
  210. db_key_map_[db_key].insert(cf_key);
  211. }
  212. void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
  213. // Acquiring same lock as GetThreadList() to guarantee
  214. // a consistent view of global column family table (cf_info_map).
  215. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  216. auto cf_pair = cf_info_map_.find(cf_key);
  217. if (cf_pair != cf_info_map_.end()) {
  218. // Remove its entry from db_key_map_ by the following steps:
  219. // 1. Obtain the entry in db_key_map_ whose set contains cf_key
  220. // 2. Remove it from the set.
  221. ConstantColumnFamilyInfo& cf_info = cf_pair->second;
  222. auto db_pair = db_key_map_.find(cf_info.db_key);
  223. assert(db_pair != db_key_map_.end());
  224. size_t result __attribute__((__unused__));
  225. result = db_pair->second.erase(cf_key);
  226. assert(result);
  227. cf_info_map_.erase(cf_pair);
  228. }
  229. }
  230. void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
  231. // Acquiring same lock as GetThreadList() to guarantee
  232. // a consistent view of global column family table (cf_info_map).
  233. std::lock_guard<std::mutex> lck(thread_list_mutex_);
  234. auto db_pair = db_key_map_.find(db_key);
  235. if (UNLIKELY(db_pair == db_key_map_.end())) {
  236. // In some occasional cases such as DB::Open fails, we won't
  237. // register ColumnFamilyInfo for a db.
  238. return;
  239. }
  240. for (auto cf_key : db_pair->second) {
  241. auto cf_pair = cf_info_map_.find(cf_key);
  242. if (cf_pair != cf_info_map_.end()) {
  243. cf_info_map_.erase(cf_pair);
  244. }
  245. }
  246. db_key_map_.erase(db_key);
  247. }
  248. #else
  249. void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
  250. uint64_t /*thread_id*/) {}
  251. void ThreadStatusUpdater::UnregisterThread() {}
  252. void ThreadStatusUpdater::ResetThreadStatus() {}
  253. void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
  254. void ThreadStatusUpdater::SetThreadOperation(
  255. const ThreadStatus::OperationType /*type*/) {}
  256. void ThreadStatusUpdater::ClearThreadOperation() {}
  257. void ThreadStatusUpdater::SetThreadState(
  258. const ThreadStatus::StateType /*type*/) {}
  259. void ThreadStatusUpdater::ClearThreadState() {}
  260. Status ThreadStatusUpdater::GetThreadList(
  261. std::vector<ThreadStatus>* /*thread_list*/) {
  262. return Status::NotSupported(
  263. "GetThreadList is not supported in the current running environment.");
  264. }
  265. void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
  266. const std::string& /*db_name*/,
  267. const void* /*cf_key*/,
  268. const std::string& /*cf_name*/) {}
  269. void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
  270. void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
  271. void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
  272. uint64_t /*value*/) {}
  273. void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
  274. uint64_t /*delta*/) {}
  275. #endif // ROCKSDB_USING_THREAD_STATUS
  276. } // namespace ROCKSDB_NAMESPACE