| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
- // This source code is licensed under both the GPLv2 (found in the
- // COPYING file in the root directory) and Apache 2.0 License
- // (found in the LICENSE.Apache file in the root directory).
- #include "test_util/sync_point_impl.h"
- #ifndef NDEBUG
- namespace ROCKSDB_NAMESPACE {
- void TestKillRandom(std::string kill_point, int odds,
- const std::string& srcfile, int srcline) {
- for (auto& p : rocksdb_kill_prefix_blacklist) {
- if (kill_point.substr(0, p.length()) == p) {
- return;
- }
- }
- assert(odds > 0);
- if (odds % 7 == 0) {
- // class Random uses multiplier 16807, which is 7^5. If odds are
- // multiplier of 7, there might be limited values generated.
- odds++;
- }
- auto* r = Random::GetTLSInstance();
- bool crash = r->OneIn(odds);
- if (crash) {
- port::Crash(srcfile, srcline);
- }
- }
- void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
- std::lock_guard<std::mutex> lock(mutex_);
- successors_.clear();
- predecessors_.clear();
- cleared_points_.clear();
- for (const auto& dependency : dependencies) {
- successors_[dependency.predecessor].push_back(dependency.successor);
- predecessors_[dependency.successor].push_back(dependency.predecessor);
- }
- cv_.notify_all();
- }
- void SyncPoint::Data::LoadDependencyAndMarkers(
- const std::vector<SyncPointPair>& dependencies,
- const std::vector<SyncPointPair>& markers) {
- std::lock_guard<std::mutex> lock(mutex_);
- successors_.clear();
- predecessors_.clear();
- cleared_points_.clear();
- markers_.clear();
- marked_thread_id_.clear();
- for (const auto& dependency : dependencies) {
- successors_[dependency.predecessor].push_back(dependency.successor);
- predecessors_[dependency.successor].push_back(dependency.predecessor);
- }
- for (const auto& marker : markers) {
- successors_[marker.predecessor].push_back(marker.successor);
- predecessors_[marker.successor].push_back(marker.predecessor);
- markers_[marker.predecessor].push_back(marker.successor);
- }
- cv_.notify_all();
- }
- bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
- for (const auto& pred : predecessors_[point]) {
- if (cleared_points_.count(pred) == 0) {
- return false;
- }
- }
- return true;
- }
- void SyncPoint::Data::ClearCallBack(const std::string& point) {
- std::unique_lock<std::mutex> lock(mutex_);
- while (num_callbacks_running_ > 0) {
- cv_.wait(lock);
- }
- callbacks_.erase(point);
- }
- void SyncPoint::Data::ClearAllCallBacks() {
- std::unique_lock<std::mutex> lock(mutex_);
- while (num_callbacks_running_ > 0) {
- cv_.wait(lock);
- }
- callbacks_.clear();
- }
- void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
- if (!enabled_) {
- return;
- }
- std::unique_lock<std::mutex> lock(mutex_);
- auto thread_id = std::this_thread::get_id();
- auto marker_iter = markers_.find(point);
- if (marker_iter != markers_.end()) {
- for (auto& marked_point : marker_iter->second) {
- marked_thread_id_.emplace(marked_point, thread_id);
- }
- }
- if (DisabledByMarker(point, thread_id)) {
- return;
- }
- while (!PredecessorsAllCleared(point)) {
- cv_.wait(lock);
- if (DisabledByMarker(point, thread_id)) {
- return;
- }
- }
- auto callback_pair = callbacks_.find(point);
- if (callback_pair != callbacks_.end()) {
- num_callbacks_running_++;
- mutex_.unlock();
- callback_pair->second(cb_arg);
- mutex_.lock();
- num_callbacks_running_--;
- }
- cleared_points_.insert(point);
- cv_.notify_all();
- }
- } // namespace ROCKSDB_NAMESPACE
- #endif
|