semaphore.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <cassert>
  7. #include <condition_variable>
  8. #include <mutex>
  9. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  10. #include <semaphore>
  11. #endif
  12. #include "port/port.h"
  13. #include "rocksdb/rocksdb_namespace.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. // Wrapper providing a chosen counting semaphore implementation. The default
  16. // implementation based on a mutex and condvar unfortunately can result in
  17. // Release() temporarily waiting on another thread to make progress (if that
  18. // other thread is preempted while holding the mutex), but that should be rare.
  19. // However, alternative implementations may have correctness issues or even
  20. // worse performance. See std::counting_semaphore for general contract.
  21. //
  22. // NOTE1: std::counting_semaphore is known to be buggy on many std library
  23. // implementations, so be cautious about enabling it. Reportedly, an acquire()
  24. // can falsely block indefinitely. And we can't easily work around that with
  25. // try_acquire_for because another common bug has that function consistently
  26. // sleeping for the entire timeout duration even if a release() happens earlier.
  27. // Therefore, using std::counting_semaphore/binary_semaphore is strictly opt-in
  28. // for now.
  29. //
  30. // NOTE2: Also tried wrapping folly::fibers::Semaphore here but it was not as
  31. // efficient (for parallel compression) as even the mutex+condvar version.
  32. class ALIGN_AS(CACHE_LINE_SIZE) CountingSemaphore {
  33. public:
  34. explicit CountingSemaphore(std::ptrdiff_t starting_count)
  35. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  36. : sem_(starting_count)
  37. #else
  38. : count_(static_cast<int32_t>(starting_count))
  39. #endif // ROCKSDB_USE_STD_SEMAPHORES
  40. {
  41. assert(starting_count >= 0);
  42. assert(starting_count <= INT32_MAX);
  43. }
  44. void Acquire() {
  45. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  46. sem_.acquire();
  47. #else
  48. std::unique_lock<std::mutex> lock(mutex_);
  49. assert(count_ >= 0);
  50. cv_.wait(lock, [this] { return count_ > 0; });
  51. --count_;
  52. #endif // ROCKSDB_USE_STD_SEMAPHORES
  53. }
  54. bool TryAcquire() {
  55. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  56. return sem_.try_acquire();
  57. #else
  58. std::unique_lock<std::mutex> lock(mutex_);
  59. assert(count_ >= 0);
  60. if (count_ == 0) {
  61. return false;
  62. } else {
  63. --count_;
  64. return true;
  65. }
  66. #endif // ROCKSDB_USE_STD_SEMAPHORES
  67. }
  68. void Release(std::ptrdiff_t n = 1) {
  69. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  70. sem_.release(n);
  71. #else
  72. assert(n >= 0);
  73. assert(n <= INT32_MAX);
  74. if (n > 0) {
  75. std::unique_lock<std::mutex> lock(mutex_);
  76. assert(count_ >= 0);
  77. count_ += static_cast<int32_t>(n);
  78. assert(count_ >= 0); // no overflow
  79. if (n == 1) {
  80. cv_.notify_one();
  81. } else {
  82. cv_.notify_all();
  83. }
  84. }
  85. #endif // ROCKSDB_USE_STD_SEMAPHORES
  86. }
  87. private:
  88. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  89. std::counting_semaphore<INT32_MAX> sem_;
  90. #else
  91. int32_t count_;
  92. std::mutex mutex_;
  93. std::condition_variable cv_;
  94. #endif // ROCKSDB_USE_STD_SEMAPHORES
  95. }; // namespace ROCKSDB_NAMESPACE
  96. // Wrapper providing a chosen binary semaphore implementation. See notes on
  97. // CountingSemaphore above, and on Release() below.
  98. class BinarySemaphore {
  99. public:
  100. explicit BinarySemaphore(std::ptrdiff_t starting_count)
  101. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  102. : sem_(starting_count)
  103. #else
  104. : state_(starting_count > 0)
  105. #endif // ROCKSDB_USE_STD_SEMAPHORES
  106. {
  107. assert(starting_count >= 0);
  108. }
  109. void Acquire() {
  110. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  111. sem_.acquire();
  112. #else
  113. std::unique_lock<std::mutex> lock(mutex_);
  114. cv_.wait(lock, [this] { return state_; });
  115. state_ = false;
  116. #endif // ROCKSDB_USE_STD_SEMAPHORES
  117. }
  118. bool TryAcquire() {
  119. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  120. return sem_.try_acquire();
  121. #else
  122. std::unique_lock<std::mutex> lock(mutex_);
  123. if (state_) {
  124. state_ = false;
  125. return true;
  126. } else {
  127. return false;
  128. }
  129. #endif // ROCKSDB_USE_STD_SEMAPHORES
  130. }
  131. void Release() {
  132. // NOTE: implementations of std::binary_semaphore::release() tend to behave
  133. // like counting semaphores in the case of multiple Release() calls without
  134. // Acquire() in between, though it is undefined behavior. It is also OK to
  135. // cap the count at 1.
  136. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  137. sem_.release();
  138. #else
  139. std::unique_lock<std::mutex> lock(mutex_);
  140. // check precondition to avoid UB in std implementation
  141. assert(state_ == false);
  142. state_ = true;
  143. cv_.notify_one();
  144. #endif // ROCKSDB_USE_STD_SEMAPHORES
  145. }
  146. private:
  147. #ifdef ROCKSDB_USE_STD_SEMAPHORES
  148. std::binary_semaphore sem_;
  149. #else
  150. bool state_;
  151. std::mutex mutex_;
  152. std::condition_variable cv_;
  153. #endif // ROCKSDB_USE_STD_SEMAPHORES
  154. };
  155. } // namespace ROCKSDB_NAMESPACE