write_buffer_manager.cc 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "rocksdb/write_buffer_manager.h"
  10. #include <memory>
  11. #include "cache/cache_entry_roles.h"
  12. #include "cache/cache_reservation_manager.h"
  13. #include "db/db_impl/db_impl.h"
  14. #include "rocksdb/status.h"
  15. #include "util/coding.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. WriteBufferManager::WriteBufferManager(size_t _buffer_size,
  18. std::shared_ptr<Cache> cache,
  19. bool allow_stall)
  20. : buffer_size_(_buffer_size),
  21. mutable_limit_(buffer_size_ * 7 / 8),
  22. memory_used_(0),
  23. memory_active_(0),
  24. cache_res_mgr_(nullptr),
  25. allow_stall_(allow_stall),
  26. stall_active_(false) {
  27. if (cache) {
  28. // Memtable's memory usage tends to fluctuate frequently
  29. // therefore we set delayed_decrease = true to save some dummy entry
  30. // insertion on memory increase right after memory decrease
  31. cache_res_mgr_ = std::make_shared<
  32. CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>>(
  33. cache, true /* delayed_decrease */);
  34. }
  35. }
  36. WriteBufferManager::~WriteBufferManager() {
  37. #ifndef NDEBUG
  38. std::unique_lock<std::mutex> lock(mu_);
  39. assert(queue_.empty());
  40. #endif
  41. }
  42. std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
  43. if (cache_res_mgr_ != nullptr) {
  44. return cache_res_mgr_->GetTotalReservedCacheSize();
  45. } else {
  46. return 0;
  47. }
  48. }
  49. void WriteBufferManager::ReserveMem(size_t mem) {
  50. if (cache_res_mgr_ != nullptr) {
  51. ReserveMemWithCache(mem);
  52. } else if (enabled()) {
  53. memory_used_.fetch_add(mem, std::memory_order_relaxed);
  54. }
  55. if (enabled()) {
  56. memory_active_.fetch_add(mem, std::memory_order_relaxed);
  57. }
  58. }
  59. // Should only be called from write thread
  60. void WriteBufferManager::ReserveMemWithCache(size_t mem) {
  61. assert(cache_res_mgr_ != nullptr);
  62. // Use a mutex to protect various data structures. Can be optimized to a
  63. // lock-free solution if it ends up with a performance bottleneck.
  64. std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
  65. size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
  66. memory_used_.store(new_mem_used, std::memory_order_relaxed);
  67. Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
  68. // We absorb the error since WriteBufferManager is not able to handle
  69. // this failure properly. Ideallly we should prevent this allocation
  70. // from happening if this cache charging fails.
  71. // [TODO] We'll need to improve it in the future and figure out what to do on
  72. // error
  73. s.PermitUncheckedError();
  74. }
  75. void WriteBufferManager::ScheduleFreeMem(size_t mem) {
  76. if (enabled()) {
  77. memory_active_.fetch_sub(mem, std::memory_order_relaxed);
  78. }
  79. }
  80. void WriteBufferManager::FreeMem(size_t mem) {
  81. if (cache_res_mgr_ != nullptr) {
  82. FreeMemWithCache(mem);
  83. } else if (enabled()) {
  84. memory_used_.fetch_sub(mem, std::memory_order_relaxed);
  85. }
  86. // Check if stall is active and can be ended.
  87. MaybeEndWriteStall();
  88. }
  89. void WriteBufferManager::FreeMemWithCache(size_t mem) {
  90. assert(cache_res_mgr_ != nullptr);
  91. // Use a mutex to protect various data structures. Can be optimized to a
  92. // lock-free solution if it ends up with a performance bottleneck.
  93. std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
  94. size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
  95. memory_used_.store(new_mem_used, std::memory_order_relaxed);
  96. Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
  97. // We absorb the error since WriteBufferManager is not able to handle
  98. // this failure properly.
  99. // [TODO] We'll need to improve it in the future and figure out what to do on
  100. // error
  101. s.PermitUncheckedError();
  102. }
  103. void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
  104. assert(wbm_stall != nullptr);
  105. // Allocate outside of the lock.
  106. std::list<StallInterface*> new_node = {wbm_stall};
  107. {
  108. std::unique_lock<std::mutex> lock(mu_);
  109. // Verify if the stall conditions are stil active.
  110. if (ShouldStall()) {
  111. stall_active_.store(true, std::memory_order_relaxed);
  112. queue_.splice(queue_.end(), std::move(new_node));
  113. }
  114. }
  115. // If the node was not consumed, the stall has ended already and we can signal
  116. // the caller.
  117. if (!new_node.empty()) {
  118. new_node.front()->Signal();
  119. }
  120. }
  121. // Called when memory is freed in FreeMem or the buffer size has changed.
  122. void WriteBufferManager::MaybeEndWriteStall() {
  123. // Stall conditions have not been resolved.
  124. if (allow_stall_.load(std::memory_order_relaxed) &&
  125. IsStallThresholdExceeded()) {
  126. return;
  127. }
  128. // Perform all deallocations outside of the lock.
  129. std::list<StallInterface*> cleanup;
  130. std::unique_lock<std::mutex> lock(mu_);
  131. if (!stall_active_.load(std::memory_order_relaxed)) {
  132. return; // Nothing to do.
  133. }
  134. // Unblock new writers.
  135. stall_active_.store(false, std::memory_order_relaxed);
  136. // Unblock the writers in the queue.
  137. for (StallInterface* wbm_stall : queue_) {
  138. wbm_stall->Signal();
  139. }
  140. cleanup = std::move(queue_);
  141. }
  142. void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
  143. assert(wbm_stall != nullptr);
  144. // Deallocate the removed nodes outside of the lock.
  145. std::list<StallInterface*> cleanup;
  146. if (enabled() && allow_stall_.load(std::memory_order_relaxed)) {
  147. std::unique_lock<std::mutex> lock(mu_);
  148. for (auto it = queue_.begin(); it != queue_.end();) {
  149. auto next = std::next(it);
  150. if (*it == wbm_stall) {
  151. cleanup.splice(cleanup.end(), queue_, std::move(it));
  152. }
  153. it = next;
  154. }
  155. }
  156. wbm_stall->Signal();
  157. }
  158. } // namespace ROCKSDB_NAMESPACE