cache_dump_load_impl.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "utilities/cache_dump_load_impl.h"
  6. #include <limits>
  7. #include "cache/cache_entry_roles.h"
  8. #include "cache/cache_key.h"
  9. #include "file/writable_file_writer.h"
  10. #include "port/lang.h"
  11. #include "rocksdb/env.h"
  12. #include "rocksdb/file_system.h"
  13. #include "rocksdb/utilities/ldb_cmd.h"
  14. #include "table/block_based/block_based_table_reader.h"
  15. #include "table/format.h"
  16. #include "util/crc32c.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. // Set the dump filter with a list of DBs. Block cache may be shared by multipe
  19. // DBs and we may only want to dump out the blocks belonging to certain DB(s).
  20. // Therefore, a filter is need to decide if the key of the block satisfy the
  21. // requirement.
  22. Status CacheDumperImpl::SetDumpFilter(const std::vector<DB*>& db_list) {
  23. Status s = Status::OK();
  24. dump_all_keys_ = false;
  25. for (size_t i = 0; i < db_list.size(); i++) {
  26. assert(i < db_list.size());
  27. TablePropertiesCollection ptc;
  28. assert(db_list[i] != nullptr);
  29. s = db_list[i]->GetPropertiesOfAllTables(&ptc);
  30. if (!s.ok()) {
  31. return s;
  32. }
  33. for (auto id = ptc.begin(); id != ptc.end(); id++) {
  34. OffsetableCacheKey base;
  35. // We only want to save cache entries that are portable to another
  36. // DB::Open, so only save entries with stable keys.
  37. bool is_stable;
  38. BlockBasedTable::SetupBaseCacheKey(id->second.get(),
  39. /*cur_db_session_id*/ "",
  40. /*cur_file_num*/ 0, &base, &is_stable);
  41. if (is_stable) {
  42. Slice prefix_slice = base.CommonPrefixSlice();
  43. assert(prefix_slice.size() == OffsetableCacheKey::kCommonPrefixSize);
  44. prefix_filter_.insert(prefix_slice.ToString());
  45. }
  46. }
  47. }
  48. return s;
  49. }
  50. // This is the main function to dump out the cache block entries to the writer.
  51. // The writer may create a file or write to other systems. Currently, we will
  52. // iterate the whole block cache, get the blocks, and write them to the writer
  53. IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
  54. // Prepare stage, check the parameters.
  55. if (cache_ == nullptr) {
  56. return IOStatus::InvalidArgument("Cache is null");
  57. }
  58. if (writer_ == nullptr) {
  59. return IOStatus::InvalidArgument("CacheDumpWriter is null");
  60. }
  61. // Set the system clock
  62. if (options_.clock == nullptr) {
  63. return IOStatus::InvalidArgument("System clock is null");
  64. }
  65. clock_ = options_.clock;
  66. deadline_ = options_.deadline;
  67. // Set the sequence number
  68. sequence_num_ = 0;
  69. // Dump stage, first, we write the hader
  70. IOStatus io_s = WriteHeader();
  71. if (!io_s.ok()) {
  72. return io_s;
  73. }
  74. // Then, we iterate the block cache and dump out the blocks that are not
  75. // filtered out.
  76. std::string buf;
  77. cache_->ApplyToAllEntries(DumpOneBlockCallBack(buf), {});
  78. // Finally, write the footer
  79. io_s = WriteFooter();
  80. if (!io_s.ok()) {
  81. return io_s;
  82. }
  83. io_s = writer_->Close();
  84. return io_s;
  85. }
  86. // Check if we need to filter out the block based on its key
  87. bool CacheDumperImpl::ShouldFilterOut(const Slice& key) {
  88. if (key.size() < OffsetableCacheKey::kCommonPrefixSize) {
  89. return /*filter out*/ true;
  90. }
  91. Slice key_prefix(key.data(), OffsetableCacheKey::kCommonPrefixSize);
  92. std::string prefix = key_prefix.ToString();
  93. // Filter out if not found
  94. return prefix_filter_.find(prefix) == prefix_filter_.end();
  95. }
  96. // This is the callback function which will be applied to
  97. // Cache::ApplyToAllEntries. In this callback function, we will get the block
  98. // type, decide if the block needs to be dumped based on the filter, and write
  99. // the block through the provided writer. `buf` is passed in for efficiennt
  100. // reuse.
  101. std::function<void(const Slice&, Cache::ObjectPtr, size_t,
  102. const Cache::CacheItemHelper*)>
  103. CacheDumperImpl::DumpOneBlockCallBack(std::string& buf) {
  104. return [&](const Slice& key, Cache::ObjectPtr value, size_t /*charge*/,
  105. const Cache::CacheItemHelper* helper) {
  106. if (helper == nullptr || helper->size_cb == nullptr ||
  107. helper->saveto_cb == nullptr) {
  108. // Not compatible with dumping. Skip this entry.
  109. return;
  110. }
  111. if (options_.max_size_bytes > 0 &&
  112. dumped_size_bytes_ > options_.max_size_bytes) {
  113. return;
  114. }
  115. uint64_t timestamp = clock_->NowMicros();
  116. if (deadline_.count()) {
  117. std::chrono::microseconds now = std::chrono::microseconds(timestamp);
  118. if (now >= deadline_) {
  119. return;
  120. }
  121. }
  122. CacheEntryRole role = helper->role;
  123. CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
  124. switch (role) {
  125. case CacheEntryRole::kDataBlock:
  126. type = CacheDumpUnitType::kData;
  127. break;
  128. case CacheEntryRole::kFilterBlock:
  129. type = CacheDumpUnitType::kFilter;
  130. break;
  131. case CacheEntryRole::kFilterMetaBlock:
  132. type = CacheDumpUnitType::kFilterMetaBlock;
  133. break;
  134. case CacheEntryRole::kIndexBlock:
  135. type = CacheDumpUnitType::kIndex;
  136. break;
  137. default:
  138. // Filter out other entries
  139. // FIXME? Do we need the CacheDumpUnitTypes? UncompressionDict?
  140. return;
  141. }
  142. // based on the key prefix, check if the block should be filter out.
  143. if (!dump_all_keys_ && ShouldFilterOut(key)) {
  144. return;
  145. }
  146. assert(type != CacheDumpUnitType::kBlockTypeMax);
  147. // Use cache item helper to get persistable data
  148. // FIXME: reduce copying
  149. size_t len = helper->size_cb(value);
  150. buf.assign(len, '\0');
  151. Status s = helper->saveto_cb(value, /*start*/ 0, len, buf.data());
  152. if (s.ok()) {
  153. // Write it out
  154. WriteBlock(type, key, buf, timestamp).PermitUncheckedError();
  155. dumped_size_bytes_ += len;
  156. }
  157. };
  158. }
  159. // Write the block to the writer. It takes the timestamp of the
  160. // block being copied from block cache, block type, key, block pointer,
  161. // block size and block checksum as the input. When writing the dumper raw
  162. // block, we first create the dump unit and encoude it to a string. Then,
  163. // we calculate the checksum of the whole dump unit string and store it in
  164. // the dump unit metadata.
  165. // First, we write the metadata first, which is a fixed size string. Then, we
  166. // Append the dump unit string to the writer.
  167. IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key,
  168. const Slice& value, uint64_t timestamp) {
  169. uint32_t value_checksum = crc32c::Value(value.data(), value.size());
  170. // First, serialize the block information in a string
  171. DumpUnit dump_unit;
  172. dump_unit.timestamp = timestamp;
  173. dump_unit.key = key;
  174. dump_unit.type = type;
  175. dump_unit.value_len = value.size();
  176. dump_unit.value = const_cast<char*>(value.data());
  177. dump_unit.value_checksum = value_checksum;
  178. std::string encoded_data;
  179. CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data);
  180. // Second, create the metadata, which contains a sequence number, the dump
  181. // unit string checksum and the string size. The sequence number monotonically
  182. // increases from 0.
  183. DumpUnitMeta unit_meta;
  184. unit_meta.sequence_num = sequence_num_;
  185. sequence_num_++;
  186. unit_meta.dump_unit_checksum =
  187. crc32c::Value(encoded_data.data(), encoded_data.size());
  188. unit_meta.dump_unit_size = encoded_data.size();
  189. std::string encoded_meta;
  190. CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta);
  191. // We write the metadata first.
  192. assert(writer_ != nullptr);
  193. IOStatus io_s = writer_->WriteMetadata(encoded_meta);
  194. if (!io_s.ok()) {
  195. return io_s;
  196. }
  197. // followed by the dump unit.
  198. return writer_->WritePacket(encoded_data);
  199. }
  200. // Before we write any block, we write the header first to store the cache dump
  201. // format version, rocksdb version, and brief intro.
  202. IOStatus CacheDumperImpl::WriteHeader() {
  203. std::string header_key = "header";
  204. std::ostringstream s;
  205. s << kTraceMagic << "\t"
  206. << "Cache dump format version: " << kCacheDumpMajorVersion << "."
  207. << kCacheDumpMinorVersion << "\t" << "RocksDB Version: " << kMajorVersion
  208. << "." << kMinorVersion << "\t"
  209. << "Format: dump_unit_metadata <sequence_number, dump_unit_checksum, "
  210. "dump_unit_size>, dump_unit <timestamp, key, block_type, "
  211. "block_size, block_data, block_checksum> cache_value\n";
  212. std::string header_value(s.str());
  213. CacheDumpUnitType type = CacheDumpUnitType::kHeader;
  214. uint64_t timestamp = clock_->NowMicros();
  215. return WriteBlock(type, header_key, header_value, timestamp);
  216. }
  217. // Write the footer after all the blocks are stored to indicate the ending.
  218. IOStatus CacheDumperImpl::WriteFooter() {
  219. std::string footer_key = "footer";
  220. std::string footer_value("cache dump completed");
  221. CacheDumpUnitType type = CacheDumpUnitType::kFooter;
  222. uint64_t timestamp = clock_->NowMicros();
  223. return WriteBlock(type, footer_key, footer_value, timestamp);
  224. }
  225. // This is the main function to restore the cache entries to secondary cache.
  226. // First, we check if all the arguments are valid. Then, we read the block
  227. // sequentially from the reader and insert them to the secondary cache.
  228. IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
  229. // TODO: remove this line when options are used in the loader
  230. (void)options_;
  231. // Step 1: we check if all the arguments are valid
  232. if (secondary_cache_ == nullptr) {
  233. return IOStatus::InvalidArgument("Secondary Cache is null");
  234. }
  235. if (reader_ == nullptr) {
  236. return IOStatus::InvalidArgument("CacheDumpReader is null");
  237. }
  238. // Step 2: read the header
  239. // TODO: we need to check the cache dump format version and RocksDB version
  240. // after the header is read out.
  241. IOStatus io_s;
  242. DumpUnit dump_unit;
  243. std::string data;
  244. io_s = ReadHeader(&data, &dump_unit);
  245. if (!io_s.ok()) {
  246. return io_s;
  247. }
  248. // Step 3: read out the rest of the blocks from the reader. The loop will stop
  249. // either I/O status is not ok or we reach to the the end.
  250. while (io_s.ok()) {
  251. dump_unit.reset();
  252. data.clear();
  253. // read the content and store in the dump_unit
  254. io_s = ReadCacheBlock(&data, &dump_unit);
  255. if (!io_s.ok()) {
  256. break;
  257. }
  258. if (dump_unit.type == CacheDumpUnitType::kFooter) {
  259. break;
  260. }
  261. // Create the uncompressed_block based on the information in the dump_unit
  262. // (There is no block trailer here compatible with block-based SST file.)
  263. Slice content =
  264. Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
  265. Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
  266. if (!s.ok()) {
  267. io_s = status_to_io_status(std::move(s));
  268. }
  269. }
  270. if (dump_unit.type == CacheDumpUnitType::kFooter) {
  271. return IOStatus::OK();
  272. } else {
  273. return io_s;
  274. }
  275. }
  276. // Read and copy the dump unit metadata to std::string data, decode and create
  277. // the unit metadata based on the string
  278. IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
  279. DumpUnitMeta* unit_meta) {
  280. assert(reader_ != nullptr);
  281. assert(data != nullptr);
  282. assert(unit_meta != nullptr);
  283. IOStatus io_s = reader_->ReadMetadata(data);
  284. if (!io_s.ok()) {
  285. return io_s;
  286. }
  287. return status_to_io_status(
  288. CacheDumperHelper::DecodeDumpUnitMeta(*data, unit_meta));
  289. }
  290. // Read and copy the dump unit to std::string data, decode and create the unit
  291. // based on the string
  292. IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
  293. DumpUnit* unit) {
  294. assert(reader_ != nullptr);
  295. assert(data != nullptr);
  296. assert(unit != nullptr);
  297. IOStatus io_s = reader_->ReadPacket(data);
  298. if (!io_s.ok()) {
  299. return io_s;
  300. }
  301. if (data->size() != len) {
  302. return IOStatus::Corruption(
  303. "The data being read out does not match the size stored in metadata!");
  304. }
  305. Slice block;
  306. return status_to_io_status(CacheDumperHelper::DecodeDumpUnit(*data, unit));
  307. }
  308. // Read the header
  309. IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
  310. DumpUnit* dump_unit) {
  311. DumpUnitMeta header_meta;
  312. header_meta.reset();
  313. std::string meta_string;
  314. IOStatus io_s = ReadDumpUnitMeta(&meta_string, &header_meta);
  315. if (!io_s.ok()) {
  316. return io_s;
  317. }
  318. io_s = ReadDumpUnit(header_meta.dump_unit_size, data, dump_unit);
  319. if (!io_s.ok()) {
  320. return io_s;
  321. }
  322. uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
  323. if (unit_checksum != header_meta.dump_unit_checksum) {
  324. return IOStatus::Corruption("Read header unit corrupted!");
  325. }
  326. return io_s;
  327. }
  328. // Read the blocks after header is read out
  329. IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
  330. DumpUnit* dump_unit) {
  331. // According to the write process, we read the dump_unit_metadata first
  332. DumpUnitMeta unit_meta;
  333. unit_meta.reset();
  334. std::string unit_string;
  335. IOStatus io_s = ReadDumpUnitMeta(&unit_string, &unit_meta);
  336. if (!io_s.ok()) {
  337. return io_s;
  338. }
  339. // Based on the information in the dump_unit_metadata, we read the dump_unit
  340. // and verify if its content is correct.
  341. io_s = ReadDumpUnit(unit_meta.dump_unit_size, data, dump_unit);
  342. if (!io_s.ok()) {
  343. return io_s;
  344. }
  345. uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
  346. if (unit_checksum != unit_meta.dump_unit_checksum) {
  347. return IOStatus::Corruption(
  348. "Checksum does not match! Read dumped unit corrupted!");
  349. }
  350. return io_s;
  351. }
  352. } // namespace ROCKSDB_NAMESPACE