compaction_iterator.h 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <deque>
  8. #include <string>
  9. #include <unordered_set>
  10. #include <vector>
  11. #include "db/compaction/compaction.h"
  12. #include "db/compaction/compaction_iteration_stats.h"
  13. #include "db/merge_helper.h"
  14. #include "db/pinned_iterators_manager.h"
  15. #include "db/range_del_aggregator.h"
  16. #include "db/snapshot_checker.h"
  17. #include "options/cf_options.h"
  18. #include "rocksdb/compaction_filter.h"
  19. namespace ROCKSDB_NAMESPACE {
  20. class CompactionIterator {
  21. public:
  22. // A wrapper around Compaction. Has a much smaller interface, only what
  23. // CompactionIterator uses. Tests can override it.
  24. class CompactionProxy {
  25. public:
  26. explicit CompactionProxy(const Compaction* compaction)
  27. : compaction_(compaction) {}
  28. virtual ~CompactionProxy() = default;
  29. virtual int level(size_t /*compaction_input_level*/ = 0) const {
  30. return compaction_->level();
  31. }
  32. virtual bool KeyNotExistsBeyondOutputLevel(
  33. const Slice& user_key, std::vector<size_t>* level_ptrs) const {
  34. return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
  35. }
  36. virtual bool bottommost_level() const {
  37. return compaction_->bottommost_level();
  38. }
  39. virtual int number_levels() const { return compaction_->number_levels(); }
  40. virtual Slice GetLargestUserKey() const {
  41. return compaction_->GetLargestUserKey();
  42. }
  43. virtual bool allow_ingest_behind() const {
  44. return compaction_->immutable_cf_options()->allow_ingest_behind;
  45. }
  46. virtual bool preserve_deletes() const {
  47. return compaction_->immutable_cf_options()->preserve_deletes;
  48. }
  49. protected:
  50. CompactionProxy() = default;
  51. private:
  52. const Compaction* compaction_;
  53. };
  54. CompactionIterator(
  55. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  56. SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
  57. SequenceNumber earliest_write_conflict_snapshot,
  58. const SnapshotChecker* snapshot_checker, Env* env,
  59. bool report_detailed_time, bool expect_valid_internal_key,
  60. CompactionRangeDelAggregator* range_del_agg,
  61. const Compaction* compaction = nullptr,
  62. const CompactionFilter* compaction_filter = nullptr,
  63. const std::atomic<bool>* shutting_down = nullptr,
  64. const SequenceNumber preserve_deletes_seqnum = 0,
  65. const std::atomic<bool>* manual_compaction_paused = nullptr,
  66. const std::shared_ptr<Logger> info_log = nullptr);
  67. // Constructor with custom CompactionProxy, used for tests.
  68. CompactionIterator(
  69. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  70. SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
  71. SequenceNumber earliest_write_conflict_snapshot,
  72. const SnapshotChecker* snapshot_checker, Env* env,
  73. bool report_detailed_time, bool expect_valid_internal_key,
  74. CompactionRangeDelAggregator* range_del_agg,
  75. std::unique_ptr<CompactionProxy> compaction,
  76. const CompactionFilter* compaction_filter = nullptr,
  77. const std::atomic<bool>* shutting_down = nullptr,
  78. const SequenceNumber preserve_deletes_seqnum = 0,
  79. const std::atomic<bool>* manual_compaction_paused = nullptr,
  80. const std::shared_ptr<Logger> info_log = nullptr);
  81. ~CompactionIterator();
  82. void ResetRecordCounts();
  83. // Seek to the beginning of the compaction iterator output.
  84. //
  85. // REQUIRED: Call only once.
  86. void SeekToFirst();
  87. // Produces the next record in the compaction.
  88. //
  89. // REQUIRED: SeekToFirst() has been called.
  90. void Next();
  91. // Getters
  92. const Slice& key() const { return key_; }
  93. const Slice& value() const { return value_; }
  94. const Status& status() const { return status_; }
  95. const ParsedInternalKey& ikey() const { return ikey_; }
  96. bool Valid() const { return valid_; }
  97. const Slice& user_key() const { return current_user_key_; }
  98. const CompactionIterationStats& iter_stats() const { return iter_stats_; }
  99. private:
  100. // Processes the input stream to find the next output
  101. void NextFromInput();
  102. // Do last preparations before presenting the output to the callee. At this
  103. // point this only zeroes out the sequence number if possible for better
  104. // compression.
  105. void PrepareOutput();
  106. // Invoke compaction filter if needed.
  107. void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
  108. // Given a sequence number, return the sequence number of the
  109. // earliest snapshot that this sequence number is visible in.
  110. // The snapshots themselves are arranged in ascending order of
  111. // sequence numbers.
  112. // Employ a sequential search because the total number of
  113. // snapshots are typically small.
  114. inline SequenceNumber findEarliestVisibleSnapshot(
  115. SequenceNumber in, SequenceNumber* prev_snapshot);
  116. // Checks whether the currently seen ikey_ is needed for
  117. // incremental (differential) snapshot and hence can't be dropped
  118. // or seqnum be zero-ed out even if all other conditions for it are met.
  119. inline bool ikeyNotNeededForIncrementalSnapshot();
  120. inline bool KeyCommitted(SequenceNumber sequence) {
  121. return snapshot_checker_ == nullptr ||
  122. snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
  123. SnapshotCheckerResult::kInSnapshot;
  124. }
  125. bool IsInEarliestSnapshot(SequenceNumber sequence);
  126. InternalIterator* input_;
  127. const Comparator* cmp_;
  128. MergeHelper* merge_helper_;
  129. const std::vector<SequenceNumber>* snapshots_;
  130. // List of snapshots released during compaction.
  131. // findEarliestVisibleSnapshot() find them out from return of
  132. // snapshot_checker, and make sure they will not be returned as
  133. // earliest visible snapshot of an older value.
  134. // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
  135. std::unordered_set<SequenceNumber> released_snapshots_;
  136. std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
  137. const SequenceNumber earliest_write_conflict_snapshot_;
  138. const SnapshotChecker* const snapshot_checker_;
  139. Env* env_;
  140. bool report_detailed_time_;
  141. bool expect_valid_internal_key_;
  142. CompactionRangeDelAggregator* range_del_agg_;
  143. std::unique_ptr<CompactionProxy> compaction_;
  144. const CompactionFilter* compaction_filter_;
  145. const std::atomic<bool>* shutting_down_;
  146. const std::atomic<bool>* manual_compaction_paused_;
  147. const SequenceNumber preserve_deletes_seqnum_;
  148. bool bottommost_level_;
  149. bool valid_ = false;
  150. bool visible_at_tip_;
  151. SequenceNumber earliest_snapshot_;
  152. SequenceNumber latest_snapshot_;
  153. // State
  154. //
  155. // Points to a copy of the current compaction iterator output (current_key_)
  156. // if valid_.
  157. Slice key_;
  158. // Points to the value in the underlying iterator that corresponds to the
  159. // current output.
  160. Slice value_;
  161. // The status is OK unless compaction iterator encounters a merge operand
  162. // while not having a merge operator defined.
  163. Status status_;
  164. // Stores the user key, sequence number and type of the current compaction
  165. // iterator output (or current key in the underlying iterator during
  166. // NextFromInput()).
  167. ParsedInternalKey ikey_;
  168. // Stores whether ikey_.user_key is valid. If set to false, the user key is
  169. // not compared against the current key in the underlying iterator.
  170. bool has_current_user_key_ = false;
  171. bool at_next_ = false; // If false, the iterator
  172. // Holds a copy of the current compaction iterator output (or current key in
  173. // the underlying iterator during NextFromInput()).
  174. IterKey current_key_;
  175. Slice current_user_key_;
  176. SequenceNumber current_user_key_sequence_;
  177. SequenceNumber current_user_key_snapshot_;
  178. // True if the iterator has already returned a record for the current key.
  179. bool has_outputted_key_ = false;
  180. // truncated the value of the next key and output it without applying any
  181. // compaction rules. This is used for outputting a put after a single delete.
  182. bool clear_and_output_next_key_ = false;
  183. MergeOutputIterator merge_out_iter_;
  184. // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
  185. // merge operands and then releasing them after consuming them.
  186. PinnedIteratorsManager pinned_iters_mgr_;
  187. std::string compaction_filter_value_;
  188. InternalKey compaction_filter_skip_until_;
  189. // "level_ptrs" holds indices that remember which file of an associated
  190. // level we were last checking during the last call to compaction->
  191. // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
  192. // to pick off where it left off since each subcompaction's key range is
  193. // increasing so a later call to the function must be looking for a key that
  194. // is in or beyond the last file checked during the previous call
  195. std::vector<size_t> level_ptrs_;
  196. CompactionIterationStats iter_stats_;
  197. // Used to avoid purging uncommitted values. The application can specify
  198. // uncommitted values by providing a SnapshotChecker object.
  199. bool current_key_committed_;
  200. std::shared_ptr<Logger> info_log_;
  201. bool IsShuttingDown() {
  202. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  203. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
  204. }
  205. bool IsPausingManualCompaction() {
  206. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  207. return manual_compaction_paused_ &&
  208. manual_compaction_paused_->load(std::memory_order_relaxed);
  209. }
  210. };
  211. } // namespace ROCKSDB_NAMESPACE