range_del_aggregator.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/range_del_aggregator.h"
  6. #include "db/compaction/compaction_iteration_stats.h"
  7. #include "db/dbformat.h"
  8. #include "db/pinned_iterators_manager.h"
  9. #include "db/range_del_aggregator.h"
  10. #include "db/range_tombstone_fragmenter.h"
  11. #include "db/version_edit.h"
  12. #include "rocksdb/comparator.h"
  13. #include "rocksdb/types.h"
  14. #include "table/internal_iterator.h"
  15. #include "table/scoped_arena_iterator.h"
  16. #include "table/table_builder.h"
  17. #include "util/heap.h"
  18. #include "util/kv_map.h"
  19. #include "util/vector_iterator.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. TruncatedRangeDelIterator::TruncatedRangeDelIterator(
  22. std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
  23. const InternalKeyComparator* icmp, const InternalKey* smallest,
  24. const InternalKey* largest)
  25. : iter_(std::move(iter)),
  26. icmp_(icmp),
  27. smallest_ikey_(smallest),
  28. largest_ikey_(largest) {
  29. if (smallest != nullptr) {
  30. pinned_bounds_.emplace_back();
  31. auto& parsed_smallest = pinned_bounds_.back();
  32. if (!ParseInternalKey(smallest->Encode(), &parsed_smallest)) {
  33. assert(false);
  34. }
  35. smallest_ = &parsed_smallest;
  36. }
  37. if (largest != nullptr) {
  38. pinned_bounds_.emplace_back();
  39. auto& parsed_largest = pinned_bounds_.back();
  40. if (!ParseInternalKey(largest->Encode(), &parsed_largest)) {
  41. assert(false);
  42. }
  43. if (parsed_largest.type == kTypeRangeDeletion &&
  44. parsed_largest.sequence == kMaxSequenceNumber) {
  45. // The file boundary has been artificially extended by a range tombstone.
  46. // We do not need to adjust largest to properly truncate range
  47. // tombstones that extend past the boundary.
  48. } else if (parsed_largest.sequence == 0) {
  49. // The largest key in the sstable has a sequence number of 0. Since we
  50. // guarantee that no internal keys with the same user key and sequence
  51. // number can exist in a DB, we know that the largest key in this sstable
  52. // cannot exist as the smallest key in the next sstable. This further
  53. // implies that no range tombstone in this sstable covers largest;
  54. // otherwise, the file boundary would have been artificially extended.
  55. //
  56. // Therefore, we will never truncate a range tombstone at largest, so we
  57. // can leave it unchanged.
  58. } else {
  59. // The same user key may straddle two sstable boundaries. To ensure that
  60. // the truncated end key can cover the largest key in this sstable, reduce
  61. // its sequence number by 1.
  62. parsed_largest.sequence -= 1;
  63. }
  64. largest_ = &parsed_largest;
  65. }
  66. }
  67. bool TruncatedRangeDelIterator::Valid() const {
  68. return iter_->Valid() &&
  69. (smallest_ == nullptr ||
  70. icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) &&
  71. (largest_ == nullptr ||
  72. icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0);
  73. }
  74. void TruncatedRangeDelIterator::Next() { iter_->TopNext(); }
  75. void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); }
  76. void TruncatedRangeDelIterator::InternalNext() { iter_->Next(); }
  77. // NOTE: target is a user key
  78. void TruncatedRangeDelIterator::Seek(const Slice& target) {
  79. if (largest_ != nullptr &&
  80. icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber,
  81. kTypeRangeDeletion)) <= 0) {
  82. iter_->Invalidate();
  83. return;
  84. }
  85. if (smallest_ != nullptr &&
  86. icmp_->user_comparator()->Compare(target, smallest_->user_key) < 0) {
  87. iter_->Seek(smallest_->user_key);
  88. return;
  89. }
  90. iter_->Seek(target);
  91. }
  92. // NOTE: target is a user key
  93. void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
  94. if (smallest_ != nullptr &&
  95. icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion),
  96. *smallest_) < 0) {
  97. iter_->Invalidate();
  98. return;
  99. }
  100. if (largest_ != nullptr &&
  101. icmp_->user_comparator()->Compare(largest_->user_key, target) < 0) {
  102. iter_->SeekForPrev(largest_->user_key);
  103. return;
  104. }
  105. iter_->SeekForPrev(target);
  106. }
  107. void TruncatedRangeDelIterator::SeekToFirst() {
  108. if (smallest_ != nullptr) {
  109. iter_->Seek(smallest_->user_key);
  110. return;
  111. }
  112. iter_->SeekToTopFirst();
  113. }
  114. void TruncatedRangeDelIterator::SeekToLast() {
  115. if (largest_ != nullptr) {
  116. iter_->SeekForPrev(largest_->user_key);
  117. return;
  118. }
  119. iter_->SeekToTopLast();
  120. }
  121. std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
  122. TruncatedRangeDelIterator::SplitBySnapshot(
  123. const std::vector<SequenceNumber>& snapshots) {
  124. using FragmentedIterPair =
  125. std::pair<const SequenceNumber,
  126. std::unique_ptr<FragmentedRangeTombstoneIterator>>;
  127. auto split_untruncated_iters = iter_->SplitBySnapshot(snapshots);
  128. std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
  129. split_truncated_iters;
  130. std::for_each(
  131. split_untruncated_iters.begin(), split_untruncated_iters.end(),
  132. [&](FragmentedIterPair& iter_pair) {
  133. std::unique_ptr<TruncatedRangeDelIterator> truncated_iter(
  134. new TruncatedRangeDelIterator(std::move(iter_pair.second), icmp_,
  135. smallest_ikey_, largest_ikey_));
  136. split_truncated_iters.emplace(iter_pair.first,
  137. std::move(truncated_iter));
  138. });
  139. return split_truncated_iters;
  140. }
  141. ForwardRangeDelIterator::ForwardRangeDelIterator(
  142. const InternalKeyComparator* icmp)
  143. : icmp_(icmp),
  144. unused_idx_(0),
  145. active_seqnums_(SeqMaxComparator()),
  146. active_iters_(EndKeyMinComparator(icmp)),
  147. inactive_iters_(StartKeyMinComparator(icmp)) {}
  148. bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
  149. // Move active iterators that end before parsed.
  150. while (!active_iters_.empty() &&
  151. icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
  152. TruncatedRangeDelIterator* iter = PopActiveIter();
  153. do {
  154. iter->Next();
  155. } while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0);
  156. PushIter(iter, parsed);
  157. assert(active_iters_.size() == active_seqnums_.size());
  158. }
  159. // Move inactive iterators that start before parsed.
  160. while (!inactive_iters_.empty() &&
  161. icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) {
  162. TruncatedRangeDelIterator* iter = PopInactiveIter();
  163. while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) {
  164. iter->Next();
  165. }
  166. PushIter(iter, parsed);
  167. assert(active_iters_.size() == active_seqnums_.size());
  168. }
  169. return active_seqnums_.empty()
  170. ? false
  171. : (*active_seqnums_.begin())->seq() > parsed.sequence;
  172. }
  173. void ForwardRangeDelIterator::Invalidate() {
  174. unused_idx_ = 0;
  175. active_iters_.clear();
  176. active_seqnums_.clear();
  177. inactive_iters_.clear();
  178. }
  179. ReverseRangeDelIterator::ReverseRangeDelIterator(
  180. const InternalKeyComparator* icmp)
  181. : icmp_(icmp),
  182. unused_idx_(0),
  183. active_seqnums_(SeqMaxComparator()),
  184. active_iters_(StartKeyMaxComparator(icmp)),
  185. inactive_iters_(EndKeyMaxComparator(icmp)) {}
  186. bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
  187. // Move active iterators that start after parsed.
  188. while (!active_iters_.empty() &&
  189. icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
  190. TruncatedRangeDelIterator* iter = PopActiveIter();
  191. do {
  192. iter->Prev();
  193. } while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0);
  194. PushIter(iter, parsed);
  195. assert(active_iters_.size() == active_seqnums_.size());
  196. }
  197. // Move inactive iterators that end after parsed.
  198. while (!inactive_iters_.empty() &&
  199. icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) {
  200. TruncatedRangeDelIterator* iter = PopInactiveIter();
  201. while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) {
  202. iter->Prev();
  203. }
  204. PushIter(iter, parsed);
  205. assert(active_iters_.size() == active_seqnums_.size());
  206. }
  207. return active_seqnums_.empty()
  208. ? false
  209. : (*active_seqnums_.begin())->seq() > parsed.sequence;
  210. }
  211. void ReverseRangeDelIterator::Invalidate() {
  212. unused_idx_ = 0;
  213. active_iters_.clear();
  214. active_seqnums_.clear();
  215. inactive_iters_.clear();
  216. }
  217. bool RangeDelAggregator::StripeRep::ShouldDelete(
  218. const ParsedInternalKey& parsed, RangeDelPositioningMode mode) {
  219. if (!InStripe(parsed.sequence) || IsEmpty()) {
  220. return false;
  221. }
  222. switch (mode) {
  223. case RangeDelPositioningMode::kForwardTraversal:
  224. InvalidateReverseIter();
  225. // Pick up previously unseen iterators.
  226. for (auto it = std::next(iters_.begin(), forward_iter_.UnusedIdx());
  227. it != iters_.end(); ++it, forward_iter_.IncUnusedIdx()) {
  228. auto& iter = *it;
  229. forward_iter_.AddNewIter(iter.get(), parsed);
  230. }
  231. return forward_iter_.ShouldDelete(parsed);
  232. case RangeDelPositioningMode::kBackwardTraversal:
  233. InvalidateForwardIter();
  234. // Pick up previously unseen iterators.
  235. for (auto it = std::next(iters_.begin(), reverse_iter_.UnusedIdx());
  236. it != iters_.end(); ++it, reverse_iter_.IncUnusedIdx()) {
  237. auto& iter = *it;
  238. reverse_iter_.AddNewIter(iter.get(), parsed);
  239. }
  240. return reverse_iter_.ShouldDelete(parsed);
  241. default:
  242. assert(false);
  243. return false;
  244. }
  245. }
  246. bool RangeDelAggregator::StripeRep::IsRangeOverlapped(const Slice& start,
  247. const Slice& end) {
  248. Invalidate();
  249. // Set the internal start/end keys so that:
  250. // - if start_ikey has the same user key and sequence number as the
  251. // current end key, start_ikey will be considered greater; and
  252. // - if end_ikey has the same user key and sequence number as the current
  253. // start key, end_ikey will be considered greater.
  254. ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
  255. static_cast<ValueType>(0));
  256. ParsedInternalKey end_ikey(end, 0, static_cast<ValueType>(0));
  257. for (auto& iter : iters_) {
  258. bool checked_candidate_tombstones = false;
  259. for (iter->SeekForPrev(start);
  260. iter->Valid() && icmp_->Compare(iter->start_key(), end_ikey) <= 0;
  261. iter->Next()) {
  262. checked_candidate_tombstones = true;
  263. if (icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
  264. icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
  265. return true;
  266. }
  267. }
  268. if (!checked_candidate_tombstones) {
  269. // Do an additional check for when the end of the range is the begin
  270. // key of a tombstone, which we missed earlier since SeekForPrev'ing
  271. // to the start was invalid.
  272. iter->SeekForPrev(end);
  273. if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
  274. icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
  275. return true;
  276. }
  277. }
  278. }
  279. return false;
  280. }
  281. void ReadRangeDelAggregator::AddTombstones(
  282. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  283. const InternalKey* smallest, const InternalKey* largest) {
  284. if (input_iter == nullptr || input_iter->empty()) {
  285. return;
  286. }
  287. rep_.AddTombstones(
  288. std::unique_ptr<TruncatedRangeDelIterator>(new TruncatedRangeDelIterator(
  289. std::move(input_iter), icmp_, smallest, largest)));
  290. }
  291. bool ReadRangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed,
  292. RangeDelPositioningMode mode) {
  293. return rep_.ShouldDelete(parsed, mode);
  294. }
  295. bool ReadRangeDelAggregator::IsRangeOverlapped(const Slice& start,
  296. const Slice& end) {
  297. InvalidateRangeDelMapPositions();
  298. return rep_.IsRangeOverlapped(start, end);
  299. }
  300. void CompactionRangeDelAggregator::AddTombstones(
  301. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  302. const InternalKey* smallest, const InternalKey* largest) {
  303. if (input_iter == nullptr || input_iter->empty()) {
  304. return;
  305. }
  306. assert(input_iter->lower_bound() == 0);
  307. assert(input_iter->upper_bound() == kMaxSequenceNumber);
  308. parent_iters_.emplace_back(new TruncatedRangeDelIterator(
  309. std::move(input_iter), icmp_, smallest, largest));
  310. auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_);
  311. for (auto& split_iter : split_iters) {
  312. auto it = reps_.find(split_iter.first);
  313. if (it == reps_.end()) {
  314. bool inserted;
  315. SequenceNumber upper_bound = split_iter.second->upper_bound();
  316. SequenceNumber lower_bound = split_iter.second->lower_bound();
  317. std::tie(it, inserted) = reps_.emplace(
  318. split_iter.first, StripeRep(icmp_, upper_bound, lower_bound));
  319. assert(inserted);
  320. }
  321. assert(it != reps_.end());
  322. it->second.AddTombstones(std::move(split_iter.second));
  323. }
  324. }
  325. bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
  326. RangeDelPositioningMode mode) {
  327. auto it = reps_.lower_bound(parsed.sequence);
  328. if (it == reps_.end()) {
  329. return false;
  330. }
  331. return it->second.ShouldDelete(parsed, mode);
  332. }
  333. namespace {
  334. class TruncatedRangeDelMergingIter : public InternalIterator {
  335. public:
  336. TruncatedRangeDelMergingIter(
  337. const InternalKeyComparator* icmp, const Slice* lower_bound,
  338. const Slice* upper_bound, bool upper_bound_inclusive,
  339. const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children)
  340. : icmp_(icmp),
  341. lower_bound_(lower_bound),
  342. upper_bound_(upper_bound),
  343. upper_bound_inclusive_(upper_bound_inclusive),
  344. heap_(StartKeyMinComparator(icmp)) {
  345. for (auto& child : children) {
  346. if (child != nullptr) {
  347. assert(child->lower_bound() == 0);
  348. assert(child->upper_bound() == kMaxSequenceNumber);
  349. children_.push_back(child.get());
  350. }
  351. }
  352. }
  353. bool Valid() const override {
  354. return !heap_.empty() && BeforeEndKey(heap_.top());
  355. }
  356. Status status() const override { return Status::OK(); }
  357. void SeekToFirst() override {
  358. heap_.clear();
  359. for (auto& child : children_) {
  360. if (lower_bound_ != nullptr) {
  361. child->Seek(*lower_bound_);
  362. } else {
  363. child->SeekToFirst();
  364. }
  365. if (child->Valid()) {
  366. heap_.push(child);
  367. }
  368. }
  369. }
  370. void Next() override {
  371. auto* top = heap_.top();
  372. top->InternalNext();
  373. if (top->Valid()) {
  374. heap_.replace_top(top);
  375. } else {
  376. heap_.pop();
  377. }
  378. }
  379. Slice key() const override {
  380. auto* top = heap_.top();
  381. cur_start_key_.Set(top->start_key().user_key, top->seq(),
  382. kTypeRangeDeletion);
  383. return cur_start_key_.Encode();
  384. }
  385. Slice value() const override {
  386. auto* top = heap_.top();
  387. assert(top->end_key().sequence == kMaxSequenceNumber);
  388. return top->end_key().user_key;
  389. }
  390. // Unused InternalIterator methods
  391. void Prev() override { assert(false); }
  392. void Seek(const Slice& /* target */) override { assert(false); }
  393. void SeekForPrev(const Slice& /* target */) override { assert(false); }
  394. void SeekToLast() override { assert(false); }
  395. private:
  396. bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const {
  397. if (upper_bound_ == nullptr) {
  398. return true;
  399. }
  400. int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key,
  401. *upper_bound_);
  402. return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0;
  403. }
  404. const InternalKeyComparator* icmp_;
  405. const Slice* lower_bound_;
  406. const Slice* upper_bound_;
  407. bool upper_bound_inclusive_;
  408. BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_;
  409. std::vector<TruncatedRangeDelIterator*> children_;
  410. mutable InternalKey cur_start_key_;
  411. };
  412. } // namespace
  413. std::unique_ptr<FragmentedRangeTombstoneIterator>
  414. CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound,
  415. const Slice* upper_bound,
  416. bool upper_bound_inclusive) {
  417. InvalidateRangeDelMapPositions();
  418. std::unique_ptr<TruncatedRangeDelMergingIter> merging_iter(
  419. new TruncatedRangeDelMergingIter(icmp_, lower_bound, upper_bound,
  420. upper_bound_inclusive, parent_iters_));
  421. auto fragmented_tombstone_list =
  422. std::make_shared<FragmentedRangeTombstoneList>(
  423. std::move(merging_iter), *icmp_, true /* for_compaction */,
  424. *snapshots_);
  425. return std::unique_ptr<FragmentedRangeTombstoneIterator>(
  426. new FragmentedRangeTombstoneIterator(
  427. fragmented_tombstone_list, *icmp_,
  428. kMaxSequenceNumber /* upper_bound */));
  429. }
  430. } // namespace ROCKSDB_NAMESPACE