block_cache_tier.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #ifndef ROCKSDB_LITE
  6. #include "utilities/persistent_cache/block_cache_tier.h"
  7. #include <regex>
  8. #include <utility>
  9. #include <vector>
  10. #include "logging/logging.h"
  11. #include "port/port.h"
  12. #include "test_util/sync_point.h"
  13. #include "util/stop_watch.h"
  14. #include "utilities/persistent_cache/block_cache_tier_file.h"
  15. namespace ROCKSDB_NAMESPACE {
  16. //
  17. // BlockCacheImpl
  18. //
  19. Status BlockCacheTier::Open() {
  20. Status status;
  21. WriteLock _(&lock_);
  22. assert(!size_);
  23. // Check the validity of the options
  24. status = opt_.ValidateSettings();
  25. assert(status.ok());
  26. if (!status.ok()) {
  27. Error(opt_.log, "Invalid block cache options");
  28. return status;
  29. }
  30. // Create base directory or cleanup existing directory
  31. status = opt_.env->CreateDirIfMissing(opt_.path);
  32. if (!status.ok()) {
  33. Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
  34. status.ToString().c_str());
  35. return status;
  36. }
  37. // Create base/<cache dir> directory
  38. status = opt_.env->CreateDir(GetCachePath());
  39. if (!status.ok()) {
  40. // directory already exists, clean it up
  41. status = CleanupCacheFolder(GetCachePath());
  42. assert(status.ok());
  43. if (!status.ok()) {
  44. Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(),
  45. status.ToString().c_str());
  46. return status;
  47. }
  48. }
  49. // create a new file
  50. assert(!cache_file_);
  51. status = NewCacheFile();
  52. if (!status.ok()) {
  53. Error(opt_.log, "Error creating new file %s. %s", opt_.path.c_str(),
  54. status.ToString().c_str());
  55. return status;
  56. }
  57. assert(cache_file_);
  58. if (opt_.pipeline_writes) {
  59. assert(!insert_th_.joinable());
  60. insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
  61. }
  62. return Status::OK();
  63. }
  64. bool IsCacheFile(const std::string& file) {
  65. // check if the file has .rc suffix
  66. // Unfortunately regex support across compilers is not even, so we use simple
  67. // string parsing
  68. size_t pos = file.find(".");
  69. if (pos == std::string::npos) {
  70. return false;
  71. }
  72. std::string suffix = file.substr(pos);
  73. return suffix == ".rc";
  74. }
  75. Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) {
  76. std::vector<std::string> files;
  77. Status status = opt_.env->GetChildren(folder, &files);
  78. if (!status.ok()) {
  79. Error(opt_.log, "Error getting files for %s. %s", folder.c_str(),
  80. status.ToString().c_str());
  81. return status;
  82. }
  83. // cleanup files with the patter :digi:.rc
  84. for (auto file : files) {
  85. if (IsCacheFile(file)) {
  86. // cache file
  87. Info(opt_.log, "Removing file %s.", file.c_str());
  88. status = opt_.env->DeleteFile(folder + "/" + file);
  89. if (!status.ok()) {
  90. Error(opt_.log, "Error deleting file %s. %s", file.c_str(),
  91. status.ToString().c_str());
  92. return status;
  93. }
  94. } else {
  95. ROCKS_LOG_DEBUG(opt_.log, "Skipping file %s", file.c_str());
  96. }
  97. }
  98. return Status::OK();
  99. }
  100. Status BlockCacheTier::Close() {
  101. // stop the insert thread
  102. if (opt_.pipeline_writes && insert_th_.joinable()) {
  103. InsertOp op(/*quit=*/true);
  104. insert_ops_.Push(std::move(op));
  105. insert_th_.join();
  106. }
  107. // stop the writer before
  108. writer_.Stop();
  109. // clear all metadata
  110. WriteLock _(&lock_);
  111. metadata_.Clear();
  112. return Status::OK();
  113. }
  114. template<class T>
  115. void Add(std::map<std::string, double>* stats, const std::string& key,
  116. const T& t) {
  117. stats->insert({key, static_cast<double>(t)});
  118. }
  119. PersistentCache::StatsType BlockCacheTier::Stats() {
  120. std::map<std::string, double> stats;
  121. Add(&stats, "persistentcache.blockcachetier.bytes_piplined",
  122. stats_.bytes_pipelined_.Average());
  123. Add(&stats, "persistentcache.blockcachetier.bytes_written",
  124. stats_.bytes_written_.Average());
  125. Add(&stats, "persistentcache.blockcachetier.bytes_read",
  126. stats_.bytes_read_.Average());
  127. Add(&stats, "persistentcache.blockcachetier.insert_dropped",
  128. stats_.insert_dropped_);
  129. Add(&stats, "persistentcache.blockcachetier.cache_hits",
  130. stats_.cache_hits_);
  131. Add(&stats, "persistentcache.blockcachetier.cache_misses",
  132. stats_.cache_misses_);
  133. Add(&stats, "persistentcache.blockcachetier.cache_errors",
  134. stats_.cache_errors_);
  135. Add(&stats, "persistentcache.blockcachetier.cache_hits_pct",
  136. stats_.CacheHitPct());
  137. Add(&stats, "persistentcache.blockcachetier.cache_misses_pct",
  138. stats_.CacheMissPct());
  139. Add(&stats, "persistentcache.blockcachetier.read_hit_latency",
  140. stats_.read_hit_latency_.Average());
  141. Add(&stats, "persistentcache.blockcachetier.read_miss_latency",
  142. stats_.read_miss_latency_.Average());
  143. Add(&stats, "persistentcache.blockcachetier.write_latency",
  144. stats_.write_latency_.Average());
  145. auto out = PersistentCacheTier::Stats();
  146. out.push_back(stats);
  147. return out;
  148. }
  149. Status BlockCacheTier::Insert(const Slice& key, const char* data,
  150. const size_t size) {
  151. // update stats
  152. stats_.bytes_pipelined_.Add(size);
  153. if (opt_.pipeline_writes) {
  154. // off load the write to the write thread
  155. insert_ops_.Push(
  156. InsertOp(key.ToString(), std::move(std::string(data, size))));
  157. return Status::OK();
  158. }
  159. assert(!opt_.pipeline_writes);
  160. return InsertImpl(key, Slice(data, size));
  161. }
  162. void BlockCacheTier::InsertMain() {
  163. while (true) {
  164. InsertOp op(insert_ops_.Pop());
  165. if (op.signal_) {
  166. // that is a secret signal to exit
  167. break;
  168. }
  169. size_t retry = 0;
  170. Status s;
  171. while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) {
  172. if (retry > kMaxRetry) {
  173. break;
  174. }
  175. // this can happen when the buffers are full, we wait till some buffers
  176. // are free. Why don't we wait inside the code. This is because we want
  177. // to support both pipelined and non-pipelined mode
  178. buffer_allocator_.WaitUntilUsable();
  179. retry++;
  180. }
  181. if (!s.ok()) {
  182. stats_.insert_dropped_++;
  183. }
  184. }
  185. }
  186. Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) {
  187. // pre-condition
  188. assert(key.size());
  189. assert(data.size());
  190. assert(cache_file_);
  191. StopWatchNano timer(opt_.env, /*auto_start=*/ true);
  192. WriteLock _(&lock_);
  193. LBA lba;
  194. if (metadata_.Lookup(key, &lba)) {
  195. // the key already exists, this is duplicate insert
  196. return Status::OK();
  197. }
  198. while (!cache_file_->Append(key, data, &lba)) {
  199. if (!cache_file_->Eof()) {
  200. ROCKS_LOG_DEBUG(opt_.log, "Error inserting to cache file %d",
  201. cache_file_->cacheid());
  202. stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
  203. return Status::TryAgain();
  204. }
  205. assert(cache_file_->Eof());
  206. Status status = NewCacheFile();
  207. if (!status.ok()) {
  208. return status;
  209. }
  210. }
  211. // Insert into lookup index
  212. BlockInfo* info = metadata_.Insert(key, lba);
  213. assert(info);
  214. if (!info) {
  215. return Status::IOError("Unexpected error inserting to index");
  216. }
  217. // insert to cache file reverse mapping
  218. cache_file_->Add(info);
  219. // update stats
  220. stats_.bytes_written_.Add(data.size());
  221. stats_.write_latency_.Add(timer.ElapsedNanos() / 1000);
  222. return Status::OK();
  223. }
  224. Status BlockCacheTier::Lookup(const Slice& key, std::unique_ptr<char[]>* val,
  225. size_t* size) {
  226. StopWatchNano timer(opt_.env, /*auto_start=*/ true);
  227. LBA lba;
  228. bool status;
  229. status = metadata_.Lookup(key, &lba);
  230. if (!status) {
  231. stats_.cache_misses_++;
  232. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  233. return Status::NotFound("blockcache: key not found");
  234. }
  235. BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_);
  236. if (!file) {
  237. // this can happen because the block index and cache file index are
  238. // different, and the cache file might be removed between the two lookups
  239. stats_.cache_misses_++;
  240. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  241. return Status::NotFound("blockcache: cache file not found");
  242. }
  243. assert(file->refs_);
  244. std::unique_ptr<char[]> scratch(new char[lba.size_]);
  245. Slice blk_key;
  246. Slice blk_val;
  247. status = file->Read(lba, &blk_key, &blk_val, scratch.get());
  248. --file->refs_;
  249. if (!status) {
  250. stats_.cache_misses_++;
  251. stats_.cache_errors_++;
  252. stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000);
  253. return Status::NotFound("blockcache: error reading data");
  254. }
  255. assert(blk_key == key);
  256. val->reset(new char[blk_val.size()]);
  257. memcpy(val->get(), blk_val.data(), blk_val.size());
  258. *size = blk_val.size();
  259. stats_.bytes_read_.Add(*size);
  260. stats_.cache_hits_++;
  261. stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000);
  262. return Status::OK();
  263. }
  264. bool BlockCacheTier::Erase(const Slice& key) {
  265. WriteLock _(&lock_);
  266. BlockInfo* info = metadata_.Remove(key);
  267. assert(info);
  268. delete info;
  269. return true;
  270. }
  271. Status BlockCacheTier::NewCacheFile() {
  272. lock_.AssertHeld();
  273. TEST_SYNC_POINT_CALLBACK("BlockCacheTier::NewCacheFile:DeleteDir",
  274. (void*)(GetCachePath().c_str()));
  275. std::unique_ptr<WriteableCacheFile> f(
  276. new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_,
  277. GetCachePath(), writer_cache_id_,
  278. opt_.cache_file_size, opt_.log));
  279. bool status = f->Create(opt_.enable_direct_writes, opt_.enable_direct_reads);
  280. if (!status) {
  281. return Status::IOError("Error creating file");
  282. }
  283. Info(opt_.log, "Created cache file %d", writer_cache_id_);
  284. writer_cache_id_++;
  285. cache_file_ = f.release();
  286. // insert to cache files tree
  287. status = metadata_.Insert(cache_file_);
  288. assert(status);
  289. if (!status) {
  290. Error(opt_.log, "Error inserting to metadata");
  291. return Status::IOError("Error inserting to metadata");
  292. }
  293. return Status::OK();
  294. }
  295. bool BlockCacheTier::Reserve(const size_t size) {
  296. WriteLock _(&lock_);
  297. assert(size_ <= opt_.cache_size);
  298. if (size + size_ <= opt_.cache_size) {
  299. // there is enough space to write
  300. size_ += size;
  301. return true;
  302. }
  303. assert(size + size_ >= opt_.cache_size);
  304. // there is not enough space to fit the requested data
  305. // we can clear some space by evicting cold data
  306. const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);
  307. while (size + size_ > opt_.cache_size * retain_fac) {
  308. std::unique_ptr<BlockCacheFile> f(metadata_.Evict());
  309. if (!f) {
  310. // nothing is evictable
  311. return false;
  312. }
  313. assert(!f->refs_);
  314. uint64_t file_size;
  315. if (!f->Delete(&file_size).ok()) {
  316. // unable to delete file
  317. return false;
  318. }
  319. assert(file_size <= size_);
  320. size_ -= file_size;
  321. }
  322. size_ += size;
  323. assert(size_ <= opt_.cache_size * 0.9);
  324. return true;
  325. }
  326. Status NewPersistentCache(Env* const env, const std::string& path,
  327. const uint64_t size,
  328. const std::shared_ptr<Logger>& log,
  329. const bool optimized_for_nvm,
  330. std::shared_ptr<PersistentCache>* cache) {
  331. if (!cache) {
  332. return Status::IOError("invalid argument cache");
  333. }
  334. auto opt = PersistentCacheConfig(env, path, size, log);
  335. if (optimized_for_nvm) {
  336. // the default settings are optimized for SSD
  337. // NVM devices are better accessed with 4K direct IO and written with
  338. // parallelism
  339. opt.enable_direct_writes = true;
  340. opt.writer_qdepth = 4;
  341. opt.writer_dispatch_size = 4 * 1024;
  342. }
  343. auto pcache = std::make_shared<BlockCacheTier>(opt);
  344. Status s = pcache->Open();
  345. if (!s.ok()) {
  346. return s;
  347. }
  348. *cache = pcache;
  349. return s;
  350. }
  351. } // namespace ROCKSDB_NAMESPACE
  352. #endif // ifndef ROCKSDB_LITE