compaction_iterator.h 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #pragma once
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <deque>
  9. #include <string>
  10. #include <unordered_set>
  11. #include <vector>
  12. #include "db/compaction/compaction.h"
  13. #include "db/compaction/compaction_iteration_stats.h"
  14. #include "db/merge_helper.h"
  15. #include "db/pinned_iterators_manager.h"
  16. #include "db/range_del_aggregator.h"
  17. #include "db/snapshot_checker.h"
  18. #include "options/cf_options.h"
  19. #include "rocksdb/compaction_filter.h"
  20. namespace ROCKSDB_NAMESPACE {
  21. class BlobFileBuilder;
  22. class BlobFetcher;
  23. class PrefetchBufferCollection;
  24. // A wrapper of internal iterator whose purpose is to count how
  25. // many entries there are in the iterator.
  26. class SequenceIterWrapper : public InternalIterator {
  27. public:
  28. SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
  29. bool need_count_entries)
  30. : icmp_(cmp),
  31. inner_iter_(iter),
  32. need_count_entries_(need_count_entries) {}
  33. bool Valid() const override { return inner_iter_->Valid(); }
  34. Status status() const override { return inner_iter_->status(); }
  35. void Next() override {
  36. if (!inner_iter_->IsDeleteRangeSentinelKey()) {
  37. num_itered_++;
  38. }
  39. inner_iter_->Next();
  40. }
  41. void Seek(const Slice& target) override {
  42. if (!need_count_entries_) {
  43. has_num_itered_ = false;
  44. inner_iter_->Seek(target);
  45. } else {
  46. // Need to count total number of entries,
  47. // so we do Next() rather than Seek().
  48. while (inner_iter_->Valid() &&
  49. icmp_.Compare(inner_iter_->key(), target) < 0) {
  50. Next();
  51. }
  52. }
  53. }
  54. Slice key() const override { return inner_iter_->key(); }
  55. Slice value() const override { return inner_iter_->value(); }
  56. // Unused InternalIterator methods
  57. void SeekToFirst() override { assert(false); }
  58. void Prev() override { assert(false); }
  59. void SeekForPrev(const Slice& /* target */) override { assert(false); }
  60. void SeekToLast() override { assert(false); }
  61. uint64_t NumItered() const { return num_itered_; }
  62. bool HasNumItered() const { return has_num_itered_; }
  63. bool IsDeleteRangeSentinelKey() const override {
  64. assert(Valid());
  65. return inner_iter_->IsDeleteRangeSentinelKey();
  66. }
  67. private:
  68. InternalKeyComparator icmp_;
  69. InternalIterator* inner_iter_; // not owned
  70. uint64_t num_itered_ = 0;
  71. bool need_count_entries_;
  72. bool has_num_itered_ = true;
  73. };
  74. class CompactionIterator {
  75. public:
  76. // A wrapper around Compaction. Has a much smaller interface, only what
  77. // CompactionIterator uses. Tests can override it.
  78. class CompactionProxy {
  79. public:
  80. virtual ~CompactionProxy() = default;
  81. virtual int level() const = 0;
  82. virtual bool KeyNotExistsBeyondOutputLevel(
  83. const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0;
  84. virtual bool bottommost_level() const = 0;
  85. virtual int number_levels() const = 0;
  86. // Result includes timestamp if user-defined timestamp is enabled.
  87. virtual Slice GetLargestUserKey() const = 0;
  88. virtual bool allow_ingest_behind() const = 0;
  89. virtual bool allow_mmap_reads() const = 0;
  90. virtual bool enable_blob_garbage_collection() const = 0;
  91. virtual double blob_garbage_collection_age_cutoff() const = 0;
  92. virtual uint64_t blob_compaction_readahead_size() const = 0;
  93. virtual const Version* input_version() const = 0;
  94. virtual bool DoesInputReferenceBlobFiles() const = 0;
  95. virtual const Compaction* real_compaction() const = 0;
  96. virtual bool SupportsPerKeyPlacement() const = 0;
  97. };
  98. class RealCompaction : public CompactionProxy {
  99. public:
  100. explicit RealCompaction(const Compaction* compaction)
  101. : compaction_(compaction) {
  102. assert(compaction_);
  103. }
  104. int level() const override { return compaction_->level(); }
  105. bool KeyNotExistsBeyondOutputLevel(
  106. const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
  107. return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
  108. }
  109. bool bottommost_level() const override {
  110. return compaction_->bottommost_level();
  111. }
  112. int number_levels() const override { return compaction_->number_levels(); }
  113. // Result includes timestamp if user-defined timestamp is enabled.
  114. Slice GetLargestUserKey() const override {
  115. return compaction_->GetLargestUserKey();
  116. }
  117. bool allow_ingest_behind() const override {
  118. return compaction_->immutable_options().cf_allow_ingest_behind ||
  119. compaction_->immutable_options().allow_ingest_behind;
  120. }
  121. bool allow_mmap_reads() const override {
  122. return compaction_->immutable_options().allow_mmap_reads;
  123. }
  124. bool enable_blob_garbage_collection() const override {
  125. return compaction_->enable_blob_garbage_collection();
  126. }
  127. double blob_garbage_collection_age_cutoff() const override {
  128. return compaction_->blob_garbage_collection_age_cutoff();
  129. }
  130. uint64_t blob_compaction_readahead_size() const override {
  131. return compaction_->mutable_cf_options().blob_compaction_readahead_size;
  132. }
  133. const Version* input_version() const override {
  134. return compaction_->input_version();
  135. }
  136. bool DoesInputReferenceBlobFiles() const override {
  137. return compaction_->DoesInputReferenceBlobFiles();
  138. }
  139. const Compaction* real_compaction() const override { return compaction_; }
  140. bool SupportsPerKeyPlacement() const override {
  141. return compaction_->SupportsPerKeyPlacement();
  142. }
  143. private:
  144. const Compaction* compaction_;
  145. };
  146. // @param must_count_input_entries Controls input entry counting accuracy vs
  147. // performance:
  148. // - If true: `NumInputEntryScanned()` always returns the exact count of
  149. // input keys
  150. // scanned. The iterator will use sequential `Next()` calls instead of
  151. // `Seek()` to maintain count accuracy as `Seek()` will not count the
  152. // skipped input entries, which is slower but guarantees correctness.
  153. // - If false: `NumInputEntryScanned()` returns the count only if no
  154. // `Seek()` operations
  155. // were performed on the input iterator. When compaction filters request
  156. // skipping ranges of keys or other optimizations trigger seek operations,
  157. // the count becomes unreliable. Always call `HasNumInputEntryScanned()`
  158. // first to verify if the count is accurate before using
  159. // `NumInputEntryScanned()`.
  160. CompactionIterator(
  161. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  162. SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
  163. SequenceNumber earliest_snapshot,
  164. SequenceNumber earliest_write_conflict_snapshot,
  165. SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
  166. Env* env, bool report_detailed_time,
  167. CompactionRangeDelAggregator* range_del_agg,
  168. BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
  169. bool enforce_single_del_contracts,
  170. const std::atomic<bool>& manual_compaction_canceled,
  171. bool must_count_input_entries, const Compaction* compaction = nullptr,
  172. const CompactionFilter* compaction_filter = nullptr,
  173. const std::atomic<bool>* shutting_down = nullptr,
  174. const std::shared_ptr<Logger> info_log = nullptr,
  175. const std::string* full_history_ts_low = nullptr,
  176. std::optional<SequenceNumber> preserve_seqno_min = {});
  177. // Constructor with custom CompactionProxy, used for tests.
  178. CompactionIterator(InternalIterator* input, const Comparator* cmp,
  179. MergeHelper* merge_helper, SequenceNumber last_sequence,
  180. std::vector<SequenceNumber>* snapshots,
  181. SequenceNumber earliest_snapshot,
  182. SequenceNumber earliest_write_conflict_snapshot,
  183. SequenceNumber job_snapshot,
  184. const SnapshotChecker* snapshot_checker, Env* env,
  185. bool report_detailed_time,
  186. CompactionRangeDelAggregator* range_del_agg,
  187. BlobFileBuilder* blob_file_builder,
  188. bool allow_data_in_errors,
  189. bool enforce_single_del_contracts,
  190. const std::atomic<bool>& manual_compaction_canceled,
  191. std::unique_ptr<CompactionProxy> compaction,
  192. bool must_count_input_entries,
  193. const CompactionFilter* compaction_filter = nullptr,
  194. const std::atomic<bool>* shutting_down = nullptr,
  195. const std::shared_ptr<Logger> info_log = nullptr,
  196. const std::string* full_history_ts_low = nullptr,
  197. std::optional<SequenceNumber> preserve_seqno_min = {});
  198. ~CompactionIterator();
  199. void ResetRecordCounts();
  200. // Seek to the beginning of the compaction iterator output.
  201. //
  202. // REQUIRED: Call only once.
  203. void SeekToFirst();
  204. // Produces the next record in the compaction.
  205. //
  206. // REQUIRED: SeekToFirst() has been called.
  207. void Next();
  208. // Getters
  209. const Slice& key() const { return key_; }
  210. const Slice& value() const { return value_; }
  211. const Status& status() const { return status_; }
  212. const ParsedInternalKey& ikey() const { return ikey_; }
  213. inline bool Valid() const { return validity_info_.IsValid(); }
  214. const Slice& user_key() const {
  215. if (UNLIKELY(is_range_del_)) {
  216. return ikey_.user_key;
  217. }
  218. return current_user_key_;
  219. }
  220. const CompactionIterationStats& iter_stats() const { return iter_stats_; }
  221. bool HasNumInputEntryScanned() const { return input_.HasNumItered(); }
  222. // This method should only be used when `HasNumInputEntryScanned()` returns
  223. // true, unless `must_count_input_entries=true` was specified during iterator
  224. // creation (which ensures the count is always accurate).
  225. uint64_t NumInputEntryScanned() const { return input_.NumItered(); }
  226. // Returns true if the current valid key was already scanned/counted during
  227. // a lookahead operation in a previous iteration.
  228. //
  229. // REQUIRED: Valid() must be true
  230. bool IsCurrentKeyAlreadyScanned() const {
  231. assert(Valid());
  232. return at_next_ || merge_out_iter_.Valid();
  233. }
  234. Status InputStatus() const { return input_.status(); }
  235. bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
  236. private:
  237. // Processes the input stream to find the next output
  238. void NextFromInput();
  239. // Do final preparations before presenting the output to the callee.
  240. void PrepareOutput();
  241. // Passes the output value to the blob file builder (if any), and replaces it
  242. // with the corresponding blob reference if it has been actually written to a
  243. // blob file (i.e. if it passed the value size check). Returns true if the
  244. // value got extracted to a blob file, false otherwise.
  245. bool ExtractLargeValueIfNeededImpl();
  246. // Extracts large values as described above, and updates the internal key's
  247. // type to kTypeBlobIndex if the value got extracted. Should only be called
  248. // for regular values (kTypeValue).
  249. void ExtractLargeValueIfNeeded();
  250. // Relocates valid blobs residing in the oldest blob files if garbage
  251. // collection is enabled. Relocated blobs are written to new blob files or
  252. // inlined in the LSM tree depending on the current settings (i.e.
  253. // enable_blob_files and min_blob_size). Should only be called for blob
  254. // references (kTypeBlobIndex).
  255. //
  256. // Note: the stacked BlobDB implementation's compaction filter based GC
  257. // algorithm is also called from here.
  258. void GarbageCollectBlobIfNeeded();
  259. // Invoke compaction filter if needed.
  260. // Return true on success, false on failures (e.g.: kIOError).
  261. bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
  262. // Given a sequence number, return the sequence number of the
  263. // earliest snapshot that this sequence number is visible in.
  264. // The snapshots themselves are arranged in ascending order of
  265. // sequence numbers.
  266. // Employ a sequential search because the total number of
  267. // snapshots are typically small.
  268. inline SequenceNumber findEarliestVisibleSnapshot(
  269. SequenceNumber in, SequenceNumber* prev_snapshot);
  270. inline bool KeyCommitted(SequenceNumber sequence) {
  271. return snapshot_checker_ == nullptr ||
  272. snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
  273. SnapshotCheckerResult::kInSnapshot;
  274. }
  275. bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
  276. bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
  277. // Extract user-defined timestamp from user key if possible and compare it
  278. // with *full_history_ts_low_ if applicable.
  279. inline void UpdateTimestampAndCompareWithFullHistoryLow() {
  280. if (!timestamp_size_) {
  281. return;
  282. }
  283. Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
  284. curr_ts_.assign(ts.data(), ts.size());
  285. if (full_history_ts_low_) {
  286. cmp_with_history_ts_low_ =
  287. cmp_->CompareTimestamp(ts, *full_history_ts_low_);
  288. }
  289. }
  290. static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
  291. const CompactionProxy* compaction);
  292. static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
  293. const CompactionProxy* compaction);
  294. static std::unique_ptr<PrefetchBufferCollection>
  295. CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
  296. SequenceIterWrapper input_;
  297. const Comparator* cmp_;
  298. MergeHelper* merge_helper_;
  299. const std::vector<SequenceNumber>* snapshots_;
  300. // List of snapshots released during compaction.
  301. // findEarliestVisibleSnapshot() find them out from return of
  302. // snapshot_checker, and make sure they will not be returned as
  303. // earliest visible snapshot of an older value.
  304. // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
  305. std::unordered_set<SequenceNumber> released_snapshots_;
  306. const SequenceNumber earliest_write_conflict_snapshot_;
  307. const SequenceNumber job_snapshot_;
  308. const SnapshotChecker* const snapshot_checker_;
  309. Env* env_;
  310. SystemClock* clock_;
  311. const bool report_detailed_time_;
  312. CompactionRangeDelAggregator* range_del_agg_;
  313. BlobFileBuilder* blob_file_builder_;
  314. std::unique_ptr<CompactionProxy> compaction_;
  315. const CompactionFilter* compaction_filter_;
  316. const std::atomic<bool>* shutting_down_;
  317. const std::atomic<bool>& manual_compaction_canceled_;
  318. const bool bottommost_level_;
  319. const bool visible_at_tip_;
  320. const SequenceNumber earliest_snapshot_;
  321. std::shared_ptr<Logger> info_log_;
  322. const bool allow_data_in_errors_;
  323. const bool enforce_single_del_contracts_;
  324. // Comes from comparator.
  325. const size_t timestamp_size_;
  326. // Lower bound timestamp to retain full history in terms of user-defined
  327. // timestamp. If a key's timestamp is older than full_history_ts_low_, then
  328. // the key *may* be eligible for garbage collection (GC). The skipping logic
  329. // is in `NextFromInput()` and `PrepareOutput()`.
  330. // If nullptr, NO GC will be performed and all history will be preserved.
  331. const std::string* const full_history_ts_low_;
  332. // State
  333. //
  334. enum ValidContext : uint8_t {
  335. kMerge1 = 0,
  336. kMerge2 = 1,
  337. kParseKeyError = 2,
  338. kCurrentKeyUncommitted = 3,
  339. kKeepSDAndClearPut = 4,
  340. kKeepTsHistory = 5,
  341. kKeepSDForConflictCheck = 6,
  342. kKeepSDForSnapshot = 7,
  343. kKeepSD = 8,
  344. kKeepDel = 9,
  345. kNewUserKey = 10,
  346. kRangeDeletion = 11,
  347. kSwapPreferredSeqno = 12,
  348. };
  349. struct ValidityInfo {
  350. inline bool IsValid() const { return rep & 1; }
  351. ValidContext GetContext() const {
  352. return static_cast<ValidContext>(rep >> 1);
  353. }
  354. inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
  355. inline void Invalidate() { rep = 0; }
  356. uint8_t rep{0};
  357. } validity_info_;
  358. // Points to a copy of the current compaction iterator output (current_key_)
  359. // if valid.
  360. Slice key_;
  361. // Points to the value in the underlying iterator that corresponds to the
  362. // current output.
  363. Slice value_;
  364. // The status is OK unless compaction iterator encounters a merge operand
  365. // while not having a merge operator defined.
  366. Status status_;
  367. // Stores the user key, sequence number and type of the current compaction
  368. // iterator output (or current key in the underlying iterator during
  369. // NextFromInput()).
  370. ParsedInternalKey ikey_;
  371. // Stores whether current_user_key_ is valid. If so, current_user_key_
  372. // stores the user key of the last key seen by the iterator.
  373. // If false, treat the next key to read as a new user key.
  374. bool has_current_user_key_ = false;
  375. // If false, the iterator holds a copy of the current compaction iterator
  376. // output (or current key in the underlying iterator during NextFromInput()).
  377. bool at_next_ = false;
  378. // A copy of the current internal key.
  379. IterKey current_key_;
  380. Slice current_user_key_;
  381. std::string curr_ts_;
  382. SequenceNumber current_user_key_sequence_;
  383. SequenceNumber current_user_key_snapshot_;
  384. // True if the iterator has already returned a record for the current key.
  385. bool has_outputted_key_ = false;
  386. // Truncate the value of the next key and output it without applying any
  387. // compaction rules. This is an optimization for outputting a put after
  388. // a single delete. See more in `NextFromInput()` under Optimization 3.
  389. bool clear_and_output_next_key_ = false;
  390. MergeOutputIterator merge_out_iter_;
  391. Status merge_until_status_;
  392. // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
  393. // merge operands and then releasing them after consuming them.
  394. PinnedIteratorsManager pinned_iters_mgr_;
  395. uint64_t blob_garbage_collection_cutoff_file_number_;
  396. std::unique_ptr<BlobFetcher> blob_fetcher_;
  397. std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
  398. std::string blob_index_;
  399. PinnableSlice blob_value_;
  400. std::string compaction_filter_value_;
  401. InternalKey compaction_filter_skip_until_;
  402. // "level_ptrs" holds indices that remember which file of an associated
  403. // level we were last checking during the last call to compaction->
  404. // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
  405. // to pick off where it left off since each subcompaction's key range is
  406. // increasing so a later call to the function must be looking for a key that
  407. // is in or beyond the last file checked during the previous call
  408. std::vector<size_t> level_ptrs_;
  409. CompactionIterationStats iter_stats_;
  410. // Used to avoid purging uncommitted values. The application can specify
  411. // uncommitted values by providing a SnapshotChecker object.
  412. bool current_key_committed_;
  413. // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
  414. int cmp_with_history_ts_low_;
  415. const int level_;
  416. // True if the previous internal key (same user key)'s sequence number has
  417. // just been zeroed out during bottommost compaction.
  418. bool last_key_seq_zeroed_{false};
  419. // Max seqno that can be zeroed out at last level (various reasons)
  420. const SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
  421. void AdvanceInputIter() { input_.Next(); }
  422. void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
  423. bool IsShuttingDown() {
  424. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  425. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
  426. }
  427. bool IsPausingManualCompaction() {
  428. // This is a best-effort facility, so memory_order_relaxed is sufficient.
  429. return manual_compaction_canceled_.load(std::memory_order_relaxed);
  430. }
  431. // Stores whether the current compaction iterator output
  432. // is a range tombstone start key.
  433. bool is_range_del_{false};
  434. };
  435. inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
  436. SequenceNumber snapshot) {
  437. return DataIsDefinitelyInSnapshot(seq, snapshot, snapshot_checker_);
  438. }
  439. inline bool CompactionIterator::DefinitelyNotInSnapshot(
  440. SequenceNumber seq, SequenceNumber snapshot) {
  441. return DataIsDefinitelyNotInSnapshot(seq, snapshot, snapshot_checker_);
  442. }
  443. } // namespace ROCKSDB_NAMESPACE