| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).//// Copyright (c) 2011 The LevelDB Authors. All rights reserved.// Use of this source code is governed by a BSD-style license that can be// found in the LICENSE file. See the AUTHORS file for names of contributors.#include "rocksdb/write_buffer_manager.h"#include <mutex>#include "util/coding.h"namespace ROCKSDB_NAMESPACE {#ifndef ROCKSDB_LITEnamespace {const size_t kSizeDummyEntry = 256 * 1024;// The key will be longer than keys for blocks in SST files so they won't// conflict.const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;}  // namespacestruct WriteBufferManager::CacheRep {  std::shared_ptr<Cache> cache_;  std::mutex cache_mutex_;  std::atomic<size_t> cache_allocated_size_;  // The non-prefix part will be updated according to the ID to use.  char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];  uint64_t next_cache_key_id_ = 0;  std::vector<Cache::Handle*> dummy_handles_;  explicit CacheRep(std::shared_ptr<Cache> cache)      : cache_(cache), cache_allocated_size_(0) {    memset(cache_key_, 0, kCacheKeyPrefix);    size_t pointer_size = sizeof(const void*);    assert(pointer_size <= kCacheKeyPrefix);    memcpy(cache_key_, static_cast<const void*>(this), pointer_size);  }  Slice GetNextCacheKey() {    memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);    char* end =        EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);    return Slice(cache_key_, static_cast<size_t>(end - cache_key_));  }};#elsestruct WriteBufferManager::CacheRep {};#endif  // ROCKSDB_LITEWriteBufferManager::WriteBufferManager(size_t _buffer_size,                                       std::shared_ptr<Cache> cache)    : buffer_size_(_buffer_size),      mutable_limit_(buffer_size_ * 7 / 8),      memory_used_(0),      memory_active_(0),      cache_rep_(nullptr) {#ifndef ROCKSDB_LITE  if (cache) {    // Construct the cache key using the pointer to this.    cache_rep_.reset(new CacheRep(cache));  }#else  (void)cache;#endif  // ROCKSDB_LITE}WriteBufferManager::~WriteBufferManager() {#ifndef ROCKSDB_LITE  if (cache_rep_) {    for (auto* handle : cache_rep_->dummy_handles_) {      cache_rep_->cache_->Release(handle, true);    }  }#endif  // ROCKSDB_LITE}// Should only be called from write threadvoid WriteBufferManager::ReserveMemWithCache(size_t mem) {#ifndef ROCKSDB_LITE  assert(cache_rep_ != nullptr);  // Use a mutex to protect various data structures. Can be optimized to a  // lock-free solution if it ends up with a performance bottleneck.  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);  size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;  memory_used_.store(new_mem_used, std::memory_order_relaxed);  while (new_mem_used > cache_rep_->cache_allocated_size_) {    // Expand size by at least 256KB.    // Add a dummy record to the cache    Cache::Handle* handle;    cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,                               kSizeDummyEntry, nullptr, &handle);    cache_rep_->dummy_handles_.push_back(handle);    cache_rep_->cache_allocated_size_ += kSizeDummyEntry;  }#else  (void)mem;#endif  // ROCKSDB_LITE}void WriteBufferManager::FreeMemWithCache(size_t mem) {#ifndef ROCKSDB_LITE  assert(cache_rep_ != nullptr);  // Use a mutex to protect various data structures. Can be optimized to a  // lock-free solution if it ends up with a performance bottleneck.  std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);  size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;  memory_used_.store(new_mem_used, std::memory_order_relaxed);  // Gradually shrink memory costed in the block cache if the actual  // usage is less than 3/4 of what we reserve from the block cache.  // We do this because:  // 1. we don't pay the cost of the block cache immediately a memtable is  //    freed, as block cache insert is expensive;  // 2. eventually, if we walk away from a temporary memtable size increase,  //    we make sure shrink the memory costed in block cache over time.  // In this way, we only shrink costed memory showly even there is enough  // margin.  if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&      cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {    assert(!cache_rep_->dummy_handles_.empty());    cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);    cache_rep_->dummy_handles_.pop_back();    cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;  }#else  (void)mem;#endif  // ROCKSDB_LITE}}  // namespace ROCKSDB_NAMESPACE
 |