write_buffer_manager.cc 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "rocksdb/write_buffer_manager.h"
  10. #include <mutex>
  11. #include "util/coding.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. #ifndef ROCKSDB_LITE
  14. namespace {
  15. const size_t kSizeDummyEntry = 256 * 1024;
  16. // The key will be longer than keys for blocks in SST files so they won't
  17. // conflict.
  18. const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
  19. } // namespace
  20. struct WriteBufferManager::CacheRep {
  21. std::shared_ptr<Cache> cache_;
  22. std::mutex cache_mutex_;
  23. std::atomic<size_t> cache_allocated_size_;
  24. // The non-prefix part will be updated according to the ID to use.
  25. char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
  26. uint64_t next_cache_key_id_ = 0;
  27. std::vector<Cache::Handle*> dummy_handles_;
  28. explicit CacheRep(std::shared_ptr<Cache> cache)
  29. : cache_(cache), cache_allocated_size_(0) {
  30. memset(cache_key_, 0, kCacheKeyPrefix);
  31. size_t pointer_size = sizeof(const void*);
  32. assert(pointer_size <= kCacheKeyPrefix);
  33. memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
  34. }
  35. Slice GetNextCacheKey() {
  36. memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
  37. char* end =
  38. EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
  39. return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
  40. }
  41. };
  42. #else
  43. struct WriteBufferManager::CacheRep {};
  44. #endif // ROCKSDB_LITE
  45. WriteBufferManager::WriteBufferManager(size_t _buffer_size,
  46. std::shared_ptr<Cache> cache)
  47. : buffer_size_(_buffer_size),
  48. mutable_limit_(buffer_size_ * 7 / 8),
  49. memory_used_(0),
  50. memory_active_(0),
  51. cache_rep_(nullptr) {
  52. #ifndef ROCKSDB_LITE
  53. if (cache) {
  54. // Construct the cache key using the pointer to this.
  55. cache_rep_.reset(new CacheRep(cache));
  56. }
  57. #else
  58. (void)cache;
  59. #endif // ROCKSDB_LITE
  60. }
  61. WriteBufferManager::~WriteBufferManager() {
  62. #ifndef ROCKSDB_LITE
  63. if (cache_rep_) {
  64. for (auto* handle : cache_rep_->dummy_handles_) {
  65. cache_rep_->cache_->Release(handle, true);
  66. }
  67. }
  68. #endif // ROCKSDB_LITE
  69. }
  70. // Should only be called from write thread
  71. void WriteBufferManager::ReserveMemWithCache(size_t mem) {
  72. #ifndef ROCKSDB_LITE
  73. assert(cache_rep_ != nullptr);
  74. // Use a mutex to protect various data structures. Can be optimized to a
  75. // lock-free solution if it ends up with a performance bottleneck.
  76. std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
  77. size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
  78. memory_used_.store(new_mem_used, std::memory_order_relaxed);
  79. while (new_mem_used > cache_rep_->cache_allocated_size_) {
  80. // Expand size by at least 256KB.
  81. // Add a dummy record to the cache
  82. Cache::Handle* handle;
  83. cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
  84. kSizeDummyEntry, nullptr, &handle);
  85. cache_rep_->dummy_handles_.push_back(handle);
  86. cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
  87. }
  88. #else
  89. (void)mem;
  90. #endif // ROCKSDB_LITE
  91. }
  92. void WriteBufferManager::FreeMemWithCache(size_t mem) {
  93. #ifndef ROCKSDB_LITE
  94. assert(cache_rep_ != nullptr);
  95. // Use a mutex to protect various data structures. Can be optimized to a
  96. // lock-free solution if it ends up with a performance bottleneck.
  97. std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
  98. size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
  99. memory_used_.store(new_mem_used, std::memory_order_relaxed);
  100. // Gradually shrink memory costed in the block cache if the actual
  101. // usage is less than 3/4 of what we reserve from the block cache.
  102. // We do this because:
  103. // 1. we don't pay the cost of the block cache immediately a memtable is
  104. // freed, as block cache insert is expensive;
  105. // 2. eventually, if we walk away from a temporary memtable size increase,
  106. // we make sure shrink the memory costed in block cache over time.
  107. // In this way, we only shrink costed memory showly even there is enough
  108. // margin.
  109. if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
  110. cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
  111. assert(!cache_rep_->dummy_handles_.empty());
  112. cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
  113. cache_rep_->dummy_handles_.pop_back();
  114. cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
  115. }
  116. #else
  117. (void)mem;
  118. #endif // ROCKSDB_LITE
  119. }
  120. } // namespace ROCKSDB_NAMESPACE