compaction.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  7. // Use of this source code is governed by a BSD-style license that can be
  8. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  9. #include <cinttypes>
  10. #include <vector>
  11. #include "db/column_family.h"
  12. #include "db/compaction/compaction.h"
  13. #include "rocksdb/compaction_filter.h"
  14. #include "test_util/sync_point.h"
  15. #include "util/string_util.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. const uint64_t kRangeTombstoneSentinel =
  18. PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
  19. int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
  20. const InternalKey& b) {
  21. auto c = user_cmp->Compare(a.user_key(), b.user_key());
  22. if (c != 0) {
  23. return c;
  24. }
  25. auto a_footer = ExtractInternalKeyFooter(a.Encode());
  26. auto b_footer = ExtractInternalKeyFooter(b.Encode());
  27. if (a_footer == kRangeTombstoneSentinel) {
  28. if (b_footer != kRangeTombstoneSentinel) {
  29. return -1;
  30. }
  31. } else if (b_footer == kRangeTombstoneSentinel) {
  32. return 1;
  33. }
  34. return 0;
  35. }
  36. int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
  37. const InternalKey& b) {
  38. if (a == nullptr) {
  39. return -1;
  40. }
  41. return sstableKeyCompare(user_cmp, *a, b);
  42. }
  43. int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
  44. const InternalKey* b) {
  45. if (b == nullptr) {
  46. return -1;
  47. }
  48. return sstableKeyCompare(user_cmp, a, *b);
  49. }
  50. uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
  51. uint64_t sum = 0;
  52. for (size_t i = 0; i < files.size() && files[i]; i++) {
  53. sum += files[i]->fd.GetFileSize();
  54. }
  55. return sum;
  56. }
  57. void Compaction::SetInputVersion(Version* _input_version) {
  58. input_version_ = _input_version;
  59. cfd_ = input_version_->cfd();
  60. cfd_->Ref();
  61. input_version_->Ref();
  62. edit_.SetColumnFamily(cfd_->GetID());
  63. }
  64. void Compaction::GetBoundaryKeys(
  65. VersionStorageInfo* vstorage,
  66. const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
  67. Slice* largest_user_key) {
  68. bool initialized = false;
  69. const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
  70. for (size_t i = 0; i < inputs.size(); ++i) {
  71. if (inputs[i].files.empty()) {
  72. continue;
  73. }
  74. if (inputs[i].level == 0) {
  75. // we need to consider all files on level 0
  76. for (const auto* f : inputs[i].files) {
  77. const Slice& start_user_key = f->smallest.user_key();
  78. if (!initialized ||
  79. ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
  80. *smallest_user_key = start_user_key;
  81. }
  82. const Slice& end_user_key = f->largest.user_key();
  83. if (!initialized ||
  84. ucmp->Compare(end_user_key, *largest_user_key) > 0) {
  85. *largest_user_key = end_user_key;
  86. }
  87. initialized = true;
  88. }
  89. } else {
  90. // we only need to consider the first and last file
  91. const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
  92. if (!initialized ||
  93. ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
  94. *smallest_user_key = start_user_key;
  95. }
  96. const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
  97. if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
  98. *largest_user_key = end_user_key;
  99. }
  100. initialized = true;
  101. }
  102. }
  103. }
  104. std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
  105. VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
  106. const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
  107. for (size_t i = 0; i < inputs.size(); i++) {
  108. if (inputs[i].level == 0 || inputs[i].files.empty()) {
  109. continue;
  110. }
  111. inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
  112. AtomicCompactionUnitBoundary cur_boundary;
  113. size_t first_atomic_idx = 0;
  114. auto add_unit_boundary = [&](size_t to) {
  115. if (first_atomic_idx == to) return;
  116. for (size_t k = first_atomic_idx; k < to; k++) {
  117. inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
  118. }
  119. first_atomic_idx = to;
  120. };
  121. for (size_t j = 0; j < inputs[i].files.size(); j++) {
  122. const auto* f = inputs[i].files[j];
  123. if (j == 0) {
  124. // First file in a level.
  125. cur_boundary.smallest = &f->smallest;
  126. cur_boundary.largest = &f->largest;
  127. } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
  128. 0) {
  129. // SSTs overlap but the end key of the previous file was not
  130. // artificially extended by a range tombstone. Extend the current
  131. // boundary.
  132. cur_boundary.largest = &f->largest;
  133. } else {
  134. // Atomic compaction unit has ended.
  135. add_unit_boundary(j);
  136. cur_boundary.smallest = &f->smallest;
  137. cur_boundary.largest = &f->largest;
  138. }
  139. }
  140. add_unit_boundary(inputs[i].files.size());
  141. assert(inputs[i].files.size() ==
  142. inputs[i].atomic_compaction_unit_boundaries.size());
  143. }
  144. return inputs;
  145. }
  146. // helper function to determine if compaction is creating files at the
  147. // bottommost level
  148. bool Compaction::IsBottommostLevel(
  149. int output_level, VersionStorageInfo* vstorage,
  150. const std::vector<CompactionInputFiles>& inputs) {
  151. int output_l0_idx;
  152. if (output_level == 0) {
  153. output_l0_idx = 0;
  154. for (const auto* file : vstorage->LevelFiles(0)) {
  155. if (inputs[0].files.back() == file) {
  156. break;
  157. }
  158. ++output_l0_idx;
  159. }
  160. assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
  161. } else {
  162. output_l0_idx = -1;
  163. }
  164. Slice smallest_key, largest_key;
  165. GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
  166. return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
  167. output_level, output_l0_idx);
  168. }
  169. // test function to validate the functionality of IsBottommostLevel()
  170. // function -- determines if compaction with inputs and storage is bottommost
  171. bool Compaction::TEST_IsBottommostLevel(
  172. int output_level, VersionStorageInfo* vstorage,
  173. const std::vector<CompactionInputFiles>& inputs) {
  174. return IsBottommostLevel(output_level, vstorage, inputs);
  175. }
  176. bool Compaction::IsFullCompaction(
  177. VersionStorageInfo* vstorage,
  178. const std::vector<CompactionInputFiles>& inputs) {
  179. size_t num_files_in_compaction = 0;
  180. size_t total_num_files = 0;
  181. for (int l = 0; l < vstorage->num_levels(); l++) {
  182. total_num_files += vstorage->NumLevelFiles(l);
  183. }
  184. for (size_t i = 0; i < inputs.size(); i++) {
  185. num_files_in_compaction += inputs[i].size();
  186. }
  187. return num_files_in_compaction == total_num_files;
  188. }
  189. Compaction::Compaction(VersionStorageInfo* vstorage,
  190. const ImmutableCFOptions& _immutable_cf_options,
  191. const MutableCFOptions& _mutable_cf_options,
  192. std::vector<CompactionInputFiles> _inputs,
  193. int _output_level, uint64_t _target_file_size,
  194. uint64_t _max_compaction_bytes, uint32_t _output_path_id,
  195. CompressionType _compression,
  196. CompressionOptions _compression_opts,
  197. uint32_t _max_subcompactions,
  198. std::vector<FileMetaData*> _grandparents,
  199. bool _manual_compaction, double _score,
  200. bool _deletion_compaction,
  201. CompactionReason _compaction_reason)
  202. : input_vstorage_(vstorage),
  203. start_level_(_inputs[0].level),
  204. output_level_(_output_level),
  205. max_output_file_size_(_target_file_size),
  206. max_compaction_bytes_(_max_compaction_bytes),
  207. max_subcompactions_(_max_subcompactions),
  208. immutable_cf_options_(_immutable_cf_options),
  209. mutable_cf_options_(_mutable_cf_options),
  210. input_version_(nullptr),
  211. number_levels_(vstorage->num_levels()),
  212. cfd_(nullptr),
  213. output_path_id_(_output_path_id),
  214. output_compression_(_compression),
  215. output_compression_opts_(_compression_opts),
  216. deletion_compaction_(_deletion_compaction),
  217. inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
  218. grandparents_(std::move(_grandparents)),
  219. score_(_score),
  220. bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
  221. is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
  222. is_manual_compaction_(_manual_compaction),
  223. is_trivial_move_(false),
  224. compaction_reason_(_compaction_reason) {
  225. MarkFilesBeingCompacted(true);
  226. if (is_manual_compaction_) {
  227. compaction_reason_ = CompactionReason::kManualCompaction;
  228. }
  229. if (max_subcompactions_ == 0) {
  230. max_subcompactions_ = immutable_cf_options_.max_subcompactions;
  231. }
  232. if (!bottommost_level_) {
  233. // Currently we only enable dictionary compression during compaction to the
  234. // bottommost level.
  235. output_compression_opts_.max_dict_bytes = 0;
  236. output_compression_opts_.zstd_max_train_bytes = 0;
  237. }
  238. #ifndef NDEBUG
  239. for (size_t i = 1; i < inputs_.size(); ++i) {
  240. assert(inputs_[i].level > inputs_[i - 1].level);
  241. }
  242. #endif
  243. // setup input_levels_
  244. {
  245. input_levels_.resize(num_input_levels());
  246. for (size_t which = 0; which < num_input_levels(); which++) {
  247. DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
  248. &arena_);
  249. }
  250. }
  251. GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
  252. }
  253. Compaction::~Compaction() {
  254. if (input_version_ != nullptr) {
  255. input_version_->Unref();
  256. }
  257. if (cfd_ != nullptr) {
  258. cfd_->UnrefAndTryDelete();
  259. }
  260. }
  261. bool Compaction::InputCompressionMatchesOutput() const {
  262. int base_level = input_vstorage_->base_level();
  263. bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_,
  264. mutable_cf_options_, start_level_,
  265. base_level) == output_compression_);
  266. if (matches) {
  267. TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
  268. return true;
  269. }
  270. TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
  271. return matches;
  272. }
  273. bool Compaction::IsTrivialMove() const {
  274. // Avoid a move if there is lots of overlapping grandparent data.
  275. // Otherwise, the move could create a parent file that will require
  276. // a very expensive merge later on.
  277. // If start_level_== output_level_, the purpose is to force compaction
  278. // filter to be applied to that level, and thus cannot be a trivial move.
  279. // Check if start level have files with overlapping ranges
  280. if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
  281. // We cannot move files from L0 to L1 if the files are overlapping
  282. return false;
  283. }
  284. if (is_manual_compaction_ &&
  285. (immutable_cf_options_.compaction_filter != nullptr ||
  286. immutable_cf_options_.compaction_filter_factory != nullptr)) {
  287. // This is a manual compaction and we have a compaction filter that should
  288. // be executed, we cannot do a trivial move
  289. return false;
  290. }
  291. // Used in universal compaction, where trivial move can be done if the
  292. // input files are non overlapping
  293. if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
  294. (output_level_ != 0)) {
  295. return is_trivial_move_;
  296. }
  297. if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
  298. input(0, 0)->fd.GetPathId() == output_path_id() &&
  299. InputCompressionMatchesOutput())) {
  300. return false;
  301. }
  302. // assert inputs_.size() == 1
  303. for (const auto& file : inputs_.front().files) {
  304. std::vector<FileMetaData*> file_grand_parents;
  305. if (output_level_ + 1 >= number_levels_) {
  306. continue;
  307. }
  308. input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
  309. &file->largest, &file_grand_parents);
  310. const auto compaction_size =
  311. file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
  312. if (compaction_size > max_compaction_bytes_) {
  313. return false;
  314. }
  315. }
  316. return true;
  317. }
  318. void Compaction::AddInputDeletions(VersionEdit* out_edit) {
  319. for (size_t which = 0; which < num_input_levels(); which++) {
  320. for (size_t i = 0; i < inputs_[which].size(); i++) {
  321. out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
  322. }
  323. }
  324. }
  325. bool Compaction::KeyNotExistsBeyondOutputLevel(
  326. const Slice& user_key, std::vector<size_t>* level_ptrs) const {
  327. assert(input_version_ != nullptr);
  328. assert(level_ptrs != nullptr);
  329. assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
  330. if (bottommost_level_) {
  331. return true;
  332. } else if (output_level_ != 0 &&
  333. cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
  334. // Maybe use binary search to find right entry instead of linear search?
  335. const Comparator* user_cmp = cfd_->user_comparator();
  336. for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
  337. const std::vector<FileMetaData*>& files =
  338. input_vstorage_->LevelFiles(lvl);
  339. for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
  340. auto* f = files[level_ptrs->at(lvl)];
  341. if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
  342. // We've advanced far enough
  343. if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
  344. // Key falls in this file's range, so it may
  345. // exist beyond output level
  346. return false;
  347. }
  348. break;
  349. }
  350. }
  351. }
  352. return true;
  353. }
  354. return false;
  355. }
  356. // Mark (or clear) each file that is being compacted
  357. void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
  358. for (size_t i = 0; i < num_input_levels(); i++) {
  359. for (size_t j = 0; j < inputs_[i].size(); j++) {
  360. assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
  361. : inputs_[i][j]->being_compacted);
  362. inputs_[i][j]->being_compacted = mark_as_compacted;
  363. }
  364. }
  365. }
  366. // Sample output:
  367. // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
  368. // print: "3@0 + 2@3 + 1@4 files to L5"
  369. const char* Compaction::InputLevelSummary(
  370. InputLevelSummaryBuffer* scratch) const {
  371. int len = 0;
  372. bool is_first = true;
  373. for (auto& input_level : inputs_) {
  374. if (input_level.empty()) {
  375. continue;
  376. }
  377. if (!is_first) {
  378. len +=
  379. snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
  380. len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
  381. } else {
  382. is_first = false;
  383. }
  384. len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
  385. "%" ROCKSDB_PRIszt "@%d", input_level.size(),
  386. input_level.level);
  387. len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
  388. }
  389. snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
  390. " files to L%d", output_level());
  391. return scratch->buffer;
  392. }
  393. uint64_t Compaction::CalculateTotalInputSize() const {
  394. uint64_t size = 0;
  395. for (auto& input_level : inputs_) {
  396. for (auto f : input_level.files) {
  397. size += f->fd.GetFileSize();
  398. }
  399. }
  400. return size;
  401. }
  402. void Compaction::ReleaseCompactionFiles(Status status) {
  403. MarkFilesBeingCompacted(false);
  404. cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
  405. }
  406. void Compaction::ResetNextCompactionIndex() {
  407. assert(input_version_ != nullptr);
  408. input_vstorage_->ResetNextCompactionIndex(start_level_);
  409. }
  410. namespace {
  411. int InputSummary(const std::vector<FileMetaData*>& files, char* output,
  412. int len) {
  413. *output = '\0';
  414. int write = 0;
  415. for (size_t i = 0; i < files.size(); i++) {
  416. int sz = len - write;
  417. int ret;
  418. char sztxt[16];
  419. AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
  420. ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
  421. files.at(i)->fd.GetNumber(), sztxt);
  422. if (ret < 0 || ret >= sz) break;
  423. write += ret;
  424. }
  425. // if files.size() is non-zero, overwrite the last space
  426. return write - !!files.size();
  427. }
  428. } // namespace
  429. void Compaction::Summary(char* output, int len) {
  430. int write =
  431. snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
  432. input_version_->GetVersionNumber(), start_level_);
  433. if (write < 0 || write >= len) {
  434. return;
  435. }
  436. for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
  437. if (level_iter > 0) {
  438. write += snprintf(output + write, len - write, "], [");
  439. if (write < 0 || write >= len) {
  440. return;
  441. }
  442. }
  443. write +=
  444. InputSummary(inputs_[level_iter].files, output + write, len - write);
  445. if (write < 0 || write >= len) {
  446. return;
  447. }
  448. }
  449. snprintf(output + write, len - write, "]");
  450. }
  451. uint64_t Compaction::OutputFilePreallocationSize() const {
  452. uint64_t preallocation_size = 0;
  453. for (const auto& level_files : inputs_) {
  454. for (const auto& file : level_files.files) {
  455. preallocation_size += file->fd.GetFileSize();
  456. }
  457. }
  458. if (max_output_file_size_ != port::kMaxUint64 &&
  459. (immutable_cf_options_.compaction_style == kCompactionStyleLevel ||
  460. output_level() > 0)) {
  461. preallocation_size = std::min(max_output_file_size_, preallocation_size);
  462. }
  463. // Over-estimate slightly so we don't end up just barely crossing
  464. // the threshold
  465. // No point to prellocate more than 1GB.
  466. return std::min(uint64_t{1073741824},
  467. preallocation_size + (preallocation_size / 10));
  468. }
  469. std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
  470. if (!cfd_->ioptions()->compaction_filter_factory) {
  471. return nullptr;
  472. }
  473. CompactionFilter::Context context;
  474. context.is_full_compaction = is_full_compaction_;
  475. context.is_manual_compaction = is_manual_compaction_;
  476. context.column_family_id = cfd_->GetID();
  477. return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
  478. context);
  479. }
  480. bool Compaction::IsOutputLevelEmpty() const {
  481. return inputs_.back().level != output_level_ || inputs_.back().empty();
  482. }
  483. bool Compaction::ShouldFormSubcompactions() const {
  484. if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
  485. return false;
  486. }
  487. if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
  488. return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 &&
  489. !IsOutputLevelEmpty();
  490. } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
  491. return number_levels_ > 1 && output_level_ > 0;
  492. } else {
  493. return false;
  494. }
  495. }
  496. uint64_t Compaction::MinInputFileOldestAncesterTime() const {
  497. uint64_t min_oldest_ancester_time = port::kMaxUint64;
  498. for (const auto& level_files : inputs_) {
  499. for (const auto& file : level_files.files) {
  500. uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
  501. if (oldest_ancester_time != 0) {
  502. min_oldest_ancester_time =
  503. std::min(min_oldest_ancester_time, oldest_ancester_time);
  504. }
  505. }
  506. }
  507. return min_oldest_ancester_time;
  508. }
  509. int Compaction::GetInputBaseLevel() const {
  510. return input_vstorage_->base_level();
  511. }
  512. } // namespace ROCKSDB_NAMESPACE