range_del_aggregator.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/range_del_aggregator.h"
  6. #include "db/compaction/compaction_iteration_stats.h"
  7. #include "db/dbformat.h"
  8. #include "db/pinned_iterators_manager.h"
  9. #include "db/range_tombstone_fragmenter.h"
  10. #include "db/version_edit.h"
  11. #include "rocksdb/comparator.h"
  12. #include "rocksdb/types.h"
  13. #include "table/internal_iterator.h"
  14. #include "table/table_builder.h"
  15. #include "util/heap.h"
  16. #include "util/kv_map.h"
  17. #include "util/vector_iterator.h"
  18. namespace ROCKSDB_NAMESPACE {
  19. TruncatedRangeDelIterator::TruncatedRangeDelIterator(
  20. std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
  21. const InternalKeyComparator* icmp, const InternalKey* smallest,
  22. const InternalKey* largest)
  23. : iter_(std::move(iter)),
  24. icmp_(icmp),
  25. smallest_ikey_(smallest),
  26. largest_ikey_(largest) {
  27. // Set up bounds such that range tombstones from this iterator are
  28. // truncated to range [smallest_, largest_).
  29. if (smallest != nullptr) {
  30. pinned_bounds_.emplace_back();
  31. auto& parsed_smallest = pinned_bounds_.back();
  32. Status pik_status = ParseInternalKey(smallest->Encode(), &parsed_smallest,
  33. false /* log_err_key */); // TODO
  34. pik_status.PermitUncheckedError();
  35. parsed_smallest.type = kTypeMaxValid;
  36. assert(pik_status.ok());
  37. smallest_ = &parsed_smallest;
  38. }
  39. if (largest != nullptr) {
  40. pinned_bounds_.emplace_back();
  41. auto& parsed_largest = pinned_bounds_.back();
  42. Status pik_status = ParseInternalKey(largest->Encode(), &parsed_largest,
  43. false /* log_err_key */); // TODO
  44. pik_status.PermitUncheckedError();
  45. assert(pik_status.ok());
  46. if (parsed_largest.type == kTypeRangeDeletion &&
  47. parsed_largest.sequence == kMaxSequenceNumber) {
  48. // The file boundary has been artificially extended by a range tombstone.
  49. // We do not need to adjust largest to properly truncate range
  50. // tombstones that extend past the boundary.
  51. } else if (parsed_largest.sequence == 0) {
  52. // The largest key in the sstable has a sequence number of 0. Since we
  53. // guarantee that no internal keys with the same user key and sequence
  54. // number can exist in a DB, we know that the largest key in this sstable
  55. // cannot exist as the smallest key in the next sstable. This further
  56. // implies that no range tombstone in this sstable covers largest;
  57. // otherwise, the file boundary would have been artificially extended.
  58. //
  59. // Therefore, we will never truncate a range tombstone at largest, so we
  60. // can leave it unchanged.
  61. // TODO: maybe use kMaxValid here to ensure range tombstone having
  62. // distinct key from point keys.
  63. } else {
  64. // The same user key may straddle two sstable boundaries. To ensure that
  65. // the truncated end key can cover the largest key in this sstable, reduce
  66. // its sequence number by 1.
  67. parsed_largest.sequence -= 1;
  68. // This line is not needed for correctness, but it ensures that the
  69. // truncated end key is not covering keys from the next SST file.
  70. parsed_largest.type = kTypeMaxValid;
  71. }
  72. largest_ = &parsed_largest;
  73. }
  74. }
  75. bool TruncatedRangeDelIterator::Valid() const {
  76. assert(iter_ != nullptr);
  77. return iter_->Valid() &&
  78. (smallest_ == nullptr ||
  79. icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) &&
  80. (largest_ == nullptr ||
  81. icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0);
  82. }
  83. // NOTE: target is a user key, with timestamp if enabled.
  84. void TruncatedRangeDelIterator::Seek(const Slice& target) {
  85. if (largest_ != nullptr &&
  86. icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber,
  87. kTypeRangeDeletion)) <= 0) {
  88. iter_->Invalidate();
  89. return;
  90. }
  91. if (smallest_ != nullptr &&
  92. icmp_->user_comparator()->Compare(target, smallest_->user_key) < 0) {
  93. iter_->Seek(smallest_->user_key);
  94. return;
  95. }
  96. iter_->Seek(target);
  97. }
  98. void TruncatedRangeDelIterator::SeekInternalKey(const Slice& target) {
  99. if (largest_ && icmp_->Compare(*largest_, target) <= 0) {
  100. iter_->Invalidate();
  101. return;
  102. }
  103. if (smallest_ && icmp_->Compare(target, *smallest_) < 0) {
  104. // Since target < smallest, target < largest_.
  105. // This seek must land on a range tombstone where end_key() > target,
  106. // so there is no need to check again.
  107. iter_->Seek(smallest_->user_key);
  108. } else {
  109. iter_->Seek(ExtractUserKey(target));
  110. while (Valid() && icmp_->Compare(end_key(), target) <= 0) {
  111. Next();
  112. }
  113. }
  114. }
  115. // NOTE: target is a user key, with timestamp if enabled.
  116. void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
  117. if (smallest_ != nullptr &&
  118. icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion),
  119. *smallest_) < 0) {
  120. iter_->Invalidate();
  121. return;
  122. }
  123. if (largest_ != nullptr &&
  124. icmp_->user_comparator()->Compare(largest_->user_key, target) < 0) {
  125. iter_->SeekForPrev(largest_->user_key);
  126. return;
  127. }
  128. iter_->SeekForPrev(target);
  129. }
  130. void TruncatedRangeDelIterator::SeekToFirst() {
  131. if (smallest_ != nullptr) {
  132. iter_->Seek(smallest_->user_key);
  133. return;
  134. }
  135. iter_->SeekToTopFirst();
  136. }
  137. void TruncatedRangeDelIterator::SeekToLast() {
  138. if (largest_ != nullptr) {
  139. iter_->SeekForPrev(largest_->user_key);
  140. return;
  141. }
  142. iter_->SeekToTopLast();
  143. }
  144. std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
  145. TruncatedRangeDelIterator::SplitBySnapshot(
  146. const std::vector<SequenceNumber>& snapshots) {
  147. using FragmentedIterPair =
  148. std::pair<const SequenceNumber,
  149. std::unique_ptr<FragmentedRangeTombstoneIterator>>;
  150. auto split_untruncated_iters = iter_->SplitBySnapshot(snapshots);
  151. std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
  152. split_truncated_iters;
  153. std::for_each(
  154. split_untruncated_iters.begin(), split_untruncated_iters.end(),
  155. [&](FragmentedIterPair& iter_pair) {
  156. auto truncated_iter = std::make_unique<TruncatedRangeDelIterator>(
  157. std::move(iter_pair.second), icmp_, smallest_ikey_, largest_ikey_);
  158. split_truncated_iters.emplace(iter_pair.first,
  159. std::move(truncated_iter));
  160. });
  161. return split_truncated_iters;
  162. }
  163. ForwardRangeDelIterator::ForwardRangeDelIterator(
  164. const InternalKeyComparator* icmp)
  165. : icmp_(icmp),
  166. unused_idx_(0),
  167. active_seqnums_(SeqMaxComparator()),
  168. active_iters_(EndKeyMinComparator(icmp)),
  169. inactive_iters_(StartKeyMinComparator(icmp)) {}
  170. bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
  171. // Move active iterators that end before parsed.
  172. while (!active_iters_.empty() &&
  173. icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
  174. TruncatedRangeDelIterator* iter = PopActiveIter();
  175. do {
  176. iter->Next();
  177. } while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0);
  178. PushIter(iter, parsed);
  179. assert(active_iters_.size() == active_seqnums_.size());
  180. }
  181. // Move inactive iterators that start before parsed.
  182. while (!inactive_iters_.empty() &&
  183. icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) {
  184. TruncatedRangeDelIterator* iter = PopInactiveIter();
  185. while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) {
  186. iter->Next();
  187. }
  188. PushIter(iter, parsed);
  189. assert(active_iters_.size() == active_seqnums_.size());
  190. }
  191. return active_seqnums_.empty()
  192. ? false
  193. : (*active_seqnums_.begin())->seq() > parsed.sequence;
  194. }
  195. void ForwardRangeDelIterator::Invalidate() {
  196. unused_idx_ = 0;
  197. active_iters_.clear();
  198. active_seqnums_.clear();
  199. inactive_iters_.clear();
  200. }
  201. ReverseRangeDelIterator::ReverseRangeDelIterator(
  202. const InternalKeyComparator* icmp)
  203. : icmp_(icmp),
  204. unused_idx_(0),
  205. active_seqnums_(SeqMaxComparator()),
  206. active_iters_(StartKeyMaxComparator(icmp)),
  207. inactive_iters_(EndKeyMaxComparator(icmp)) {}
  208. bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
  209. // Move active iterators that start after parsed.
  210. while (!active_iters_.empty() &&
  211. icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
  212. TruncatedRangeDelIterator* iter = PopActiveIter();
  213. do {
  214. iter->Prev();
  215. } while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0);
  216. PushIter(iter, parsed);
  217. assert(active_iters_.size() == active_seqnums_.size());
  218. }
  219. // Move inactive iterators that end after parsed.
  220. while (!inactive_iters_.empty() &&
  221. icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) {
  222. TruncatedRangeDelIterator* iter = PopInactiveIter();
  223. while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) {
  224. iter->Prev();
  225. }
  226. PushIter(iter, parsed);
  227. assert(active_iters_.size() == active_seqnums_.size());
  228. }
  229. return active_seqnums_.empty()
  230. ? false
  231. : (*active_seqnums_.begin())->seq() > parsed.sequence;
  232. }
  233. void ReverseRangeDelIterator::Invalidate() {
  234. unused_idx_ = 0;
  235. active_iters_.clear();
  236. active_seqnums_.clear();
  237. inactive_iters_.clear();
  238. }
  239. bool RangeDelAggregator::StripeRep::ShouldDelete(
  240. const ParsedInternalKey& parsed, RangeDelPositioningMode mode) {
  241. if (!InStripe(parsed.sequence) || IsEmpty()) {
  242. return false;
  243. }
  244. switch (mode) {
  245. case RangeDelPositioningMode::kForwardTraversal:
  246. InvalidateReverseIter();
  247. // Pick up previously unseen iterators.
  248. for (auto it = std::next(iters_.begin(), forward_iter_.UnusedIdx());
  249. it != iters_.end(); ++it, forward_iter_.IncUnusedIdx()) {
  250. auto& iter = *it;
  251. forward_iter_.AddNewIter(iter.get(), parsed);
  252. }
  253. return forward_iter_.ShouldDelete(parsed);
  254. case RangeDelPositioningMode::kBackwardTraversal:
  255. InvalidateForwardIter();
  256. // Pick up previously unseen iterators.
  257. for (auto it = std::next(iters_.begin(), reverse_iter_.UnusedIdx());
  258. it != iters_.end(); ++it, reverse_iter_.IncUnusedIdx()) {
  259. auto& iter = *it;
  260. reverse_iter_.AddNewIter(iter.get(), parsed);
  261. }
  262. return reverse_iter_.ShouldDelete(parsed);
  263. default:
  264. assert(false);
  265. return false;
  266. }
  267. }
  268. bool RangeDelAggregator::StripeRep::IsRangeOverlapped(const Slice& start,
  269. const Slice& end) {
  270. Invalidate();
  271. // Set the internal start/end keys so that:
  272. // - if start_ikey has the same user key and sequence number as the
  273. // current end key, start_ikey will be considered greater; and
  274. // - if end_ikey has the same user key and sequence number as the current
  275. // start key, end_ikey will be considered greater.
  276. ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
  277. static_cast<ValueType>(0));
  278. ParsedInternalKey end_ikey(end, 0, static_cast<ValueType>(0));
  279. for (auto& iter : iters_) {
  280. bool checked_candidate_tombstones = false;
  281. for (iter->SeekForPrev(start);
  282. iter->Valid() && icmp_->Compare(iter->start_key(), end_ikey) <= 0;
  283. iter->Next()) {
  284. checked_candidate_tombstones = true;
  285. if (icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
  286. icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
  287. return true;
  288. }
  289. }
  290. if (!checked_candidate_tombstones) {
  291. // Do an additional check for when the end of the range is the begin
  292. // key of a tombstone, which we missed earlier since SeekForPrev'ing
  293. // to the start was invalid.
  294. iter->SeekForPrev(end);
  295. if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
  296. icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
  297. return true;
  298. }
  299. }
  300. }
  301. return false;
  302. }
  303. void ReadRangeDelAggregator::AddTombstones(
  304. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  305. const InternalKey* smallest, const InternalKey* largest) {
  306. if (input_iter == nullptr || input_iter->empty()) {
  307. return;
  308. }
  309. rep_.AddTombstones(std::make_unique<TruncatedRangeDelIterator>(
  310. std::move(input_iter), icmp_, smallest, largest));
  311. }
  312. bool ReadRangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed,
  313. RangeDelPositioningMode mode) {
  314. return rep_.ShouldDelete(parsed, mode);
  315. }
  316. bool ReadRangeDelAggregator::IsRangeOverlapped(const Slice& start,
  317. const Slice& end) {
  318. InvalidateRangeDelMapPositions();
  319. return rep_.IsRangeOverlapped(start, end);
  320. }
  321. void CompactionRangeDelAggregator::AddTombstones(
  322. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  323. const InternalKey* smallest, const InternalKey* largest) {
  324. if (input_iter == nullptr || input_iter->empty()) {
  325. return;
  326. }
  327. // This bounds output of CompactionRangeDelAggregator::NewIterator.
  328. if (!trim_ts_.empty()) {
  329. assert(icmp_->user_comparator()->timestamp_size() > 0);
  330. input_iter->SetTimestampUpperBound(&trim_ts_);
  331. }
  332. assert(input_iter->lower_bound() == 0);
  333. assert(input_iter->upper_bound() == kMaxSequenceNumber);
  334. parent_iters_.emplace_back(new TruncatedRangeDelIterator(
  335. std::move(input_iter), icmp_, smallest, largest));
  336. Slice* ts_upper_bound = nullptr;
  337. if (!ts_upper_bound_.empty()) {
  338. assert(icmp_->user_comparator()->timestamp_size() > 0);
  339. ts_upper_bound = &ts_upper_bound_;
  340. }
  341. auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_);
  342. for (auto& split_iter : split_iters) {
  343. auto it = reps_.find(split_iter.first);
  344. if (it == reps_.end()) {
  345. bool inserted;
  346. SequenceNumber upper_bound = split_iter.second->upper_bound();
  347. SequenceNumber lower_bound = split_iter.second->lower_bound();
  348. std::tie(it, inserted) = reps_.emplace(
  349. split_iter.first, StripeRep(icmp_, upper_bound, lower_bound));
  350. assert(inserted);
  351. }
  352. assert(it != reps_.end());
  353. // ts_upper_bound is used to bound ShouldDelete() to only consider
  354. // range tombstones under full_history_ts_low_ and trim_ts_. Keys covered by
  355. // range tombstones that are above full_history_ts_low_ should not be
  356. // dropped prematurely: user may read with a timestamp between the range
  357. // tombstone and the covered key. Note that we cannot set timestamp
  358. // upperbound on the original `input_iter` since `input_iter`s are later
  359. // used in CompactionRangeDelAggregator::NewIterator to output range
  360. // tombstones for persistence. We do not want to only persist range
  361. // tombstones with timestamp lower than ts_upper_bound.
  362. split_iter.second->SetTimestampUpperBound(ts_upper_bound);
  363. it->second.AddTombstones(std::move(split_iter.second));
  364. }
  365. }
  366. bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
  367. RangeDelPositioningMode mode) {
  368. auto it = reps_.lower_bound(parsed.sequence);
  369. if (it == reps_.end()) {
  370. return false;
  371. }
  372. return it->second.ShouldDelete(parsed, mode);
  373. }
  374. namespace {
  375. // Produce a sorted (by start internal key) stream of range tombstones from
  376. // `children`. lower_bound and upper_bound on internal key can be
  377. // optionally specified. Range tombstones that ends before lower_bound or starts
  378. // after upper_bound are excluded.
  379. // If user-defined timestamp is enabled, lower_bound and upper_bound should
  380. // contain timestamp.
  381. class TruncatedRangeDelMergingIter : public InternalIterator {
  382. public:
  383. TruncatedRangeDelMergingIter(
  384. const InternalKeyComparator* icmp, const Slice* lower_bound,
  385. const Slice* upper_bound,
  386. const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children)
  387. : icmp_(icmp),
  388. lower_bound_(lower_bound),
  389. upper_bound_(upper_bound),
  390. heap_(StartKeyMinComparator(icmp)),
  391. ts_sz_(icmp_->user_comparator()->timestamp_size()) {
  392. for (auto& child : children) {
  393. if (child != nullptr) {
  394. assert(child->lower_bound() == 0);
  395. assert(child->upper_bound() == kMaxSequenceNumber);
  396. children_.push_back(child.get());
  397. }
  398. }
  399. }
  400. bool Valid() const override {
  401. return !heap_.empty() && !AfterEndKey(heap_.top());
  402. }
  403. Status status() const override { return Status::OK(); }
  404. void SeekToFirst() override {
  405. heap_.clear();
  406. for (auto& child : children_) {
  407. if (lower_bound_ != nullptr) {
  408. child->Seek(ExtractUserKey(*lower_bound_));
  409. // Since the above `Seek()` operates on a user key while `lower_bound_`
  410. // is an internal key, we may need to advance `child` farther for it to
  411. // be in bounds.
  412. while (child->Valid() && BeforeStartKey(child)) {
  413. child->InternalNext();
  414. }
  415. } else {
  416. child->SeekToFirst();
  417. }
  418. if (child->Valid()) {
  419. heap_.push(child);
  420. }
  421. }
  422. }
  423. void Next() override {
  424. auto* top = heap_.top();
  425. top->InternalNext();
  426. if (top->Valid()) {
  427. heap_.replace_top(top);
  428. } else {
  429. heap_.pop();
  430. }
  431. }
  432. Slice key() const override {
  433. auto* top = heap_.top();
  434. if (ts_sz_) {
  435. cur_start_key_.Set(top->start_key().user_key, top->seq(),
  436. kTypeRangeDeletion, top->timestamp());
  437. } else {
  438. cur_start_key_.Set(top->start_key().user_key, top->seq(),
  439. kTypeRangeDeletion);
  440. }
  441. assert(top->start_key().user_key.size() >= ts_sz_);
  442. return cur_start_key_.Encode();
  443. }
  444. Slice value() const override {
  445. auto* top = heap_.top();
  446. if (!ts_sz_) {
  447. return top->end_key().user_key;
  448. }
  449. assert(top->timestamp().size() == ts_sz_);
  450. cur_end_key_.clear();
  451. cur_end_key_.append(top->end_key().user_key.data(),
  452. top->end_key().user_key.size() - ts_sz_);
  453. cur_end_key_.append(top->timestamp().data(), ts_sz_);
  454. return cur_end_key_;
  455. }
  456. // Unused InternalIterator methods
  457. void Prev() override { assert(false); }
  458. void Seek(const Slice& /* target */) override { assert(false); }
  459. void SeekForPrev(const Slice& /* target */) override { assert(false); }
  460. void SeekToLast() override { assert(false); }
  461. private:
  462. bool BeforeStartKey(const TruncatedRangeDelIterator* iter) const {
  463. if (lower_bound_ == nullptr) {
  464. return false;
  465. }
  466. return icmp_->Compare(iter->end_key(), *lower_bound_) <= 0;
  467. }
  468. bool AfterEndKey(const TruncatedRangeDelIterator* iter) const {
  469. if (upper_bound_ == nullptr) {
  470. return false;
  471. }
  472. return icmp_->Compare(iter->start_key(), *upper_bound_) > 0;
  473. }
  474. const InternalKeyComparator* icmp_;
  475. const Slice* lower_bound_;
  476. const Slice* upper_bound_;
  477. BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_;
  478. std::vector<TruncatedRangeDelIterator*> children_;
  479. mutable InternalKey cur_start_key_;
  480. mutable std::string cur_end_key_;
  481. size_t ts_sz_;
  482. };
  483. } // anonymous namespace
  484. std::unique_ptr<FragmentedRangeTombstoneIterator>
  485. CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound,
  486. const Slice* upper_bound) {
  487. InvalidateRangeDelMapPositions();
  488. auto merging_iter = std::make_unique<TruncatedRangeDelMergingIter>(
  489. icmp_, lower_bound, upper_bound, parent_iters_);
  490. auto fragmented_tombstone_list =
  491. std::make_shared<FragmentedRangeTombstoneList>(
  492. std::move(merging_iter), *icmp_, true /* for_compaction */,
  493. *snapshots_);
  494. return std::make_unique<FragmentedRangeTombstoneIterator>(
  495. fragmented_tombstone_list, *icmp_, kMaxSequenceNumber /* upper_bound */);
  496. }
  497. } // namespace ROCKSDB_NAMESPACE