range_del_aggregator.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <iterator>
  8. #include <list>
  9. #include <map>
  10. #include <set>
  11. #include <string>
  12. #include <vector>
  13. #include "db/compaction/compaction_iteration_stats.h"
  14. #include "db/dbformat.h"
  15. #include "db/pinned_iterators_manager.h"
  16. #include "db/range_del_aggregator.h"
  17. #include "db/range_tombstone_fragmenter.h"
  18. #include "db/version_edit.h"
  19. #include "rocksdb/comparator.h"
  20. #include "rocksdb/types.h"
  21. #include "table/internal_iterator.h"
  22. #include "table/scoped_arena_iterator.h"
  23. #include "table/table_builder.h"
  24. #include "util/heap.h"
  25. #include "util/kv_map.h"
  26. namespace ROCKSDB_NAMESPACE {
  27. class TruncatedRangeDelIterator {
  28. public:
  29. TruncatedRangeDelIterator(
  30. std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
  31. const InternalKeyComparator* icmp, const InternalKey* smallest,
  32. const InternalKey* largest);
  33. bool Valid() const;
  34. void Next();
  35. void Prev();
  36. void InternalNext();
  37. // Seeks to the tombstone with the highest viisble sequence number that covers
  38. // target (a user key). If no such tombstone exists, the position will be at
  39. // the earliest tombstone that ends after target.
  40. void Seek(const Slice& target);
  41. // Seeks to the tombstone with the highest viisble sequence number that covers
  42. // target (a user key). If no such tombstone exists, the position will be at
  43. // the latest tombstone that starts before target.
  44. void SeekForPrev(const Slice& target);
  45. void SeekToFirst();
  46. void SeekToLast();
  47. ParsedInternalKey start_key() const {
  48. return (smallest_ == nullptr ||
  49. icmp_->Compare(*smallest_, iter_->parsed_start_key()) <= 0)
  50. ? iter_->parsed_start_key()
  51. : *smallest_;
  52. }
  53. ParsedInternalKey end_key() const {
  54. return (largest_ == nullptr ||
  55. icmp_->Compare(iter_->parsed_end_key(), *largest_) <= 0)
  56. ? iter_->parsed_end_key()
  57. : *largest_;
  58. }
  59. SequenceNumber seq() const { return iter_->seq(); }
  60. std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
  61. SplitBySnapshot(const std::vector<SequenceNumber>& snapshots);
  62. SequenceNumber upper_bound() const { return iter_->upper_bound(); }
  63. SequenceNumber lower_bound() const { return iter_->lower_bound(); }
  64. private:
  65. std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
  66. const InternalKeyComparator* icmp_;
  67. const ParsedInternalKey* smallest_ = nullptr;
  68. const ParsedInternalKey* largest_ = nullptr;
  69. std::list<ParsedInternalKey> pinned_bounds_;
  70. const InternalKey* smallest_ikey_;
  71. const InternalKey* largest_ikey_;
  72. };
  73. struct SeqMaxComparator {
  74. bool operator()(const TruncatedRangeDelIterator* a,
  75. const TruncatedRangeDelIterator* b) const {
  76. return a->seq() > b->seq();
  77. }
  78. };
  79. struct StartKeyMinComparator {
  80. explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
  81. bool operator()(const TruncatedRangeDelIterator* a,
  82. const TruncatedRangeDelIterator* b) const {
  83. return icmp->Compare(a->start_key(), b->start_key()) > 0;
  84. }
  85. const InternalKeyComparator* icmp;
  86. };
  87. class ForwardRangeDelIterator {
  88. public:
  89. explicit ForwardRangeDelIterator(const InternalKeyComparator* icmp);
  90. bool ShouldDelete(const ParsedInternalKey& parsed);
  91. void Invalidate();
  92. void AddNewIter(TruncatedRangeDelIterator* iter,
  93. const ParsedInternalKey& parsed) {
  94. iter->Seek(parsed.user_key);
  95. PushIter(iter, parsed);
  96. assert(active_iters_.size() == active_seqnums_.size());
  97. }
  98. size_t UnusedIdx() const { return unused_idx_; }
  99. void IncUnusedIdx() { unused_idx_++; }
  100. private:
  101. using ActiveSeqSet =
  102. std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
  103. struct EndKeyMinComparator {
  104. explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
  105. bool operator()(const ActiveSeqSet::const_iterator& a,
  106. const ActiveSeqSet::const_iterator& b) const {
  107. return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0;
  108. }
  109. const InternalKeyComparator* icmp;
  110. };
  111. void PushIter(TruncatedRangeDelIterator* iter,
  112. const ParsedInternalKey& parsed) {
  113. if (!iter->Valid()) {
  114. // The iterator has been fully consumed, so we don't need to add it to
  115. // either of the heaps.
  116. return;
  117. }
  118. int cmp = icmp_->Compare(parsed, iter->start_key());
  119. if (cmp < 0) {
  120. PushInactiveIter(iter);
  121. } else {
  122. PushActiveIter(iter);
  123. }
  124. }
  125. void PushActiveIter(TruncatedRangeDelIterator* iter) {
  126. auto seq_pos = active_seqnums_.insert(iter);
  127. active_iters_.push(seq_pos);
  128. }
  129. TruncatedRangeDelIterator* PopActiveIter() {
  130. auto active_top = active_iters_.top();
  131. auto iter = *active_top;
  132. active_iters_.pop();
  133. active_seqnums_.erase(active_top);
  134. return iter;
  135. }
  136. void PushInactiveIter(TruncatedRangeDelIterator* iter) {
  137. inactive_iters_.push(iter);
  138. }
  139. TruncatedRangeDelIterator* PopInactiveIter() {
  140. auto* iter = inactive_iters_.top();
  141. inactive_iters_.pop();
  142. return iter;
  143. }
  144. const InternalKeyComparator* icmp_;
  145. size_t unused_idx_;
  146. ActiveSeqSet active_seqnums_;
  147. BinaryHeap<ActiveSeqSet::const_iterator, EndKeyMinComparator> active_iters_;
  148. BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> inactive_iters_;
  149. };
  150. class ReverseRangeDelIterator {
  151. public:
  152. explicit ReverseRangeDelIterator(const InternalKeyComparator* icmp);
  153. bool ShouldDelete(const ParsedInternalKey& parsed);
  154. void Invalidate();
  155. void AddNewIter(TruncatedRangeDelIterator* iter,
  156. const ParsedInternalKey& parsed) {
  157. iter->SeekForPrev(parsed.user_key);
  158. PushIter(iter, parsed);
  159. assert(active_iters_.size() == active_seqnums_.size());
  160. }
  161. size_t UnusedIdx() const { return unused_idx_; }
  162. void IncUnusedIdx() { unused_idx_++; }
  163. private:
  164. using ActiveSeqSet =
  165. std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
  166. struct EndKeyMaxComparator {
  167. explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
  168. bool operator()(const TruncatedRangeDelIterator* a,
  169. const TruncatedRangeDelIterator* b) const {
  170. return icmp->Compare(a->end_key(), b->end_key()) < 0;
  171. }
  172. const InternalKeyComparator* icmp;
  173. };
  174. struct StartKeyMaxComparator {
  175. explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
  176. bool operator()(const ActiveSeqSet::const_iterator& a,
  177. const ActiveSeqSet::const_iterator& b) const {
  178. return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0;
  179. }
  180. const InternalKeyComparator* icmp;
  181. };
  182. void PushIter(TruncatedRangeDelIterator* iter,
  183. const ParsedInternalKey& parsed) {
  184. if (!iter->Valid()) {
  185. // The iterator has been fully consumed, so we don't need to add it to
  186. // either of the heaps.
  187. } else if (icmp_->Compare(iter->end_key(), parsed) <= 0) {
  188. PushInactiveIter(iter);
  189. } else {
  190. PushActiveIter(iter);
  191. }
  192. }
  193. void PushActiveIter(TruncatedRangeDelIterator* iter) {
  194. auto seq_pos = active_seqnums_.insert(iter);
  195. active_iters_.push(seq_pos);
  196. }
  197. TruncatedRangeDelIterator* PopActiveIter() {
  198. auto active_top = active_iters_.top();
  199. auto iter = *active_top;
  200. active_iters_.pop();
  201. active_seqnums_.erase(active_top);
  202. return iter;
  203. }
  204. void PushInactiveIter(TruncatedRangeDelIterator* iter) {
  205. inactive_iters_.push(iter);
  206. }
  207. TruncatedRangeDelIterator* PopInactiveIter() {
  208. auto* iter = inactive_iters_.top();
  209. inactive_iters_.pop();
  210. return iter;
  211. }
  212. const InternalKeyComparator* icmp_;
  213. size_t unused_idx_;
  214. ActiveSeqSet active_seqnums_;
  215. BinaryHeap<ActiveSeqSet::const_iterator, StartKeyMaxComparator> active_iters_;
  216. BinaryHeap<TruncatedRangeDelIterator*, EndKeyMaxComparator> inactive_iters_;
  217. };
  218. enum class RangeDelPositioningMode { kForwardTraversal, kBackwardTraversal };
  219. class RangeDelAggregator {
  220. public:
  221. explicit RangeDelAggregator(const InternalKeyComparator* icmp)
  222. : icmp_(icmp) {}
  223. virtual ~RangeDelAggregator() {}
  224. virtual void AddTombstones(
  225. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  226. const InternalKey* smallest = nullptr,
  227. const InternalKey* largest = nullptr) = 0;
  228. bool ShouldDelete(const Slice& key, RangeDelPositioningMode mode) {
  229. ParsedInternalKey parsed;
  230. if (!ParseInternalKey(key, &parsed)) {
  231. return false;
  232. }
  233. return ShouldDelete(parsed, mode);
  234. }
  235. virtual bool ShouldDelete(const ParsedInternalKey& parsed,
  236. RangeDelPositioningMode mode) = 0;
  237. virtual void InvalidateRangeDelMapPositions() = 0;
  238. virtual bool IsEmpty() const = 0;
  239. bool AddFile(uint64_t file_number) {
  240. return files_seen_.insert(file_number).second;
  241. }
  242. protected:
  243. class StripeRep {
  244. public:
  245. StripeRep(const InternalKeyComparator* icmp, SequenceNumber upper_bound,
  246. SequenceNumber lower_bound)
  247. : icmp_(icmp),
  248. forward_iter_(icmp),
  249. reverse_iter_(icmp),
  250. upper_bound_(upper_bound),
  251. lower_bound_(lower_bound) {}
  252. void AddTombstones(std::unique_ptr<TruncatedRangeDelIterator> input_iter) {
  253. iters_.push_back(std::move(input_iter));
  254. }
  255. bool IsEmpty() const { return iters_.empty(); }
  256. bool ShouldDelete(const ParsedInternalKey& parsed,
  257. RangeDelPositioningMode mode);
  258. void Invalidate() {
  259. if (!IsEmpty()) {
  260. InvalidateForwardIter();
  261. InvalidateReverseIter();
  262. }
  263. }
  264. bool IsRangeOverlapped(const Slice& start, const Slice& end);
  265. private:
  266. bool InStripe(SequenceNumber seq) const {
  267. return lower_bound_ <= seq && seq <= upper_bound_;
  268. }
  269. void InvalidateForwardIter() { forward_iter_.Invalidate(); }
  270. void InvalidateReverseIter() { reverse_iter_.Invalidate(); }
  271. const InternalKeyComparator* icmp_;
  272. std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
  273. ForwardRangeDelIterator forward_iter_;
  274. ReverseRangeDelIterator reverse_iter_;
  275. SequenceNumber upper_bound_;
  276. SequenceNumber lower_bound_;
  277. };
  278. const InternalKeyComparator* icmp_;
  279. private:
  280. std::set<uint64_t> files_seen_;
  281. };
  282. class ReadRangeDelAggregator final : public RangeDelAggregator {
  283. public:
  284. ReadRangeDelAggregator(const InternalKeyComparator* icmp,
  285. SequenceNumber upper_bound)
  286. : RangeDelAggregator(icmp),
  287. rep_(icmp, upper_bound, 0 /* lower_bound */) {}
  288. ~ReadRangeDelAggregator() override {}
  289. using RangeDelAggregator::ShouldDelete;
  290. void AddTombstones(
  291. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  292. const InternalKey* smallest = nullptr,
  293. const InternalKey* largest = nullptr) override;
  294. bool ShouldDelete(const ParsedInternalKey& parsed,
  295. RangeDelPositioningMode mode) final override {
  296. if (rep_.IsEmpty()) {
  297. return false;
  298. }
  299. return ShouldDeleteImpl(parsed, mode);
  300. }
  301. bool IsRangeOverlapped(const Slice& start, const Slice& end);
  302. void InvalidateRangeDelMapPositions() override { rep_.Invalidate(); }
  303. bool IsEmpty() const override { return rep_.IsEmpty(); }
  304. private:
  305. StripeRep rep_;
  306. bool ShouldDeleteImpl(const ParsedInternalKey& parsed,
  307. RangeDelPositioningMode mode);
  308. };
  309. class CompactionRangeDelAggregator : public RangeDelAggregator {
  310. public:
  311. CompactionRangeDelAggregator(const InternalKeyComparator* icmp,
  312. const std::vector<SequenceNumber>& snapshots)
  313. : RangeDelAggregator(icmp), snapshots_(&snapshots) {}
  314. ~CompactionRangeDelAggregator() override {}
  315. void AddTombstones(
  316. std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
  317. const InternalKey* smallest = nullptr,
  318. const InternalKey* largest = nullptr) override;
  319. using RangeDelAggregator::ShouldDelete;
  320. bool ShouldDelete(const ParsedInternalKey& parsed,
  321. RangeDelPositioningMode mode) override;
  322. bool IsRangeOverlapped(const Slice& start, const Slice& end);
  323. void InvalidateRangeDelMapPositions() override {
  324. for (auto& rep : reps_) {
  325. rep.second.Invalidate();
  326. }
  327. }
  328. bool IsEmpty() const override {
  329. for (const auto& rep : reps_) {
  330. if (!rep.second.IsEmpty()) {
  331. return false;
  332. }
  333. }
  334. return true;
  335. }
  336. // Creates an iterator over all the range tombstones in the aggregator, for
  337. // use in compaction. Nullptr arguments indicate that the iterator range is
  338. // unbounded.
  339. // NOTE: the boundaries are used for optimization purposes to reduce the
  340. // number of tombstones that are passed to the fragmenter; they do not
  341. // guarantee that the resulting iterator only contains range tombstones that
  342. // cover keys in the provided range. If required, these bounds must be
  343. // enforced during iteration.
  344. std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator(
  345. const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr,
  346. bool upper_bound_inclusive = false);
  347. private:
  348. std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_;
  349. std::map<SequenceNumber, StripeRep> reps_;
  350. const std::vector<SequenceNumber>* snapshots_;
  351. };
  352. } // namespace ROCKSDB_NAMESPACE