range_tombstone_fragmenter.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/range_tombstone_fragmenter.h"
  6. #include <algorithm>
  7. #include <cinttypes>
  8. #include <cstdio>
  9. #include <functional>
  10. #include <set>
  11. #include "util/autovector.h"
  12. #include "util/kv_map.h"
  13. #include "util/vector_iterator.h"
  14. namespace ROCKSDB_NAMESPACE {
  15. FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
  16. std::unique_ptr<InternalIterator> unfragmented_tombstones,
  17. const InternalKeyComparator& icmp, bool for_compaction,
  18. const std::vector<SequenceNumber>& snapshots,
  19. const bool tombstone_end_include_ts) {
  20. if (unfragmented_tombstones == nullptr) {
  21. return;
  22. }
  23. bool is_sorted = true;
  24. InternalKey pinned_last_start_key;
  25. Slice last_start_key;
  26. num_unfragmented_tombstones_ = 0;
  27. total_tombstone_payload_bytes_ = 0;
  28. for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
  29. unfragmented_tombstones->Next(), num_unfragmented_tombstones_++) {
  30. total_tombstone_payload_bytes_ += unfragmented_tombstones->key().size() +
  31. unfragmented_tombstones->value().size();
  32. if (num_unfragmented_tombstones_ > 0 &&
  33. icmp.Compare(last_start_key, unfragmented_tombstones->key()) > 0) {
  34. is_sorted = false;
  35. break;
  36. }
  37. if (unfragmented_tombstones->IsKeyPinned()) {
  38. last_start_key = unfragmented_tombstones->key();
  39. } else {
  40. pinned_last_start_key.DecodeFrom(unfragmented_tombstones->key());
  41. last_start_key = pinned_last_start_key.Encode();
  42. }
  43. }
  44. auto ucmp = icmp.user_comparator();
  45. assert(ucmp);
  46. const size_t ts_sz = ucmp->timestamp_size();
  47. bool pad_min_ts_for_end = ts_sz > 0 && !tombstone_end_include_ts;
  48. if (is_sorted && !pad_min_ts_for_end) {
  49. FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
  50. snapshots);
  51. return;
  52. }
  53. // Sort the tombstones before fragmenting them.
  54. std::vector<std::string> keys, values;
  55. keys.reserve(num_unfragmented_tombstones_);
  56. values.reserve(num_unfragmented_tombstones_);
  57. // Reset the counter to zero for the next iteration over keys.
  58. total_tombstone_payload_bytes_ = 0;
  59. for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
  60. unfragmented_tombstones->Next()) {
  61. total_tombstone_payload_bytes_ += unfragmented_tombstones->key().size() +
  62. unfragmented_tombstones->value().size();
  63. keys.emplace_back(unfragmented_tombstones->key().data(),
  64. unfragmented_tombstones->key().size());
  65. Slice value = unfragmented_tombstones->value();
  66. if (pad_min_ts_for_end) {
  67. AppendKeyWithMinTimestamp(&values.emplace_back(), value, ts_sz);
  68. } else {
  69. values.emplace_back(value.data(), value.size());
  70. }
  71. }
  72. if (pad_min_ts_for_end) {
  73. total_tombstone_payload_bytes_ += num_unfragmented_tombstones_ * ts_sz;
  74. }
  75. // VectorIterator implicitly sorts by key during construction.
  76. auto iter = std::make_unique<VectorIterator>(std::move(keys),
  77. std::move(values), &icmp);
  78. FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
  79. }
  80. void FragmentedRangeTombstoneList::FragmentTombstones(
  81. std::unique_ptr<InternalIterator> unfragmented_tombstones,
  82. const InternalKeyComparator& icmp, bool for_compaction,
  83. const std::vector<SequenceNumber>& snapshots) {
  84. Slice cur_start_key(nullptr, 0);
  85. auto cmp = ParsedInternalKeyComparator(&icmp);
  86. // Stores the end keys and sequence numbers of range tombstones with a start
  87. // key less than or equal to cur_start_key. Provides an ordering by end key
  88. // for use in flush_current_tombstones.
  89. std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
  90. size_t ts_sz = icmp.user_comparator()->timestamp_size();
  91. // Given the next start key in unfragmented_tombstones,
  92. // flush_current_tombstones writes every tombstone fragment that starts
  93. // and ends with a key before next_start_key, and starts with a key greater
  94. // than or equal to cur_start_key.
  95. auto flush_current_tombstones = [&](const Slice& next_start_key) {
  96. auto it = cur_end_keys.begin();
  97. bool reached_next_start_key = false;
  98. for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
  99. Slice cur_end_key = it->user_key;
  100. if (icmp.user_comparator()->CompareWithoutTimestamp(cur_start_key,
  101. cur_end_key) == 0) {
  102. // Empty tombstone.
  103. continue;
  104. }
  105. if (icmp.user_comparator()->CompareWithoutTimestamp(next_start_key,
  106. cur_end_key) <= 0) {
  107. // All the end keys in [it, cur_end_keys.end()) are after
  108. // next_start_key, so the tombstones they represent can be used in
  109. // fragments that start with keys greater than or equal to
  110. // next_start_key. However, the end keys we already passed will not be
  111. // used in any more tombstone fragments.
  112. //
  113. // Remove the fully fragmented tombstones and stop iteration after a
  114. // final round of flushing to preserve the tombstones we can create more
  115. // fragments from.
  116. reached_next_start_key = true;
  117. cur_end_keys.erase(cur_end_keys.begin(), it);
  118. cur_end_key = next_start_key;
  119. }
  120. // Flush a range tombstone fragment [cur_start_key, cur_end_key), which
  121. // should not overlap with the last-flushed tombstone fragment.
  122. assert(tombstones_.empty() ||
  123. icmp.user_comparator()->CompareWithoutTimestamp(
  124. tombstones_.back().end_key, cur_start_key) <= 0);
  125. // Sort the sequence numbers of the tombstones being fragmented in
  126. // descending order, and then flush them in that order.
  127. autovector<SequenceNumber> seqnums_to_flush;
  128. autovector<Slice> timestamps_to_flush;
  129. for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
  130. seqnums_to_flush.push_back(flush_it->sequence);
  131. if (ts_sz) {
  132. timestamps_to_flush.push_back(
  133. ExtractTimestampFromUserKey(flush_it->user_key, ts_sz));
  134. }
  135. }
  136. // TODO: bind the two sorting together to be more efficient
  137. std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
  138. std::greater<SequenceNumber>());
  139. if (ts_sz) {
  140. std::sort(timestamps_to_flush.begin(), timestamps_to_flush.end(),
  141. [icmp](const Slice& ts1, const Slice& ts2) {
  142. return icmp.user_comparator()->CompareTimestamp(ts1, ts2) >
  143. 0;
  144. });
  145. }
  146. size_t start_idx = tombstone_seqs_.size();
  147. size_t end_idx = start_idx + seqnums_to_flush.size();
  148. // If user-defined timestamp is enabled, we should not drop tombstones
  149. // from any snapshot stripe. Garbage collection of range tombstones
  150. // happens in CompactionOutputs::AddRangeDels().
  151. if (for_compaction && ts_sz == 0) {
  152. // Drop all tombstone seqnums that are not preserved by a snapshot.
  153. SequenceNumber next_snapshot = kMaxSequenceNumber;
  154. for (auto seq : seqnums_to_flush) {
  155. if (seq <= next_snapshot) {
  156. // This seqnum is visible by a lower snapshot.
  157. tombstone_seqs_.push_back(seq);
  158. auto upper_bound_it =
  159. std::lower_bound(snapshots.begin(), snapshots.end(), seq);
  160. if (upper_bound_it == snapshots.begin()) {
  161. // This seqnum is the topmost one visible by the earliest
  162. // snapshot. None of the seqnums below it will be visible, so we
  163. // can skip them.
  164. break;
  165. }
  166. next_snapshot = *std::prev(upper_bound_it);
  167. }
  168. }
  169. end_idx = tombstone_seqs_.size();
  170. } else {
  171. // The fragmentation is being done for reads, so preserve all seqnums.
  172. tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
  173. seqnums_to_flush.end());
  174. if (ts_sz) {
  175. tombstone_timestamps_.insert(tombstone_timestamps_.end(),
  176. timestamps_to_flush.begin(),
  177. timestamps_to_flush.end());
  178. }
  179. }
  180. assert(start_idx < end_idx);
  181. if (ts_sz) {
  182. std::string start_key_with_max_ts;
  183. AppendUserKeyWithMaxTimestamp(&start_key_with_max_ts, cur_start_key,
  184. ts_sz);
  185. pinned_slices_.emplace_back(std::move(start_key_with_max_ts));
  186. Slice start_key = pinned_slices_.back();
  187. std::string end_key_with_max_ts;
  188. AppendUserKeyWithMaxTimestamp(&end_key_with_max_ts, cur_end_key, ts_sz);
  189. pinned_slices_.emplace_back(std::move(end_key_with_max_ts));
  190. Slice end_key = pinned_slices_.back();
  191. // RangeTombstoneStack expects start_key and end_key to have max
  192. // timestamp.
  193. tombstones_.emplace_back(start_key, end_key, start_idx, end_idx);
  194. } else {
  195. tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx,
  196. end_idx);
  197. }
  198. cur_start_key = cur_end_key;
  199. }
  200. if (!reached_next_start_key) {
  201. // There is a gap between the last flushed tombstone fragment and
  202. // the next tombstone's start key. Remove all the end keys in
  203. // the working set, since we have fully fragmented their corresponding
  204. // tombstones.
  205. cur_end_keys.clear();
  206. }
  207. cur_start_key = next_start_key;
  208. };
  209. pinned_iters_mgr_.StartPinning();
  210. bool no_tombstones = true;
  211. for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
  212. unfragmented_tombstones->Next()) {
  213. const Slice& ikey = unfragmented_tombstones->key();
  214. Slice tombstone_start_key = ExtractUserKey(ikey);
  215. SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey);
  216. if (!unfragmented_tombstones->IsKeyPinned()) {
  217. pinned_slices_.emplace_back(tombstone_start_key.data(),
  218. tombstone_start_key.size());
  219. tombstone_start_key = pinned_slices_.back();
  220. }
  221. no_tombstones = false;
  222. Slice tombstone_end_key = unfragmented_tombstones->value();
  223. if (!unfragmented_tombstones->IsValuePinned()) {
  224. pinned_slices_.emplace_back(tombstone_end_key.data(),
  225. tombstone_end_key.size());
  226. tombstone_end_key = pinned_slices_.back();
  227. }
  228. if (!cur_end_keys.empty() &&
  229. icmp.user_comparator()->CompareWithoutTimestamp(
  230. cur_start_key, tombstone_start_key) != 0) {
  231. // The start key has changed. Flush all tombstones that start before
  232. // this new start key.
  233. flush_current_tombstones(tombstone_start_key);
  234. }
  235. cur_start_key = tombstone_start_key;
  236. cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion);
  237. }
  238. if (!cur_end_keys.empty()) {
  239. ParsedInternalKey last_end_key = *std::prev(cur_end_keys.end());
  240. flush_current_tombstones(last_end_key.user_key);
  241. }
  242. if (!no_tombstones) {
  243. pinned_iters_mgr_.PinIterator(unfragmented_tombstones.release(),
  244. false /* arena */);
  245. }
  246. }
  247. bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
  248. SequenceNumber upper) {
  249. std::call_once(seq_set_init_once_flag_, [this]() {
  250. for (auto s : tombstone_seqs_) {
  251. seq_set_.insert(s);
  252. }
  253. });
  254. auto seq_it = seq_set_.lower_bound(lower);
  255. return seq_it != seq_set_.end() && *seq_it <= upper;
  256. }
  257. FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
  258. FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp,
  259. SequenceNumber _upper_bound, const Slice* ts_upper_bound,
  260. SequenceNumber _lower_bound)
  261. : tombstone_start_cmp_(icmp.user_comparator()),
  262. tombstone_end_cmp_(icmp.user_comparator()),
  263. icmp_(&icmp),
  264. ucmp_(icmp.user_comparator()),
  265. tombstones_(tombstones),
  266. upper_bound_(_upper_bound),
  267. lower_bound_(_lower_bound),
  268. ts_upper_bound_(ts_upper_bound) {
  269. assert(tombstones_ != nullptr);
  270. Invalidate();
  271. }
  272. FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
  273. const std::shared_ptr<FragmentedRangeTombstoneList>& tombstones,
  274. const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
  275. const Slice* ts_upper_bound, SequenceNumber _lower_bound)
  276. : tombstone_start_cmp_(icmp.user_comparator()),
  277. tombstone_end_cmp_(icmp.user_comparator()),
  278. icmp_(&icmp),
  279. ucmp_(icmp.user_comparator()),
  280. tombstones_ref_(tombstones),
  281. tombstones_(tombstones_ref_.get()),
  282. upper_bound_(_upper_bound),
  283. lower_bound_(_lower_bound),
  284. ts_upper_bound_(ts_upper_bound) {
  285. assert(tombstones_ != nullptr);
  286. Invalidate();
  287. }
  288. FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
  289. const std::shared_ptr<FragmentedRangeTombstoneListCache>& tombstones_cache,
  290. const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
  291. const Slice* ts_upper_bound, SequenceNumber _lower_bound)
  292. : tombstone_start_cmp_(icmp.user_comparator()),
  293. tombstone_end_cmp_(icmp.user_comparator()),
  294. icmp_(&icmp),
  295. ucmp_(icmp.user_comparator()),
  296. tombstones_cache_ref_(tombstones_cache),
  297. tombstones_(tombstones_cache_ref_->tombstones.get()),
  298. upper_bound_(_upper_bound),
  299. lower_bound_(_lower_bound) {
  300. assert(tombstones_ != nullptr);
  301. if (!ts_upper_bound || ts_upper_bound->empty()) {
  302. ts_upper_bound_ = nullptr;
  303. } else {
  304. ts_upper_bound_ = ts_upper_bound;
  305. }
  306. Invalidate();
  307. }
  308. void FragmentedRangeTombstoneIterator::SeekToFirst() {
  309. pos_ = tombstones_->begin();
  310. seq_pos_ = tombstones_->seq_begin();
  311. }
  312. void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
  313. if (tombstones_->empty()) {
  314. Invalidate();
  315. return;
  316. }
  317. pos_ = tombstones_->begin();
  318. SetMaxVisibleSeqAndTimestamp();
  319. ScanForwardToVisibleTombstone();
  320. }
  321. void FragmentedRangeTombstoneIterator::SeekToLast() {
  322. pos_ = std::prev(tombstones_->end());
  323. seq_pos_ = std::prev(tombstones_->seq_end());
  324. }
  325. void FragmentedRangeTombstoneIterator::SeekToTopLast() {
  326. if (tombstones_->empty()) {
  327. Invalidate();
  328. return;
  329. }
  330. pos_ = std::prev(tombstones_->end());
  331. SetMaxVisibleSeqAndTimestamp();
  332. ScanBackwardToVisibleTombstone();
  333. }
  334. // @param `target` is a user key, with timestamp if user-defined timestamp is
  335. // enabled.
  336. void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
  337. if (tombstones_->empty()) {
  338. Invalidate();
  339. return;
  340. }
  341. SeekToCoveringTombstone(target);
  342. ScanForwardToVisibleTombstone();
  343. }
  344. void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) {
  345. if (tombstones_->empty()) {
  346. Invalidate();
  347. return;
  348. }
  349. SeekForPrevToCoveringTombstone(target);
  350. ScanBackwardToVisibleTombstone();
  351. }
  352. void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
  353. const Slice& target) {
  354. pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
  355. tombstone_end_cmp_);
  356. if (pos_ == tombstones_->end()) {
  357. // All tombstones end before target.
  358. seq_pos_ = tombstones_->seq_end();
  359. return;
  360. }
  361. SetMaxVisibleSeqAndTimestamp();
  362. }
  363. void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
  364. const Slice& target) {
  365. if (tombstones_->empty()) {
  366. Invalidate();
  367. return;
  368. }
  369. pos_ = std::upper_bound(tombstones_->begin(), tombstones_->end(), target,
  370. tombstone_start_cmp_);
  371. if (pos_ == tombstones_->begin()) {
  372. // All tombstones start after target.
  373. Invalidate();
  374. return;
  375. }
  376. --pos_;
  377. SetMaxVisibleSeqAndTimestamp();
  378. }
  379. void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
  380. while (pos_ != tombstones_->end() &&
  381. (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
  382. *seq_pos_ < lower_bound_)) {
  383. ++pos_;
  384. if (pos_ == tombstones_->end()) {
  385. Invalidate();
  386. return;
  387. }
  388. SetMaxVisibleSeqAndTimestamp();
  389. }
  390. }
  391. void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
  392. while (pos_ != tombstones_->end() &&
  393. (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
  394. *seq_pos_ < lower_bound_)) {
  395. if (pos_ == tombstones_->begin()) {
  396. Invalidate();
  397. return;
  398. }
  399. --pos_;
  400. SetMaxVisibleSeqAndTimestamp();
  401. }
  402. }
  403. void FragmentedRangeTombstoneIterator::Next() {
  404. ++seq_pos_;
  405. if (seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
  406. ++pos_;
  407. }
  408. }
  409. void FragmentedRangeTombstoneIterator::TopNext() {
  410. ++pos_;
  411. if (pos_ == tombstones_->end()) {
  412. return;
  413. }
  414. SetMaxVisibleSeqAndTimestamp();
  415. ScanForwardToVisibleTombstone();
  416. }
  417. void FragmentedRangeTombstoneIterator::Prev() {
  418. if (seq_pos_ == tombstones_->seq_begin()) {
  419. Invalidate();
  420. return;
  421. }
  422. --seq_pos_;
  423. if (pos_ == tombstones_->end() ||
  424. seq_pos_ == tombstones_->seq_iter(pos_->seq_start_idx - 1)) {
  425. --pos_;
  426. }
  427. }
  428. void FragmentedRangeTombstoneIterator::TopPrev() {
  429. if (pos_ == tombstones_->begin()) {
  430. Invalidate();
  431. return;
  432. }
  433. --pos_;
  434. SetMaxVisibleSeqAndTimestamp();
  435. ScanBackwardToVisibleTombstone();
  436. }
  437. bool FragmentedRangeTombstoneIterator::Valid() const {
  438. return tombstones_ != nullptr && pos_ != tombstones_->end();
  439. }
  440. SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
  441. const Slice& target_user_key) {
  442. SeekToCoveringTombstone(target_user_key);
  443. return ValidPos() && ucmp_->CompareWithoutTimestamp(start_key(),
  444. target_user_key) <= 0
  445. ? seq()
  446. : 0;
  447. }
  448. std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
  449. FragmentedRangeTombstoneIterator::SplitBySnapshot(
  450. const std::vector<SequenceNumber>& snapshots) {
  451. std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
  452. splits;
  453. SequenceNumber lower = 0;
  454. SequenceNumber upper;
  455. for (size_t i = 0; i <= snapshots.size(); i++) {
  456. if (i >= snapshots.size()) {
  457. upper = kMaxSequenceNumber;
  458. } else {
  459. upper = snapshots[i];
  460. }
  461. if (tombstones_->ContainsRange(lower, upper)) {
  462. splits.emplace(upper,
  463. std::make_unique<FragmentedRangeTombstoneIterator>(
  464. tombstones_, *icmp_, upper, ts_upper_bound_, lower));
  465. }
  466. lower = upper + 1;
  467. }
  468. return splits;
  469. }
  470. } // namespace ROCKSDB_NAMESPACE