version_builder.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include "db/version_builder.h"
  10. #include <algorithm>
  11. #include <atomic>
  12. #include <cinttypes>
  13. #include <functional>
  14. #include <map>
  15. #include <set>
  16. #include <thread>
  17. #include <unordered_map>
  18. #include <unordered_set>
  19. #include <utility>
  20. #include <vector>
  21. #include "db/dbformat.h"
  22. #include "db/internal_stats.h"
  23. #include "db/table_cache.h"
  24. #include "db/version_set.h"
  25. #include "port/port.h"
  26. #include "table/table_reader.h"
  27. #include "util/string_util.h"
  28. namespace ROCKSDB_NAMESPACE {
  29. bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
  30. if (a->fd.largest_seqno != b->fd.largest_seqno) {
  31. return a->fd.largest_seqno > b->fd.largest_seqno;
  32. }
  33. if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
  34. return a->fd.smallest_seqno > b->fd.smallest_seqno;
  35. }
  36. // Break ties by file number
  37. return a->fd.GetNumber() > b->fd.GetNumber();
  38. }
  39. namespace {
  40. bool BySmallestKey(FileMetaData* a, FileMetaData* b,
  41. const InternalKeyComparator* cmp) {
  42. int r = cmp->Compare(a->smallest, b->smallest);
  43. if (r != 0) {
  44. return (r < 0);
  45. }
  46. // Break ties by file number
  47. return (a->fd.GetNumber() < b->fd.GetNumber());
  48. }
  49. } // namespace
  50. class VersionBuilder::Rep {
  51. private:
  52. // Helper to sort files_ in v
  53. // kLevel0 -- NewestFirstBySeqNo
  54. // kLevelNon0 -- BySmallestKey
  55. struct FileComparator {
  56. enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
  57. const InternalKeyComparator* internal_comparator;
  58. FileComparator() : internal_comparator(nullptr) {}
  59. bool operator()(FileMetaData* f1, FileMetaData* f2) const {
  60. switch (sort_method) {
  61. case kLevel0:
  62. return NewestFirstBySeqNo(f1, f2);
  63. case kLevelNon0:
  64. return BySmallestKey(f1, f2, internal_comparator);
  65. }
  66. assert(false);
  67. return false;
  68. }
  69. };
  70. struct LevelState {
  71. std::unordered_set<uint64_t> deleted_files;
  72. // Map from file number to file meta data.
  73. std::unordered_map<uint64_t, FileMetaData*> added_files;
  74. };
  75. const FileOptions& file_options_;
  76. Logger* info_log_;
  77. TableCache* table_cache_;
  78. VersionStorageInfo* base_vstorage_;
  79. int num_levels_;
  80. LevelState* levels_;
  81. // Store states of levels larger than num_levels_. We do this instead of
  82. // storing them in levels_ to avoid regression in case there are no files
  83. // on invalid levels. The version is not consistent if in the end the files
  84. // on invalid levels don't cancel out.
  85. std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
  86. // Whether there are invalid new files or invalid deletion on levels larger
  87. // than num_levels_.
  88. bool has_invalid_levels_;
  89. FileComparator level_zero_cmp_;
  90. FileComparator level_nonzero_cmp_;
  91. public:
  92. Rep(const FileOptions& file_options, Logger* info_log,
  93. TableCache* table_cache,
  94. VersionStorageInfo* base_vstorage)
  95. : file_options_(file_options),
  96. info_log_(info_log),
  97. table_cache_(table_cache),
  98. base_vstorage_(base_vstorage),
  99. num_levels_(base_vstorage->num_levels()),
  100. has_invalid_levels_(false) {
  101. levels_ = new LevelState[num_levels_];
  102. level_zero_cmp_.sort_method = FileComparator::kLevel0;
  103. level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
  104. level_nonzero_cmp_.internal_comparator =
  105. base_vstorage_->InternalComparator();
  106. }
  107. ~Rep() {
  108. for (int level = 0; level < num_levels_; level++) {
  109. const auto& added = levels_[level].added_files;
  110. for (auto& pair : added) {
  111. UnrefFile(pair.second);
  112. }
  113. }
  114. delete[] levels_;
  115. }
  116. void UnrefFile(FileMetaData* f) {
  117. f->refs--;
  118. if (f->refs <= 0) {
  119. if (f->table_reader_handle) {
  120. assert(table_cache_ != nullptr);
  121. table_cache_->ReleaseHandle(f->table_reader_handle);
  122. f->table_reader_handle = nullptr;
  123. }
  124. delete f;
  125. }
  126. }
  127. Status CheckConsistency(VersionStorageInfo* vstorage) {
  128. #ifdef NDEBUG
  129. if (!vstorage->force_consistency_checks()) {
  130. // Dont run consistency checks in release mode except if
  131. // explicitly asked to
  132. return Status::OK();
  133. }
  134. #endif
  135. // make sure the files are sorted correctly
  136. for (int level = 0; level < num_levels_; level++) {
  137. auto& level_files = vstorage->LevelFiles(level);
  138. for (size_t i = 1; i < level_files.size(); i++) {
  139. auto f1 = level_files[i - 1];
  140. auto f2 = level_files[i];
  141. #ifndef NDEBUG
  142. auto pair = std::make_pair(&f1, &f2);
  143. TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
  144. #endif
  145. if (level == 0) {
  146. if (!level_zero_cmp_(f1, f2)) {
  147. fprintf(stderr, "L0 files are not sorted properly");
  148. return Status::Corruption("L0 files are not sorted properly");
  149. }
  150. if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
  151. // This is an external file that we ingested
  152. SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
  153. if (!(external_file_seqno < f1->fd.largest_seqno ||
  154. external_file_seqno == 0)) {
  155. fprintf(stderr,
  156. "L0 file with seqno %" PRIu64 " %" PRIu64
  157. " vs. file with global_seqno %" PRIu64 "\n",
  158. f1->fd.smallest_seqno, f1->fd.largest_seqno,
  159. external_file_seqno);
  160. return Status::Corruption(
  161. "L0 file with seqno " +
  162. NumberToString(f1->fd.smallest_seqno) + " " +
  163. NumberToString(f1->fd.largest_seqno) +
  164. " vs. file with global_seqno" +
  165. NumberToString(external_file_seqno) + " with fileNumber " +
  166. NumberToString(f1->fd.GetNumber()));
  167. }
  168. } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
  169. fprintf(stderr,
  170. "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
  171. " %" PRIu64 "\n",
  172. f1->fd.smallest_seqno, f1->fd.largest_seqno,
  173. f2->fd.smallest_seqno, f2->fd.largest_seqno);
  174. return Status::Corruption(
  175. "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
  176. " " + NumberToString(f1->fd.largest_seqno) + " " +
  177. NumberToString(f1->fd.GetNumber()) + " vs. " +
  178. NumberToString(f2->fd.smallest_seqno) + " " +
  179. NumberToString(f2->fd.largest_seqno) + " " +
  180. NumberToString(f2->fd.GetNumber()));
  181. }
  182. } else {
  183. if (!level_nonzero_cmp_(f1, f2)) {
  184. fprintf(stderr, "L%d files are not sorted properly", level);
  185. return Status::Corruption("L" + NumberToString(level) +
  186. " files are not sorted properly");
  187. }
  188. // Make sure there is no overlap in levels > 0
  189. if (vstorage->InternalComparator()->Compare(f1->largest,
  190. f2->smallest) >= 0) {
  191. fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
  192. (f1->largest).DebugString(true).c_str(),
  193. (f2->smallest).DebugString(true).c_str());
  194. return Status::Corruption(
  195. "L" + NumberToString(level) + " have overlapping ranges " +
  196. (f1->largest).DebugString(true) + " vs. " +
  197. (f2->smallest).DebugString(true));
  198. }
  199. }
  200. }
  201. }
  202. return Status::OK();
  203. }
  204. Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
  205. int level) {
  206. #ifdef NDEBUG
  207. if (!base_vstorage_->force_consistency_checks()) {
  208. // Dont run consistency checks in release mode except if
  209. // explicitly asked to
  210. return Status::OK();
  211. }
  212. #endif
  213. // a file to be deleted better exist in the previous version
  214. bool found = false;
  215. for (int l = 0; !found && l < num_levels_; l++) {
  216. const std::vector<FileMetaData*>& base_files =
  217. base_vstorage_->LevelFiles(l);
  218. for (size_t i = 0; i < base_files.size(); i++) {
  219. FileMetaData* f = base_files[i];
  220. if (f->fd.GetNumber() == number) {
  221. found = true;
  222. break;
  223. }
  224. }
  225. }
  226. // if the file did not exist in the previous version, then it
  227. // is possibly moved from lower level to higher level in current
  228. // version
  229. for (int l = level + 1; !found && l < num_levels_; l++) {
  230. auto& level_added = levels_[l].added_files;
  231. auto got = level_added.find(number);
  232. if (got != level_added.end()) {
  233. found = true;
  234. break;
  235. }
  236. }
  237. // maybe this file was added in a previous edit that was Applied
  238. if (!found) {
  239. auto& level_added = levels_[level].added_files;
  240. auto got = level_added.find(number);
  241. if (got != level_added.end()) {
  242. found = true;
  243. }
  244. }
  245. if (!found) {
  246. fprintf(stderr, "not found %" PRIu64 "\n", number);
  247. return Status::Corruption("not found " + NumberToString(number));
  248. }
  249. return Status::OK();
  250. }
  251. bool CheckConsistencyForNumLevels() {
  252. // Make sure there are no files on or beyond num_levels().
  253. if (has_invalid_levels_) {
  254. return false;
  255. }
  256. for (auto& level : invalid_levels_) {
  257. if (level.second.size() > 0) {
  258. return false;
  259. }
  260. }
  261. return true;
  262. }
  263. // Apply all of the edits in *edit to the current state.
  264. Status Apply(VersionEdit* edit) {
  265. Status s = CheckConsistency(base_vstorage_);
  266. if (!s.ok()) {
  267. return s;
  268. }
  269. // Delete files
  270. const auto& del = edit->GetDeletedFiles();
  271. for (const auto& del_file : del) {
  272. const auto level = del_file.first;
  273. const auto number = del_file.second;
  274. if (level < num_levels_) {
  275. levels_[level].deleted_files.insert(number);
  276. CheckConsistencyForDeletes(edit, number, level);
  277. auto exising = levels_[level].added_files.find(number);
  278. if (exising != levels_[level].added_files.end()) {
  279. UnrefFile(exising->second);
  280. levels_[level].added_files.erase(exising);
  281. }
  282. } else {
  283. if (invalid_levels_[level].erase(number) == 0) {
  284. // Deleting an non-existing file on invalid level.
  285. has_invalid_levels_ = true;
  286. }
  287. }
  288. }
  289. // Add new files
  290. for (const auto& new_file : edit->GetNewFiles()) {
  291. const int level = new_file.first;
  292. if (level < num_levels_) {
  293. FileMetaData* f = new FileMetaData(new_file.second);
  294. f->refs = 1;
  295. assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
  296. levels_[level].added_files.end());
  297. levels_[level].deleted_files.erase(f->fd.GetNumber());
  298. levels_[level].added_files[f->fd.GetNumber()] = f;
  299. } else {
  300. uint64_t number = new_file.second.fd.GetNumber();
  301. auto& lvls = invalid_levels_[level];
  302. if (lvls.count(number) == 0) {
  303. lvls.insert(number);
  304. } else {
  305. // Creating an already existing file on invalid level.
  306. has_invalid_levels_ = true;
  307. }
  308. }
  309. }
  310. return s;
  311. }
  312. // Save the current state in *v.
  313. Status SaveTo(VersionStorageInfo* vstorage) {
  314. Status s = CheckConsistency(base_vstorage_);
  315. if (!s.ok()) {
  316. return s;
  317. }
  318. s = CheckConsistency(vstorage);
  319. if (!s.ok()) {
  320. return s;
  321. }
  322. for (int level = 0; level < num_levels_; level++) {
  323. const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
  324. // Merge the set of added files with the set of pre-existing files.
  325. // Drop any deleted files. Store the result in *v.
  326. const auto& base_files = base_vstorage_->LevelFiles(level);
  327. const auto& unordered_added_files = levels_[level].added_files;
  328. vstorage->Reserve(level,
  329. base_files.size() + unordered_added_files.size());
  330. // Sort added files for the level.
  331. std::vector<FileMetaData*> added_files;
  332. added_files.reserve(unordered_added_files.size());
  333. for (const auto& pair : unordered_added_files) {
  334. added_files.push_back(pair.second);
  335. }
  336. std::sort(added_files.begin(), added_files.end(), cmp);
  337. #ifndef NDEBUG
  338. FileMetaData* prev_added_file = nullptr;
  339. for (const auto& added : added_files) {
  340. if (level > 0 && prev_added_file != nullptr) {
  341. assert(base_vstorage_->InternalComparator()->Compare(
  342. prev_added_file->smallest, added->smallest) <= 0);
  343. }
  344. prev_added_file = added;
  345. }
  346. #endif
  347. auto base_iter = base_files.begin();
  348. auto base_end = base_files.end();
  349. auto added_iter = added_files.begin();
  350. auto added_end = added_files.end();
  351. while (added_iter != added_end || base_iter != base_end) {
  352. if (base_iter == base_end ||
  353. (added_iter != added_end && cmp(*added_iter, *base_iter))) {
  354. MaybeAddFile(vstorage, level, *added_iter++);
  355. } else {
  356. MaybeAddFile(vstorage, level, *base_iter++);
  357. }
  358. }
  359. }
  360. s = CheckConsistency(vstorage);
  361. return s;
  362. }
  363. Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
  364. bool prefetch_index_and_filter_in_cache,
  365. bool is_initial_load,
  366. const SliceTransform* prefix_extractor) {
  367. assert(table_cache_ != nullptr);
  368. size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
  369. bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
  370. size_t max_load = port::kMaxSizet;
  371. if (!always_load) {
  372. // If it is initial loading and not set to always laoding all the
  373. // files, we only load up to kInitialLoadLimit files, to limit the
  374. // time reopening the DB.
  375. const size_t kInitialLoadLimit = 16;
  376. size_t load_limit;
  377. // If the table cache is not 1/4 full, we pin the table handle to
  378. // file metadata to avoid the cache read costs when reading the file.
  379. // The downside of pinning those files is that LRU won't be followed
  380. // for those files. This doesn't matter much because if number of files
  381. // of the DB excceeds table cache capacity, eventually no table reader
  382. // will be pinned and LRU will be followed.
  383. if (is_initial_load) {
  384. load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
  385. } else {
  386. load_limit = table_cache_capacity / 4;
  387. }
  388. size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
  389. if (table_cache_usage >= load_limit) {
  390. // TODO (yanqin) find a suitable status code.
  391. return Status::OK();
  392. } else {
  393. max_load = load_limit - table_cache_usage;
  394. }
  395. }
  396. // <file metadata, level>
  397. std::vector<std::pair<FileMetaData*, int>> files_meta;
  398. std::vector<Status> statuses;
  399. for (int level = 0; level < num_levels_; level++) {
  400. for (auto& file_meta_pair : levels_[level].added_files) {
  401. auto* file_meta = file_meta_pair.second;
  402. // If the file has been opened before, just skip it.
  403. if (!file_meta->table_reader_handle) {
  404. files_meta.emplace_back(file_meta, level);
  405. statuses.emplace_back(Status::OK());
  406. }
  407. if (files_meta.size() >= max_load) {
  408. break;
  409. }
  410. }
  411. if (files_meta.size() >= max_load) {
  412. break;
  413. }
  414. }
  415. std::atomic<size_t> next_file_meta_idx(0);
  416. std::function<void()> load_handlers_func([&]() {
  417. while (true) {
  418. size_t file_idx = next_file_meta_idx.fetch_add(1);
  419. if (file_idx >= files_meta.size()) {
  420. break;
  421. }
  422. auto* file_meta = files_meta[file_idx].first;
  423. int level = files_meta[file_idx].second;
  424. statuses[file_idx] = table_cache_->FindTable(
  425. file_options_, *(base_vstorage_->InternalComparator()),
  426. file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
  427. false /*no_io */, true /* record_read_stats */,
  428. internal_stats->GetFileReadHist(level), false, level,
  429. prefetch_index_and_filter_in_cache);
  430. if (file_meta->table_reader_handle != nullptr) {
  431. // Load table_reader
  432. file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
  433. file_meta->table_reader_handle);
  434. }
  435. }
  436. });
  437. std::vector<port::Thread> threads;
  438. for (int i = 1; i < max_threads; i++) {
  439. threads.emplace_back(load_handlers_func);
  440. }
  441. load_handlers_func();
  442. for (auto& t : threads) {
  443. t.join();
  444. }
  445. for (const auto& s : statuses) {
  446. if (!s.ok()) {
  447. return s;
  448. }
  449. }
  450. return Status::OK();
  451. }
  452. void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
  453. if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
  454. // f is to-be-deleted table file
  455. vstorage->RemoveCurrentStats(f);
  456. } else {
  457. vstorage->AddFile(level, f, info_log_);
  458. }
  459. }
  460. };
  461. VersionBuilder::VersionBuilder(const FileOptions& file_options,
  462. TableCache* table_cache,
  463. VersionStorageInfo* base_vstorage,
  464. Logger* info_log)
  465. : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
  466. VersionBuilder::~VersionBuilder() { delete rep_; }
  467. Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
  468. return rep_->CheckConsistency(vstorage);
  469. }
  470. Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
  471. uint64_t number, int level) {
  472. return rep_->CheckConsistencyForDeletes(edit, number, level);
  473. }
  474. bool VersionBuilder::CheckConsistencyForNumLevels() {
  475. return rep_->CheckConsistencyForNumLevels();
  476. }
  477. Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
  478. Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
  479. return rep_->SaveTo(vstorage);
  480. }
  481. Status VersionBuilder::LoadTableHandlers(
  482. InternalStats* internal_stats, int max_threads,
  483. bool prefetch_index_and_filter_in_cache, bool is_initial_load,
  484. const SliceTransform* prefix_extractor) {
  485. return rep_->LoadTableHandlers(internal_stats, max_threads,
  486. prefetch_index_and_filter_in_cache,
  487. is_initial_load, prefix_extractor);
  488. }
  489. void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
  490. FileMetaData* f) {
  491. rep_->MaybeAddFile(vstorage, level, f);
  492. }
  493. } // namespace ROCKSDB_NAMESPACE