block_cache_tier.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #ifndef OS_WIN
  8. #include <unistd.h>
  9. #endif // ! OS_WIN
  10. #include <atomic>
  11. #include <list>
  12. #include <memory>
  13. #include <set>
  14. #include <sstream>
  15. #include <stdexcept>
  16. #include <string>
  17. #include <thread>
  18. #include "rocksdb/cache.h"
  19. #include "rocksdb/comparator.h"
  20. #include "rocksdb/persistent_cache.h"
  21. #include "utilities/persistent_cache/block_cache_tier_file.h"
  22. #include "utilities/persistent_cache/block_cache_tier_metadata.h"
  23. #include "utilities/persistent_cache/persistent_cache_util.h"
  24. #include "memory/arena.h"
  25. #include "memtable/skiplist.h"
  26. #include "monitoring/histogram.h"
  27. #include "port/port.h"
  28. #include "util/coding.h"
  29. #include "util/crc32c.h"
  30. #include "util/mutexlock.h"
  31. namespace ROCKSDB_NAMESPACE {
  32. //
  33. // Block cache tier implementation
  34. //
  35. class BlockCacheTier : public PersistentCacheTier {
  36. public:
  37. explicit BlockCacheTier(const PersistentCacheConfig& opt)
  38. : opt_(opt),
  39. insert_ops_(static_cast<size_t>(opt_.max_write_pipeline_backlog_size)),
  40. buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
  41. writer_(this, opt_.writer_qdepth, static_cast<size_t>(opt_.writer_dispatch_size)) {
  42. Info(opt_.log, "Initializing allocator. size=%d B count=%" ROCKSDB_PRIszt,
  43. opt_.write_buffer_size, opt_.write_buffer_count());
  44. }
  45. virtual ~BlockCacheTier() {
  46. // Close is re-entrant so we can call close even if it is already closed
  47. Close();
  48. assert(!insert_th_.joinable());
  49. }
  50. Status Insert(const Slice& key, const char* data, const size_t size) override;
  51. Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
  52. size_t* size) override;
  53. Status Open() override;
  54. Status Close() override;
  55. bool Erase(const Slice& key) override;
  56. bool Reserve(const size_t size) override;
  57. bool IsCompressed() override { return opt_.is_compressed; }
  58. std::string GetPrintableOptions() const override { return opt_.ToString(); }
  59. PersistentCache::StatsType Stats() override;
  60. void TEST_Flush() override {
  61. while (insert_ops_.Size()) {
  62. /* sleep override */
  63. Env::Default()->SleepForMicroseconds(1000000);
  64. }
  65. }
  66. private:
  67. // Percentage of cache to be evicted when the cache is full
  68. static const size_t kEvictPct = 10;
  69. // Max attempts to insert key, value to cache in pipelined mode
  70. static const size_t kMaxRetry = 3;
  71. // Pipelined operation
  72. struct InsertOp {
  73. explicit InsertOp(const bool signal) : signal_(signal) {}
  74. explicit InsertOp(std::string&& key, const std::string& data)
  75. : key_(std::move(key)), data_(data) {}
  76. ~InsertOp() {}
  77. InsertOp() = delete;
  78. InsertOp(InsertOp&& /*rhs*/) = default;
  79. InsertOp& operator=(InsertOp&& rhs) = default;
  80. // used for estimating size by bounded queue
  81. size_t Size() { return data_.size() + key_.size(); }
  82. std::string key_;
  83. std::string data_;
  84. bool signal_ = false; // signal to request processing thread to exit
  85. };
  86. // entry point for insert thread
  87. void InsertMain();
  88. // insert implementation
  89. Status InsertImpl(const Slice& key, const Slice& data);
  90. // Create a new cache file
  91. Status NewCacheFile();
  92. // Get cache directory path
  93. std::string GetCachePath() const { return opt_.path + "/cache"; }
  94. // Cleanup folder
  95. Status CleanupCacheFolder(const std::string& folder);
  96. // Statistics
  97. struct Statistics {
  98. HistogramImpl bytes_pipelined_;
  99. HistogramImpl bytes_written_;
  100. HistogramImpl bytes_read_;
  101. HistogramImpl read_hit_latency_;
  102. HistogramImpl read_miss_latency_;
  103. HistogramImpl write_latency_;
  104. std::atomic<uint64_t> cache_hits_{0};
  105. std::atomic<uint64_t> cache_misses_{0};
  106. std::atomic<uint64_t> cache_errors_{0};
  107. std::atomic<uint64_t> insert_dropped_{0};
  108. double CacheHitPct() const {
  109. const auto lookups = cache_hits_ + cache_misses_;
  110. return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
  111. }
  112. double CacheMissPct() const {
  113. const auto lookups = cache_hits_ + cache_misses_;
  114. return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
  115. }
  116. };
  117. port::RWMutex lock_; // Synchronization
  118. const PersistentCacheConfig opt_; // BlockCache options
  119. BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
  120. ROCKSDB_NAMESPACE::port::Thread insert_th_; // Insert thread
  121. uint32_t writer_cache_id_ = 0; // Current cache file identifier
  122. WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
  123. CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
  124. ThreadedWriter writer_; // Writer threads
  125. BlockCacheTierMetadata metadata_; // Cache meta data manager
  126. std::atomic<uint64_t> size_{0}; // Size of the cache
  127. Statistics stats_; // Statistics
  128. };
  129. } // namespace ROCKSDB_NAMESPACE
  130. #endif