version_builder.cc 64 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_builder.h"
  10. #include <algorithm>
  11. #include <atomic>
  12. #include <cinttypes>
  13. #include <functional>
  14. #include <map>
  15. #include <memory>
  16. #include <set>
  17. #include <sstream>
  18. #include <thread>
  19. #include <unordered_map>
  20. #include <unordered_set>
  21. #include <utility>
  22. #include <vector>
  23. #include "cache/cache_reservation_manager.h"
  24. #include "db/blob/blob_file_cache.h"
  25. #include "db/blob/blob_file_meta.h"
  26. #include "db/dbformat.h"
  27. #include "db/internal_stats.h"
  28. #include "db/table_cache.h"
  29. #include "db/version_edit.h"
  30. #include "db/version_edit_handler.h"
  31. #include "db/version_set.h"
  32. #include "port/port.h"
  33. #include "table/table_reader.h"
  34. #include "util/string_util.h"
  35. namespace ROCKSDB_NAMESPACE {
  36. class VersionBuilder::Rep {
  37. class NewestFirstBySeqNo {
  38. public:
  39. bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
  40. assert(lhs);
  41. assert(rhs);
  42. if (lhs->fd.largest_seqno != rhs->fd.largest_seqno) {
  43. return lhs->fd.largest_seqno > rhs->fd.largest_seqno;
  44. }
  45. if (lhs->fd.smallest_seqno != rhs->fd.smallest_seqno) {
  46. return lhs->fd.smallest_seqno > rhs->fd.smallest_seqno;
  47. }
  48. // Break ties by file number
  49. return lhs->fd.GetNumber() > rhs->fd.GetNumber();
  50. }
  51. };
  52. class NewestFirstByEpochNumber {
  53. private:
  54. inline static const NewestFirstBySeqNo seqno_cmp;
  55. public:
  56. bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
  57. assert(lhs);
  58. assert(rhs);
  59. if (lhs->epoch_number != rhs->epoch_number) {
  60. return lhs->epoch_number > rhs->epoch_number;
  61. } else {
  62. return seqno_cmp(lhs, rhs);
  63. }
  64. }
  65. };
  66. class BySmallestKey {
  67. public:
  68. explicit BySmallestKey(const InternalKeyComparator* cmp) : cmp_(cmp) {}
  69. bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
  70. assert(lhs);
  71. assert(rhs);
  72. assert(cmp_);
  73. const int r = cmp_->Compare(lhs->smallest, rhs->smallest);
  74. if (r != 0) {
  75. return (r < 0);
  76. }
  77. // Break ties by file number
  78. return (lhs->fd.GetNumber() < rhs->fd.GetNumber());
  79. }
  80. private:
  81. const InternalKeyComparator* cmp_;
  82. };
  83. struct LevelState {
  84. std::unordered_set<uint64_t> deleted_files;
  85. // Map from file number to file meta data.
  86. std::unordered_map<uint64_t, FileMetaData*> added_files;
  87. };
  88. // A class that represents the accumulated changes (like additional garbage or
  89. // newly linked/unlinked SST files) for a given blob file after applying a
  90. // series of VersionEdits.
  91. class BlobFileMetaDataDelta {
  92. public:
  93. bool IsEmpty() const {
  94. return !additional_garbage_count_ && !additional_garbage_bytes_ &&
  95. newly_linked_ssts_.empty() && newly_unlinked_ssts_.empty();
  96. }
  97. uint64_t GetAdditionalGarbageCount() const {
  98. return additional_garbage_count_;
  99. }
  100. uint64_t GetAdditionalGarbageBytes() const {
  101. return additional_garbage_bytes_;
  102. }
  103. const std::unordered_set<uint64_t>& GetNewlyLinkedSsts() const {
  104. return newly_linked_ssts_;
  105. }
  106. const std::unordered_set<uint64_t>& GetNewlyUnlinkedSsts() const {
  107. return newly_unlinked_ssts_;
  108. }
  109. void AddGarbage(uint64_t count, uint64_t bytes) {
  110. additional_garbage_count_ += count;
  111. additional_garbage_bytes_ += bytes;
  112. }
  113. void LinkSst(uint64_t sst_file_number) {
  114. assert(newly_linked_ssts_.find(sst_file_number) ==
  115. newly_linked_ssts_.end());
  116. // Reconcile with newly unlinked SSTs on the fly. (Note: an SST can be
  117. // linked to and unlinked from the same blob file in the case of a trivial
  118. // move.)
  119. auto it = newly_unlinked_ssts_.find(sst_file_number);
  120. if (it != newly_unlinked_ssts_.end()) {
  121. newly_unlinked_ssts_.erase(it);
  122. } else {
  123. newly_linked_ssts_.emplace(sst_file_number);
  124. }
  125. }
  126. void UnlinkSst(uint64_t sst_file_number) {
  127. assert(newly_unlinked_ssts_.find(sst_file_number) ==
  128. newly_unlinked_ssts_.end());
  129. // Reconcile with newly linked SSTs on the fly. (Note: an SST can be
  130. // linked to and unlinked from the same blob file in the case of a trivial
  131. // move.)
  132. auto it = newly_linked_ssts_.find(sst_file_number);
  133. if (it != newly_linked_ssts_.end()) {
  134. newly_linked_ssts_.erase(it);
  135. } else {
  136. newly_unlinked_ssts_.emplace(sst_file_number);
  137. }
  138. }
  139. private:
  140. uint64_t additional_garbage_count_ = 0;
  141. uint64_t additional_garbage_bytes_ = 0;
  142. std::unordered_set<uint64_t> newly_linked_ssts_;
  143. std::unordered_set<uint64_t> newly_unlinked_ssts_;
  144. };
  145. // A class that represents the state of a blob file after applying a series of
  146. // VersionEdits. In addition to the resulting state, it also contains the
  147. // delta (see BlobFileMetaDataDelta above). The resulting state can be used to
  148. // identify obsolete blob files, while the delta makes it possible to
  149. // efficiently detect trivial moves.
  150. class MutableBlobFileMetaData {
  151. public:
  152. // To be used for brand new blob files
  153. explicit MutableBlobFileMetaData(
  154. std::shared_ptr<SharedBlobFileMetaData>&& shared_meta)
  155. : shared_meta_(std::move(shared_meta)) {}
  156. // To be used for pre-existing blob files
  157. explicit MutableBlobFileMetaData(
  158. const std::shared_ptr<BlobFileMetaData>& meta)
  159. : shared_meta_(meta->GetSharedMeta()),
  160. linked_ssts_(meta->GetLinkedSsts()),
  161. garbage_blob_count_(meta->GetGarbageBlobCount()),
  162. garbage_blob_bytes_(meta->GetGarbageBlobBytes()) {}
  163. const std::shared_ptr<SharedBlobFileMetaData>& GetSharedMeta() const {
  164. return shared_meta_;
  165. }
  166. uint64_t GetBlobFileNumber() const {
  167. assert(shared_meta_);
  168. return shared_meta_->GetBlobFileNumber();
  169. }
  170. bool HasDelta() const { return !delta_.IsEmpty(); }
  171. const std::unordered_set<uint64_t>& GetLinkedSsts() const {
  172. return linked_ssts_;
  173. }
  174. uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
  175. uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
  176. bool AddGarbage(uint64_t count, uint64_t bytes) {
  177. assert(shared_meta_);
  178. if (garbage_blob_count_ + count > shared_meta_->GetTotalBlobCount() ||
  179. garbage_blob_bytes_ + bytes > shared_meta_->GetTotalBlobBytes()) {
  180. return false;
  181. }
  182. delta_.AddGarbage(count, bytes);
  183. garbage_blob_count_ += count;
  184. garbage_blob_bytes_ += bytes;
  185. return true;
  186. }
  187. void LinkSst(uint64_t sst_file_number) {
  188. delta_.LinkSst(sst_file_number);
  189. assert(linked_ssts_.find(sst_file_number) == linked_ssts_.end());
  190. linked_ssts_.emplace(sst_file_number);
  191. }
  192. void UnlinkSst(uint64_t sst_file_number) {
  193. delta_.UnlinkSst(sst_file_number);
  194. assert(linked_ssts_.find(sst_file_number) != linked_ssts_.end());
  195. linked_ssts_.erase(sst_file_number);
  196. }
  197. private:
  198. std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
  199. // Accumulated changes
  200. BlobFileMetaDataDelta delta_;
  201. // Resulting state after applying the changes
  202. BlobFileMetaData::LinkedSsts linked_ssts_;
  203. uint64_t garbage_blob_count_ = 0;
  204. uint64_t garbage_blob_bytes_ = 0;
  205. };
  206. const FileOptions& file_options_;
  207. const ImmutableCFOptions* const ioptions_;
  208. TableCache* table_cache_;
  209. VersionStorageInfo* base_vstorage_;
  210. VersionSet* version_set_;
  211. int num_levels_;
  212. LevelState* levels_;
  213. // Store sizes of levels larger than num_levels_. We do this instead of
  214. // storing them in levels_ to avoid regression in case there are no files
  215. // on invalid levels. The version is not consistent if in the end the files
  216. // on invalid levels don't cancel out.
  217. std::unordered_map<int, size_t> invalid_level_sizes_;
  218. // Whether there are invalid new files or invalid deletion on levels larger
  219. // than num_levels_.
  220. bool has_invalid_levels_;
  221. // Current levels of table files affected by additions/deletions.
  222. std::unordered_map<uint64_t, int> table_file_levels_;
  223. // Current compact cursors that should be changed after the last compaction
  224. std::unordered_map<int, InternalKey> updated_compact_cursors_;
  225. const std::shared_ptr<const NewestFirstByEpochNumber>
  226. level_zero_cmp_by_epochno_;
  227. const std::shared_ptr<const NewestFirstBySeqNo> level_zero_cmp_by_seqno_;
  228. const std::shared_ptr<const BySmallestKey> level_nonzero_cmp_;
  229. // Mutable metadata objects for all blob files affected by the series of
  230. // version edits.
  231. std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
  232. std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
  233. ColumnFamilyData* cfd_;
  234. VersionEditHandler* version_edit_handler_;
  235. bool track_found_and_missing_files_;
  236. // If false, only a complete Version with all files consisting it found is
  237. // considered valid. If true, besides complete Version, if the Version is
  238. // never edited in an atomic group, an incomplete Version with only a suffix
  239. // of L0 files missing is also considered valid.
  240. bool allow_incomplete_valid_version_;
  241. // These are only tracked if `track_found_and_missing_files_` is enabled.
  242. // The SST files that are found (blob files not included yet).
  243. std::unordered_set<uint64_t> found_files_;
  244. // Missing SST files for L0
  245. std::unordered_set<uint64_t> l0_missing_files_;
  246. // Missing SST files for non L0 levels
  247. std::unordered_set<uint64_t> non_l0_missing_files_;
  248. // Intermediate SST files (blob files not included yet)
  249. std::vector<std::string> intermediate_files_;
  250. // The highest file number for all the missing blob files, useful to check
  251. // if a complete Version is available.
  252. uint64_t missing_blob_files_high_ = kInvalidBlobFileNumber;
  253. // Missing blob files, useful to check if only the missing L0 files'
  254. // associated blob files are missing.
  255. std::unordered_set<uint64_t> missing_blob_files_;
  256. // True if all files consisting the Version can be found. Or if
  257. // `allow_incomplete_valid_version_` is true and the version history is not
  258. // ever edited in an atomic group, this will be true if only a
  259. // suffix of L0 SST files and their associated blob files are missing.
  260. bool valid_version_available_;
  261. // True if version is ever edited in an atomic group.
  262. bool edited_in_atomic_group_;
  263. // Flag to indicate if the Version is updated since last validity check. If no
  264. // `Apply` call is made between a `Rep`'s construction and a
  265. // `ValidVersionAvailable` check or between two `ValidVersionAvailable` calls.
  266. // This flag will be true to indicate the cached validity value can be
  267. // directly used without a recheck.
  268. bool version_updated_since_last_check_;
  269. // End of fields that are only tracked when `track_found_and_missing_files_`
  270. // is enabled.
  271. public:
  272. Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
  273. TableCache* table_cache, VersionStorageInfo* base_vstorage,
  274. VersionSet* version_set,
  275. std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
  276. ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
  277. bool track_found_and_missing_files, bool allow_incomplete_valid_version)
  278. : file_options_(file_options),
  279. ioptions_(ioptions),
  280. table_cache_(table_cache),
  281. base_vstorage_(base_vstorage),
  282. version_set_(version_set),
  283. num_levels_(base_vstorage->num_levels()),
  284. has_invalid_levels_(false),
  285. level_zero_cmp_by_epochno_(
  286. std::make_shared<NewestFirstByEpochNumber>()),
  287. level_zero_cmp_by_seqno_(std::make_shared<NewestFirstBySeqNo>()),
  288. level_nonzero_cmp_(std::make_shared<BySmallestKey>(
  289. base_vstorage_->InternalComparator())),
  290. file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr),
  291. cfd_(cfd),
  292. version_edit_handler_(version_edit_handler),
  293. track_found_and_missing_files_(track_found_and_missing_files),
  294. allow_incomplete_valid_version_(allow_incomplete_valid_version) {
  295. assert(ioptions_);
  296. levels_ = new LevelState[num_levels_];
  297. if (track_found_and_missing_files_) {
  298. assert(cfd_);
  299. assert(version_edit_handler_);
  300. // `track_found_and_missing_files_` mode used by VersionEditHandlerPIT
  301. // assumes the initial base version is valid. For best efforts recovery,
  302. // base will be empty. For manifest tailing usage like secondary instance,
  303. // they do not allow incomplete version, so the base version in subsequent
  304. // catch up attempts should be valid too.
  305. valid_version_available_ = true;
  306. edited_in_atomic_group_ = false;
  307. version_updated_since_last_check_ = false;
  308. }
  309. }
  310. Rep(const Rep& other)
  311. : file_options_(other.file_options_),
  312. ioptions_(other.ioptions_),
  313. table_cache_(other.table_cache_),
  314. base_vstorage_(other.base_vstorage_),
  315. version_set_(other.version_set_),
  316. num_levels_(other.num_levels_),
  317. invalid_level_sizes_(other.invalid_level_sizes_),
  318. has_invalid_levels_(other.has_invalid_levels_),
  319. table_file_levels_(other.table_file_levels_),
  320. updated_compact_cursors_(other.updated_compact_cursors_),
  321. level_zero_cmp_by_epochno_(other.level_zero_cmp_by_epochno_),
  322. level_zero_cmp_by_seqno_(other.level_zero_cmp_by_seqno_),
  323. level_nonzero_cmp_(other.level_nonzero_cmp_),
  324. mutable_blob_file_metas_(other.mutable_blob_file_metas_),
  325. file_metadata_cache_res_mgr_(other.file_metadata_cache_res_mgr_),
  326. cfd_(other.cfd_),
  327. version_edit_handler_(other.version_edit_handler_),
  328. track_found_and_missing_files_(other.track_found_and_missing_files_),
  329. allow_incomplete_valid_version_(other.allow_incomplete_valid_version_),
  330. found_files_(other.found_files_),
  331. l0_missing_files_(other.l0_missing_files_),
  332. non_l0_missing_files_(other.non_l0_missing_files_),
  333. intermediate_files_(other.intermediate_files_),
  334. missing_blob_files_high_(other.missing_blob_files_high_),
  335. missing_blob_files_(other.missing_blob_files_),
  336. valid_version_available_(other.valid_version_available_),
  337. edited_in_atomic_group_(other.edited_in_atomic_group_),
  338. version_updated_since_last_check_(
  339. other.version_updated_since_last_check_) {
  340. assert(ioptions_);
  341. levels_ = new LevelState[num_levels_];
  342. for (int level = 0; level < num_levels_; level++) {
  343. levels_[level] = other.levels_[level];
  344. const auto& added = levels_[level].added_files;
  345. for (auto& pair : added) {
  346. RefFile(pair.second);
  347. }
  348. }
  349. if (track_found_and_missing_files_) {
  350. assert(cfd_);
  351. assert(version_edit_handler_);
  352. }
  353. }
  354. ~Rep() {
  355. for (int level = 0; level < num_levels_; level++) {
  356. const auto& added = levels_[level].added_files;
  357. for (auto& pair : added) {
  358. UnrefFile(pair.second);
  359. }
  360. }
  361. delete[] levels_;
  362. }
  363. void RefFile(FileMetaData* f) {
  364. assert(f);
  365. assert(f->refs > 0);
  366. f->refs++;
  367. }
  368. void UnrefFile(FileMetaData* f) {
  369. f->refs--;
  370. if (f->refs <= 0) {
  371. if (f->table_reader_handle) {
  372. assert(table_cache_ != nullptr);
  373. // NOTE: have to release in raw cache interface to avoid using a
  374. // TypedHandle for FileMetaData::table_reader_handle
  375. table_cache_->get_cache().get()->Release(f->table_reader_handle);
  376. f->table_reader_handle = nullptr;
  377. }
  378. if (file_metadata_cache_res_mgr_) {
  379. Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
  380. f->ApproximateMemoryUsage(), false /* increase */);
  381. s.PermitUncheckedError();
  382. }
  383. delete f;
  384. }
  385. }
  386. // Mapping used for checking the consistency of links between SST files and
  387. // blob files. It is built using the forward links (table file -> blob file),
  388. // and is subsequently compared with the inverse mapping stored in the
  389. // BlobFileMetaData objects.
  390. using ExpectedLinkedSsts =
  391. std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
  392. static void UpdateExpectedLinkedSsts(
  393. uint64_t table_file_number, uint64_t blob_file_number,
  394. ExpectedLinkedSsts* expected_linked_ssts) {
  395. assert(expected_linked_ssts);
  396. if (blob_file_number == kInvalidBlobFileNumber) {
  397. return;
  398. }
  399. (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
  400. }
  401. template <typename Checker>
  402. Status CheckConsistencyDetailsForLevel(
  403. const VersionStorageInfo* vstorage, int level, Checker checker,
  404. const std::string& sync_point,
  405. ExpectedLinkedSsts* expected_linked_ssts) const {
  406. #ifdef NDEBUG
  407. (void)sync_point;
  408. #endif
  409. assert(vstorage);
  410. assert(level >= 0 && level < num_levels_);
  411. assert(expected_linked_ssts);
  412. const auto& level_files = vstorage->LevelFiles(level);
  413. if (level_files.empty()) {
  414. return Status::OK();
  415. }
  416. assert(level_files[0]);
  417. UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
  418. level_files[0]->oldest_blob_file_number,
  419. expected_linked_ssts);
  420. for (size_t i = 1; i < level_files.size(); ++i) {
  421. assert(level_files[i]);
  422. UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
  423. level_files[i]->oldest_blob_file_number,
  424. expected_linked_ssts);
  425. auto lhs = level_files[i - 1];
  426. auto rhs = level_files[i];
  427. #ifndef NDEBUG
  428. auto pair = std::make_pair(&lhs, &rhs);
  429. TEST_SYNC_POINT_CALLBACK(sync_point, &pair);
  430. #endif
  431. const Status s = checker(lhs, rhs);
  432. if (!s.ok()) {
  433. return s;
  434. }
  435. }
  436. return Status::OK();
  437. }
  438. // Make sure table files are sorted correctly and that the links between
  439. // table files and blob files are consistent.
  440. Status CheckConsistencyDetails(const VersionStorageInfo* vstorage) const {
  441. assert(vstorage);
  442. ExpectedLinkedSsts expected_linked_ssts;
  443. if (num_levels_ > 0) {
  444. const InternalKeyComparator* const icmp = vstorage->InternalComparator();
  445. EpochNumberRequirement epoch_number_requirement =
  446. vstorage->GetEpochNumberRequirement();
  447. assert(icmp);
  448. // Check L0
  449. {
  450. auto l0_checker = [this, epoch_number_requirement, icmp](
  451. const FileMetaData* lhs,
  452. const FileMetaData* rhs) {
  453. assert(lhs);
  454. assert(rhs);
  455. if (epoch_number_requirement ==
  456. EpochNumberRequirement::kMightMissing) {
  457. if (!level_zero_cmp_by_seqno_->operator()(lhs, rhs)) {
  458. std::ostringstream oss;
  459. oss << "L0 files are not sorted properly: files #"
  460. << lhs->fd.GetNumber() << " with seqnos (largest, smallest) "
  461. << lhs->fd.largest_seqno << " , " << lhs->fd.smallest_seqno
  462. << ", #" << rhs->fd.GetNumber()
  463. << " with seqnos (largest, smallest) "
  464. << rhs->fd.largest_seqno << " , " << rhs->fd.smallest_seqno;
  465. return Status::Corruption("VersionBuilder", oss.str());
  466. }
  467. } else if (epoch_number_requirement ==
  468. EpochNumberRequirement::kMustPresent) {
  469. if (lhs->epoch_number == rhs->epoch_number) {
  470. bool range_overlapped =
  471. icmp->Compare(lhs->smallest, rhs->largest) <= 0 &&
  472. icmp->Compare(lhs->largest, rhs->smallest) >= 0;
  473. if (range_overlapped) {
  474. std::ostringstream oss;
  475. oss << "L0 files of same epoch number but overlapping range #"
  476. << lhs->fd.GetNumber()
  477. << " , smallest key: " << lhs->smallest.DebugString(true)
  478. << " , largest key: " << lhs->largest.DebugString(true)
  479. << " , epoch number: " << lhs->epoch_number << " vs. file #"
  480. << rhs->fd.GetNumber()
  481. << " , smallest key: " << rhs->smallest.DebugString(true)
  482. << " , largest key: " << rhs->largest.DebugString(true)
  483. << " , epoch number: " << rhs->epoch_number;
  484. return Status::Corruption("VersionBuilder", oss.str());
  485. }
  486. }
  487. if (!level_zero_cmp_by_epochno_->operator()(lhs, rhs)) {
  488. std::ostringstream oss;
  489. oss << "L0 files are not sorted properly: files #"
  490. << lhs->fd.GetNumber() << " with epoch number "
  491. << lhs->epoch_number << ", #" << rhs->fd.GetNumber()
  492. << " with epoch number " << rhs->epoch_number;
  493. return Status::Corruption("VersionBuilder", oss.str());
  494. }
  495. }
  496. return Status::OK();
  497. };
  498. const Status s = CheckConsistencyDetailsForLevel(
  499. vstorage, /* level */ 0, l0_checker,
  500. "VersionBuilder::CheckConsistency0", &expected_linked_ssts);
  501. if (!s.ok()) {
  502. return s;
  503. }
  504. }
  505. // Check L1 and up
  506. for (int level = 1; level < num_levels_; ++level) {
  507. auto checker = [this, level, icmp](const FileMetaData* lhs,
  508. const FileMetaData* rhs) {
  509. assert(lhs);
  510. assert(rhs);
  511. if (!level_nonzero_cmp_->operator()(lhs, rhs)) {
  512. std::ostringstream oss;
  513. oss << 'L' << level << " files are not sorted properly: files #"
  514. << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
  515. return Status::Corruption("VersionBuilder", oss.str());
  516. }
  517. // Make sure there is no overlap in level
  518. if (icmp->Compare(lhs->largest, rhs->smallest) >= 0) {
  519. std::ostringstream oss;
  520. oss << 'L' << level << " has overlapping ranges: file #"
  521. << lhs->fd.GetNumber()
  522. << " largest key: " << lhs->largest.DebugString(true)
  523. << " vs. file #" << rhs->fd.GetNumber()
  524. << " smallest key: " << rhs->smallest.DebugString(true);
  525. return Status::Corruption("VersionBuilder", oss.str());
  526. }
  527. return Status::OK();
  528. };
  529. const Status s = CheckConsistencyDetailsForLevel(
  530. vstorage, level, checker, "VersionBuilder::CheckConsistency1",
  531. &expected_linked_ssts);
  532. if (!s.ok()) {
  533. return s;
  534. }
  535. }
  536. }
  537. // Make sure that all blob files in the version have non-garbage data and
  538. // the links between them and the table files are consistent.
  539. const auto& blob_files = vstorage->GetBlobFiles();
  540. for (const auto& blob_file_meta : blob_files) {
  541. assert(blob_file_meta);
  542. const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
  543. if (blob_file_meta->GetGarbageBlobCount() >=
  544. blob_file_meta->GetTotalBlobCount()) {
  545. std::ostringstream oss;
  546. oss << "Blob file #" << blob_file_number
  547. << " consists entirely of garbage";
  548. return Status::Corruption("VersionBuilder", oss.str());
  549. }
  550. if (blob_file_meta->GetLinkedSsts() !=
  551. expected_linked_ssts[blob_file_number]) {
  552. std::ostringstream oss;
  553. oss << "Links are inconsistent between table files and blob file #"
  554. << blob_file_number;
  555. return Status::Corruption("VersionBuilder", oss.str());
  556. }
  557. }
  558. Status ret_s;
  559. TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
  560. &ret_s);
  561. return ret_s;
  562. }
  563. Status CheckConsistency(const VersionStorageInfo* vstorage) const {
  564. assert(vstorage);
  565. // Always run consistency checks in debug build
  566. #ifdef NDEBUG
  567. if (!vstorage->force_consistency_checks()) {
  568. return Status::OK();
  569. }
  570. #endif
  571. Status s = CheckConsistencyDetails(vstorage);
  572. if (s.IsCorruption() && s.getState()) {
  573. // Make it clear the error is due to force_consistency_checks = 1 or
  574. // debug build
  575. #ifdef NDEBUG
  576. auto prefix = "force_consistency_checks";
  577. #else
  578. auto prefix = "force_consistency_checks(DEBUG)";
  579. #endif
  580. s = Status::Corruption(prefix, s.getState());
  581. } else {
  582. // was only expecting corruption with message, or OK
  583. assert(s.ok());
  584. }
  585. return s;
  586. }
  587. bool CheckConsistencyForNumLevels() const {
  588. // Make sure there are no files on or beyond num_levels().
  589. if (has_invalid_levels_) {
  590. return false;
  591. }
  592. for (const auto& pair : invalid_level_sizes_) {
  593. const size_t level_size = pair.second;
  594. if (level_size != 0) {
  595. return false;
  596. }
  597. }
  598. return true;
  599. }
  600. bool IsBlobFileInVersion(uint64_t blob_file_number) const {
  601. auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
  602. if (mutable_it != mutable_blob_file_metas_.end()) {
  603. return true;
  604. }
  605. assert(base_vstorage_);
  606. const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
  607. return !!meta;
  608. }
  609. MutableBlobFileMetaData* GetOrCreateMutableBlobFileMetaData(
  610. uint64_t blob_file_number) {
  611. auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
  612. if (mutable_it != mutable_blob_file_metas_.end()) {
  613. return &mutable_it->second;
  614. }
  615. assert(base_vstorage_);
  616. const auto meta = base_vstorage_->GetBlobFileMetaData(blob_file_number);
  617. if (meta) {
  618. mutable_it = mutable_blob_file_metas_
  619. .emplace(blob_file_number, MutableBlobFileMetaData(meta))
  620. .first;
  621. return &mutable_it->second;
  622. }
  623. return nullptr;
  624. }
  625. Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
  626. const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
  627. if (IsBlobFileInVersion(blob_file_number)) {
  628. std::ostringstream oss;
  629. oss << "Blob file #" << blob_file_number << " already added";
  630. return Status::Corruption("VersionBuilder", oss.str());
  631. }
  632. auto deleter = [vs = version_set_, ioptions = ioptions_,
  633. bc = cfd_ ? cfd_->blob_file_cache()
  634. : nullptr](SharedBlobFileMetaData* shared_meta) {
  635. if (vs) {
  636. assert(ioptions);
  637. assert(!ioptions->cf_paths.empty());
  638. assert(shared_meta);
  639. vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
  640. ioptions->cf_paths.front().path);
  641. }
  642. if (bc) {
  643. bc->Evict(shared_meta->GetBlobFileNumber());
  644. }
  645. delete shared_meta;
  646. };
  647. auto shared_meta = SharedBlobFileMetaData::Create(
  648. blob_file_number, blob_file_addition.GetTotalBlobCount(),
  649. blob_file_addition.GetTotalBlobBytes(),
  650. blob_file_addition.GetChecksumMethod(),
  651. blob_file_addition.GetChecksumValue(), std::move(deleter));
  652. mutable_blob_file_metas_.emplace(
  653. blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
  654. Status s;
  655. if (track_found_and_missing_files_) {
  656. assert(version_edit_handler_);
  657. s = version_edit_handler_->VerifyBlobFile(cfd_, blob_file_number,
  658. blob_file_addition);
  659. if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
  660. missing_blob_files_high_ =
  661. std::max(missing_blob_files_high_, blob_file_number);
  662. missing_blob_files_.insert(blob_file_number);
  663. s = Status::OK();
  664. } else if (!s.ok()) {
  665. return s;
  666. }
  667. }
  668. return s;
  669. }
  670. Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
  671. const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
  672. MutableBlobFileMetaData* const mutable_meta =
  673. GetOrCreateMutableBlobFileMetaData(blob_file_number);
  674. if (!mutable_meta) {
  675. std::ostringstream oss;
  676. oss << "Blob file #" << blob_file_number << " not found";
  677. return Status::Corruption("VersionBuilder", oss.str());
  678. }
  679. if (!mutable_meta->AddGarbage(blob_file_garbage.GetGarbageBlobCount(),
  680. blob_file_garbage.GetGarbageBlobBytes())) {
  681. std::ostringstream oss;
  682. oss << "Garbage overflow for blob file #" << blob_file_number;
  683. return Status::Corruption("VersionBuilder", oss.str());
  684. }
  685. return Status::OK();
  686. }
  687. int GetCurrentLevelForTableFile(uint64_t file_number) const {
  688. auto it = table_file_levels_.find(file_number);
  689. if (it != table_file_levels_.end()) {
  690. return it->second;
  691. }
  692. assert(base_vstorage_);
  693. return base_vstorage_->GetFileLocation(file_number).GetLevel();
  694. }
  695. uint64_t GetOldestBlobFileNumberForTableFile(int level,
  696. uint64_t file_number) const {
  697. assert(level < num_levels_);
  698. const auto& added_files = levels_[level].added_files;
  699. auto it = added_files.find(file_number);
  700. if (it != added_files.end()) {
  701. const FileMetaData* const meta = it->second;
  702. assert(meta);
  703. return meta->oldest_blob_file_number;
  704. }
  705. assert(base_vstorage_);
  706. const FileMetaData* const meta =
  707. base_vstorage_->GetFileMetaDataByNumber(file_number);
  708. assert(meta);
  709. return meta->oldest_blob_file_number;
  710. }
  711. Status ApplyFileDeletion(int level, uint64_t file_number) {
  712. assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
  713. const int current_level = GetCurrentLevelForTableFile(file_number);
  714. if (level != current_level) {
  715. if (level >= num_levels_) {
  716. has_invalid_levels_ = true;
  717. }
  718. std::ostringstream oss;
  719. oss << "Cannot delete table file #" << file_number << " from level "
  720. << level << " since it is ";
  721. if (current_level ==
  722. VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
  723. oss << "not in the LSM tree";
  724. } else {
  725. oss << "on level " << current_level;
  726. }
  727. return Status::Corruption("VersionBuilder", oss.str());
  728. }
  729. if (level >= num_levels_) {
  730. assert(invalid_level_sizes_[level] > 0);
  731. --invalid_level_sizes_[level];
  732. table_file_levels_[file_number] =
  733. VersionStorageInfo::FileLocation::Invalid().GetLevel();
  734. return Status::OK();
  735. }
  736. const uint64_t blob_file_number =
  737. GetOldestBlobFileNumberForTableFile(level, file_number);
  738. if (blob_file_number != kInvalidBlobFileNumber) {
  739. MutableBlobFileMetaData* const mutable_meta =
  740. GetOrCreateMutableBlobFileMetaData(blob_file_number);
  741. if (mutable_meta) {
  742. mutable_meta->UnlinkSst(file_number);
  743. }
  744. }
  745. auto& level_state = levels_[level];
  746. auto& add_files = level_state.added_files;
  747. auto add_it = add_files.find(file_number);
  748. if (add_it != add_files.end()) {
  749. UnrefFile(add_it->second);
  750. add_files.erase(add_it);
  751. }
  752. auto& del_files = level_state.deleted_files;
  753. assert(del_files.find(file_number) == del_files.end());
  754. del_files.emplace(file_number);
  755. table_file_levels_[file_number] =
  756. VersionStorageInfo::FileLocation::Invalid().GetLevel();
  757. if (track_found_and_missing_files_) {
  758. assert(version_edit_handler_);
  759. if (l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
  760. l0_missing_files_.erase(file_number);
  761. } else if (non_l0_missing_files_.find(file_number) !=
  762. non_l0_missing_files_.end()) {
  763. non_l0_missing_files_.erase(file_number);
  764. } else {
  765. auto fiter = found_files_.find(file_number);
  766. // Only mark new files added during this catchup attempt for deletion.
  767. // These files were never installed in VersionStorageInfo.
  768. // Already referenced files that are deleted by a VersionEdit will
  769. // be added to the VersionStorageInfo's obsolete files when the old
  770. // version is dereferenced.
  771. if (fiter != found_files_.end()) {
  772. assert(!ioptions_->cf_paths.empty());
  773. intermediate_files_.emplace_back(
  774. MakeTableFileName(ioptions_->cf_paths[0].path, file_number));
  775. found_files_.erase(fiter);
  776. }
  777. }
  778. }
  779. return Status::OK();
  780. }
  781. Status ApplyFileAddition(int level, const FileMetaData& meta) {
  782. assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
  783. const uint64_t file_number = meta.fd.GetNumber();
  784. const int current_level = GetCurrentLevelForTableFile(file_number);
  785. if (current_level !=
  786. VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
  787. if (level >= num_levels_) {
  788. has_invalid_levels_ = true;
  789. }
  790. std::ostringstream oss;
  791. oss << "Cannot add table file #" << file_number << " to level " << level
  792. << " since it is already in the LSM tree on level " << current_level;
  793. return Status::Corruption("VersionBuilder", oss.str());
  794. }
  795. if (level >= num_levels_) {
  796. ++invalid_level_sizes_[level];
  797. table_file_levels_[file_number] = level;
  798. return Status::OK();
  799. }
  800. auto& level_state = levels_[level];
  801. auto& del_files = level_state.deleted_files;
  802. auto del_it = del_files.find(file_number);
  803. if (del_it != del_files.end()) {
  804. del_files.erase(del_it);
  805. }
  806. FileMetaData* const f = new FileMetaData(meta);
  807. f->refs = 1;
  808. if (file_metadata_cache_res_mgr_) {
  809. Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
  810. f->ApproximateMemoryUsage(), true /* increase */);
  811. if (!s.ok()) {
  812. delete f;
  813. s = Status::MemoryLimit(
  814. "Can't allocate " +
  815. kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
  816. CacheEntryRole::kFileMetadata)] +
  817. " due to exceeding the memory limit "
  818. "based on "
  819. "cache capacity");
  820. return s;
  821. }
  822. }
  823. auto& add_files = level_state.added_files;
  824. assert(add_files.find(file_number) == add_files.end());
  825. add_files.emplace(file_number, f);
  826. const uint64_t blob_file_number = f->oldest_blob_file_number;
  827. if (blob_file_number != kInvalidBlobFileNumber) {
  828. MutableBlobFileMetaData* const mutable_meta =
  829. GetOrCreateMutableBlobFileMetaData(blob_file_number);
  830. if (mutable_meta) {
  831. mutable_meta->LinkSst(file_number);
  832. }
  833. }
  834. table_file_levels_[file_number] = level;
  835. Status s;
  836. if (track_found_and_missing_files_) {
  837. assert(version_edit_handler_);
  838. assert(!ioptions_->cf_paths.empty());
  839. const std::string fpath =
  840. MakeTableFileName(ioptions_->cf_paths[0].path, file_number);
  841. s = version_edit_handler_->VerifyFile(cfd_, fpath, level, meta);
  842. if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
  843. if (0 == level) {
  844. l0_missing_files_.insert(file_number);
  845. } else {
  846. non_l0_missing_files_.insert(file_number);
  847. }
  848. if (s.IsCorruption()) {
  849. found_files_.insert(file_number);
  850. }
  851. s = Status::OK();
  852. } else if (!s.ok()) {
  853. return s;
  854. } else {
  855. found_files_.insert(file_number);
  856. }
  857. }
  858. return s;
  859. }
  860. Status ApplyCompactCursors(int level,
  861. const InternalKey& smallest_uncompacted_key) {
  862. if (level < 0) {
  863. std::ostringstream oss;
  864. oss << "Cannot add compact cursor (" << level << ","
  865. << smallest_uncompacted_key.Encode().ToString()
  866. << " due to invalid level (level = " << level << ")";
  867. return Status::Corruption("VersionBuilder", oss.str());
  868. }
  869. if (level < num_levels_) {
  870. // Omit levels (>= num_levels_) when re-open with shrinking num_levels_
  871. updated_compact_cursors_[level] = smallest_uncompacted_key;
  872. }
  873. return Status::OK();
  874. }
  875. // Apply all of the edits in *edit to the current state.
  876. Status Apply(const VersionEdit* edit) {
  877. bool version_updated = false;
  878. {
  879. const Status s = CheckConsistency(base_vstorage_);
  880. if (!s.ok()) {
  881. return s;
  882. }
  883. }
  884. // Note: we process the blob file related changes first because the
  885. // table file addition/deletion logic depends on the blob files
  886. // already being there.
  887. // Add new blob files
  888. for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
  889. const Status s = ApplyBlobFileAddition(blob_file_addition);
  890. if (!s.ok()) {
  891. return s;
  892. }
  893. version_updated = true;
  894. }
  895. // Increase the amount of garbage for blob files affected by GC
  896. for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
  897. const Status s = ApplyBlobFileGarbage(blob_file_garbage);
  898. if (!s.ok()) {
  899. return s;
  900. }
  901. version_updated = true;
  902. }
  903. // Delete table files
  904. for (const auto& deleted_file : edit->GetDeletedFiles()) {
  905. const int level = deleted_file.first;
  906. const uint64_t file_number = deleted_file.second;
  907. const Status s = ApplyFileDeletion(level, file_number);
  908. if (!s.ok()) {
  909. return s;
  910. }
  911. version_updated = true;
  912. }
  913. // Add new table files
  914. for (const auto& new_file : edit->GetNewFiles()) {
  915. const int level = new_file.first;
  916. const FileMetaData& meta = new_file.second;
  917. const Status s = ApplyFileAddition(level, meta);
  918. if (!s.ok()) {
  919. return s;
  920. }
  921. version_updated = true;
  922. }
  923. // Populate compact cursors for round-robin compaction, leave
  924. // the cursor to be empty to indicate it is invalid
  925. for (const auto& cursor : edit->GetCompactCursors()) {
  926. const int level = cursor.first;
  927. const InternalKey smallest_uncompacted_key = cursor.second;
  928. const Status s = ApplyCompactCursors(level, smallest_uncompacted_key);
  929. if (!s.ok()) {
  930. return s;
  931. }
  932. }
  933. if (track_found_and_missing_files_ && version_updated) {
  934. version_updated_since_last_check_ = true;
  935. if (!edited_in_atomic_group_ && edit->IsInAtomicGroup()) {
  936. edited_in_atomic_group_ = true;
  937. }
  938. }
  939. return Status::OK();
  940. }
  941. // Helper function template for merging the blob file metadata from the base
  942. // version with the mutable metadata representing the state after applying the
  943. // edits. The function objects process_base and process_mutable are
  944. // respectively called to handle a base version object when there is no
  945. // matching mutable object, and a mutable object when there is no matching
  946. // base version object. process_both is called to perform the merge when a
  947. // given blob file appears both in the base version and the mutable list. The
  948. // helper stops processing objects if a function object returns false. Blob
  949. // files with a file number below first_blob_file are not processed.
  950. template <typename ProcessBase, typename ProcessMutable, typename ProcessBoth>
  951. void MergeBlobFileMetas(uint64_t first_blob_file, ProcessBase process_base,
  952. ProcessMutable process_mutable,
  953. ProcessBoth process_both) const {
  954. assert(base_vstorage_);
  955. auto base_it = base_vstorage_->GetBlobFileMetaDataLB(first_blob_file);
  956. const auto base_it_end = base_vstorage_->GetBlobFiles().end();
  957. auto mutable_it = mutable_blob_file_metas_.lower_bound(first_blob_file);
  958. const auto mutable_it_end = mutable_blob_file_metas_.end();
  959. while (base_it != base_it_end && mutable_it != mutable_it_end) {
  960. const auto& base_meta = *base_it;
  961. assert(base_meta);
  962. const uint64_t base_blob_file_number = base_meta->GetBlobFileNumber();
  963. const uint64_t mutable_blob_file_number = mutable_it->first;
  964. if (base_blob_file_number < mutable_blob_file_number) {
  965. if (!process_base(base_meta)) {
  966. return;
  967. }
  968. ++base_it;
  969. } else if (mutable_blob_file_number < base_blob_file_number) {
  970. const auto& mutable_meta = mutable_it->second;
  971. if (!process_mutable(mutable_meta)) {
  972. return;
  973. }
  974. ++mutable_it;
  975. } else {
  976. assert(base_blob_file_number == mutable_blob_file_number);
  977. const auto& mutable_meta = mutable_it->second;
  978. if (!process_both(base_meta, mutable_meta)) {
  979. return;
  980. }
  981. ++base_it;
  982. ++mutable_it;
  983. }
  984. }
  985. while (base_it != base_it_end) {
  986. const auto& base_meta = *base_it;
  987. if (!process_base(base_meta)) {
  988. return;
  989. }
  990. ++base_it;
  991. }
  992. while (mutable_it != mutable_it_end) {
  993. const auto& mutable_meta = mutable_it->second;
  994. if (!process_mutable(mutable_meta)) {
  995. return;
  996. }
  997. ++mutable_it;
  998. }
  999. }
  1000. // Helper function template for finding the first blob file that has linked
  1001. // SSTs.
  1002. template <typename Meta>
  1003. static bool CheckLinkedSsts(const Meta& meta,
  1004. uint64_t* min_oldest_blob_file_num) {
  1005. assert(min_oldest_blob_file_num);
  1006. if (!meta.GetLinkedSsts().empty()) {
  1007. assert(*min_oldest_blob_file_num == kInvalidBlobFileNumber);
  1008. *min_oldest_blob_file_num = meta.GetBlobFileNumber();
  1009. return false;
  1010. }
  1011. return true;
  1012. }
  1013. // Find the oldest blob file that has linked SSTs.
  1014. uint64_t GetMinOldestBlobFileNumber() const {
  1015. uint64_t min_oldest_blob_file_num = kInvalidBlobFileNumber;
  1016. auto process_base =
  1017. [&min_oldest_blob_file_num](
  1018. const std::shared_ptr<BlobFileMetaData>& base_meta) {
  1019. assert(base_meta);
  1020. return CheckLinkedSsts(*base_meta, &min_oldest_blob_file_num);
  1021. };
  1022. auto process_mutable = [&min_oldest_blob_file_num](
  1023. const MutableBlobFileMetaData& mutable_meta) {
  1024. return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
  1025. };
  1026. auto process_both = [&min_oldest_blob_file_num](
  1027. const std::shared_ptr<BlobFileMetaData>& base_meta,
  1028. const MutableBlobFileMetaData& mutable_meta) {
  1029. #ifndef NDEBUG
  1030. assert(base_meta);
  1031. assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
  1032. #else
  1033. (void)base_meta;
  1034. #endif
  1035. // Look at mutable_meta since it supersedes *base_meta
  1036. return CheckLinkedSsts(mutable_meta, &min_oldest_blob_file_num);
  1037. };
  1038. MergeBlobFileMetas(kInvalidBlobFileNumber, process_base, process_mutable,
  1039. process_both);
  1040. return min_oldest_blob_file_num;
  1041. }
  1042. static std::shared_ptr<BlobFileMetaData> CreateBlobFileMetaData(
  1043. const MutableBlobFileMetaData& mutable_meta) {
  1044. return BlobFileMetaData::Create(
  1045. mutable_meta.GetSharedMeta(), mutable_meta.GetLinkedSsts(),
  1046. mutable_meta.GetGarbageBlobCount(), mutable_meta.GetGarbageBlobBytes());
  1047. }
  1048. bool OnlyLinkedToMissingL0Files(
  1049. const std::unordered_set<uint64_t>& linked_ssts) const {
  1050. return std::all_of(
  1051. linked_ssts.begin(), linked_ssts.end(), [&](const uint64_t& element) {
  1052. return l0_missing_files_.find(element) != l0_missing_files_.end();
  1053. });
  1054. }
  1055. // Add the blob file specified by meta to *vstorage if it is determined to
  1056. // contain valid data (blobs).
  1057. template <typename Meta>
  1058. void AddBlobFileIfNeeded(VersionStorageInfo* vstorage, Meta&& meta,
  1059. uint64_t blob_file_number) const {
  1060. assert(vstorage);
  1061. assert(meta);
  1062. const auto& linked_ssts = meta->GetLinkedSsts();
  1063. if (track_found_and_missing_files_) {
  1064. if (missing_blob_files_.find(blob_file_number) !=
  1065. missing_blob_files_.end()) {
  1066. return;
  1067. }
  1068. // Leave the empty case for the below blob garbage collection logic.
  1069. if (!linked_ssts.empty() && OnlyLinkedToMissingL0Files(linked_ssts)) {
  1070. return;
  1071. }
  1072. }
  1073. if (linked_ssts.empty() &&
  1074. meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
  1075. return;
  1076. }
  1077. vstorage->AddBlobFile(std::forward<Meta>(meta));
  1078. }
  1079. // Merge the blob file metadata from the base version with the changes (edits)
  1080. // applied, and save the result into *vstorage.
  1081. void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
  1082. assert(vstorage);
  1083. assert(!track_found_and_missing_files_ || valid_version_available_);
  1084. assert(base_vstorage_);
  1085. vstorage->ReserveBlob(base_vstorage_->GetBlobFiles().size() +
  1086. mutable_blob_file_metas_.size());
  1087. const uint64_t oldest_blob_file_with_linked_ssts =
  1088. GetMinOldestBlobFileNumber();
  1089. // If there are no blob files with linked SSTs, meaning that there are no
  1090. // valid blob files
  1091. if (oldest_blob_file_with_linked_ssts == kInvalidBlobFileNumber) {
  1092. return;
  1093. }
  1094. auto process_base =
  1095. [this, vstorage](const std::shared_ptr<BlobFileMetaData>& base_meta) {
  1096. assert(base_meta);
  1097. AddBlobFileIfNeeded(vstorage, base_meta,
  1098. base_meta->GetBlobFileNumber());
  1099. return true;
  1100. };
  1101. auto process_mutable =
  1102. [this, vstorage](const MutableBlobFileMetaData& mutable_meta) {
  1103. AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta),
  1104. mutable_meta.GetBlobFileNumber());
  1105. return true;
  1106. };
  1107. auto process_both = [this, vstorage](
  1108. const std::shared_ptr<BlobFileMetaData>& base_meta,
  1109. const MutableBlobFileMetaData& mutable_meta) {
  1110. assert(base_meta);
  1111. assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
  1112. if (!mutable_meta.HasDelta()) {
  1113. assert(base_meta->GetGarbageBlobCount() ==
  1114. mutable_meta.GetGarbageBlobCount());
  1115. assert(base_meta->GetGarbageBlobBytes() ==
  1116. mutable_meta.GetGarbageBlobBytes());
  1117. assert(base_meta->GetLinkedSsts() == mutable_meta.GetLinkedSsts());
  1118. AddBlobFileIfNeeded(vstorage, base_meta,
  1119. base_meta->GetBlobFileNumber());
  1120. return true;
  1121. }
  1122. AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta),
  1123. mutable_meta.GetBlobFileNumber());
  1124. return true;
  1125. };
  1126. MergeBlobFileMetas(oldest_blob_file_with_linked_ssts, process_base,
  1127. process_mutable, process_both);
  1128. }
  1129. void MaybeAddFile(VersionStorageInfo* vstorage, int level,
  1130. FileMetaData* f) const {
  1131. const uint64_t file_number = f->fd.GetNumber();
  1132. if (track_found_and_missing_files_ && level == 0 &&
  1133. l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
  1134. return;
  1135. }
  1136. const auto& level_state = levels_[level];
  1137. const auto& del_files = level_state.deleted_files;
  1138. const auto del_it = del_files.find(file_number);
  1139. if (del_it != del_files.end()) {
  1140. // f is to-be-deleted table file
  1141. vstorage->RemoveCurrentStats(f);
  1142. } else {
  1143. const auto& add_files = level_state.added_files;
  1144. const auto add_it = add_files.find(file_number);
  1145. // Note: if the file appears both in the base version and in the added
  1146. // list, the added FileMetaData supersedes the one in the base version.
  1147. if (add_it != add_files.end() && add_it->second != f) {
  1148. vstorage->RemoveCurrentStats(f);
  1149. } else {
  1150. vstorage->AddFile(level, f);
  1151. }
  1152. }
  1153. }
  1154. bool ContainsCompleteVersion() const {
  1155. assert(track_found_and_missing_files_);
  1156. return l0_missing_files_.empty() && non_l0_missing_files_.empty() &&
  1157. (missing_blob_files_high_ == kInvalidBlobFileNumber ||
  1158. missing_blob_files_high_ < GetMinOldestBlobFileNumber());
  1159. }
  1160. bool HasMissingFiles() const {
  1161. assert(track_found_and_missing_files_);
  1162. return !l0_missing_files_.empty() || !non_l0_missing_files_.empty() ||
  1163. missing_blob_files_high_ != kInvalidBlobFileNumber;
  1164. }
  1165. std::vector<std::string>& GetAndClearIntermediateFiles() {
  1166. assert(track_found_and_missing_files_);
  1167. return intermediate_files_;
  1168. }
  1169. void ClearFoundFiles() {
  1170. assert(track_found_and_missing_files_);
  1171. found_files_.clear();
  1172. }
  1173. template <typename Cmp>
  1174. void SaveSSTFilesTo(VersionStorageInfo* vstorage, int level, Cmp cmp) const {
  1175. // Merge the set of added files with the set of pre-existing files.
  1176. // Drop any deleted files. Store the result in *vstorage.
  1177. const auto& base_files = base_vstorage_->LevelFiles(level);
  1178. const auto& unordered_added_files = levels_[level].added_files;
  1179. vstorage->Reserve(level, base_files.size() + unordered_added_files.size());
  1180. MergeUnorderdAddedFilesWithBase(
  1181. base_files, unordered_added_files, cmp,
  1182. [&](FileMetaData* file) { MaybeAddFile(vstorage, level, file); });
  1183. }
  1184. template <typename Cmp, typename AddFileFunc>
  1185. void MergeUnorderdAddedFilesWithBase(
  1186. const std::vector<FileMetaData*>& base_files,
  1187. const std::unordered_map<uint64_t, FileMetaData*>& unordered_added_files,
  1188. Cmp cmp, AddFileFunc add_file_func) const {
  1189. // Sort added files for the level.
  1190. std::vector<FileMetaData*> added_files;
  1191. added_files.reserve(unordered_added_files.size());
  1192. for (const auto& pair : unordered_added_files) {
  1193. added_files.push_back(pair.second);
  1194. }
  1195. std::sort(added_files.begin(), added_files.end(), cmp);
  1196. auto base_iter = base_files.begin();
  1197. auto base_end = base_files.end();
  1198. auto added_iter = added_files.begin();
  1199. auto added_end = added_files.end();
  1200. while (added_iter != added_end || base_iter != base_end) {
  1201. if (base_iter == base_end ||
  1202. (added_iter != added_end && cmp(*added_iter, *base_iter))) {
  1203. add_file_func(*added_iter++);
  1204. } else {
  1205. add_file_func(*base_iter++);
  1206. }
  1207. }
  1208. }
  1209. bool PromoteEpochNumberRequirementIfNeeded(
  1210. VersionStorageInfo* vstorage) const {
  1211. if (vstorage->HasMissingEpochNumber()) {
  1212. return false;
  1213. }
  1214. for (int level = 0; level < num_levels_; ++level) {
  1215. for (const auto& pair : levels_[level].added_files) {
  1216. const FileMetaData* f = pair.second;
  1217. if (f->epoch_number == kUnknownEpochNumber) {
  1218. return false;
  1219. }
  1220. }
  1221. }
  1222. vstorage->SetEpochNumberRequirement(EpochNumberRequirement::kMustPresent);
  1223. return true;
  1224. }
  1225. void SaveSSTFilesTo(VersionStorageInfo* vstorage) const {
  1226. assert(vstorage);
  1227. if (!num_levels_) {
  1228. return;
  1229. }
  1230. EpochNumberRequirement epoch_number_requirement =
  1231. vstorage->GetEpochNumberRequirement();
  1232. if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) {
  1233. bool promoted = PromoteEpochNumberRequirementIfNeeded(vstorage);
  1234. if (promoted) {
  1235. epoch_number_requirement = vstorage->GetEpochNumberRequirement();
  1236. }
  1237. }
  1238. if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) {
  1239. SaveSSTFilesTo(vstorage, /* level */ 0, *level_zero_cmp_by_seqno_);
  1240. } else {
  1241. SaveSSTFilesTo(vstorage, /* level */ 0, *level_zero_cmp_by_epochno_);
  1242. }
  1243. for (int level = 1; level < num_levels_; ++level) {
  1244. SaveSSTFilesTo(vstorage, level, *level_nonzero_cmp_);
  1245. }
  1246. }
  1247. void SaveCompactCursorsTo(VersionStorageInfo* vstorage) const {
  1248. for (auto iter = updated_compact_cursors_.begin();
  1249. iter != updated_compact_cursors_.end(); iter++) {
  1250. vstorage->AddCursorForOneLevel(iter->first, iter->second);
  1251. }
  1252. }
  1253. bool ValidVersionAvailable() {
  1254. assert(track_found_and_missing_files_);
  1255. if (version_updated_since_last_check_) {
  1256. valid_version_available_ = ContainsCompleteVersion();
  1257. if (!valid_version_available_ && !edited_in_atomic_group_ &&
  1258. allow_incomplete_valid_version_) {
  1259. valid_version_available_ = OnlyMissingL0Suffix();
  1260. }
  1261. version_updated_since_last_check_ = false;
  1262. }
  1263. return valid_version_available_;
  1264. }
  1265. bool OnlyMissingL0Suffix() const {
  1266. if (!non_l0_missing_files_.empty()) {
  1267. return false;
  1268. }
  1269. assert(!(l0_missing_files_.empty() && missing_blob_files_.empty()));
  1270. if (!l0_missing_files_.empty() && !MissingL0FilesAreL0Suffix()) {
  1271. return false;
  1272. }
  1273. if (!missing_blob_files_.empty() &&
  1274. !RemainingSstFilesNotMissingBlobFiles()) {
  1275. return false;
  1276. }
  1277. return true;
  1278. }
  1279. // Check missing L0 files are a suffix of expected sorted L0 files.
  1280. bool MissingL0FilesAreL0Suffix() const {
  1281. assert(non_l0_missing_files_.empty());
  1282. assert(!l0_missing_files_.empty());
  1283. std::vector<FileMetaData*> expected_sorted_l0_files;
  1284. const auto& base_files = base_vstorage_->LevelFiles(0);
  1285. const auto& unordered_added_files = levels_[0].added_files;
  1286. expected_sorted_l0_files.reserve(base_files.size() +
  1287. unordered_added_files.size());
  1288. EpochNumberRequirement epoch_number_requirement =
  1289. base_vstorage_->GetEpochNumberRequirement();
  1290. if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) {
  1291. MergeUnorderdAddedFilesWithBase(
  1292. base_files, unordered_added_files, *level_zero_cmp_by_seqno_,
  1293. [&](FileMetaData* file) {
  1294. expected_sorted_l0_files.push_back(file);
  1295. });
  1296. } else {
  1297. MergeUnorderdAddedFilesWithBase(
  1298. base_files, unordered_added_files, *level_zero_cmp_by_epochno_,
  1299. [&](FileMetaData* file) {
  1300. expected_sorted_l0_files.push_back(file);
  1301. });
  1302. }
  1303. assert(expected_sorted_l0_files.size() >= l0_missing_files_.size());
  1304. std::unordered_set<uint64_t> unaddressed_missing_files = l0_missing_files_;
  1305. for (auto iter = expected_sorted_l0_files.begin();
  1306. iter != expected_sorted_l0_files.end(); iter++) {
  1307. uint64_t file_number = (*iter)->fd.GetNumber();
  1308. if (l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
  1309. assert(unaddressed_missing_files.find(file_number) !=
  1310. unaddressed_missing_files.end());
  1311. unaddressed_missing_files.erase(file_number);
  1312. } else if (!unaddressed_missing_files.empty()) {
  1313. return false;
  1314. } else {
  1315. break;
  1316. }
  1317. }
  1318. return true;
  1319. }
  1320. // Check for each of the missing blob file missing, it either is older than
  1321. // the minimum oldest blob file required by this Version or only linked to
  1322. // the missing L0 files.
  1323. bool RemainingSstFilesNotMissingBlobFiles() const {
  1324. assert(non_l0_missing_files_.empty());
  1325. assert(!missing_blob_files_.empty());
  1326. bool no_l0_files_missing = l0_missing_files_.empty();
  1327. uint64_t min_oldest_blob_file_num = GetMinOldestBlobFileNumber();
  1328. for (const auto& missing_blob_file : missing_blob_files_) {
  1329. if (missing_blob_file < min_oldest_blob_file_num) {
  1330. continue;
  1331. }
  1332. auto iter = mutable_blob_file_metas_.find(missing_blob_file);
  1333. assert(iter != mutable_blob_file_metas_.end());
  1334. const std::unordered_set<uint64_t>& linked_ssts =
  1335. iter->second.GetLinkedSsts();
  1336. // TODO(yuzhangyu): In theory, if no L0 SST files ara missing, and only
  1337. // blob files exclusively linked to a L0 suffix are missing, we can
  1338. // recover to a valid point in time too. We don't recover that type of
  1339. // incomplete Version yet.
  1340. if (!linked_ssts.empty() && no_l0_files_missing) {
  1341. return false;
  1342. }
  1343. if (!OnlyLinkedToMissingL0Files(linked_ssts)) {
  1344. return false;
  1345. }
  1346. }
  1347. return true;
  1348. }
  1349. // Save the current state in *vstorage.
  1350. Status SaveTo(VersionStorageInfo* vstorage) const {
  1351. assert(!track_found_and_missing_files_ || valid_version_available_);
  1352. Status s;
  1353. #ifndef NDEBUG
  1354. // The same check is done within Apply() so we skip it in release mode.
  1355. s = CheckConsistency(base_vstorage_);
  1356. if (!s.ok()) {
  1357. return s;
  1358. }
  1359. #endif // NDEBUG
  1360. s = CheckConsistency(vstorage);
  1361. if (!s.ok()) {
  1362. return s;
  1363. }
  1364. SaveSSTFilesTo(vstorage);
  1365. SaveBlobFilesTo(vstorage);
  1366. SaveCompactCursorsTo(vstorage);
  1367. s = CheckConsistency(vstorage);
  1368. return s;
  1369. }
  1370. Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
  1371. bool prefetch_index_and_filter_in_cache,
  1372. bool is_initial_load,
  1373. const MutableCFOptions& mutable_cf_options,
  1374. size_t max_file_size_for_l0_meta_pin,
  1375. const ReadOptions& read_options) {
  1376. assert(table_cache_ != nullptr);
  1377. assert(!track_found_and_missing_files_ || valid_version_available_);
  1378. size_t table_cache_capacity =
  1379. table_cache_->get_cache().get()->GetCapacity();
  1380. bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
  1381. size_t max_load = std::numeric_limits<size_t>::max();
  1382. if (!always_load) {
  1383. // If it is initial loading and not set to always loading all the
  1384. // files, we only load up to kInitialLoadLimit files, to limit the
  1385. // time reopening the DB.
  1386. const size_t kInitialLoadLimit = 16;
  1387. size_t load_limit;
  1388. // If the table cache is not 1/4 full, we pin the table handle to
  1389. // file metadata to avoid the cache read costs when reading the file.
  1390. // The downside of pinning those files is that LRU won't be followed
  1391. // for those files. This doesn't matter much because if number of files
  1392. // of the DB excceeds table cache capacity, eventually no table reader
  1393. // will be pinned and LRU will be followed.
  1394. if (is_initial_load) {
  1395. load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
  1396. } else {
  1397. load_limit = table_cache_capacity / 4;
  1398. }
  1399. size_t table_cache_usage = table_cache_->get_cache().get()->GetUsage();
  1400. if (table_cache_usage >= load_limit) {
  1401. // TODO (yanqin) find a suitable status code.
  1402. return Status::OK();
  1403. } else {
  1404. max_load = load_limit - table_cache_usage;
  1405. }
  1406. }
  1407. // <file metadata, level>
  1408. std::vector<std::pair<FileMetaData*, int>> files_meta;
  1409. std::vector<Status> statuses;
  1410. for (int level = 0; level < num_levels_; level++) {
  1411. for (auto& file_meta_pair : levels_[level].added_files) {
  1412. auto* file_meta = file_meta_pair.second;
  1413. uint64_t file_number = file_meta->fd.GetNumber();
  1414. if (track_found_and_missing_files_ && level == 0 &&
  1415. l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
  1416. continue;
  1417. }
  1418. // If the file has been opened before, just skip it.
  1419. if (!file_meta->table_reader_handle) {
  1420. files_meta.emplace_back(file_meta, level);
  1421. statuses.emplace_back(Status::OK());
  1422. }
  1423. if (files_meta.size() >= max_load) {
  1424. break;
  1425. }
  1426. }
  1427. if (files_meta.size() >= max_load) {
  1428. break;
  1429. }
  1430. }
  1431. std::atomic<size_t> next_file_meta_idx(0);
  1432. std::function<void()> load_handlers_func([&]() {
  1433. while (true) {
  1434. size_t file_idx = next_file_meta_idx.fetch_add(1);
  1435. if (file_idx >= files_meta.size()) {
  1436. break;
  1437. }
  1438. auto* file_meta = files_meta[file_idx].first;
  1439. int level = files_meta[file_idx].second;
  1440. TableCache::TypedHandle* handle = nullptr;
  1441. statuses[file_idx] = table_cache_->FindTable(
  1442. read_options, file_options_,
  1443. *(base_vstorage_->InternalComparator()), *file_meta, &handle,
  1444. mutable_cf_options, false /*no_io */,
  1445. internal_stats->GetFileReadHist(level), false, level,
  1446. prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin,
  1447. file_meta->temperature);
  1448. if (handle != nullptr) {
  1449. file_meta->table_reader_handle = handle;
  1450. // Load table_reader
  1451. file_meta->fd.table_reader = table_cache_->get_cache().Value(handle);
  1452. }
  1453. }
  1454. });
  1455. std::vector<port::Thread> threads;
  1456. for (int i = 1; i < max_threads; i++) {
  1457. threads.emplace_back(load_handlers_func);
  1458. }
  1459. load_handlers_func();
  1460. for (auto& t : threads) {
  1461. t.join();
  1462. }
  1463. Status ret;
  1464. for (const auto& s : statuses) {
  1465. if (!s.ok()) {
  1466. if (ret.ok()) {
  1467. ret = s;
  1468. }
  1469. }
  1470. }
  1471. return ret;
  1472. }
  1473. };
  1474. VersionBuilder::VersionBuilder(
  1475. const FileOptions& file_options, const ImmutableCFOptions* ioptions,
  1476. TableCache* table_cache, VersionStorageInfo* base_vstorage,
  1477. VersionSet* version_set,
  1478. std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
  1479. ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
  1480. bool track_found_and_missing_files, bool allow_incomplete_valid_version)
  1481. : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
  1482. version_set, file_metadata_cache_res_mgr, cfd,
  1483. version_edit_handler, track_found_and_missing_files,
  1484. allow_incomplete_valid_version)) {}
  1485. VersionBuilder::~VersionBuilder() = default;
  1486. bool VersionBuilder::CheckConsistencyForNumLevels() {
  1487. return rep_->CheckConsistencyForNumLevels();
  1488. }
  1489. Status VersionBuilder::Apply(const VersionEdit* edit) {
  1490. return rep_->Apply(edit);
  1491. }
  1492. Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) const {
  1493. return rep_->SaveTo(vstorage);
  1494. }
  1495. Status VersionBuilder::LoadTableHandlers(
  1496. InternalStats* internal_stats, int max_threads,
  1497. bool prefetch_index_and_filter_in_cache, bool is_initial_load,
  1498. const MutableCFOptions& mutable_cf_options,
  1499. size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options) {
  1500. return rep_->LoadTableHandlers(internal_stats, max_threads,
  1501. prefetch_index_and_filter_in_cache,
  1502. is_initial_load, mutable_cf_options,
  1503. max_file_size_for_l0_meta_pin, read_options);
  1504. }
  1505. void VersionBuilder::CreateOrReplaceSavePoint() {
  1506. assert(rep_);
  1507. savepoint_ = std::move(rep_);
  1508. rep_ = std::make_unique<Rep>(*savepoint_);
  1509. }
  1510. bool VersionBuilder::ValidVersionAvailable() {
  1511. return rep_->ValidVersionAvailable();
  1512. }
  1513. bool VersionBuilder::HasMissingFiles() const { return rep_->HasMissingFiles(); }
  1514. std::vector<std::string>& VersionBuilder::GetAndClearIntermediateFiles() {
  1515. return rep_->GetAndClearIntermediateFiles();
  1516. }
  1517. void VersionBuilder::ClearFoundFiles() { return rep_->ClearFoundFiles(); }
  1518. Status VersionBuilder::SaveSavePointTo(VersionStorageInfo* vstorage) const {
  1519. if (!savepoint_ || !savepoint_->ValidVersionAvailable()) {
  1520. return Status::InvalidArgument();
  1521. }
  1522. return savepoint_->SaveTo(vstorage);
  1523. }
  1524. Status VersionBuilder::LoadSavePointTableHandlers(
  1525. InternalStats* internal_stats, int max_threads,
  1526. bool prefetch_index_and_filter_in_cache, bool is_initial_load,
  1527. const MutableCFOptions& mutable_cf_options,
  1528. size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options) {
  1529. if (!savepoint_ || !savepoint_->ValidVersionAvailable()) {
  1530. return Status::InvalidArgument();
  1531. }
  1532. return savepoint_->LoadTableHandlers(
  1533. internal_stats, max_threads, prefetch_index_and_filter_in_cache,
  1534. is_initial_load, mutable_cf_options, max_file_size_for_l0_meta_pin,
  1535. read_options);
  1536. }
  1537. void VersionBuilder::ClearSavePoint() { savepoint_.reset(nullptr); }
  1538. BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
  1539. ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
  1540. bool track_found_and_missing_files, bool allow_incomplete_valid_version)
  1541. : version_builder_(new VersionBuilder(
  1542. cfd->current()->version_set()->file_options(), &cfd->ioptions(),
  1543. cfd->table_cache(), cfd->current()->storage_info(),
  1544. cfd->current()->version_set(),
  1545. cfd->GetFileMetadataCacheReservationManager(), cfd,
  1546. version_edit_handler, track_found_and_missing_files,
  1547. allow_incomplete_valid_version)),
  1548. version_(cfd->current()) {
  1549. version_->Ref();
  1550. }
  1551. BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
  1552. ColumnFamilyData* cfd, Version* v, VersionEditHandler* version_edit_handler,
  1553. bool track_found_and_missing_files, bool allow_incomplete_valid_version)
  1554. : version_builder_(new VersionBuilder(
  1555. cfd->current()->version_set()->file_options(), &cfd->ioptions(),
  1556. cfd->table_cache(), v->storage_info(), v->version_set(),
  1557. cfd->GetFileMetadataCacheReservationManager(), cfd,
  1558. version_edit_handler, track_found_and_missing_files,
  1559. allow_incomplete_valid_version)),
  1560. version_(v) {
  1561. assert(version_ != cfd->current());
  1562. }
  1563. BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
  1564. version_->Unref();
  1565. }
  1566. } // namespace ROCKSDB_NAMESPACE