sync_point_impl.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <assert.h>
  6. #include <atomic>
  7. #include <condition_variable>
  8. #include <functional>
  9. #include <mutex>
  10. #include <string>
  11. #include <thread>
  12. #include <unordered_map>
  13. #include <unordered_set>
  14. #include "memory/concurrent_arena.h"
  15. #include "port/port.h"
  16. #include "test_util/sync_point.h"
  17. #include "util/dynamic_bloom.h"
  18. #include "util/random.h"
  19. #pragma once
  20. #ifndef NDEBUG
  21. namespace ROCKSDB_NAMESPACE {
  22. // A hacky allocator for single use.
  23. // Arena depends on SyncPoint and create circular dependency.
  24. class SingleAllocator : public Allocator {
  25. public:
  26. char* Allocate(size_t) override {
  27. assert(false);
  28. return nullptr;
  29. }
  30. char* AllocateAligned(size_t bytes, size_t, Logger*) override {
  31. buf_.resize(bytes);
  32. return const_cast<char*>(buf_.data());
  33. }
  34. size_t BlockSize() const override {
  35. assert(false);
  36. return 0;
  37. }
  38. private:
  39. std::string buf_;
  40. };
  41. struct SyncPoint::Data {
  42. Data() : point_filter_(&alloc_, /*total_bits=*/8192), enabled_(false) {}
  43. // Enable proper deletion by subclasses
  44. virtual ~Data() {}
  45. // successor/predecessor map loaded from LoadDependency
  46. std::unordered_map<std::string, std::vector<std::string>> successors_;
  47. std::unordered_map<std::string, std::vector<std::string>> predecessors_;
  48. std::unordered_map<std::string, std::function<void(void*)>> callbacks_;
  49. std::unordered_map<std::string, std::vector<std::string>> markers_;
  50. std::unordered_map<std::string, std::thread::id> marked_thread_id_;
  51. std::mutex mutex_;
  52. std::condition_variable cv_;
  53. // sync points that have been passed through
  54. std::unordered_set<std::string> cleared_points_;
  55. SingleAllocator alloc_;
  56. // A filter before holding mutex to speed up process.
  57. DynamicBloom point_filter_;
  58. std::atomic<bool> enabled_;
  59. int num_callbacks_running_ = 0;
  60. void LoadDependency(const std::vector<SyncPointPair>& dependencies);
  61. void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
  62. const std::vector<SyncPointPair>& markers);
  63. bool PredecessorsAllCleared(const std::string& point);
  64. void SetCallBack(const std::string& point,
  65. const std::function<void(void*)>& callback) {
  66. std::lock_guard<std::mutex> lock(mutex_);
  67. callbacks_[point] = callback;
  68. point_filter_.Add(point);
  69. }
  70. void ClearCallBack(const std::string& point);
  71. void ClearAllCallBacks();
  72. void EnableProcessing() { enabled_ = true; }
  73. void DisableProcessing() { enabled_ = false; }
  74. void ClearTrace() {
  75. std::lock_guard<std::mutex> lock(mutex_);
  76. cleared_points_.clear();
  77. }
  78. bool DisabledByMarker(const std::string& point, std::thread::id thread_id) {
  79. auto marked_point_iter = marked_thread_id_.find(point);
  80. return marked_point_iter != marked_thread_id_.end() &&
  81. thread_id != marked_point_iter->second;
  82. }
  83. void Process(const Slice& point, void* cb_arg);
  84. };
  85. } // namespace ROCKSDB_NAMESPACE
  86. #endif // NDEBUG