cache_dump_load_impl.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <unordered_map>
  7. #include "file/random_access_file_reader.h"
  8. #include "file/writable_file_writer.h"
  9. #include "rocksdb/utilities/cache_dump_load.h"
  10. #include "table/block_based/block.h"
  11. #include "table/block_based/block_type.h"
  12. #include "table/block_based/cachable_entry.h"
  13. #include "table/block_based/parsed_full_filter_block.h"
  14. #include "table/block_based/reader_common.h"
  15. #include "util/hash_containers.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. // the read buffer size of for the default CacheDumpReader
  18. const unsigned int kDumpReaderBufferSize = 1024; // 1KB
  19. static const unsigned int kSizePrefixLen = 4;
  20. enum CacheDumpUnitType : unsigned char {
  21. kHeader = 1,
  22. kFooter = 2,
  23. kData = 3,
  24. kFilter = 4,
  25. kProperties = 5,
  26. kCompressionDictionary = 6,
  27. kRangeDeletion = 7,
  28. kHashIndexPrefixes = 8,
  29. kHashIndexMetadata = 9,
  30. kMetaIndex = 10,
  31. kIndex = 11,
  32. kDeprecatedFilterBlock = 12, // OBSOLETE / DEPRECATED
  33. kFilterMetaBlock = 13,
  34. kBlockTypeMax,
  35. };
  36. // The metadata of a dump unit. After it is serialized, its size is fixed 16
  37. // bytes.
  38. struct DumpUnitMeta {
  39. // sequence number is a monotonically increasing number to indicate the order
  40. // of the blocks being written. Header is 0.
  41. uint32_t sequence_num;
  42. // The Crc32c checksum of its dump unit.
  43. uint32_t dump_unit_checksum;
  44. // The dump unit size after the dump unit is serialized to a string.
  45. uint64_t dump_unit_size;
  46. void reset() {
  47. sequence_num = 0;
  48. dump_unit_checksum = 0;
  49. dump_unit_size = 0;
  50. }
  51. };
  52. // The data structure to hold a block and its information.
  53. struct DumpUnit {
  54. // The timestamp when the block is identified, copied, and dumped from block
  55. // cache
  56. uint64_t timestamp;
  57. // The type of the block
  58. CacheDumpUnitType type;
  59. // The key of this block when the block is referenced by this Cache
  60. Slice key;
  61. // The block size
  62. size_t value_len;
  63. // The Crc32c checksum of the block
  64. uint32_t value_checksum;
  65. // Pointer to the block. Note that, in the dump process, it points to a memory
  66. // buffer copied from cache block. The buffer is freed when we process the
  67. // next block. In the load process, we use an std::string to store the
  68. // serialized dump_unit read from the reader. So it points to the memory
  69. // address of the begin of the block in this string.
  70. void* value;
  71. DumpUnit() { reset(); }
  72. void reset() {
  73. timestamp = 0;
  74. type = CacheDumpUnitType::kBlockTypeMax;
  75. key.clear();
  76. value_len = 0;
  77. value_checksum = 0;
  78. value = nullptr;
  79. }
  80. };
  81. // The default implementation of the Cache Dumper
  82. class CacheDumperImpl : public CacheDumper {
  83. public:
  84. CacheDumperImpl(const CacheDumpOptions& dump_options,
  85. const std::shared_ptr<Cache>& cache,
  86. std::unique_ptr<CacheDumpWriter>&& writer)
  87. : options_(dump_options), cache_(cache), writer_(std::move(writer)) {
  88. dumped_size_bytes_ = 0;
  89. }
  90. ~CacheDumperImpl() { writer_.reset(); }
  91. Status SetDumpFilter(const std::vector<DB*>& db_list) override;
  92. IOStatus DumpCacheEntriesToWriter() override;
  93. private:
  94. IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key,
  95. const Slice& value, uint64_t timestamp);
  96. IOStatus WriteHeader();
  97. IOStatus WriteFooter();
  98. bool ShouldFilterOut(const Slice& key);
  99. std::function<void(const Slice&, Cache::ObjectPtr, size_t,
  100. const Cache::CacheItemHelper*)>
  101. DumpOneBlockCallBack(std::string& buf);
  102. CacheDumpOptions options_;
  103. std::shared_ptr<Cache> cache_;
  104. std::unique_ptr<CacheDumpWriter> writer_;
  105. SystemClock* clock_;
  106. uint32_t sequence_num_;
  107. // The cache key prefix filter. Currently, we use db_session_id as the prefix,
  108. // so using std::set to store the prefixes as filter is enough. Further
  109. // improvement can be applied like BloomFilter or others to speedup the
  110. // filtering.
  111. std::set<std::string> prefix_filter_;
  112. // Deadline for dumper in microseconds.
  113. std::chrono::microseconds deadline_;
  114. uint64_t dumped_size_bytes_;
  115. // dump all keys of cache if user doesn't call SetDumpFilter
  116. bool dump_all_keys_ = true;
  117. };
  118. // The default implementation of CacheDumpedLoader
  119. class CacheDumpedLoaderImpl : public CacheDumpedLoader {
  120. public:
  121. CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
  122. const BlockBasedTableOptions& /*toptions*/,
  123. const std::shared_ptr<SecondaryCache>& secondary_cache,
  124. std::unique_ptr<CacheDumpReader>&& reader)
  125. : options_(dump_options),
  126. secondary_cache_(secondary_cache),
  127. reader_(std::move(reader)) {}
  128. ~CacheDumpedLoaderImpl() {}
  129. IOStatus RestoreCacheEntriesToSecondaryCache() override;
  130. private:
  131. IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
  132. IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit);
  133. IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit);
  134. IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit);
  135. CacheDumpOptions options_;
  136. std::shared_ptr<SecondaryCache> secondary_cache_;
  137. std::unique_ptr<CacheDumpReader> reader_;
  138. };
  139. // The default implementation of CacheDumpWriter. We write the blocks to a file
  140. // sequentially.
  141. class ToFileCacheDumpWriter : public CacheDumpWriter {
  142. public:
  143. explicit ToFileCacheDumpWriter(
  144. std::unique_ptr<WritableFileWriter>&& file_writer)
  145. : file_writer_(std::move(file_writer)) {}
  146. ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); }
  147. // Write the serialized metadata to the file
  148. IOStatus WriteMetadata(const Slice& metadata) override {
  149. assert(file_writer_ != nullptr);
  150. std::string prefix;
  151. PutFixed32(&prefix, static_cast<uint32_t>(metadata.size()));
  152. const IOOptions opts;
  153. IOStatus io_s = file_writer_->Append(opts, Slice(prefix));
  154. if (!io_s.ok()) {
  155. return io_s;
  156. }
  157. io_s = file_writer_->Append(opts, metadata);
  158. return io_s;
  159. }
  160. // Write the serialized data to the file
  161. IOStatus WritePacket(const Slice& data) override {
  162. assert(file_writer_ != nullptr);
  163. std::string prefix;
  164. PutFixed32(&prefix, static_cast<uint32_t>(data.size()));
  165. const IOOptions opts;
  166. IOStatus io_s = file_writer_->Append(opts, Slice(prefix));
  167. if (!io_s.ok()) {
  168. return io_s;
  169. }
  170. io_s = file_writer_->Append(opts, data);
  171. return io_s;
  172. }
  173. // Reset the writer
  174. IOStatus Close() override {
  175. IOStatus io_s;
  176. if (file_writer_ != nullptr && !file_writer_->seen_error()) {
  177. io_s = file_writer_->Sync(IOOptions(), false /* use_fsync */);
  178. }
  179. file_writer_.reset();
  180. return io_s;
  181. }
  182. private:
  183. std::unique_ptr<WritableFileWriter> file_writer_;
  184. };
  185. // The default implementation of CacheDumpReader. It is implemented based on
  186. // RandomAccessFileReader. Note that, we keep an internal variable to remember
  187. // the current offset.
  188. class FromFileCacheDumpReader : public CacheDumpReader {
  189. public:
  190. explicit FromFileCacheDumpReader(
  191. std::unique_ptr<RandomAccessFileReader>&& reader)
  192. : file_reader_(std::move(reader)),
  193. offset_(0),
  194. buffer_(new char[kDumpReaderBufferSize]) {}
  195. ~FromFileCacheDumpReader() { delete[] buffer_; }
  196. IOStatus ReadMetadata(std::string* metadata) override {
  197. uint32_t metadata_len = 0;
  198. IOStatus io_s = ReadSizePrefix(&metadata_len);
  199. if (!io_s.ok()) {
  200. return io_s;
  201. }
  202. return Read(metadata_len, metadata);
  203. }
  204. IOStatus ReadPacket(std::string* data) override {
  205. uint32_t data_len = 0;
  206. IOStatus io_s = ReadSizePrefix(&data_len);
  207. if (!io_s.ok()) {
  208. return io_s;
  209. }
  210. return Read(data_len, data);
  211. }
  212. private:
  213. IOStatus ReadSizePrefix(uint32_t* len) {
  214. std::string prefix;
  215. IOStatus io_s = Read(kSizePrefixLen, &prefix);
  216. if (!io_s.ok()) {
  217. return io_s;
  218. }
  219. Slice encoded_slice(prefix);
  220. if (!GetFixed32(&encoded_slice, len)) {
  221. return IOStatus::Corruption("Decode size prefix string failed");
  222. }
  223. return IOStatus::OK();
  224. }
  225. IOStatus Read(size_t len, std::string* data) {
  226. assert(file_reader_ != nullptr);
  227. IOStatus io_s;
  228. unsigned int bytes_to_read = static_cast<unsigned int>(len);
  229. unsigned int to_read = bytes_to_read > kDumpReaderBufferSize
  230. ? kDumpReaderBufferSize
  231. : bytes_to_read;
  232. while (to_read > 0) {
  233. io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
  234. buffer_, nullptr);
  235. if (!io_s.ok()) {
  236. return io_s;
  237. }
  238. if (result_.size() < to_read) {
  239. return IOStatus::Corruption("Corrupted cache dump file.");
  240. }
  241. data->append(result_.data(), result_.size());
  242. offset_ += to_read;
  243. bytes_to_read -= to_read;
  244. to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize
  245. : bytes_to_read;
  246. }
  247. return io_s;
  248. }
  249. std::unique_ptr<RandomAccessFileReader> file_reader_;
  250. Slice result_;
  251. size_t offset_;
  252. char* buffer_;
  253. };
  254. // The cache dump and load helper class
  255. class CacheDumperHelper {
  256. public:
  257. // serialize the dump_unit_meta to a string, it is fixed 16 bytes size.
  258. static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) {
  259. assert(data);
  260. PutFixed32(data, static_cast<uint32_t>(meta.sequence_num));
  261. PutFixed32(data, static_cast<uint32_t>(meta.dump_unit_checksum));
  262. PutFixed64(data, meta.dump_unit_size);
  263. }
  264. // Serialize the dump_unit to a string.
  265. static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) {
  266. assert(data);
  267. PutFixed64(data, dump_unit.timestamp);
  268. data->push_back(dump_unit.type);
  269. PutLengthPrefixedSlice(data, dump_unit.key);
  270. PutFixed32(data, static_cast<uint32_t>(dump_unit.value_len));
  271. PutFixed32(data, dump_unit.value_checksum);
  272. PutLengthPrefixedSlice(data,
  273. Slice((char*)dump_unit.value, dump_unit.value_len));
  274. }
  275. // Deserialize the dump_unit_meta from a string
  276. static Status DecodeDumpUnitMeta(const std::string& encoded_data,
  277. DumpUnitMeta* unit_meta) {
  278. assert(unit_meta != nullptr);
  279. Slice encoded_slice = Slice(encoded_data);
  280. if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) {
  281. return Status::Incomplete("Decode dumped unit meta sequence_num failed");
  282. }
  283. if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) {
  284. return Status::Incomplete(
  285. "Decode dumped unit meta dump_unit_checksum failed");
  286. }
  287. if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) {
  288. return Status::Incomplete(
  289. "Decode dumped unit meta dump_unit_size failed");
  290. }
  291. return Status::OK();
  292. }
  293. // Deserialize the dump_unit from a string.
  294. static Status DecodeDumpUnit(const std::string& encoded_data,
  295. DumpUnit* dump_unit) {
  296. assert(dump_unit != nullptr);
  297. Slice encoded_slice = Slice(encoded_data);
  298. // Decode timestamp
  299. if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) {
  300. return Status::Incomplete("Decode dumped unit string failed");
  301. }
  302. // Decode the block type
  303. dump_unit->type = static_cast<CacheDumpUnitType>(encoded_slice[0]);
  304. encoded_slice.remove_prefix(1);
  305. // Decode the key
  306. if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) {
  307. return Status::Incomplete("Decode dumped unit string failed");
  308. }
  309. // Decode the value size
  310. uint32_t value_len;
  311. if (!GetFixed32(&encoded_slice, &value_len)) {
  312. return Status::Incomplete("Decode dumped unit string failed");
  313. }
  314. dump_unit->value_len = static_cast<size_t>(value_len);
  315. // Decode the value checksum
  316. if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) {
  317. return Status::Incomplete("Decode dumped unit string failed");
  318. }
  319. // Decode the block content and copy to the memory space whose pointer
  320. // will be managed by the cache finally.
  321. Slice block;
  322. if (!GetLengthPrefixedSlice(&encoded_slice, &block)) {
  323. return Status::Incomplete("Decode dumped unit string failed");
  324. }
  325. dump_unit->value = (void*)block.data();
  326. assert(block.size() == dump_unit->value_len);
  327. return Status::OK();
  328. }
  329. };
  330. } // namespace ROCKSDB_NAMESPACE