compaction_outputs.cc 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. //
  7. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  8. // Use of this source code is governed by a BSD-style license that can be
  9. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  10. #include "db/compaction/compaction_outputs.h"
  11. #include "db/builder.h"
  12. namespace ROCKSDB_NAMESPACE {
  13. void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) {
  14. builder_.reset(NewTableBuilder(tboptions, file_writer_.get()));
  15. }
  16. Status CompactionOutputs::Finish(
  17. const Status& intput_status,
  18. const SeqnoToTimeMapping& seqno_to_time_mapping) {
  19. FileMetaData* meta = GetMetaData();
  20. assert(meta != nullptr);
  21. Status s = intput_status;
  22. if (s.ok()) {
  23. SeqnoToTimeMapping relevant_mapping;
  24. relevant_mapping.CopyFromSeqnoRange(
  25. seqno_to_time_mapping,
  26. std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno),
  27. meta->fd.largest_seqno);
  28. relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
  29. builder_->SetSeqnoTimeTableProperties(relevant_mapping,
  30. meta->oldest_ancester_time);
  31. s = builder_->Finish();
  32. } else {
  33. builder_->Abandon();
  34. }
  35. Status io_s = builder_->io_status();
  36. if (s.ok()) {
  37. s = io_s;
  38. } else {
  39. io_s.PermitUncheckedError();
  40. }
  41. const uint64_t current_bytes = builder_->FileSize();
  42. if (s.ok()) {
  43. meta->fd.file_size = current_bytes;
  44. meta->tail_size = builder_->GetTailSize();
  45. meta->marked_for_compaction = builder_->NeedCompact();
  46. meta->user_defined_timestamps_persisted = static_cast<bool>(
  47. builder_->GetTableProperties().user_defined_timestamps_persisted);
  48. }
  49. current_output().finished = true;
  50. stats_.bytes_written += current_bytes;
  51. stats_.bytes_written_pre_comp += builder_->PreCompressionSize();
  52. stats_.num_output_files = static_cast<int>(outputs_.size());
  53. worker_cpu_micros_ += builder_->GetWorkerCPUMicros();
  54. return s;
  55. }
  56. IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status,
  57. SystemClock* clock,
  58. Statistics* statistics,
  59. bool use_fsync) {
  60. IOStatus io_s;
  61. IOOptions opts;
  62. io_s = WritableFileWriter::PrepareIOOptions(
  63. WriteOptions(Env::IOActivity::kCompaction), opts);
  64. if (input_status.ok() && io_s.ok()) {
  65. StopWatch sw(clock, statistics, COMPACTION_OUTFILE_SYNC_MICROS);
  66. io_s = file_writer_->Sync(opts, use_fsync);
  67. }
  68. if (input_status.ok() && io_s.ok()) {
  69. io_s = file_writer_->Close(opts);
  70. }
  71. if (input_status.ok() && io_s.ok()) {
  72. FileMetaData* meta = GetMetaData();
  73. meta->file_checksum = file_writer_->GetFileChecksum();
  74. meta->file_checksum_func_name = file_writer_->GetFileChecksumFuncName();
  75. }
  76. file_writer_.reset();
  77. return io_s;
  78. }
  79. bool CompactionOutputs::UpdateFilesToCutForTTLStates(
  80. const Slice& internal_key) {
  81. if (!files_to_cut_for_ttl_.empty()) {
  82. const InternalKeyComparator* icmp =
  83. &compaction_->column_family_data()->internal_comparator();
  84. if (cur_files_to_cut_for_ttl_ != -1) {
  85. // Previous key is inside the range of a file
  86. if (icmp->Compare(internal_key,
  87. files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_]
  88. ->largest.Encode()) > 0) {
  89. next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1;
  90. cur_files_to_cut_for_ttl_ = -1;
  91. return true;
  92. }
  93. } else {
  94. // Look for the key position
  95. while (next_files_to_cut_for_ttl_ <
  96. static_cast<int>(files_to_cut_for_ttl_.size())) {
  97. if (icmp->Compare(internal_key,
  98. files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
  99. ->smallest.Encode()) >= 0) {
  100. if (icmp->Compare(internal_key,
  101. files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
  102. ->largest.Encode()) <= 0) {
  103. // With in the current file
  104. cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_;
  105. return true;
  106. }
  107. // Beyond the current file
  108. next_files_to_cut_for_ttl_++;
  109. } else {
  110. // Still fall into the gap
  111. break;
  112. }
  113. }
  114. }
  115. }
  116. return false;
  117. }
  118. size_t CompactionOutputs::UpdateGrandparentBoundaryInfo(
  119. const Slice& internal_key) {
  120. size_t curr_key_boundary_switched_num = 0;
  121. const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
  122. if (grandparents.empty()) {
  123. return curr_key_boundary_switched_num;
  124. }
  125. const Comparator* ucmp = compaction_->column_family_data()->user_comparator();
  126. // Move the grandparent_index_ to the file containing the current user_key.
  127. // If there are multiple files containing the same user_key, make sure the
  128. // index points to the last file containing the key.
  129. while (grandparent_index_ < grandparents.size()) {
  130. if (being_grandparent_gap_) {
  131. if (sstableKeyCompare(ucmp, internal_key,
  132. grandparents[grandparent_index_]->smallest) < 0) {
  133. break;
  134. }
  135. if (seen_key_) {
  136. curr_key_boundary_switched_num++;
  137. grandparent_overlapped_bytes_ +=
  138. grandparents[grandparent_index_]->fd.GetFileSize();
  139. grandparent_boundary_switched_num_++;
  140. }
  141. being_grandparent_gap_ = false;
  142. } else {
  143. int cmp_result = sstableKeyCompare(
  144. ucmp, internal_key, grandparents[grandparent_index_]->largest);
  145. // If it's same key, make sure grandparent_index_ is pointing to the last
  146. // one.
  147. if (cmp_result < 0 ||
  148. (cmp_result == 0 &&
  149. (grandparent_index_ == grandparents.size() - 1 ||
  150. sstableKeyCompare(ucmp, internal_key,
  151. grandparents[grandparent_index_ + 1]->smallest) <
  152. 0))) {
  153. break;
  154. }
  155. if (seen_key_) {
  156. curr_key_boundary_switched_num++;
  157. grandparent_boundary_switched_num_++;
  158. }
  159. being_grandparent_gap_ = true;
  160. grandparent_index_++;
  161. }
  162. }
  163. // If the first key is in the middle of a grandparent file, adding it to the
  164. // overlap
  165. if (!seen_key_ && !being_grandparent_gap_) {
  166. assert(grandparent_overlapped_bytes_ == 0);
  167. grandparent_overlapped_bytes_ =
  168. GetCurrentKeyGrandparentOverlappedBytes(internal_key);
  169. }
  170. seen_key_ = true;
  171. return curr_key_boundary_switched_num;
  172. }
  173. uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes(
  174. const Slice& internal_key) const {
  175. // no overlap with any grandparent file
  176. if (being_grandparent_gap_) {
  177. return 0;
  178. }
  179. uint64_t overlapped_bytes = 0;
  180. const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
  181. const Comparator* ucmp = compaction_->column_family_data()->user_comparator();
  182. InternalKey ikey;
  183. ikey.DecodeFrom(internal_key);
  184. #ifndef NDEBUG
  185. // make sure the grandparent_index_ is pointing to the last files containing
  186. // the current key.
  187. int cmp_result =
  188. sstableKeyCompare(ucmp, ikey, grandparents[grandparent_index_]->largest);
  189. assert(
  190. cmp_result < 0 ||
  191. (cmp_result == 0 &&
  192. (grandparent_index_ == grandparents.size() - 1 ||
  193. sstableKeyCompare(
  194. ucmp, ikey, grandparents[grandparent_index_ + 1]->smallest) < 0)));
  195. assert(sstableKeyCompare(ucmp, ikey,
  196. grandparents[grandparent_index_]->smallest) >= 0);
  197. #endif
  198. overlapped_bytes += grandparents[grandparent_index_]->fd.GetFileSize();
  199. // go backwards to find all overlapped files, one key can overlap multiple
  200. // files. In the following example, if the current output key is `c`, and one
  201. // compaction file was cut before `c`, current `c` can overlap with 3 files:
  202. // [a b] [c...
  203. // [b, b] [c, c] [c, c] [c, d]
  204. for (int64_t i = static_cast<int64_t>(grandparent_index_) - 1;
  205. i >= 0 && sstableKeyCompare(ucmp, ikey, grandparents[i]->largest) == 0;
  206. i--) {
  207. overlapped_bytes += grandparents[i]->fd.GetFileSize();
  208. }
  209. return overlapped_bytes;
  210. }
  211. bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
  212. assert(c_iter.Valid());
  213. const Slice& internal_key = c_iter.key();
  214. #ifndef NDEBUG
  215. bool should_stop = false;
  216. std::pair<bool*, const Slice> p{&should_stop, internal_key};
  217. TEST_SYNC_POINT_CALLBACK(
  218. "CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p);
  219. if (should_stop) {
  220. return true;
  221. }
  222. #endif // NDEBUG
  223. const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_;
  224. const InternalKeyComparator* icmp =
  225. &compaction_->column_family_data()->internal_comparator();
  226. size_t num_grandparent_boundaries_crossed = 0;
  227. bool should_stop_for_ttl = false;
  228. // Always update grandparent information like overlapped file number, size
  229. // etc., and TTL states.
  230. // If compaction_->output_level() == 0, there is no need to update grandparent
  231. // info, and that `grandparent` should be empty.
  232. if (compaction_->output_level() > 0) {
  233. num_grandparent_boundaries_crossed =
  234. UpdateGrandparentBoundaryInfo(internal_key);
  235. should_stop_for_ttl = UpdateFilesToCutForTTLStates(internal_key);
  236. }
  237. if (!HasBuilder()) {
  238. return false;
  239. }
  240. if (should_stop_for_ttl) {
  241. return true;
  242. }
  243. // If there's user defined partitioner, check that first
  244. if (partitioner_ && partitioner_->ShouldPartition(PartitionerRequest(
  245. last_key_for_partitioner_, c_iter.user_key(),
  246. current_output_file_size_)) == kRequired) {
  247. return true;
  248. }
  249. // files output to Level 0 won't be split
  250. if (compaction_->output_level() == 0) {
  251. return false;
  252. }
  253. // reach the max file size
  254. if (current_output_file_size_ >= compaction_->max_output_file_size()) {
  255. return true;
  256. }
  257. // Check if it needs to split for RoundRobin
  258. // Invalid local_output_split_key indicates that we do not need to split
  259. if (local_output_split_key_ != nullptr && !is_split_) {
  260. // Split occurs when the next key is larger than/equal to the cursor
  261. if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) {
  262. is_split_ = true;
  263. return true;
  264. }
  265. }
  266. // only check if the current key is going to cross the grandparents file
  267. // boundary (either the file beginning or ending).
  268. if (num_grandparent_boundaries_crossed > 0) {
  269. // Cut the file before the current key if the size of the current output
  270. // file + its overlapped grandparent files is bigger than
  271. // max_compaction_bytes. Which is to prevent future bigger than
  272. // max_compaction_bytes compaction from the current output level.
  273. if (grandparent_overlapped_bytes_ + current_output_file_size_ >
  274. compaction_->max_compaction_bytes()) {
  275. return true;
  276. }
  277. // Cut the file if including the key is going to add a skippable file on
  278. // the grandparent level AND its size is reasonably big (1/8 of target file
  279. // size). For example, if it's compacting the files L0 + L1:
  280. // L0: [1, 21]
  281. // L1: [3, 23]
  282. // L2: [2, 4] [11, 15] [22, 24]
  283. // Without this break, it will output as:
  284. // L1: [1,3, 21,23]
  285. // With this break, it will output as (assuming [11, 15] at L2 is bigger
  286. // than 1/8 of target size):
  287. // L1: [1,3] [21,23]
  288. // Then for the future compactions, [11,15] won't be included.
  289. // For random datasets (either evenly distributed or skewed), it rarely
  290. // triggers this condition, but if the user is adding 2 different datasets
  291. // without any overlap, it may likely happen.
  292. // More details, check PR #1963
  293. const size_t num_skippable_boundaries_crossed =
  294. being_grandparent_gap_ ? 2 : 3;
  295. if (compaction_->immutable_options().compaction_style ==
  296. kCompactionStyleLevel &&
  297. num_grandparent_boundaries_crossed >=
  298. num_skippable_boundaries_crossed &&
  299. grandparent_overlapped_bytes_ - previous_overlapped_bytes >
  300. compaction_->target_output_file_size() / 8) {
  301. return true;
  302. }
  303. // Pre-cut the output file if it's reaching a certain size AND it's at the
  304. // boundary of a grandparent file. It can reduce the future compaction size,
  305. // the cost is having smaller files.
  306. // The pre-cut size threshold is based on how many grandparent boundaries
  307. // it has seen before. Basically, if it has seen no boundary at all, then it
  308. // will pre-cut at 50% target file size. Every boundary it has seen
  309. // increases the threshold by 5%, max at 90%, which it will always cut.
  310. // The idea is based on if it has seen more boundaries before, it will more
  311. // likely to see another boundary (file cutting opportunity) before the
  312. // target file size. The test shows it can generate larger files than a
  313. // static threshold like 75% and has a similar write amplification
  314. // improvement.
  315. if (compaction_->immutable_options().compaction_style ==
  316. kCompactionStyleLevel &&
  317. current_output_file_size_ >=
  318. ((compaction_->target_output_file_size() + 99) / 100) *
  319. (50 + std::min(grandparent_boundary_switched_num_ * 5,
  320. size_t{40}))) {
  321. return true;
  322. }
  323. }
  324. return false;
  325. }
  326. Status CompactionOutputs::AddToOutput(
  327. const CompactionIterator& c_iter,
  328. const CompactionFileOpenFunc& open_file_func,
  329. const CompactionFileCloseFunc& close_file_func,
  330. const ParsedInternalKey& prev_table_last_internal_key) {
  331. Status s;
  332. bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
  333. if (is_range_del && compaction_->bottommost_level()) {
  334. // We don't consider range tombstone for bottommost level since:
  335. // 1. there is no grandparent and hence no overlap to consider
  336. // 2. range tombstone may be dropped at bottommost level.
  337. return s;
  338. }
  339. const Slice& key = c_iter.key();
  340. if (ShouldStopBefore(c_iter) && HasBuilder()) {
  341. s = close_file_func(c_iter.InputStatus(), prev_table_last_internal_key, key,
  342. &c_iter, *this);
  343. if (!s.ok()) {
  344. return s;
  345. }
  346. // reset grandparent information
  347. grandparent_boundary_switched_num_ = 0;
  348. grandparent_overlapped_bytes_ =
  349. GetCurrentKeyGrandparentOverlappedBytes(key);
  350. if (UNLIKELY(is_range_del)) {
  351. // lower bound for this new output file, this is needed as the lower bound
  352. // does not come from the smallest point key in this case.
  353. range_tombstone_lower_bound_.DecodeFrom(key);
  354. } else {
  355. range_tombstone_lower_bound_.Clear();
  356. }
  357. }
  358. // Open output file if necessary
  359. if (!HasBuilder()) {
  360. s = open_file_func(*this);
  361. if (!s.ok()) {
  362. return s;
  363. }
  364. }
  365. // c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
  366. // here before returning below when `is_range_del` is true
  367. if (partitioner_) {
  368. last_key_for_partitioner_.assign(c_iter.user_key().data_,
  369. c_iter.user_key().size_);
  370. }
  371. if (UNLIKELY(is_range_del)) {
  372. return s;
  373. }
  374. assert(builder_ != nullptr);
  375. const Slice& value = c_iter.value();
  376. s = current_output().validator.Add(key, value);
  377. if (!s.ok()) {
  378. return s;
  379. }
  380. builder_->Add(key, value);
  381. stats_.num_output_records++;
  382. current_output_file_size_ = builder_->EstimatedFileSize();
  383. if (blob_garbage_meter_) {
  384. s = blob_garbage_meter_->ProcessOutFlow(key, value);
  385. }
  386. if (!s.ok()) {
  387. return s;
  388. }
  389. const ParsedInternalKey& ikey = c_iter.ikey();
  390. if (ikey.type == kTypeValuePreferredSeqno) {
  391. SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value);
  392. smallest_preferred_seqno_ =
  393. std::min(smallest_preferred_seqno_, preferred_seqno);
  394. }
  395. s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
  396. ikey.type);
  397. return s;
  398. }
  399. namespace {
  400. void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key,
  401. const size_t ts_sz) {
  402. if (ts_sz) {
  403. static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
  404. if (ts_sz <= strlen(kTsMax)) {
  405. internal_key = InternalKey(user_key, kMaxSequenceNumber,
  406. kTypeRangeDeletion, Slice(kTsMax, ts_sz));
  407. } else {
  408. internal_key =
  409. InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion,
  410. std::string(ts_sz, '\xff'));
  411. }
  412. } else {
  413. internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion);
  414. }
  415. }
  416. } // namespace
  417. Status CompactionOutputs::AddRangeDels(
  418. CompactionRangeDelAggregator& range_del_agg,
  419. const Slice* comp_start_user_key, const Slice* comp_end_user_key,
  420. CompactionIterationStats& range_del_out_stats, bool bottommost_level,
  421. const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
  422. std::pair<SequenceNumber, SequenceNumber> keep_seqno_range,
  423. const Slice& next_table_min_key, const std::string& full_history_ts_low) {
  424. // The following example does not happen since
  425. // CompactionOutput::ShouldStopBefore() always return false for the first
  426. // point key. But we should consider removing this dependency. Suppose for the
  427. // first compaction output file,
  428. // - next_table_min_key.user_key == comp_start_user_key
  429. // - no point key is in the output file
  430. // - there is a range tombstone @seqno to be added that covers
  431. // comp_start_user_key
  432. // Then meta.smallest will be set to comp_start_user_key@seqno
  433. // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber
  434. // which violates the assumption that meta.smallest should be <= meta.largest.
  435. assert(!range_del_agg.IsEmpty());
  436. FileMetaData& meta = current_output().meta;
  437. const Comparator* ucmp = icmp.user_comparator();
  438. InternalKey lower_bound_buf, upper_bound_buf;
  439. Slice lower_bound_guard, upper_bound_guard;
  440. std::string smallest_user_key;
  441. const Slice *lower_bound, *upper_bound;
  442. // We first determine the internal key lower_bound and upper_bound for
  443. // this output file. All and only range tombstones that overlap with
  444. // [lower_bound, upper_bound] should be added to this file. File
  445. // boundaries (meta.smallest/largest) should be updated accordingly when
  446. // extended by range tombstones.
  447. size_t output_size = outputs_.size();
  448. if (output_size == 1) {
  449. // This is the first file in the subcompaction.
  450. //
  451. // When outputting a range tombstone that spans a subcompaction boundary,
  452. // the files on either side of that boundary need to include that
  453. // boundary's user key. Otherwise, the spanning range tombstone would lose
  454. // coverage.
  455. //
  456. // To achieve this while preventing files from overlapping in internal key
  457. // (an LSM invariant violation), we allow the earlier file to include the
  458. // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The
  459. // later file can begin at the boundary user key at the newest key version
  460. // it contains. At this point that version number is unknown since we have
  461. // not processed the range tombstones yet, so permit any version. Same story
  462. // applies to timestamp, and a non-nullptr `comp_start_user_key` should have
  463. // `kMaxTs` here, which similarly permits any timestamp.
  464. if (comp_start_user_key) {
  465. lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber,
  466. kTypeRangeDeletion);
  467. lower_bound_guard = lower_bound_buf.Encode();
  468. lower_bound = &lower_bound_guard;
  469. } else {
  470. lower_bound = nullptr;
  471. }
  472. } else {
  473. // For subsequent output tables, only include range tombstones from min
  474. // key onwards since the previous file was extended to contain range
  475. // tombstones falling before min key.
  476. if (range_tombstone_lower_bound_.size() > 0) {
  477. assert(meta.smallest.size() == 0 ||
  478. icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0);
  479. lower_bound_guard = range_tombstone_lower_bound_.Encode();
  480. } else {
  481. assert(meta.smallest.size() > 0);
  482. lower_bound_guard = meta.smallest.Encode();
  483. }
  484. lower_bound = &lower_bound_guard;
  485. }
  486. const size_t ts_sz = ucmp->timestamp_size();
  487. if (next_table_min_key.empty()) {
  488. // Last file of the subcompaction.
  489. if (comp_end_user_key) {
  490. upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber,
  491. kTypeRangeDeletion);
  492. upper_bound_guard = upper_bound_buf.Encode();
  493. upper_bound = &upper_bound_guard;
  494. } else {
  495. upper_bound = nullptr;
  496. }
  497. } else {
  498. // There is another file coming whose coverage will begin at
  499. // `next_table_min_key`. The current file needs to extend range tombstone
  500. // coverage through its own keys (through `meta.largest`) and through user
  501. // keys preceding `next_table_min_key`'s user key.
  502. ParsedInternalKey next_table_min_key_parsed;
  503. ParseInternalKey(next_table_min_key, &next_table_min_key_parsed,
  504. false /* log_err_key */)
  505. .PermitUncheckedError();
  506. assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber);
  507. assert(meta.largest.size() == 0 ||
  508. icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0);
  509. assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0);
  510. if (meta.largest.size() > 0 &&
  511. ucmp->EqualWithoutTimestamp(meta.largest.user_key(),
  512. next_table_min_key_parsed.user_key)) {
  513. // Caution: this assumes meta.largest.Encode() lives longer than
  514. // upper_bound, which is only true if meta.largest is never updated.
  515. // This just happens to be the case here since meta.largest serves
  516. // as the upper_bound.
  517. upper_bound_guard = meta.largest.Encode();
  518. } else {
  519. SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key,
  520. ts_sz);
  521. upper_bound_guard = upper_bound_buf.Encode();
  522. }
  523. upper_bound = &upper_bound_guard;
  524. }
  525. if (lower_bound && upper_bound &&
  526. icmp.Compare(*lower_bound, *upper_bound) > 0) {
  527. assert(meta.smallest.size() == 0 &&
  528. ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound),
  529. ExtractUserKey(*upper_bound)));
  530. // This can only happen when lower_bound have the same user key as
  531. // next_table_min_key and that there is no point key in the current
  532. // compaction output file.
  533. return Status::OK();
  534. }
  535. // The end key of the subcompaction must be bigger or equal to the upper
  536. // bound. If the end of subcompaction is null or the upper bound is null,
  537. // it means that this file is the last file in the compaction. So there
  538. // will be no overlapping between this file and others.
  539. assert(comp_end_user_key == nullptr || upper_bound == nullptr ||
  540. ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound),
  541. *comp_end_user_key) <= 0);
  542. auto it = range_del_agg.NewIterator(lower_bound, upper_bound);
  543. Slice last_tombstone_start_user_key{};
  544. bool reached_lower_bound = false;
  545. const ReadOptions read_options(Env::IOActivity::kCompaction);
  546. for (it->SeekToFirst(); it->Valid(); it->Next()) {
  547. auto tombstone = it->Tombstone();
  548. auto kv = tombstone.Serialize();
  549. // Filter out by seqno for per-key placement
  550. if (tombstone.seq_ < keep_seqno_range.first ||
  551. tombstone.seq_ >= keep_seqno_range.second) {
  552. continue;
  553. }
  554. InternalKey tombstone_end = tombstone.SerializeEndKey();
  555. // TODO: the underlying iterator should support clamping the bounds.
  556. // tombstone_end.Encode is of form user_key@kMaxSeqno
  557. // if it is equal to lower_bound, there is no need to include
  558. // such range tombstone.
  559. if (!reached_lower_bound && lower_bound &&
  560. icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) {
  561. continue;
  562. }
  563. assert(!lower_bound ||
  564. icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0);
  565. reached_lower_bound = true;
  566. // Garbage collection for range tombstones.
  567. // If user-defined timestamp is enabled, range tombstones are dropped if
  568. // they are at bottommost_level, below full_history_ts_low and not visible
  569. // in any snapshot. trim_ts_ is passed to the constructor for
  570. // range_del_agg_, and range_del_agg_ internally drops tombstones above
  571. // trim_ts_.
  572. bool consider_drop =
  573. tombstone.seq_ <= earliest_snapshot &&
  574. (ts_sz == 0 ||
  575. (!full_history_ts_low.empty() &&
  576. ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0));
  577. if (consider_drop && bottommost_level) {
  578. // TODO(andrewkr): tombstones that span multiple output files are
  579. // counted for each compaction output file, so lots of double
  580. // counting.
  581. range_del_out_stats.num_range_del_drop_obsolete++;
  582. range_del_out_stats.num_record_drop_obsolete++;
  583. continue;
  584. }
  585. assert(lower_bound == nullptr ||
  586. ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound),
  587. kv.second) < 0);
  588. InternalKey tombstone_start = kv.first;
  589. if (lower_bound &&
  590. ucmp->CompareWithoutTimestamp(tombstone_start.user_key(),
  591. ExtractUserKey(*lower_bound)) < 0) {
  592. // This just updates the non-timestamp portion of `tombstone_start`'s user
  593. // key. Ideally there would be a simpler API usage
  594. ParsedInternalKey tombstone_start_parsed;
  595. ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed,
  596. false /* log_err_key */)
  597. .PermitUncheckedError();
  598. // timestamp should be from where sequence number is from, which is from
  599. // tombstone in this case
  600. std::string ts =
  601. tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size())
  602. .ToString();
  603. tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound);
  604. tombstone_start.SetFrom(tombstone_start_parsed, ts);
  605. }
  606. if (upper_bound != nullptr &&
  607. icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) {
  608. break;
  609. }
  610. if (lower_bound &&
  611. icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) {
  612. tombstone_start.DecodeFrom(*lower_bound);
  613. }
  614. if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) {
  615. tombstone_end.DecodeFrom(*upper_bound);
  616. }
  617. if (consider_drop && compaction_->KeyRangeNotExistsBeyondOutputLevel(
  618. tombstone_start.user_key(),
  619. tombstone_end.user_key(), &level_ptrs_)) {
  620. range_del_out_stats.num_range_del_drop_obsolete++;
  621. range_del_out_stats.num_record_drop_obsolete++;
  622. continue;
  623. }
  624. // Here we show that *only* range tombstones that overlap with
  625. // [lower_bound, upper_bound] are added to the current file, and
  626. // sanity checking invariants that should hold:
  627. // - [tombstone_start, tombstone_end] overlaps with [lower_bound,
  628. // upper_bound]
  629. // - meta.smallest <= meta.largest
  630. // Corresponding assertions are made, the proof is broken is any of them
  631. // fails.
  632. // TODO: show that *all* range tombstones that overlap with
  633. // [lower_bound, upper_bound] are added.
  634. // TODO: some invariant about boundaries are correctly updated.
  635. //
  636. // Note that `tombstone_start` is updated in the if condition above, we use
  637. // tombstone_start to refer to its initial value, i.e.,
  638. // it->Tombstone().first, and use tombstone_start* to refer to its value
  639. // after the update.
  640. //
  641. // To show [lower_bound, upper_bound] overlaps with [tombstone_start,
  642. // tombstone_end]:
  643. // lower_bound <= upper_bound from the if condition right after all
  644. // bounds are initialized. We assume each tombstone fragment has
  645. // start_key.user_key < end_key.user_key, so
  646. // tombstone_start < tombstone_end by
  647. // FragmentedTombstoneIterator::Tombstone(). So these two ranges are both
  648. // non-emtpy. The flag `reached_lower_bound` and the if logic before it
  649. // ensures lower_bound <= tombstone_end. tombstone_start is only updated
  650. // if it has a smaller user_key than lower_bound user_key, so
  651. // tombstone_start <= tombstone_start*. The above if condition implies
  652. // tombstone_start* <= upper_bound. So we have
  653. // tombstone_start <= upper_bound and lower_bound <= tombstone_end
  654. // and the two ranges overlap.
  655. //
  656. // To show meta.smallest <= meta.largest:
  657. // From the implementation of UpdateBoundariesForRange(), it suffices to
  658. // prove that when it is first called in this function, its parameters
  659. // satisfy `start <= end`, where start = max(tombstone_start*, lower_bound)
  660. // and end = min(tombstone_end, upper_bound). From the above proof we have
  661. // lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need
  662. // to show that tombstone_start* <= min(tombstone_end, upper_bound).
  663. // Note that tombstone_start*.user_key = max(tombstone_start.user_key,
  664. // lower_bound.user_key). Assuming tombstone_end always has
  665. // kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber.
  666. // Since lower_bound <= tombstone_end and lower_bound.seqno <
  667. // tombstone_end.seqno (in absolute number order, not internal key order),
  668. // lower_bound.user_key < tombstone_end.user_key.
  669. // Since lower_bound.user_key < tombstone_end.user_key and
  670. // tombstone_start.user_key < tombstone_end.user_key, tombstone_start* <
  671. // tombstone_end. Since tombstone_start* <= upper_bound from the above proof
  672. // and tombstone_start* < tombstone_end, tombstone_start* <=
  673. // min(tombstone_end, upper_bound), so the two ranges overlap.
  674. // Range tombstone is not supported by output validator yet.
  675. builder_->Add(kv.first.Encode(), kv.second);
  676. assert(icmp.Compare(tombstone_start, tombstone_end) <= 0);
  677. meta.UpdateBoundariesForRange(tombstone_start, tombstone_end,
  678. tombstone.seq_, icmp);
  679. if (!bottommost_level) {
  680. bool start_user_key_changed =
  681. last_tombstone_start_user_key.empty() ||
  682. ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
  683. it->start_key()) < 0;
  684. last_tombstone_start_user_key = it->start_key();
  685. if (start_user_key_changed) {
  686. // If tombstone_start >= tombstone_end, then either no key range is
  687. // covered, or that they have the same user key. If they have the same
  688. // user key, then the internal key range should only be within this
  689. // level, and no keys from older levels is covered.
  690. if (ucmp->CompareWithoutTimestamp(tombstone_start.user_key(),
  691. tombstone_end.user_key()) < 0) {
  692. SizeApproximationOptions approx_opts;
  693. approx_opts.files_size_error_margin = 0.1;
  694. auto approximate_covered_size =
  695. compaction_->input_version()->version_set()->ApproximateSize(
  696. approx_opts, read_options, compaction_->input_version(),
  697. tombstone_start.Encode(), tombstone_end.Encode(),
  698. compaction_->output_level() + 1 /* start_level */,
  699. -1 /* end_level */, kCompaction);
  700. meta.compensated_range_deletion_size += approximate_covered_size;
  701. }
  702. }
  703. }
  704. }
  705. return Status::OK();
  706. }
  707. void CompactionOutputs::FillFilesToCutForTtl() {
  708. if (compaction_->immutable_options().compaction_style !=
  709. kCompactionStyleLevel ||
  710. compaction_->immutable_options().compaction_pri != kMinOverlappingRatio ||
  711. compaction_->mutable_cf_options().ttl == 0 ||
  712. compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) {
  713. return;
  714. }
  715. // We define new file with the oldest ancestor time to be younger than 1/4
  716. // TTL, and an old one to be older than 1/2 TTL time.
  717. int64_t temp_current_time;
  718. auto get_time_status = compaction_->immutable_options().clock->GetCurrentTime(
  719. &temp_current_time);
  720. if (!get_time_status.ok()) {
  721. return;
  722. }
  723. auto current_time = static_cast<uint64_t>(temp_current_time);
  724. if (current_time < compaction_->mutable_cf_options().ttl) {
  725. return;
  726. }
  727. uint64_t old_age_thres =
  728. current_time - compaction_->mutable_cf_options().ttl / 2;
  729. const std::vector<FileMetaData*>& olevel =
  730. *(compaction_->inputs(compaction_->num_input_levels() - 1));
  731. for (FileMetaData* file : olevel) {
  732. // Worth filtering out by start and end?
  733. uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
  734. // We put old files if they are not too small to prevent a flood
  735. // of small files.
  736. if (oldest_ancester_time < old_age_thres &&
  737. file->fd.GetFileSize() >
  738. compaction_->mutable_cf_options().target_file_size_base / 2) {
  739. files_to_cut_for_ttl_.push_back(file);
  740. }
  741. }
  742. }
  743. CompactionOutputs::CompactionOutputs(const Compaction* compaction,
  744. const bool is_proximal_level)
  745. : compaction_(compaction), is_proximal_level_(is_proximal_level) {
  746. partitioner_ = compaction->output_level() == 0
  747. ? nullptr
  748. : compaction->CreateSstPartitioner();
  749. if (compaction->output_level() != 0) {
  750. FillFilesToCutForTtl();
  751. }
  752. level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
  753. }
  754. } // namespace ROCKSDB_NAMESPACE