block_cache_tier.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef OS_WIN
  7. #include <unistd.h>
  8. #endif // ! OS_WIN
  9. #include <atomic>
  10. #include <list>
  11. #include <memory>
  12. #include <set>
  13. #include <sstream>
  14. #include <stdexcept>
  15. #include <string>
  16. #include <thread>
  17. #include "memory/arena.h"
  18. #include "memtable/skiplist.h"
  19. #include "monitoring/histogram.h"
  20. #include "port/port.h"
  21. #include "rocksdb/cache.h"
  22. #include "rocksdb/comparator.h"
  23. #include "rocksdb/persistent_cache.h"
  24. #include "rocksdb/system_clock.h"
  25. #include "util/coding.h"
  26. #include "util/crc32c.h"
  27. #include "util/mutexlock.h"
  28. #include "utilities/persistent_cache/block_cache_tier_file.h"
  29. #include "utilities/persistent_cache/block_cache_tier_metadata.h"
  30. #include "utilities/persistent_cache/persistent_cache_util.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. //
  33. // Block cache tier implementation
  34. //
  35. class BlockCacheTier : public PersistentCacheTier {
  36. public:
  37. explicit BlockCacheTier(const PersistentCacheConfig& opt)
  38. : opt_(opt),
  39. insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
  40. buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
  41. writer_(this, opt_.writer_qdepth,
  42. static_cast<size_t>(opt_.writer_dispatch_size)) {
  43. Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
  44. opt_.write_buffer_size, opt_.write_buffer_count());
  45. }
  46. virtual ~BlockCacheTier() {
  47. // Close is re-entrant so we can call close even if it is already closed
  48. Close().PermitUncheckedError();
  49. assert(!insert_th_.joinable());
  50. }
  51. Status Insert(const Slice& key, const char* data, const size_t size) override;
  52. Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
  53. size_t* size) override;
  54. Status Open() override;
  55. Status Close() override;
  56. bool Erase(const Slice& key) override;
  57. bool Reserve(const size_t size) override;
  58. bool IsCompressed() override { return opt_.is_compressed; }
  59. std::string GetPrintableOptions() const override { return opt_.ToString(); }
  60. PersistentCache::StatsType Stats() override;
  61. void TEST_Flush() override {
  62. while (insert_ops_.Size()) {
  63. /* sleep override */
  64. SystemClock::Default()->SleepForMicroseconds(1000000);
  65. }
  66. }
  67. private:
  68. // Percentage of cache to be evicted when the cache is full
  69. static const size_t kEvictPct = 10;
  70. // Max attempts to insert key, value to cache in pipelined mode
  71. static const size_t kMaxRetry = 3;
  72. // Pipelined operation
  73. struct InsertOp {
  74. explicit InsertOp(const bool signal) : signal_(signal) {}
  75. explicit InsertOp(std::string&& key, const std::string& data)
  76. : key_(std::move(key)), data_(data) {}
  77. ~InsertOp() {}
  78. InsertOp() = delete;
  79. InsertOp(InsertOp&& /*rhs*/) = default;
  80. InsertOp& operator=(InsertOp&& rhs) = default;
  81. // used for estimating size by bounded queue
  82. size_t Size() { return data_.size() + key_.size(); }
  83. std::string key_;
  84. std::string data_;
  85. bool signal_ = false; // signal to request processing thread to exit
  86. };
  87. // entry point for insert thread
  88. void InsertMain();
  89. // insert implementation
  90. Status InsertImpl(const Slice& key, const Slice& data);
  91. // Create a new cache file
  92. Status NewCacheFile();
  93. // Get cache directory path
  94. std::string GetCachePath() const { return opt_.path + "/cache"; }
  95. // Cleanup folder
  96. Status CleanupCacheFolder(const std::string& folder);
  97. // Statistics
  98. struct Statistics {
  99. HistogramImpl bytes_pipelined_;
  100. HistogramImpl bytes_written_;
  101. HistogramImpl bytes_read_;
  102. HistogramImpl read_hit_latency_;
  103. HistogramImpl read_miss_latency_;
  104. HistogramImpl write_latency_;
  105. std::atomic<uint64_t> cache_hits_{0};
  106. std::atomic<uint64_t> cache_misses_{0};
  107. std::atomic<uint64_t> cache_errors_{0};
  108. std::atomic<uint64_t> insert_dropped_{0};
  109. double CacheHitPct() const {
  110. const auto lookups = cache_hits_ + cache_misses_;
  111. return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
  112. }
  113. double CacheMissPct() const {
  114. const auto lookups = cache_hits_ + cache_misses_;
  115. return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
  116. }
  117. };
  118. port::RWMutex lock_; // Synchronization
  119. const PersistentCacheConfig opt_; // BlockCache options
  120. BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
  121. ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread
  122. uint32_t writer_cache_id_ = 0; // Current cache file identifier
  123. WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
  124. CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
  125. ThreadedWriter writer_; // Writer threads
  126. BlockCacheTierMetadata metadata_; // Cache meta data manager
  127. std::atomic<uint64_t> size_{0}; // Size of the cache
  128. Statistics stats_; // Statistics
  129. };
  130. } // namespace ROCKSDB_NAMESPACE