block_cache_tier_file.h 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Copyright (c) 2013, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #ifndef ROCKSDB_LITE
  7. #include <list>
  8. #include <memory>
  9. #include <string>
  10. #include <vector>
  11. #include "file/random_access_file_reader.h"
  12. #include "rocksdb/comparator.h"
  13. #include "rocksdb/env.h"
  14. #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
  15. #include "utilities/persistent_cache/lrulist.h"
  16. #include "utilities/persistent_cache/persistent_cache_tier.h"
  17. #include "utilities/persistent_cache/persistent_cache_util.h"
  18. #include "port/port.h"
  19. #include "util/crc32c.h"
  20. #include "util/mutexlock.h"
  21. // The io code path of persistent cache uses pipelined architecture
  22. //
  23. // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
  24. //
  25. // This would enable the system to scale for GB/s of throughput which is
  26. // expected with modern devies like NVM.
  27. //
  28. // The file level operations are encapsulated in the following abstractions
  29. //
  30. // BlockCacheFile
  31. // ^
  32. // |
  33. // |
  34. // RandomAccessCacheFile (For reading)
  35. // ^
  36. // |
  37. // |
  38. // WriteableCacheFile (For writing)
  39. //
  40. // Write IO code path :
  41. //
  42. namespace ROCKSDB_NAMESPACE {
  43. class WriteableCacheFile;
  44. struct BlockInfo;
  45. // Represents a logical record on device
  46. //
  47. // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
  48. struct LogicalBlockAddress {
  49. LogicalBlockAddress() {}
  50. explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
  51. const uint16_t size)
  52. : cache_id_(cache_id), off_(off), size_(size) {}
  53. uint32_t cache_id_ = 0;
  54. uint32_t off_ = 0;
  55. uint32_t size_ = 0;
  56. };
  57. typedef LogicalBlockAddress LBA;
  58. // class Writer
  59. //
  60. // Writer is the abstraction used for writing data to file. The component can be
  61. // multithreaded. It is the last step of write pipeline
  62. class Writer {
  63. public:
  64. explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
  65. virtual ~Writer() {}
  66. // write buffer to file at the given offset
  67. virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
  68. const uint64_t file_off,
  69. const std::function<void()> callback) = 0;
  70. // stop the writer
  71. virtual void Stop() = 0;
  72. PersistentCacheTier* const cache_;
  73. };
  74. // class BlockCacheFile
  75. //
  76. // Generic interface to support building file specialized for read/writing
  77. class BlockCacheFile : public LRUElement<BlockCacheFile> {
  78. public:
  79. explicit BlockCacheFile(const uint32_t cache_id)
  80. : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
  81. explicit BlockCacheFile(Env* const env, const std::string& dir,
  82. const uint32_t cache_id)
  83. : LRUElement<BlockCacheFile>(),
  84. env_(env),
  85. dir_(dir),
  86. cache_id_(cache_id) {}
  87. virtual ~BlockCacheFile() {}
  88. // append key/value to file and return LBA locator to user
  89. virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
  90. LBA* const /*lba*/) {
  91. assert(!"not implemented");
  92. return false;
  93. }
  94. // read from the record locator (LBA) and return key, value and status
  95. virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
  96. char* /*scratch*/) {
  97. assert(!"not implemented");
  98. return false;
  99. }
  100. // get file path
  101. std::string Path() const {
  102. return dir_ + "/" + std::to_string(cache_id_) + ".rc";
  103. }
  104. // get cache ID
  105. uint32_t cacheid() const { return cache_id_; }
  106. // Add block information to file data
  107. // Block information is the list of index reference for this file
  108. virtual void Add(BlockInfo* binfo) {
  109. WriteLock _(&rwlock_);
  110. block_infos_.push_back(binfo);
  111. }
  112. // get block information
  113. std::list<BlockInfo*>& block_infos() { return block_infos_; }
  114. // delete file and return the size of the file
  115. virtual Status Delete(uint64_t* size);
  116. protected:
  117. port::RWMutex rwlock_; // synchronization mutex
  118. Env* const env_ = nullptr; // Env for OS
  119. const std::string dir_; // Directory name
  120. const uint32_t cache_id_; // Cache id for the file
  121. std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
  122. // file content
  123. };
  124. // class RandomAccessFile
  125. //
  126. // Thread safe implementation for reading random data from file
  127. class RandomAccessCacheFile : public BlockCacheFile {
  128. public:
  129. explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
  130. const uint32_t cache_id,
  131. const std::shared_ptr<Logger>& log)
  132. : BlockCacheFile(env, dir, cache_id), log_(log) {}
  133. virtual ~RandomAccessCacheFile() {}
  134. // open file for reading
  135. bool Open(const bool enable_direct_reads);
  136. // read data from the disk
  137. bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
  138. private:
  139. std::unique_ptr<RandomAccessFileReader> freader_;
  140. protected:
  141. bool OpenImpl(const bool enable_direct_reads);
  142. bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
  143. std::shared_ptr<Logger> log_; // log file
  144. };
  145. // class WriteableCacheFile
  146. //
  147. // All writes to the files are cached in buffers. The buffers are flushed to
  148. // disk as they get filled up. When file size reaches a certain size, a new file
  149. // will be created provided there is free space
  150. class WriteableCacheFile : public RandomAccessCacheFile {
  151. public:
  152. explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
  153. Writer* writer, const std::string& dir,
  154. const uint32_t cache_id, const uint32_t max_size,
  155. const std::shared_ptr<Logger>& log)
  156. : RandomAccessCacheFile(env, dir, cache_id, log),
  157. alloc_(alloc),
  158. writer_(writer),
  159. max_size_(max_size) {}
  160. virtual ~WriteableCacheFile();
  161. // create file on disk
  162. bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
  163. // read data from logical file
  164. bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
  165. ReadLock _(&rwlock_);
  166. const bool closed = eof_ && bufs_.empty();
  167. if (closed) {
  168. // the file is closed, read from disk
  169. return RandomAccessCacheFile::Read(lba, key, block, scratch);
  170. }
  171. // file is still being written, read from buffers
  172. return ReadBuffer(lba, key, block, scratch);
  173. }
  174. // append data to end of file
  175. bool Append(const Slice&, const Slice&, LBA* const) override;
  176. // End-of-file
  177. bool Eof() const { return eof_; }
  178. private:
  179. friend class ThreadedWriter;
  180. static const size_t kFileAlignmentSize = 4 * 1024; // align file size
  181. bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
  182. bool ReadBuffer(const LBA& lba, char* data);
  183. bool ExpandBuffer(const size_t size);
  184. void DispatchBuffer();
  185. void BufferWriteDone();
  186. void CloseAndOpenForReading();
  187. void ClearBuffers();
  188. void Close();
  189. // File layout in memory
  190. //
  191. // +------+------+------+------+------+------+
  192. // | b0 | b1 | b2 | b3 | b4 | b5 |
  193. // +------+------+------+------+------+------+
  194. // ^ ^
  195. // | |
  196. // buf_doff_ buf_woff_
  197. // (next buffer to (next buffer to fill)
  198. // flush to disk)
  199. //
  200. // The buffers are flushed to disk serially for a given file
  201. CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
  202. Writer* const writer_ = nullptr; // File writer thread
  203. std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
  204. std::vector<CacheWriteBuffer*> bufs_; // Written buffers
  205. uint32_t size_ = 0; // Size of the file
  206. const uint32_t max_size_; // Max size of the file
  207. bool eof_ = false; // End of file
  208. uint32_t disk_woff_ = 0; // Offset to write on disk
  209. size_t buf_woff_ = 0; // off into bufs_ to write
  210. size_t buf_doff_ = 0; // off into bufs_ to dispatch
  211. size_t pending_ios_ = 0; // Number of ios to disk in-progress
  212. bool enable_direct_reads_ = false; // Should we enable direct reads
  213. // when reading from disk
  214. };
  215. //
  216. // Abstraction to do writing to device. It is part of pipelined architecture.
  217. //
  218. class ThreadedWriter : public Writer {
  219. public:
  220. // Representation of IO to device
  221. struct IO {
  222. explicit IO(const bool signal) : signal_(signal) {}
  223. explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
  224. const uint64_t file_off, const std::function<void()> callback)
  225. : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
  226. IO(const IO&) = default;
  227. IO& operator=(const IO&) = default;
  228. size_t Size() const { return sizeof(IO); }
  229. WritableFile* file_ = nullptr; // File to write to
  230. CacheWriteBuffer* buf_ = nullptr; // buffer to write
  231. uint64_t file_off_ = 0; // file offset
  232. bool signal_ = false; // signal to exit thread loop
  233. std::function<void()> callback_; // Callback on completion
  234. };
  235. explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
  236. const size_t io_size);
  237. virtual ~ThreadedWriter() { assert(threads_.empty()); }
  238. void Stop() override;
  239. void Write(WritableFile* const file, CacheWriteBuffer* buf,
  240. const uint64_t file_off,
  241. const std::function<void()> callback) override;
  242. private:
  243. void ThreadMain();
  244. void DispatchIO(const IO& io);
  245. const size_t io_size_ = 0;
  246. BoundedQueue<IO> q_;
  247. std::vector<port::Thread> threads_;
  248. };
  249. } // namespace ROCKSDB_NAMESPACE
  250. #endif