block_cache_tier_file.h 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <list>
  7. #include <memory>
  8. #include <string>
  9. #include <vector>
  10. #include "file/random_access_file_reader.h"
  11. #include "port/port.h"
  12. #include "rocksdb/comparator.h"
  13. #include "rocksdb/env.h"
  14. #include "util/crc32c.h"
  15. #include "util/mutexlock.h"
  16. #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
  17. #include "utilities/persistent_cache/lrulist.h"
  18. #include "utilities/persistent_cache/persistent_cache_tier.h"
  19. #include "utilities/persistent_cache/persistent_cache_util.h"
  20. // The io code path of persistent cache uses pipelined architecture
  21. //
  22. // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
  23. //
  24. // This would enable the system to scale for GB/s of throughput which is
  25. // expected with modern devies like NVM.
  26. //
  27. // The file level operations are encapsulated in the following abstractions
  28. //
  29. // BlockCacheFile
  30. // ^
  31. // |
  32. // |
  33. // RandomAccessCacheFile (For reading)
  34. // ^
  35. // |
  36. // |
  37. // WriteableCacheFile (For writing)
  38. //
  39. // Write IO code path :
  40. //
  41. namespace ROCKSDB_NAMESPACE {
  42. class WriteableCacheFile;
  43. struct BlockInfo;
  44. // Represents a logical record on device
  45. //
  46. // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
  47. struct LogicalBlockAddress {
  48. LogicalBlockAddress() {}
  49. explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
  50. const uint16_t size)
  51. : cache_id_(cache_id), off_(off), size_(size) {}
  52. uint32_t cache_id_ = 0;
  53. uint32_t off_ = 0;
  54. uint32_t size_ = 0;
  55. };
  56. using LBA = LogicalBlockAddress;
  57. // class Writer
  58. //
  59. // Writer is the abstraction used for writing data to file. The component can be
  60. // multithreaded. It is the last step of write pipeline
  61. class Writer {
  62. public:
  63. explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
  64. virtual ~Writer() {}
  65. // write buffer to file at the given offset
  66. virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
  67. const uint64_t file_off,
  68. const std::function<void()> callback) = 0;
  69. // stop the writer
  70. virtual void Stop() = 0;
  71. PersistentCacheTier* const cache_;
  72. };
  73. // class BlockCacheFile
  74. //
  75. // Generic interface to support building file specialized for read/writing
  76. class BlockCacheFile : public LRUElement<BlockCacheFile> {
  77. public:
  78. explicit BlockCacheFile(const uint32_t cache_id)
  79. : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
  80. explicit BlockCacheFile(Env* const env, const std::string& dir,
  81. const uint32_t cache_id)
  82. : LRUElement<BlockCacheFile>(),
  83. env_(env),
  84. dir_(dir),
  85. cache_id_(cache_id) {}
  86. virtual ~BlockCacheFile() {}
  87. // append key/value to file and return LBA locator to user
  88. virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
  89. LBA* const /*lba*/) {
  90. assert(!"not implemented");
  91. return false;
  92. }
  93. // read from the record locator (LBA) and return key, value and status
  94. virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
  95. char* /*scratch*/) {
  96. assert(!"not implemented");
  97. return false;
  98. }
  99. // get file path
  100. std::string Path() const {
  101. return dir_ + "/" + std::to_string(cache_id_) + ".rc";
  102. }
  103. // get cache ID
  104. uint32_t cacheid() const { return cache_id_; }
  105. // Add block information to file data
  106. // Block information is the list of index reference for this file
  107. virtual void Add(BlockInfo* binfo) {
  108. WriteLock _(&rwlock_);
  109. block_infos_.push_back(binfo);
  110. }
  111. // get block information
  112. std::list<BlockInfo*>& block_infos() { return block_infos_; }
  113. // delete file and return the size of the file
  114. virtual Status Delete(uint64_t* size);
  115. protected:
  116. port::RWMutex rwlock_; // synchronization mutex
  117. Env* const env_ = nullptr; // Env for OS
  118. const std::string dir_; // Directory name
  119. const uint32_t cache_id_; // Cache id for the file
  120. std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
  121. // file content
  122. };
  123. // class RandomAccessFile
  124. //
  125. // Thread safe implementation for reading random data from file
  126. class RandomAccessCacheFile : public BlockCacheFile {
  127. public:
  128. explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
  129. const uint32_t cache_id,
  130. const std::shared_ptr<Logger>& log)
  131. : BlockCacheFile(env, dir, cache_id), log_(log) {}
  132. virtual ~RandomAccessCacheFile() {}
  133. // open file for reading
  134. bool Open(const bool enable_direct_reads);
  135. // read data from the disk
  136. bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
  137. private:
  138. std::unique_ptr<RandomAccessFileReader> freader_;
  139. protected:
  140. bool OpenImpl(const bool enable_direct_reads);
  141. bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
  142. std::shared_ptr<Logger> log_; // log file
  143. };
  144. // class WriteableCacheFile
  145. //
  146. // All writes to the files are cached in buffers. The buffers are flushed to
  147. // disk as they get filled up. When file size reaches a certain size, a new file
  148. // will be created provided there is free space
  149. class WriteableCacheFile : public RandomAccessCacheFile {
  150. public:
  151. explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
  152. Writer* writer, const std::string& dir,
  153. const uint32_t cache_id, const uint32_t max_size,
  154. const std::shared_ptr<Logger>& log)
  155. : RandomAccessCacheFile(env, dir, cache_id, log),
  156. alloc_(alloc),
  157. writer_(writer),
  158. max_size_(max_size) {}
  159. virtual ~WriteableCacheFile();
  160. // create file on disk
  161. bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
  162. // read data from logical file
  163. bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
  164. ReadLock _(&rwlock_);
  165. const bool closed = eof_ && bufs_.empty();
  166. if (closed) {
  167. // the file is closed, read from disk
  168. return RandomAccessCacheFile::Read(lba, key, block, scratch);
  169. }
  170. // file is still being written, read from buffers
  171. return ReadBuffer(lba, key, block, scratch);
  172. }
  173. // append data to end of file
  174. bool Append(const Slice&, const Slice&, LBA* const) override;
  175. // End-of-file
  176. bool Eof() const { return eof_; }
  177. private:
  178. friend class ThreadedWriter;
  179. static const size_t kFileAlignmentSize = 4 * 1024; // align file size
  180. bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
  181. bool ReadBuffer(const LBA& lba, char* data);
  182. bool ExpandBuffer(const size_t size);
  183. void DispatchBuffer();
  184. void BufferWriteDone();
  185. void CloseAndOpenForReading();
  186. void ClearBuffers();
  187. void Close();
  188. // File layout in memory
  189. //
  190. // +------+------+------+------+------+------+
  191. // | b0 | b1 | b2 | b3 | b4 | b5 |
  192. // +------+------+------+------+------+------+
  193. // ^ ^
  194. // | |
  195. // buf_doff_ buf_woff_
  196. // (next buffer to (next buffer to fill)
  197. // flush to disk)
  198. //
  199. // The buffers are flushed to disk serially for a given file
  200. CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
  201. Writer* const writer_ = nullptr; // File writer thread
  202. std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
  203. std::vector<CacheWriteBuffer*> bufs_; // Written buffers
  204. uint32_t size_ = 0; // Size of the file
  205. const uint32_t max_size_; // Max size of the file
  206. bool eof_ = false; // End of file
  207. uint32_t disk_woff_ = 0; // Offset to write on disk
  208. size_t buf_woff_ = 0; // off into bufs_ to write
  209. size_t buf_doff_ = 0; // off into bufs_ to dispatch
  210. size_t pending_ios_ = 0; // Number of ios to disk in-progress
  211. bool enable_direct_reads_ = false; // Should we enable direct reads
  212. // when reading from disk
  213. };
  214. //
  215. // Abstraction to do writing to device. It is part of pipelined architecture.
  216. //
  217. class ThreadedWriter : public Writer {
  218. public:
  219. // Representation of IO to device
  220. struct IO {
  221. explicit IO(const bool signal) : signal_(signal) {}
  222. explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
  223. const uint64_t file_off, const std::function<void()> callback)
  224. : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
  225. IO(const IO&) = default;
  226. IO& operator=(const IO&) = default;
  227. size_t Size() const { return sizeof(IO); }
  228. WritableFile* file_ = nullptr; // File to write to
  229. CacheWriteBuffer* buf_ = nullptr; // buffer to write
  230. uint64_t file_off_ = 0; // file offset
  231. bool signal_ = false; // signal to exit thread loop
  232. std::function<void()> callback_; // Callback on completion
  233. };
  234. explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
  235. const size_t io_size);
  236. virtual ~ThreadedWriter() { assert(threads_.empty()); }
  237. void Stop() override;
  238. void Write(WritableFile* const file, CacheWriteBuffer* buf,
  239. const uint64_t file_off,
  240. const std::function<void()> callback) override;
  241. private:
  242. void ThreadMain();
  243. void DispatchIO(const IO& io);
  244. const size_t io_size_ = 0;
  245. BoundedQueue<IO> q_;
  246. std::vector<port::Thread> threads_;
  247. };
  248. } // namespace ROCKSDB_NAMESPACE