compaction_merging_iterator.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Copyright (c) Meta Platforms, Inc. and affiliates.
  2. //
  3. // This source code is licensed under both the GPLv2 (found in the
  4. // COPYING file in the root directory) and Apache 2.0 License
  5. // (found in the LICENSE.Apache file in the root directory).
  6. #include "table/compaction_merging_iterator.h"
  7. #include "db/internal_stats.h"
  8. namespace ROCKSDB_NAMESPACE {
  9. class CompactionMergingIterator : public InternalIterator {
  10. public:
  11. CompactionMergingIterator(
  12. const InternalKeyComparator* comparator, InternalIterator** children,
  13. int n, bool is_arena_mode,
  14. std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
  15. std::unique_ptr<TruncatedRangeDelIterator>**>>&
  16. range_tombstones,
  17. InternalStats* internal_stats)
  18. : is_arena_mode_(is_arena_mode),
  19. comparator_(comparator),
  20. current_(nullptr),
  21. minHeap_(CompactionHeapItemComparator(comparator_)),
  22. pinned_iters_mgr_(nullptr),
  23. internal_stats_(internal_stats),
  24. num_sorted_runs_recorded_(0) {
  25. children_.resize(n);
  26. for (int i = 0; i < n; i++) {
  27. children_[i].level = i;
  28. children_[i].iter.Set(children[i]);
  29. assert(children_[i].type == HeapItem::ITERATOR);
  30. }
  31. assert(range_tombstones.size() == static_cast<size_t>(n));
  32. for (auto& p : range_tombstones) {
  33. range_tombstone_iters_.push_back(std::move(p.first));
  34. }
  35. pinned_heap_item_.resize(n);
  36. for (int i = 0; i < n; ++i) {
  37. if (range_tombstones[i].second) {
  38. // for LevelIterator
  39. *range_tombstones[i].second = &range_tombstone_iters_[i];
  40. }
  41. pinned_heap_item_[i].level = i;
  42. pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
  43. }
  44. if (internal_stats_) {
  45. TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats");
  46. // The size of children_ or range_tombstone_iters_ (n) should not change
  47. // but to be safe, we can record the size here so we decrement by the
  48. // correct amount at destruction time
  49. num_sorted_runs_recorded_ = n;
  50. internal_stats_->IncrNumRunningCompactionSortedRuns(
  51. num_sorted_runs_recorded_);
  52. assert(num_sorted_runs_recorded_ <=
  53. internal_stats_->NumRunningCompactionSortedRuns());
  54. }
  55. }
  56. void considerStatus(const Status& s) {
  57. if (!s.ok() && status_.ok()) {
  58. status_ = s;
  59. }
  60. }
  61. ~CompactionMergingIterator() override {
  62. if (internal_stats_) {
  63. assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size());
  64. assert(num_sorted_runs_recorded_ <=
  65. internal_stats_->NumRunningCompactionSortedRuns());
  66. internal_stats_->DecrNumRunningCompactionSortedRuns(
  67. num_sorted_runs_recorded_);
  68. }
  69. range_tombstone_iters_.clear();
  70. for (auto& child : children_) {
  71. child.iter.DeleteIter(is_arena_mode_);
  72. }
  73. status_.PermitUncheckedError();
  74. }
  75. bool Valid() const override { return current_ != nullptr && status_.ok(); }
  76. Status status() const override { return status_; }
  77. void SeekToFirst() override;
  78. void Seek(const Slice& target) override;
  79. void Next() override;
  80. Slice key() const override {
  81. assert(Valid());
  82. return current_->key();
  83. }
  84. Slice value() const override {
  85. assert(Valid());
  86. if (LIKELY(current_->type == HeapItem::ITERATOR)) {
  87. return current_->iter.value();
  88. } else {
  89. return dummy_tombstone_val;
  90. }
  91. }
  92. // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
  93. // from current child iterator. Potentially as long as one of child iterator
  94. // report out of bound is not possible, we know current key is within bound.
  95. bool MayBeOutOfLowerBound() override {
  96. assert(Valid());
  97. return current_->type == HeapItem::DELETE_RANGE_START ||
  98. current_->iter.MayBeOutOfLowerBound();
  99. }
  100. IterBoundCheck UpperBoundCheckResult() override {
  101. assert(Valid());
  102. return current_->type == HeapItem::DELETE_RANGE_START
  103. ? IterBoundCheck::kUnknown
  104. : current_->iter.UpperBoundCheckResult();
  105. }
  106. void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
  107. pinned_iters_mgr_ = pinned_iters_mgr;
  108. for (auto& child : children_) {
  109. child.iter.SetPinnedItersMgr(pinned_iters_mgr);
  110. }
  111. }
  112. bool IsDeleteRangeSentinelKey() const override {
  113. assert(Valid());
  114. return current_->type == HeapItem::DELETE_RANGE_START;
  115. }
  116. // Compaction uses the above subset of InternalIterator interface.
  117. void SeekToLast() override { assert(false); }
  118. void SeekForPrev(const Slice&) override { assert(false); }
  119. void Prev() override { assert(false); }
  120. bool NextAndGetResult(IterateResult*) override {
  121. assert(false);
  122. return false;
  123. }
  124. bool IsKeyPinned() const override {
  125. assert(false);
  126. return false;
  127. }
  128. bool IsValuePinned() const override {
  129. assert(false);
  130. return false;
  131. }
  132. bool PrepareValue() override {
  133. assert(false);
  134. return false;
  135. }
  136. private:
  137. struct HeapItem {
  138. HeapItem() = default;
  139. IteratorWrapper iter;
  140. size_t level = 0;
  141. std::string tombstone_str;
  142. enum Type { ITERATOR, DELETE_RANGE_START };
  143. Type type = ITERATOR;
  144. explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
  145. : level(_level), type(Type::ITERATOR) {
  146. iter.Set(_iter);
  147. }
  148. void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
  149. tombstone_str.clear();
  150. AppendInternalKey(&tombstone_str, pik);
  151. }
  152. [[nodiscard]] Slice key() const {
  153. return type == ITERATOR ? iter.key() : tombstone_str;
  154. }
  155. };
  156. class CompactionHeapItemComparator {
  157. public:
  158. explicit CompactionHeapItemComparator(
  159. const InternalKeyComparator* comparator)
  160. : comparator_(comparator) {}
  161. bool operator()(HeapItem* a, HeapItem* b) const {
  162. int r = comparator_->Compare(a->key(), b->key());
  163. // For each file, we assume all range tombstone start keys come before
  164. // its file boundary sentinel key (file's meta.largest key).
  165. // In the case when meta.smallest = meta.largest and range tombstone start
  166. // key is truncated at meta.smallest, the start key will have op_type =
  167. // kMaxValid to make it smaller (see TruncatedRangeDelIterator
  168. // constructor). The following assertion validates this assumption.
  169. assert(a->type == b->type || r != 0);
  170. return r > 0;
  171. }
  172. private:
  173. const InternalKeyComparator* comparator_;
  174. };
  175. using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
  176. bool is_arena_mode_;
  177. const InternalKeyComparator* comparator_;
  178. // HeapItem for all child point iterators.
  179. std::vector<HeapItem> children_;
  180. // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
  181. // current range tombstone from range_tombstone_iters_[i].
  182. std::vector<HeapItem> pinned_heap_item_;
  183. // range_tombstone_iters_[i] contains range tombstones in the sorted run that
  184. // corresponds to children_[i]. range_tombstone_iters_[i] ==
  185. // nullptr means the sorted run of children_[i] does not have range
  186. // tombstones (or the current SSTable does not have range tombstones in the
  187. // case of LevelIterator).
  188. std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
  189. range_tombstone_iters_;
  190. // Used as value for range tombstone keys
  191. std::string dummy_tombstone_val{};
  192. // Skip file boundary sentinel keys.
  193. void FindNextVisibleKey();
  194. // top of minHeap_
  195. HeapItem* current_;
  196. // If any of the children have non-ok status, this is one of them.
  197. Status status_;
  198. CompactionMinHeap minHeap_;
  199. PinnedIteratorsManager* pinned_iters_mgr_;
  200. InternalStats* internal_stats_;
  201. uint64_t num_sorted_runs_recorded_;
  202. // Process a child that is not in the min heap.
  203. // If valid, add to the min heap. Otherwise, check status.
  204. void AddToMinHeapOrCheckStatus(HeapItem*);
  205. HeapItem* CurrentForward() const {
  206. return !minHeap_.empty() ? minHeap_.top() : nullptr;
  207. }
  208. void InsertRangeTombstoneAtLevel(size_t level) {
  209. if (range_tombstone_iters_[level]->Valid()) {
  210. pinned_heap_item_[level].SetTombstoneForCompaction(
  211. range_tombstone_iters_[level]->start_key());
  212. minHeap_.push(&pinned_heap_item_[level]);
  213. }
  214. }
  215. };
  216. void CompactionMergingIterator::SeekToFirst() {
  217. minHeap_.clear();
  218. status_ = Status::OK();
  219. for (auto& child : children_) {
  220. child.iter.SeekToFirst();
  221. AddToMinHeapOrCheckStatus(&child);
  222. }
  223. for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
  224. if (range_tombstone_iters_[i]) {
  225. range_tombstone_iters_[i]->SeekToFirst();
  226. InsertRangeTombstoneAtLevel(i);
  227. }
  228. }
  229. FindNextVisibleKey();
  230. current_ = CurrentForward();
  231. }
  232. void CompactionMergingIterator::Seek(const Slice& target) {
  233. minHeap_.clear();
  234. status_ = Status::OK();
  235. for (auto& child : children_) {
  236. child.iter.Seek(target);
  237. AddToMinHeapOrCheckStatus(&child);
  238. }
  239. ParsedInternalKey pik;
  240. ParseInternalKey(target, &pik, false /* log_err_key */)
  241. .PermitUncheckedError();
  242. for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
  243. if (range_tombstone_iters_[i]) {
  244. range_tombstone_iters_[i]->Seek(pik.user_key);
  245. // For compaction, output keys should all be after seek target.
  246. while (range_tombstone_iters_[i]->Valid() &&
  247. comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
  248. 0) {
  249. range_tombstone_iters_[i]->Next();
  250. }
  251. InsertRangeTombstoneAtLevel(i);
  252. }
  253. }
  254. FindNextVisibleKey();
  255. current_ = CurrentForward();
  256. }
  257. void CompactionMergingIterator::Next() {
  258. assert(Valid());
  259. // For the heap modifications below to be correct, current_ must be the
  260. // current top of the heap.
  261. assert(current_ == CurrentForward());
  262. // as the current points to the current record. move the iterator forward.
  263. if (current_->type == HeapItem::ITERATOR) {
  264. current_->iter.Next();
  265. if (current_->iter.Valid()) {
  266. // current is still valid after the Next() call above. Call
  267. // replace_top() to restore the heap property. When the same child
  268. // iterator yields a sequence of keys, this is cheap.
  269. assert(current_->iter.status().ok());
  270. minHeap_.replace_top(current_);
  271. } else {
  272. // current stopped being valid, remove it from the heap.
  273. considerStatus(current_->iter.status());
  274. minHeap_.pop();
  275. }
  276. } else {
  277. assert(current_->type == HeapItem::DELETE_RANGE_START);
  278. size_t level = current_->level;
  279. assert(range_tombstone_iters_[level]);
  280. range_tombstone_iters_[level]->Next();
  281. if (range_tombstone_iters_[level]->Valid()) {
  282. pinned_heap_item_[level].SetTombstoneForCompaction(
  283. range_tombstone_iters_[level]->start_key());
  284. minHeap_.replace_top(&pinned_heap_item_[level]);
  285. } else {
  286. minHeap_.pop();
  287. }
  288. }
  289. FindNextVisibleKey();
  290. current_ = CurrentForward();
  291. }
  292. void CompactionMergingIterator::FindNextVisibleKey() {
  293. while (!minHeap_.empty()) {
  294. HeapItem* current = minHeap_.top();
  295. // IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
  296. if (current->type != HeapItem::ITERATOR ||
  297. !current->iter.IsDeleteRangeSentinelKey()) {
  298. return;
  299. }
  300. // range tombstone start keys from the same SSTable should have been
  301. // exhausted
  302. assert(!range_tombstone_iters_[current->level] ||
  303. !range_tombstone_iters_[current->level]->Valid());
  304. // current->iter is a LevelIterator, and it enters a new SST file in the
  305. // Next() call here.
  306. current->iter.Next();
  307. if (current->iter.Valid()) {
  308. assert(current->iter.status().ok());
  309. minHeap_.replace_top(current);
  310. } else {
  311. considerStatus(current->iter.status());
  312. minHeap_.pop();
  313. }
  314. if (range_tombstone_iters_[current->level]) {
  315. InsertRangeTombstoneAtLevel(current->level);
  316. }
  317. }
  318. }
  319. void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
  320. if (child->iter.Valid()) {
  321. assert(child->iter.status().ok());
  322. minHeap_.push(child);
  323. } else {
  324. considerStatus(child->iter.status());
  325. }
  326. }
  327. InternalIterator* NewCompactionMergingIterator(
  328. const InternalKeyComparator* comparator, InternalIterator** children, int n,
  329. std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
  330. std::unique_ptr<TruncatedRangeDelIterator>**>>&
  331. range_tombstone_iters,
  332. Arena* arena, InternalStats* stats) {
  333. assert(n >= 0);
  334. if (n == 0) {
  335. return NewEmptyInternalIterator<Slice>(arena);
  336. } else {
  337. if (arena == nullptr) {
  338. return new CompactionMergingIterator(comparator, children, n,
  339. false /* is_arena_mode */,
  340. range_tombstone_iters, stats);
  341. } else {
  342. auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
  343. return new (mem) CompactionMergingIterator(comparator, children, n,
  344. true /* is_arena_mode */,
  345. range_tombstone_iters, stats);
  346. }
  347. }
  348. }
  349. } // namespace ROCKSDB_NAMESPACE