sync_point_impl.cc 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "test_util/sync_point_impl.h"
  6. #ifndef NDEBUG
  7. namespace ROCKSDB_NAMESPACE {
  8. KillPoint* KillPoint::GetInstance() {
  9. static KillPoint kp;
  10. return &kp;
  11. }
  12. void KillPoint::TestKillRandom(std::string kill_point, int odds_weight,
  13. const std::string& srcfile, int srcline) {
  14. if (rocksdb_kill_odds <= 0) {
  15. return;
  16. }
  17. int odds = rocksdb_kill_odds * odds_weight;
  18. for (auto& p : rocksdb_kill_exclude_prefixes) {
  19. if (kill_point.substr(0, p.length()) == p) {
  20. return;
  21. }
  22. }
  23. assert(odds > 0);
  24. if (odds % 7 == 0) {
  25. // class Random uses multiplier 16807, which is 7^5. If odds are
  26. // multiplier of 7, there might be limited values generated.
  27. odds++;
  28. }
  29. auto* r = Random::GetTLSInstance();
  30. bool crash = r->OneIn(odds);
  31. if (crash) {
  32. port::Crash(srcfile, srcline);
  33. }
  34. }
  35. void SyncPoint::Data::LoadDependency(
  36. const std::vector<SyncPointPair>& dependencies) {
  37. std::lock_guard<std::mutex> lock(mutex_);
  38. successors_.clear();
  39. predecessors_.clear();
  40. cleared_points_.clear();
  41. for (const auto& dependency : dependencies) {
  42. successors_[dependency.predecessor].push_back(dependency.successor);
  43. predecessors_[dependency.successor].push_back(dependency.predecessor);
  44. point_filter_.Add(dependency.successor);
  45. point_filter_.Add(dependency.predecessor);
  46. }
  47. cv_.notify_all();
  48. }
  49. void SyncPoint::Data::LoadDependencyAndMarkers(
  50. const std::vector<SyncPointPair>& dependencies,
  51. const std::vector<SyncPointPair>& markers) {
  52. std::lock_guard<std::mutex> lock(mutex_);
  53. successors_.clear();
  54. predecessors_.clear();
  55. cleared_points_.clear();
  56. markers_.clear();
  57. marked_thread_id_.clear();
  58. for (const auto& dependency : dependencies) {
  59. successors_[dependency.predecessor].push_back(dependency.successor);
  60. predecessors_[dependency.successor].push_back(dependency.predecessor);
  61. point_filter_.Add(dependency.successor);
  62. point_filter_.Add(dependency.predecessor);
  63. }
  64. for (const auto& marker : markers) {
  65. successors_[marker.predecessor].push_back(marker.successor);
  66. predecessors_[marker.successor].push_back(marker.predecessor);
  67. markers_[marker.predecessor].push_back(marker.successor);
  68. point_filter_.Add(marker.predecessor);
  69. point_filter_.Add(marker.successor);
  70. }
  71. cv_.notify_all();
  72. }
  73. bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
  74. for (const auto& pred : predecessors_[point]) {
  75. if (cleared_points_.count(pred) == 0) {
  76. return false;
  77. }
  78. }
  79. return true;
  80. }
  81. void SyncPoint::Data::ClearCallBack(const std::string& point) {
  82. std::unique_lock<std::mutex> lock(mutex_);
  83. while (num_callbacks_running_ > 0) {
  84. cv_.wait(lock);
  85. }
  86. callbacks_.erase(point);
  87. }
  88. void SyncPoint::Data::ClearAllCallBacks() {
  89. std::unique_lock<std::mutex> lock(mutex_);
  90. while (num_callbacks_running_ > 0) {
  91. cv_.wait(lock);
  92. }
  93. callbacks_.clear();
  94. }
  95. void SyncPoint::Data::Process(const Slice& point, void* cb_arg) {
  96. if (!enabled_) {
  97. return;
  98. }
  99. // Use a filter to prevent mutex lock if possible.
  100. if (!point_filter_.MayContain(point)) {
  101. return;
  102. }
  103. // Must convert to std::string for remaining work. Take
  104. // heap hit.
  105. std::string point_string(point.ToString());
  106. std::unique_lock<std::mutex> lock(mutex_);
  107. auto thread_id = std::this_thread::get_id();
  108. auto marker_iter = markers_.find(point_string);
  109. if (marker_iter != markers_.end()) {
  110. for (auto& marked_point : marker_iter->second) {
  111. marked_thread_id_.emplace(marked_point, thread_id);
  112. point_filter_.Add(marked_point);
  113. }
  114. }
  115. if (DisabledByMarker(point_string, thread_id)) {
  116. return;
  117. }
  118. while (!PredecessorsAllCleared(point_string)) {
  119. cv_.wait(lock);
  120. if (DisabledByMarker(point_string, thread_id)) {
  121. return;
  122. }
  123. }
  124. auto callback_pair = callbacks_.find(point_string);
  125. if (callback_pair != callbacks_.end()) {
  126. num_callbacks_running_++;
  127. mutex_.unlock();
  128. callback_pair->second(cb_arg);
  129. mutex_.lock();
  130. num_callbacks_running_--;
  131. }
  132. cleared_points_.insert(point_string);
  133. cv_.notify_all();
  134. }
  135. } // namespace ROCKSDB_NAMESPACE
  136. #endif