block_cache_tier.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/persistent_cache/block_cache_tier.h"
  6. #include <utility>
  7. #include <vector>
  8. #include "logging/logging.h"
  9. #include "port/port.h"
  10. #include "test_util/sync_point.h"
  11. #include "util/stop_watch.h"
  12. #include "utilities/persistent_cache/block_cache_tier_file.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. //
  15. // BlockCacheImpl
  16. //
  17. Status BlockCacheTier::Open() {
  18. Status status;
  19. WriteLock _(&lock_);
  20. assert(!size_);
  21. // Check the validity of the options
  22. status = opt_.ValidateSettings();
  23. assert(status.ok());
  24. if (!status.ok()) {
  25. Error(opt_.log, "Invalid block cache options");
  26. return status;
  27. }
  28. // Create base directory or cleanup existing directory
  29. status = opt_.env->CreateDirIfMissing(opt_.path);
  30. if (!status.ok()) {
  31. Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
  32. status.ToString().c_str());
  33. return status;
  34. }
  35. // Create base/<cache dir> directory
  36. status = opt_.env->CreateDir(GetCachePath());
  37. if (!status.ok()) {
  38. // directory already exists, clean it up
  39. status = CleanupCacheFolder(GetCachePath());
  40. assert(status.ok());
  41. if (!status.ok()) {
  42. Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
  43. status.ToString().c_str());
  44. return status;
  45. }
  46. }
  47. // create a new file
  48. assert(!cache_file_);
  49. status = NewCacheFile();
  50. if (!status.ok()) {
  51. Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
  52. status.ToString().c_str());
  53. return status;
  54. }
  55. assert(cache_file_);
  56. if (opt_.pipeline_writes) {
  57. assert(!insert_th_.joinable());
  58. insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
  59. }
  60. return Status::OK();
  61. }
  62. bool IsCacheFile(const std::string& file) {
  63. // check if the file has .rc suffix
  64. // Unfortunately regex support across compilers is not even, so we use simple
  65. // string parsing
  66. size_t pos = file.find('.');
  67. if (pos == std::string::npos) {
  68. return false;
  69. }
  70. std::string suffix = file.substr(pos);
  71. return suffix == ".rc";
  72. }
  73. Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
  74. std::vector<std::string> files;
  75. Status status = opt_.env->GetChildren(folder, &files);
  76. if (!status.ok()) {
  77. Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
  78. status.ToString().c_str());
  79. return status;
  80. }
  81. // cleanup files with the patter :digi:.rc
  82. for (const auto& file : files) {
  83. if (IsCacheFile(file)) {
  84. // cache file
  85. Info(opt_.log, "Removing file %s.", file.c_str());
  86. status = opt_.env->DeleteFile(folder + "/" + file);
  87. if (!status.ok()) {
  88. Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
  89. status.ToString().c_str());
  90. return status;
  91. }
  92. } else {
  93. ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
  94. }
  95. }
  96. return Status::OK();
  97. }
  98. Status BlockCacheTier::Close() {
  99. // stop the insert thread
  100. if (opt_.pipeline_writes && insert_th_.joinable()) {
  101. InsertOp op(/*quit=*/true);
  102. insert_ops_.Push(std::move(op));
  103. insert_th_.join();
  104. }
  105. // stop the writer before
  106. writer_.Stop();
  107. // clear all metadata
  108. WriteLock _(&lock_);
  109. metadata_.Clear();
  110. return Status::OK();
  111. }
  112. template <class T>
  113. void Add(std::map<std::string, double>* stats, const std::string& key,
  114. const T& t) {
  115. stats->insert({key, static_cast<double>(t)});
  116. }
  117. PersistentCache::StatsType BlockCacheTier::Stats() {
  118. std::map<std::string, double> stats;
  119. Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
  120. stats_.bytes_pipelined_.Average());
  121. Add(&stats, "persistentcache.blockcachetier.bytes_written",
  122. stats_.bytes_written_.Average());
  123. Add(&stats, "persistentcache.blockcachetier.bytes_read",
  124. stats_.bytes_read_.Average());
  125. Add(&stats, "persistentcache.blockcachetier.insert_dropped",
  126. stats_.insert_dropped_);
  127. Add(&stats, "persistentcache.blockcachetier.cache_hits", stats_.cache_hits_);
  128. Add(&stats, "persistentcache.blockcachetier.cache_misses",
  129. stats_.cache_misses_);
  130. Add(&stats, "persistentcache.blockcachetier.cache_errors",
  131. stats_.cache_errors_);
  132. Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
  133. stats_.CacheHitPct());
  134. Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
  135. stats_.CacheMissPct());
  136. Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
  137. stats_.read_hit_latency_.Average());
  138. Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
  139. stats_.read_miss_latency_.Average());
  140. Add(&stats, "persistentcache.blockcachetier.write_latency",
  141. stats_.write_latency_.Average());
  142. auto out = PersistentCacheTier::Stats();
  143. out.push_back(stats);
  144. return out;
  145. }
  146. Status BlockCacheTier::Insert(const Slice& key, const char* data,
  147. const size_t size) {
  148. // update stats
  149. stats_.bytes_pipelined_.Add(size);
  150. if (opt_.pipeline_writes) {
  151. // off load the write to the write thread
  152. insert_ops_.Push(
  153. InsertOp(key.ToString(), std::move(std::string(data, size))));
  154. return Status::OK();
  155. }
  156. assert(!opt_.pipeline_writes);
  157. return InsertImpl(key, Slice(data, size));
  158. }
  159. void BlockCacheTier::InsertMain() {
  160. while (true) {
  161. InsertOp op(insert_ops_.Pop());
  162. if (op.signal_) {
  163. // that is a secret signal to exit
  164. break;
  165. }
  166. size_t retry = 0;
  167. Status s;
  168. while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
  169. if (retry > kMaxRetry) {
  170. break;
  171. }
  172. // this can happen when the buffers are full, we wait till some buffers
  173. // are free. Why don't we wait inside the code. This is because we want
  174. // to support both pipelined and non-pipelined mode
  175. buffer_allocator_.WaitUntilUsable();
  176. retry++;
  177. }
  178. if (!s.ok()) {
  179. stats_.insert_dropped_++;
  180. }
  181. }
  182. }
  183. Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
  184. // pre-condition
  185. assert(key.size());
  186. assert(data.size());
  187. assert(cache_file_);
  188. StopWatchNano timer(opt_.clock, /*auto_start=*/true);
  189. WriteLock _(&lock_);
  190. LBA lba;
  191. if (metadata_.Lookup(key, &lba)) {
  192. // the key already exists, this is duplicate insert
  193. return Status::OK();
  194. }
  195. while (!cache_file_->Append(key, data, &lba)) {
  196. if (!cache_file_->Eof()) {
  197. ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
  198. cache_file_->cacheid());
  199. stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
  200. return Status::TryAgain();
  201. }
  202. assert(cache_file_->Eof());
  203. Status status = NewCacheFile();
  204. if (!status.ok()) {
  205. return status;
  206. }
  207. }
  208. // Insert into lookup index
  209. BlockInfo* info = metadata_.Insert(key, lba);
  210. assert(info);
  211. if (!info) {
  212. return Status::IOError("Unexpected error inserting to index");
  213. }
  214. // insert to cache file reverse mapping
  215. cache_file_->Add(info);
  216. // update stats
  217. stats_.bytes_written_.Add(data.size());
  218. stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
  219. return Status::OK();
  220. }
  221. Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
  222. size_t* size) {
  223. StopWatchNano timer(opt_.clock, /*auto_start=*/true);
  224. LBA lba;
  225. bool status;
  226. status = metadata_.Lookup(key, &lba);
  227. if (!status) {
  228. stats_.cache_misses_++;
  229. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  230. return Status::NotFound("blockcache: key not found");
  231. }
  232. BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
  233. if (!file) {
  234. // this can happen because the block index and cache file index are
  235. // different, and the cache file might be removed between the two lookups
  236. stats_.cache_misses_++;
  237. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  238. return Status::NotFound("blockcache: cache file not found");
  239. }
  240. assert(file->refs_);
  241. std::unique_ptr<char[]> scratch(new char[lba.size_]);
  242. Slice blk_key;
  243. Slice blk_val;
  244. status = file->Read(lba, &blk_key, &blk_val, scratch.get());
  245. --file->refs_;
  246. if (!status) {
  247. stats_.cache_misses_++;
  248. stats_.cache_errors_++;
  249. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  250. return Status::NotFound("blockcache: error reading data");
  251. }
  252. assert(blk_key == key);
  253. val->reset(new char[blk_val.size()]);
  254. memcpy(val->get(), blk_val.data(), blk_val.size());
  255. *size = blk_val.size();
  256. stats_.bytes_read_.Add(*size);
  257. stats_.cache_hits_++;
  258. stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
  259. return Status::OK();
  260. }
  261. bool BlockCacheTier::Erase(const Slice& key) {
  262. WriteLock _(&lock_);
  263. BlockInfo* info = metadata_.Remove(key);
  264. assert(info);
  265. delete info;
  266. return true;
  267. }
  268. Status BlockCacheTier::NewCacheFile() {
  269. lock_.AssertHeld();
  270. TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
  271. (void*)(GetCachePath().c_str()));
  272. std::unique_ptr<WriteableCacheFile> f(new WriteableCacheFile(
  273. opt_.env, &buffer_allocator_, &writer_, GetCachePath(), writer_cache_id_,
  274. opt_.cache_file_size, opt_.log));
  275. bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
  276. if (!status) {
  277. return Status::IOError("Error creating file");
  278. }
  279. Info(opt_.log, "Created cache file %d", writer_cache_id_);
  280. writer_cache_id_++;
  281. cache_file_ = f.release();
  282. // insert to cache files tree
  283. status = metadata_.Insert(cache_file_);
  284. assert(status);
  285. if (!status) {
  286. Error(opt_.log, "Error inserting to metadata");
  287. return Status::IOError("Error inserting to metadata");
  288. }
  289. return Status::OK();
  290. }
  291. bool BlockCacheTier::Reserve(const size_t size) {
  292. WriteLock _(&lock_);
  293. assert(size_ <= opt_.cache_size);
  294. if (size + size_ <= opt_.cache_size) {
  295. // there is enough space to write
  296. size_ += size;
  297. return true;
  298. }
  299. assert(size + size_ >= opt_.cache_size);
  300. // there is not enough space to fit the requested data
  301. // we can clear some space by evicting cold data
  302. const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
  303. while (size + size_ > opt_.cache_size * retain_fac) {
  304. std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
  305. if (!f) {
  306. // nothing is evictable
  307. return false;
  308. }
  309. assert(!f->refs_);
  310. uint64_t file_size;
  311. if (!f->Delete(&file_size).ok()) {
  312. // unable to delete file
  313. return false;
  314. }
  315. assert(file_size <= size_);
  316. size_ -= file_size;
  317. }
  318. size_ += size;
  319. assert(size_ <= opt_.cache_size * 0.9);
  320. return true;
  321. }
  322. Status NewPersistentCache(Env* const env, const std::string& path,
  323. const uint64_t size,
  324. const std::shared_ptr<Logger>& log,
  325. const bool optimized_for_nvm,
  326. std::shared_ptr<PersistentCache>* cache) {
  327. if (!cache) {
  328. return Status::IOError("invalid argument cache");
  329. }
  330. auto opt = PersistentCacheConfig(env, path, size, log);
  331. if (optimized_for_nvm) {
  332. // the default settings are optimized for SSD
  333. // NVM devices are better accessed with 4K direct IO and written with
  334. // parallelism
  335. opt.enable_direct_writes = true;
  336. opt.writer_qdepth = 4;
  337. opt.writer_dispatch_size = 4 * 1024;
  338. }
  339. auto pcache = std::make_shared<BlockCacheTier>(opt);
  340. Status s = pcache->Open();
  341. if (!s.ok()) {
  342. return s;
  343. }
  344. *cache = pcache;
  345. return s;
  346. }
  347. } // namespace ROCKSDB_NAMESPACE