| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 | //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#ifndef ROCKSDB_LITE#include "utilities/persistent_cache/block_cache_tier.h"#include <regex>#include <utility>#include <vector>#include "logging/logging.h"#include "port/port.h"#include "test_util/sync_point.h"#include "util/stop_watch.h"#include "utilities/persistent_cache/block_cache_tier_file.h"namespace ROCKSDB_NAMESPACE {//// BlockCacheImpl//Status BlockCacheTier::Open() {  Status status;  WriteLock _(&lock_);  assert(!size_);  // Check the validity of the options  status = opt_.ValidateSettings();  assert(status.ok());  if (!status.ok()) {    Error(opt_.log, "Invalid block cache options");    return status;  }  // Create base directory or cleanup existing directory  status = opt_.env->CreateDirIfMissing(opt_.path);  if (!status.ok()) {    Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),          status.ToString().c_str());    return status;  }  // Create base/<cache dir> directory  status = opt_.env->CreateDir(GetCachePath());  if (!status.ok()) {    // directory already exists, clean it up    status = CleanupCacheFolder(GetCachePath());    assert(status.ok());    if (!status.ok()) {      Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),            status.ToString().c_str());      return status;    }  }  // create a new file  assert(!cache_file_);  status = NewCacheFile();  if (!status.ok()) {    Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),          status.ToString().c_str());    return status;  }  assert(cache_file_);  if (opt_.pipeline_writes) {    assert(!insert_th_.joinable());    insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);  }  return Status::OK();}bool IsCacheFile(const std::string& file) {  // check if the file has .rc suffix  // Unfortunately regex support across compilers is not even, so we use simple  // string parsing  size_t pos = file.find(".");  if (pos == std::string::npos) {    return false;  }  std::string suffix = file.substr(pos);  return suffix == ".rc";}Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {  std::vector<std::string> files;  Status status = opt_.env->GetChildren(folder, &files);  if (!status.ok()) {    Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),          status.ToString().c_str());    return status;  }  // cleanup files with the patter :digi:.rc  for (auto file : files) {    if (IsCacheFile(file)) {      // cache file      Info(opt_.log, "Removing file %s.", file.c_str());      status = opt_.env->DeleteFile(folder + "/" + file);      if (!status.ok()) {        Error(opt_.log, "Error deleting file %s. %s", file.c_str(),              status.ToString().c_str());        return status;      }    } else {      ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());    }  }  return Status::OK();}Status BlockCacheTier::Close() {  // stop the insert thread  if (opt_.pipeline_writes && insert_th_.joinable()) {    InsertOp op(/*quit=*/true);    insert_ops_.Push(std::move(op));    insert_th_.join();  }  // stop the writer before  writer_.Stop();  // clear all metadata  WriteLock _(&lock_);  metadata_.Clear();  return Status::OK();}template<class T>void Add(std::map<std::string, double>* stats, const std::string& key,         const T& t) {  stats->insert({key, static_cast<double>(t)});}PersistentCache::StatsType BlockCacheTier::Stats() {  std::map<std::string, double> stats;  Add(&stats, "persistentcache.blockcachetier.bytes_piplined",      stats_.bytes_pipelined_.Average());  Add(&stats, "persistentcache.blockcachetier.bytes_written",      stats_.bytes_written_.Average());  Add(&stats, "persistentcache.blockcachetier.bytes_read",      stats_.bytes_read_.Average());  Add(&stats, "persistentcache.blockcachetier.insert_dropped",      stats_.insert_dropped_);  Add(&stats, "persistentcache.blockcachetier.cache_hits",      stats_.cache_hits_);  Add(&stats, "persistentcache.blockcachetier.cache_misses",      stats_.cache_misses_);  Add(&stats, "persistentcache.blockcachetier.cache_errors",      stats_.cache_errors_);  Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",      stats_.CacheHitPct());  Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",      stats_.CacheMissPct());  Add(&stats, "persistentcache.blockcachetier.read_hit_latency",      stats_.read_hit_latency_.Average());  Add(&stats, "persistentcache.blockcachetier.read_miss_latency",      stats_.read_miss_latency_.Average());  Add(&stats, "persistentcache.blockcachetier.write_latency",      stats_.write_latency_.Average());  auto out = PersistentCacheTier::Stats();  out.push_back(stats);  return out;}Status BlockCacheTier::Insert(const Slice& key, const char* data,                              const size_t size) {  // update stats  stats_.bytes_pipelined_.Add(size);  if (opt_.pipeline_writes) {    // off load the write to the write thread    insert_ops_.Push(        InsertOp(key.ToString(), std::move(std::string(data, size))));    return Status::OK();  }  assert(!opt_.pipeline_writes);  return InsertImpl(key, Slice(data, size));}void BlockCacheTier::InsertMain() {  while (true) {    InsertOp op(insert_ops_.Pop());    if (op.signal_) {      // that is a secret signal to exit      break;    }    size_t retry = 0;    Status s;    while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {      if (retry > kMaxRetry) {        break;      }      // this can happen when the buffers are full, we wait till some buffers      // are free. Why don't we wait inside the code. This is because we want      // to support both pipelined and non-pipelined mode      buffer_allocator_.WaitUntilUsable();      retry++;    }    if (!s.ok()) {      stats_.insert_dropped_++;    }  }}Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {  // pre-condition  assert(key.size());  assert(data.size());  assert(cache_file_);  StopWatchNano timer(opt_.env, /*auto_start=*/ true);  WriteLock _(&lock_);  LBA lba;  if (metadata_.Lookup(key, &lba)) {    // the key already exists, this is duplicate insert    return Status::OK();  }  while (!cache_file_->Append(key, data, &lba)) {    if (!cache_file_->Eof()) {      ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",                      cache_file_->cacheid());      stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);      return Status::TryAgain();    }    assert(cache_file_->Eof());    Status status = NewCacheFile();    if (!status.ok()) {      return status;    }  }  // Insert into lookup index  BlockInfo* info = metadata_.Insert(key, lba);  assert(info);  if (!info) {    return Status::IOError("Unexpected error inserting to index");  }  // insert to cache file reverse mapping  cache_file_->Add(info);  // update stats  stats_.bytes_written_.Add(data.size());  stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);  return Status::OK();}Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,                              size_t* size) {  StopWatchNano timer(opt_.env, /*auto_start=*/ true);  LBA lba;  bool status;  status = metadata_.Lookup(key, &lba);  if (!status) {    stats_.cache_misses_++;    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);    return Status::NotFound("blockcache: key not found");  }  BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);  if (!file) {    // this can happen because the block index and cache file index are    // different, and the cache file might be removed between the two lookups    stats_.cache_misses_++;    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);    return Status::NotFound("blockcache: cache file not found");  }  assert(file->refs_);  std::unique_ptr<char[]> scratch(new char[lba.size_]);  Slice blk_key;  Slice blk_val;  status = file->Read(lba, &blk_key, &blk_val, scratch.get());  --file->refs_;  if (!status) {    stats_.cache_misses_++;    stats_.cache_errors_++;    stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);    return Status::NotFound("blockcache: error reading data");  }  assert(blk_key == key);  val->reset(new char[blk_val.size()]);  memcpy(val->get(), blk_val.data(), blk_val.size());  *size = blk_val.size();  stats_.bytes_read_.Add(*size);  stats_.cache_hits_++;  stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);  return Status::OK();}bool BlockCacheTier::Erase(const Slice& key) {  WriteLock _(&lock_);  BlockInfo* info = metadata_.Remove(key);  assert(info);  delete info;  return true;}Status BlockCacheTier::NewCacheFile() {  lock_.AssertHeld();  TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",                           (void*)(GetCachePath().c_str()));  std::unique_ptr<WriteableCacheFile> f(    new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,                           GetCachePath(), writer_cache_id_,                           opt_.cache_file_size, opt_.log));  bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);  if (!status) {    return Status::IOError("Error creating file");  }  Info(opt_.log, "Created cache file %d", writer_cache_id_);  writer_cache_id_++;  cache_file_ = f.release();  // insert to cache files tree  status = metadata_.Insert(cache_file_);  assert(status);  if (!status) {    Error(opt_.log, "Error inserting to metadata");    return Status::IOError("Error inserting to metadata");  }  return Status::OK();}bool BlockCacheTier::Reserve(const size_t size) {  WriteLock _(&lock_);  assert(size_ <= opt_.cache_size);  if (size + size_ <= opt_.cache_size) {    // there is enough space to write    size_ += size;    return true;  }  assert(size + size_ >= opt_.cache_size);  // there is not enough space to fit the requested data  // we can clear some space by evicting cold data  const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);  while (size + size_ > opt_.cache_size * retain_fac) {    std::unique_ptr<BlockCacheFile> f(metadata_.Evict());    if (!f) {      // nothing is evictable      return false;    }    assert(!f->refs_);    uint64_t file_size;    if (!f->Delete(&file_size).ok()) {      // unable to delete file      return false;    }    assert(file_size <= size_);    size_ -= file_size;  }  size_ += size;  assert(size_ <= opt_.cache_size * 0.9);  return true;}Status NewPersistentCache(Env* const env, const std::string& path,                          const uint64_t size,                          const std::shared_ptr<Logger>& log,                          const bool optimized_for_nvm,                          std::shared_ptr<PersistentCache>* cache) {  if (!cache) {    return Status::IOError("invalid argument cache");  }  auto opt = PersistentCacheConfig(env, path, size, log);  if (optimized_for_nvm) {    // the default settings are optimized for SSD    // NVM devices are better accessed with 4K direct IO and written with    // parallelism    opt.enable_direct_writes = true;    opt.writer_qdepth = 4;    opt.writer_dispatch_size = 4 * 1024;  }  auto pcache = std::make_shared<BlockCacheTier>(opt);  Status s = pcache->Open();  if (!s.ok()) {    return s;  }  *cache = pcache;  return s;}}  // namespace ROCKSDB_NAMESPACE#endif  // ifndef ROCKSDB_LITE
 |