compaction_iterator.cc 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <cinttypes>
  6. #include "db/compaction/compaction_iterator.h"
  7. #include "db/snapshot_checker.h"
  8. #include "port/likely.h"
  9. #include "rocksdb/listener.h"
  10. #include "table/internal_iterator.h"
  11. #include "test_util/sync_point.h"
  12. #define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \
  13. ((seq) <= (snapshot) && \
  14. (snapshot_checker_ == nullptr || \
  15. LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
  16. SnapshotCheckerResult::kInSnapshot)))
  17. #define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \
  18. ((seq) > (snapshot) || \
  19. (snapshot_checker_ != nullptr && \
  20. UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
  21. SnapshotCheckerResult::kNotInSnapshot)))
  22. #define IN_EARLIEST_SNAPSHOT(seq) \
  23. ((seq) <= earliest_snapshot_ && \
  24. (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))))
  25. namespace ROCKSDB_NAMESPACE {
  26. CompactionIterator::CompactionIterator(
  27. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  28. SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
  29. SequenceNumber earliest_write_conflict_snapshot,
  30. const SnapshotChecker* snapshot_checker, Env* env,
  31. bool report_detailed_time, bool expect_valid_internal_key,
  32. CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
  33. const CompactionFilter* compaction_filter,
  34. const std::atomic<bool>* shutting_down,
  35. const SequenceNumber preserve_deletes_seqnum,
  36. const std::atomic<bool>* manual_compaction_paused,
  37. const std::shared_ptr<Logger> info_log)
  38. : CompactionIterator(
  39. input, cmp, merge_helper, last_sequence, snapshots,
  40. earliest_write_conflict_snapshot, snapshot_checker, env,
  41. report_detailed_time, expect_valid_internal_key, range_del_agg,
  42. std::unique_ptr<CompactionProxy>(
  43. compaction ? new CompactionProxy(compaction) : nullptr),
  44. compaction_filter, shutting_down, preserve_deletes_seqnum,
  45. manual_compaction_paused, info_log) {}
  46. CompactionIterator::CompactionIterator(
  47. InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
  48. SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
  49. SequenceNumber earliest_write_conflict_snapshot,
  50. const SnapshotChecker* snapshot_checker, Env* env,
  51. bool report_detailed_time, bool expect_valid_internal_key,
  52. CompactionRangeDelAggregator* range_del_agg,
  53. std::unique_ptr<CompactionProxy> compaction,
  54. const CompactionFilter* compaction_filter,
  55. const std::atomic<bool>* shutting_down,
  56. const SequenceNumber preserve_deletes_seqnum,
  57. const std::atomic<bool>* manual_compaction_paused,
  58. const std::shared_ptr<Logger> info_log)
  59. : input_(input),
  60. cmp_(cmp),
  61. merge_helper_(merge_helper),
  62. snapshots_(snapshots),
  63. earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
  64. snapshot_checker_(snapshot_checker),
  65. env_(env),
  66. report_detailed_time_(report_detailed_time),
  67. expect_valid_internal_key_(expect_valid_internal_key),
  68. range_del_agg_(range_del_agg),
  69. compaction_(std::move(compaction)),
  70. compaction_filter_(compaction_filter),
  71. shutting_down_(shutting_down),
  72. manual_compaction_paused_(manual_compaction_paused),
  73. preserve_deletes_seqnum_(preserve_deletes_seqnum),
  74. current_user_key_sequence_(0),
  75. current_user_key_snapshot_(0),
  76. merge_out_iter_(merge_helper_),
  77. current_key_committed_(false),
  78. info_log_(info_log) {
  79. assert(compaction_filter_ == nullptr || compaction_ != nullptr);
  80. assert(snapshots_ != nullptr);
  81. bottommost_level_ =
  82. compaction_ == nullptr ? false : compaction_->bottommost_level();
  83. if (compaction_ != nullptr) {
  84. level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
  85. }
  86. if (snapshots_->size() == 0) {
  87. // optimize for fast path if there are no snapshots
  88. visible_at_tip_ = true;
  89. earliest_snapshot_iter_ = snapshots_->end();
  90. earliest_snapshot_ = kMaxSequenceNumber;
  91. latest_snapshot_ = 0;
  92. } else {
  93. visible_at_tip_ = false;
  94. earliest_snapshot_iter_ = snapshots_->begin();
  95. earliest_snapshot_ = snapshots_->at(0);
  96. latest_snapshot_ = snapshots_->back();
  97. }
  98. #ifndef NDEBUG
  99. // findEarliestVisibleSnapshot assumes this ordering.
  100. for (size_t i = 1; i < snapshots_->size(); ++i) {
  101. assert(snapshots_->at(i - 1) < snapshots_->at(i));
  102. }
  103. #endif
  104. input_->SetPinnedItersMgr(&pinned_iters_mgr_);
  105. TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
  106. }
  107. CompactionIterator::~CompactionIterator() {
  108. // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
  109. input_->SetPinnedItersMgr(nullptr);
  110. }
  111. void CompactionIterator::ResetRecordCounts() {
  112. iter_stats_.num_record_drop_user = 0;
  113. iter_stats_.num_record_drop_hidden = 0;
  114. iter_stats_.num_record_drop_obsolete = 0;
  115. iter_stats_.num_record_drop_range_del = 0;
  116. iter_stats_.num_range_del_drop_obsolete = 0;
  117. iter_stats_.num_optimized_del_drop_obsolete = 0;
  118. }
  119. void CompactionIterator::SeekToFirst() {
  120. NextFromInput();
  121. PrepareOutput();
  122. }
  123. void CompactionIterator::Next() {
  124. // If there is a merge output, return it before continuing to process the
  125. // input.
  126. if (merge_out_iter_.Valid()) {
  127. merge_out_iter_.Next();
  128. // Check if we returned all records of the merge output.
  129. if (merge_out_iter_.Valid()) {
  130. key_ = merge_out_iter_.key();
  131. value_ = merge_out_iter_.value();
  132. bool valid_key __attribute__((__unused__));
  133. valid_key = ParseInternalKey(key_, &ikey_);
  134. // MergeUntil stops when it encounters a corrupt key and does not
  135. // include them in the result, so we expect the keys here to be valid.
  136. assert(valid_key);
  137. if (!valid_key) {
  138. ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
  139. key_.ToString(true).c_str());
  140. }
  141. // Keep current_key_ in sync.
  142. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  143. key_ = current_key_.GetInternalKey();
  144. ikey_.user_key = current_key_.GetUserKey();
  145. valid_ = true;
  146. } else {
  147. // We consumed all pinned merge operands, release pinned iterators
  148. pinned_iters_mgr_.ReleasePinnedData();
  149. // MergeHelper moves the iterator to the first record after the merged
  150. // records, so even though we reached the end of the merge output, we do
  151. // not want to advance the iterator.
  152. NextFromInput();
  153. }
  154. } else {
  155. // Only advance the input iterator if there is no merge output and the
  156. // iterator is not already at the next record.
  157. if (!at_next_) {
  158. input_->Next();
  159. }
  160. NextFromInput();
  161. }
  162. if (valid_) {
  163. // Record that we've outputted a record for the current key.
  164. has_outputted_key_ = true;
  165. }
  166. PrepareOutput();
  167. }
  168. void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
  169. Slice* skip_until) {
  170. if (compaction_filter_ != nullptr &&
  171. (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
  172. // If the user has specified a compaction filter and the sequence
  173. // number is greater than any external snapshot, then invoke the
  174. // filter. If the return value of the compaction filter is true,
  175. // replace the entry with a deletion marker.
  176. CompactionFilter::Decision filter;
  177. compaction_filter_value_.clear();
  178. compaction_filter_skip_until_.Clear();
  179. CompactionFilter::ValueType value_type =
  180. ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
  181. : CompactionFilter::ValueType::kBlobIndex;
  182. // Hack: pass internal key to BlobIndexCompactionFilter since it needs
  183. // to get sequence number.
  184. Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
  185. {
  186. StopWatchNano timer(env_, report_detailed_time_);
  187. filter = compaction_filter_->FilterV2(
  188. compaction_->level(), filter_key, value_type, value_,
  189. &compaction_filter_value_, compaction_filter_skip_until_.rep());
  190. iter_stats_.total_filter_time +=
  191. env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
  192. }
  193. if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
  194. cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
  195. 0) {
  196. // Can't skip to a key smaller than the current one.
  197. // Keep the key as per FilterV2 documentation.
  198. filter = CompactionFilter::Decision::kKeep;
  199. }
  200. if (filter == CompactionFilter::Decision::kRemove) {
  201. // convert the current key to a delete; key_ is pointing into
  202. // current_key_ at this point, so updating current_key_ updates key()
  203. ikey_.type = kTypeDeletion;
  204. current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
  205. // no value associated with delete
  206. value_.clear();
  207. iter_stats_.num_record_drop_user++;
  208. } else if (filter == CompactionFilter::Decision::kChangeValue) {
  209. value_ = compaction_filter_value_;
  210. } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
  211. *need_skip = true;
  212. compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
  213. kValueTypeForSeek);
  214. *skip_until = compaction_filter_skip_until_.Encode();
  215. }
  216. }
  217. }
  218. void CompactionIterator::NextFromInput() {
  219. at_next_ = false;
  220. valid_ = false;
  221. while (!valid_ && input_->Valid() && !IsPausingManualCompaction() &&
  222. !IsShuttingDown()) {
  223. key_ = input_->key();
  224. value_ = input_->value();
  225. iter_stats_.num_input_records++;
  226. if (!ParseInternalKey(key_, &ikey_)) {
  227. // If `expect_valid_internal_key_` is false, return the corrupted key
  228. // and let the caller decide what to do with it.
  229. // TODO(noetzli): We should have a more elegant solution for this.
  230. if (expect_valid_internal_key_) {
  231. assert(!"Corrupted internal key not expected.");
  232. status_ = Status::Corruption("Corrupted internal key not expected.");
  233. break;
  234. }
  235. key_ = current_key_.SetInternalKey(key_);
  236. has_current_user_key_ = false;
  237. current_user_key_sequence_ = kMaxSequenceNumber;
  238. current_user_key_snapshot_ = 0;
  239. iter_stats_.num_input_corrupt_records++;
  240. valid_ = true;
  241. break;
  242. }
  243. TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
  244. // Update input statistics
  245. if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
  246. iter_stats_.num_input_deletion_records++;
  247. }
  248. iter_stats_.total_input_raw_key_bytes += key_.size();
  249. iter_stats_.total_input_raw_value_bytes += value_.size();
  250. // If need_skip is true, we should seek the input iterator
  251. // to internal key skip_until and continue from there.
  252. bool need_skip = false;
  253. // Points either into compaction_filter_skip_until_ or into
  254. // merge_helper_->compaction_filter_skip_until_.
  255. Slice skip_until;
  256. // Check whether the user key changed. After this if statement current_key_
  257. // is a copy of the current input key (maybe converted to a delete by the
  258. // compaction filter). ikey_.user_key is pointing to the copy.
  259. if (!has_current_user_key_ ||
  260. !cmp_->Equal(ikey_.user_key, current_user_key_)) {
  261. // First occurrence of this user key
  262. // Copy key for output
  263. key_ = current_key_.SetInternalKey(key_, &ikey_);
  264. current_user_key_ = ikey_.user_key;
  265. has_current_user_key_ = true;
  266. has_outputted_key_ = false;
  267. current_user_key_sequence_ = kMaxSequenceNumber;
  268. current_user_key_snapshot_ = 0;
  269. current_key_committed_ = KeyCommitted(ikey_.sequence);
  270. // Apply the compaction filter to the first committed version of the user
  271. // key.
  272. if (current_key_committed_) {
  273. InvokeFilterIfNeeded(&need_skip, &skip_until);
  274. }
  275. } else {
  276. // Update the current key to reflect the new sequence number/type without
  277. // copying the user key.
  278. // TODO(rven): Compaction filter does not process keys in this path
  279. // Need to have the compaction filter process multiple versions
  280. // if we have versions on both sides of a snapshot
  281. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  282. key_ = current_key_.GetInternalKey();
  283. ikey_.user_key = current_key_.GetUserKey();
  284. // Note that newer version of a key is ordered before older versions. If a
  285. // newer version of a key is committed, so as the older version. No need
  286. // to query snapshot_checker_ in that case.
  287. if (UNLIKELY(!current_key_committed_)) {
  288. assert(snapshot_checker_ != nullptr);
  289. current_key_committed_ = KeyCommitted(ikey_.sequence);
  290. // Apply the compaction filter to the first committed version of the
  291. // user key.
  292. if (current_key_committed_) {
  293. InvokeFilterIfNeeded(&need_skip, &skip_until);
  294. }
  295. }
  296. }
  297. if (UNLIKELY(!current_key_committed_)) {
  298. assert(snapshot_checker_ != nullptr);
  299. valid_ = true;
  300. break;
  301. }
  302. // If there are no snapshots, then this kv affect visibility at tip.
  303. // Otherwise, search though all existing snapshots to find the earliest
  304. // snapshot that is affected by this kv.
  305. SequenceNumber last_sequence __attribute__((__unused__));
  306. last_sequence = current_user_key_sequence_;
  307. current_user_key_sequence_ = ikey_.sequence;
  308. SequenceNumber last_snapshot = current_user_key_snapshot_;
  309. SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
  310. current_user_key_snapshot_ =
  311. visible_at_tip_
  312. ? earliest_snapshot_
  313. : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
  314. if (need_skip) {
  315. // This case is handled below.
  316. } else if (clear_and_output_next_key_) {
  317. // In the previous iteration we encountered a single delete that we could
  318. // not compact out. We will keep this Put, but can drop it's data.
  319. // (See Optimization 3, below.)
  320. assert(ikey_.type == kTypeValue);
  321. if (ikey_.type != kTypeValue) {
  322. ROCKS_LOG_FATAL(info_log_,
  323. "Unexpected key type %d for compaction output",
  324. ikey_.type);
  325. }
  326. assert(current_user_key_snapshot_ == last_snapshot);
  327. if (current_user_key_snapshot_ != last_snapshot) {
  328. ROCKS_LOG_FATAL(info_log_,
  329. "current_user_key_snapshot_ (%" PRIu64
  330. ") != last_snapshot (%" PRIu64 ")",
  331. current_user_key_snapshot_, last_snapshot);
  332. }
  333. value_.clear();
  334. valid_ = true;
  335. clear_and_output_next_key_ = false;
  336. } else if (ikey_.type == kTypeSingleDeletion) {
  337. // We can compact out a SingleDelete if:
  338. // 1) We encounter the corresponding PUT -OR- we know that this key
  339. // doesn't appear past this output level
  340. // =AND=
  341. // 2) We've already returned a record in this snapshot -OR-
  342. // there are no earlier earliest_write_conflict_snapshot.
  343. //
  344. // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
  345. // allow Transactions to do write-conflict checking (if we compacted away
  346. // all keys, then we wouldn't know that a write happened in this
  347. // snapshot). If there is no earlier snapshot, then we know that there
  348. // are no active transactions that need to know about any writes.
  349. //
  350. // Optimization 3:
  351. // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
  352. // true, then we must output a SingleDelete. In this case, we will decide
  353. // to also output the PUT. While we are compacting less by outputting the
  354. // PUT now, hopefully this will lead to better compaction in the future
  355. // when Rule 2 is later true (Ie, We are hoping we can later compact out
  356. // both the SingleDelete and the Put, while we couldn't if we only
  357. // outputted the SingleDelete now).
  358. // In this case, we can save space by removing the PUT's value as it will
  359. // never be read.
  360. //
  361. // Deletes and Merges are not supported on the same key that has a
  362. // SingleDelete as it is not possible to correctly do any partial
  363. // compaction of such a combination of operations. The result of mixing
  364. // those operations for a given key is documented as being undefined. So
  365. // we can choose how to handle such a combinations of operations. We will
  366. // try to compact out as much as we can in these cases.
  367. // We will report counts on these anomalous cases.
  368. // The easiest way to process a SingleDelete during iteration is to peek
  369. // ahead at the next key.
  370. ParsedInternalKey next_ikey;
  371. input_->Next();
  372. // Check whether the next key exists, is not corrupt, and is the same key
  373. // as the single delete.
  374. if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
  375. cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
  376. // Check whether the next key belongs to the same snapshot as the
  377. // SingleDelete.
  378. if (prev_snapshot == 0 ||
  379. DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) {
  380. if (next_ikey.type == kTypeSingleDeletion) {
  381. // We encountered two SingleDeletes in a row. This could be due to
  382. // unexpected user input.
  383. // Skip the first SingleDelete and let the next iteration decide how
  384. // to handle the second SingleDelete
  385. // First SingleDelete has been skipped since we already called
  386. // input_->Next().
  387. ++iter_stats_.num_record_drop_obsolete;
  388. ++iter_stats_.num_single_del_mismatch;
  389. } else if (has_outputted_key_ ||
  390. DEFINITELY_IN_SNAPSHOT(
  391. ikey_.sequence, earliest_write_conflict_snapshot_)) {
  392. // Found a matching value, we can drop the single delete and the
  393. // value. It is safe to drop both records since we've already
  394. // outputted a key in this snapshot, or there is no earlier
  395. // snapshot (Rule 2 above).
  396. // Note: it doesn't matter whether the second key is a Put or if it
  397. // is an unexpected Merge or Delete. We will compact it out
  398. // either way. We will maintain counts of how many mismatches
  399. // happened
  400. if (next_ikey.type != kTypeValue &&
  401. next_ikey.type != kTypeBlobIndex) {
  402. ++iter_stats_.num_single_del_mismatch;
  403. }
  404. ++iter_stats_.num_record_drop_hidden;
  405. ++iter_stats_.num_record_drop_obsolete;
  406. // Already called input_->Next() once. Call it a second time to
  407. // skip past the second key.
  408. input_->Next();
  409. } else {
  410. // Found a matching value, but we cannot drop both keys since
  411. // there is an earlier snapshot and we need to leave behind a record
  412. // to know that a write happened in this snapshot (Rule 2 above).
  413. // Clear the value and output the SingleDelete. (The value will be
  414. // outputted on the next iteration.)
  415. // Setting valid_ to true will output the current SingleDelete
  416. valid_ = true;
  417. // Set up the Put to be outputted in the next iteration.
  418. // (Optimization 3).
  419. clear_and_output_next_key_ = true;
  420. }
  421. } else {
  422. // We hit the next snapshot without hitting a put, so the iterator
  423. // returns the single delete.
  424. valid_ = true;
  425. }
  426. } else {
  427. // We are at the end of the input, could not parse the next key, or hit
  428. // a different key. The iterator returns the single delete if the key
  429. // possibly exists beyond the current output level. We set
  430. // has_current_user_key to false so that if the iterator is at the next
  431. // key, we do not compare it again against the previous key at the next
  432. // iteration. If the next key is corrupt, we return before the
  433. // comparison, so the value of has_current_user_key does not matter.
  434. has_current_user_key_ = false;
  435. if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
  436. compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  437. &level_ptrs_)) {
  438. // Key doesn't exist outside of this range.
  439. // Can compact out this SingleDelete.
  440. ++iter_stats_.num_record_drop_obsolete;
  441. ++iter_stats_.num_single_del_fallthru;
  442. if (!bottommost_level_) {
  443. ++iter_stats_.num_optimized_del_drop_obsolete;
  444. }
  445. } else {
  446. // Output SingleDelete
  447. valid_ = true;
  448. }
  449. }
  450. if (valid_) {
  451. at_next_ = true;
  452. }
  453. } else if (last_snapshot == current_user_key_snapshot_ ||
  454. (last_snapshot > 0 &&
  455. last_snapshot < current_user_key_snapshot_)) {
  456. // If the earliest snapshot is which this key is visible in
  457. // is the same as the visibility of a previous instance of the
  458. // same key, then this kv is not visible in any snapshot.
  459. // Hidden by an newer entry for same user key
  460. //
  461. // Note: Dropping this key will not affect TransactionDB write-conflict
  462. // checking since there has already been a record returned for this key
  463. // in this snapshot.
  464. assert(last_sequence >= current_user_key_sequence_);
  465. if (last_sequence < current_user_key_sequence_) {
  466. ROCKS_LOG_FATAL(info_log_,
  467. "last_sequence (%" PRIu64
  468. ") < current_user_key_sequence_ (%" PRIu64 ")",
  469. last_sequence, current_user_key_sequence_);
  470. }
  471. ++iter_stats_.num_record_drop_hidden; // (A)
  472. input_->Next();
  473. } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
  474. IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
  475. ikeyNotNeededForIncrementalSnapshot() &&
  476. compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
  477. &level_ptrs_)) {
  478. // TODO(noetzli): This is the only place where we use compaction_
  479. // (besides the constructor). We should probably get rid of this
  480. // dependency and find a way to do similar filtering during flushes.
  481. //
  482. // For this user key:
  483. // (1) there is no data in higher levels
  484. // (2) data in lower levels will have larger sequence numbers
  485. // (3) data in layers that are being compacted here and have
  486. // smaller sequence numbers will be dropped in the next
  487. // few iterations of this loop (by rule (A) above).
  488. // Therefore this deletion marker is obsolete and can be dropped.
  489. //
  490. // Note: Dropping this Delete will not affect TransactionDB
  491. // write-conflict checking since it is earlier than any snapshot.
  492. //
  493. // It seems that we can also drop deletion later than earliest snapshot
  494. // given that:
  495. // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
  496. // (2) No value exist earlier than the deletion.
  497. ++iter_stats_.num_record_drop_obsolete;
  498. if (!bottommost_level_) {
  499. ++iter_stats_.num_optimized_del_drop_obsolete;
  500. }
  501. input_->Next();
  502. } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
  503. ikeyNotNeededForIncrementalSnapshot()) {
  504. // Handle the case where we have a delete key at the bottom most level
  505. // We can skip outputting the key iff there are no subsequent puts for this
  506. // key
  507. ParsedInternalKey next_ikey;
  508. input_->Next();
  509. // Skip over all versions of this key that happen to occur in the same snapshot
  510. // range as the delete
  511. while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
  512. cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
  513. (prev_snapshot == 0 ||
  514. DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
  515. input_->Next();
  516. }
  517. // If you find you still need to output a row with this key, we need to output the
  518. // delete too
  519. if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
  520. cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
  521. valid_ = true;
  522. at_next_ = true;
  523. }
  524. } else if (ikey_.type == kTypeMerge) {
  525. if (!merge_helper_->HasOperator()) {
  526. status_ = Status::InvalidArgument(
  527. "merge_operator is not properly initialized.");
  528. return;
  529. }
  530. pinned_iters_mgr_.StartPinning();
  531. // We know the merge type entry is not hidden, otherwise we would
  532. // have hit (A)
  533. // We encapsulate the merge related state machine in a different
  534. // object to minimize change to the existing flow.
  535. Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
  536. prev_snapshot, bottommost_level_);
  537. merge_out_iter_.SeekToFirst();
  538. if (!s.ok() && !s.IsMergeInProgress()) {
  539. status_ = s;
  540. return;
  541. } else if (merge_out_iter_.Valid()) {
  542. // NOTE: key, value, and ikey_ refer to old entries.
  543. // These will be correctly set below.
  544. key_ = merge_out_iter_.key();
  545. value_ = merge_out_iter_.value();
  546. bool valid_key __attribute__((__unused__));
  547. valid_key = ParseInternalKey(key_, &ikey_);
  548. // MergeUntil stops when it encounters a corrupt key and does not
  549. // include them in the result, so we expect the keys here to valid.
  550. assert(valid_key);
  551. if (!valid_key) {
  552. ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
  553. key_.ToString(true).c_str());
  554. }
  555. // Keep current_key_ in sync.
  556. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
  557. key_ = current_key_.GetInternalKey();
  558. ikey_.user_key = current_key_.GetUserKey();
  559. valid_ = true;
  560. } else {
  561. // all merge operands were filtered out. reset the user key, since the
  562. // batch consumed by the merge operator should not shadow any keys
  563. // coming after the merges
  564. has_current_user_key_ = false;
  565. pinned_iters_mgr_.ReleasePinnedData();
  566. if (merge_helper_->FilteredUntil(&skip_until)) {
  567. need_skip = true;
  568. }
  569. }
  570. } else {
  571. // 1. new user key -OR-
  572. // 2. different snapshot stripe
  573. bool should_delete = range_del_agg_->ShouldDelete(
  574. key_, RangeDelPositioningMode::kForwardTraversal);
  575. if (should_delete) {
  576. ++iter_stats_.num_record_drop_hidden;
  577. ++iter_stats_.num_record_drop_range_del;
  578. input_->Next();
  579. } else {
  580. valid_ = true;
  581. }
  582. }
  583. if (need_skip) {
  584. input_->Seek(skip_until);
  585. }
  586. }
  587. if (!valid_ && IsShuttingDown()) {
  588. status_ = Status::ShutdownInProgress();
  589. }
  590. if (IsPausingManualCompaction()) {
  591. status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  592. }
  593. }
  594. void CompactionIterator::PrepareOutput() {
  595. if (valid_) {
  596. if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
  597. const auto blob_decision = compaction_filter_->PrepareBlobOutput(
  598. user_key(), value_, &compaction_filter_value_);
  599. if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
  600. status_ = Status::Corruption(
  601. "Corrupted blob reference encountered during GC");
  602. valid_ = false;
  603. } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
  604. status_ = Status::IOError("Could not relocate blob during GC");
  605. valid_ = false;
  606. } else if (blob_decision ==
  607. CompactionFilter::BlobDecision::kChangeValue) {
  608. value_ = compaction_filter_value_;
  609. }
  610. }
  611. // Zeroing out the sequence number leads to better compression.
  612. // If this is the bottommost level (no files in lower levels)
  613. // and the earliest snapshot is larger than this seqno
  614. // and the userkey differs from the last userkey in compaction
  615. // then we can squash the seqno to zero.
  616. //
  617. // This is safe for TransactionDB write-conflict checking since transactions
  618. // only care about sequence number larger than any active snapshots.
  619. //
  620. // Can we do the same for levels above bottom level as long as
  621. // KeyNotExistsBeyondOutputLevel() return true?
  622. if (valid_ && compaction_ != nullptr &&
  623. !compaction_->allow_ingest_behind() &&
  624. ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
  625. IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
  626. assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
  627. if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
  628. ROCKS_LOG_FATAL(info_log_,
  629. "Unexpected key type %d for seq-zero optimization",
  630. ikey_.type);
  631. }
  632. ikey_.sequence = 0;
  633. current_key_.UpdateInternalKey(0, ikey_.type);
  634. }
  635. }
  636. }
  637. inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
  638. SequenceNumber in, SequenceNumber* prev_snapshot) {
  639. assert(snapshots_->size());
  640. if (snapshots_->size() == 0) {
  641. ROCKS_LOG_FATAL(info_log_,
  642. "No snapshot left in findEarliestVisibleSnapshot");
  643. }
  644. auto snapshots_iter = std::lower_bound(
  645. snapshots_->begin(), snapshots_->end(), in);
  646. if (snapshots_iter == snapshots_->begin()) {
  647. *prev_snapshot = 0;
  648. } else {
  649. *prev_snapshot = *std::prev(snapshots_iter);
  650. assert(*prev_snapshot < in);
  651. if (*prev_snapshot >= in) {
  652. ROCKS_LOG_FATAL(info_log_,
  653. "*prev_snapshot >= in in findEarliestVisibleSnapshot");
  654. }
  655. }
  656. if (snapshot_checker_ == nullptr) {
  657. return snapshots_iter != snapshots_->end()
  658. ? *snapshots_iter : kMaxSequenceNumber;
  659. }
  660. bool has_released_snapshot = !released_snapshots_.empty();
  661. for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
  662. auto cur = *snapshots_iter;
  663. assert(in <= cur);
  664. if (in > cur) {
  665. ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot");
  666. }
  667. // Skip if cur is in released_snapshots.
  668. if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
  669. continue;
  670. }
  671. auto res = snapshot_checker_->CheckInSnapshot(in, cur);
  672. if (res == SnapshotCheckerResult::kInSnapshot) {
  673. return cur;
  674. } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
  675. released_snapshots_.insert(cur);
  676. }
  677. *prev_snapshot = cur;
  678. }
  679. return kMaxSequenceNumber;
  680. }
  681. // used in 2 places - prevents deletion markers to be dropped if they may be
  682. // needed and disables seqnum zero-out in PrepareOutput for recent keys.
  683. inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
  684. return (!compaction_->preserve_deletes()) ||
  685. (ikey_.sequence < preserve_deletes_seqnum_);
  686. }
  687. bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
  688. assert(snapshot_checker_ != nullptr);
  689. bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
  690. (earliest_snapshot_iter_ != snapshots_->end() &&
  691. *earliest_snapshot_iter_ == earliest_snapshot_));
  692. assert(pre_condition);
  693. if (!pre_condition) {
  694. ROCKS_LOG_FATAL(info_log_,
  695. "Pre-Condition is not hold in IsInEarliestSnapshot");
  696. }
  697. auto in_snapshot =
  698. snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
  699. while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
  700. // Avoid the the current earliest_snapshot_ being return as
  701. // earliest visible snapshot for the next value. So if a value's sequence
  702. // is zero-ed out by PrepareOutput(), the next value will be compact out.
  703. released_snapshots_.insert(earliest_snapshot_);
  704. earliest_snapshot_iter_++;
  705. if (earliest_snapshot_iter_ == snapshots_->end()) {
  706. earliest_snapshot_ = kMaxSequenceNumber;
  707. } else {
  708. earliest_snapshot_ = *earliest_snapshot_iter_;
  709. }
  710. in_snapshot =
  711. snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
  712. }
  713. assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
  714. if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) {
  715. ROCKS_LOG_FATAL(info_log_,
  716. "Unexpected released snapshot in IsInEarliestSnapshot");
  717. }
  718. return in_snapshot == SnapshotCheckerResult::kInSnapshot;
  719. }
  720. } // namespace ROCKSDB_NAMESPACE