compaction_iterator_test.cc 36 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <string>
  6. #include <vector>
  7. #include "db/compaction/compaction_iterator.h"
  8. #include "port/port.h"
  9. #include "test_util/testharness.h"
  10. #include "test_util/testutil.h"
  11. #include "util/string_util.h"
  12. #include "utilities/merge_operators.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. // Expects no merging attempts.
  15. class NoMergingMergeOp : public MergeOperator {
  16. public:
  17. bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
  18. MergeOperationOutput* /*merge_out*/) const override {
  19. ADD_FAILURE();
  20. return false;
  21. }
  22. bool PartialMergeMulti(const Slice& /*key*/,
  23. const std::deque<Slice>& /*operand_list*/,
  24. std::string* /*new_value*/,
  25. Logger* /*logger*/) const override {
  26. ADD_FAILURE();
  27. return false;
  28. }
  29. const char* Name() const override {
  30. return "CompactionIteratorTest NoMergingMergeOp";
  31. }
  32. };
  33. // Compaction filter that gets stuck when it sees a particular key,
  34. // then gets unstuck when told to.
  35. // Always returns Decition::kRemove.
  36. class StallingFilter : public CompactionFilter {
  37. public:
  38. Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
  39. const Slice& /*existing_value*/, std::string* /*new_value*/,
  40. std::string* /*skip_until*/) const override {
  41. int k = std::atoi(key.ToString().c_str());
  42. last_seen.store(k);
  43. while (k >= stall_at.load()) {
  44. std::this_thread::yield();
  45. }
  46. return Decision::kRemove;
  47. }
  48. const char* Name() const override {
  49. return "CompactionIteratorTest StallingFilter";
  50. }
  51. // Wait until the filter sees a key >= k and stalls at that key.
  52. // If `exact`, asserts that the seen key is equal to k.
  53. void WaitForStall(int k, bool exact = true) {
  54. stall_at.store(k);
  55. while (last_seen.load() < k) {
  56. std::this_thread::yield();
  57. }
  58. if (exact) {
  59. EXPECT_EQ(k, last_seen.load());
  60. }
  61. }
  62. // Filter will stall on key >= stall_at. Advance stall_at to unstall.
  63. mutable std::atomic<int> stall_at{0};
  64. // Last key the filter was called with.
  65. mutable std::atomic<int> last_seen{0};
  66. };
  67. // Compaction filter that filter out all keys.
  68. class FilterAllKeysCompactionFilter : public CompactionFilter {
  69. public:
  70. Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
  71. const Slice& /*existing_value*/, std::string* /*new_value*/,
  72. std::string* /*skip_until*/) const override {
  73. return Decision::kRemove;
  74. }
  75. const char* Name() const override { return "AllKeysCompactionFilter"; }
  76. };
  77. class LoggingForwardVectorIterator : public InternalIterator {
  78. public:
  79. struct Action {
  80. enum class Type {
  81. SEEK_TO_FIRST,
  82. SEEK,
  83. NEXT,
  84. };
  85. Type type;
  86. std::string arg;
  87. explicit Action(Type _type, std::string _arg = "")
  88. : type(_type), arg(_arg) {}
  89. bool operator==(const Action& rhs) const {
  90. return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
  91. }
  92. };
  93. LoggingForwardVectorIterator(const std::vector<std::string>& keys,
  94. const std::vector<std::string>& values)
  95. : keys_(keys), values_(values), current_(keys.size()) {
  96. assert(keys_.size() == values_.size());
  97. }
  98. bool Valid() const override { return current_ < keys_.size(); }
  99. void SeekToFirst() override {
  100. log.emplace_back(Action::Type::SEEK_TO_FIRST);
  101. current_ = 0;
  102. }
  103. void SeekToLast() override { assert(false); }
  104. void Seek(const Slice& target) override {
  105. log.emplace_back(Action::Type::SEEK, target.ToString());
  106. current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
  107. keys_.begin();
  108. }
  109. void SeekForPrev(const Slice& /*target*/) override { assert(false); }
  110. void Next() override {
  111. assert(Valid());
  112. log.emplace_back(Action::Type::NEXT);
  113. current_++;
  114. }
  115. void Prev() override { assert(false); }
  116. Slice key() const override {
  117. assert(Valid());
  118. return Slice(keys_[current_]);
  119. }
  120. Slice value() const override {
  121. assert(Valid());
  122. return Slice(values_[current_]);
  123. }
  124. Status status() const override { return Status::OK(); }
  125. std::vector<Action> log;
  126. private:
  127. std::vector<std::string> keys_;
  128. std::vector<std::string> values_;
  129. size_t current_;
  130. };
  131. class FakeCompaction : public CompactionIterator::CompactionProxy {
  132. public:
  133. FakeCompaction() = default;
  134. int level(size_t /*compaction_input_level*/) const override { return 0; }
  135. bool KeyNotExistsBeyondOutputLevel(
  136. const Slice& /*user_key*/,
  137. std::vector<size_t>* /*level_ptrs*/) const override {
  138. return is_bottommost_level || key_not_exists_beyond_output_level;
  139. }
  140. bool bottommost_level() const override { return is_bottommost_level; }
  141. int number_levels() const override { return 1; }
  142. Slice GetLargestUserKey() const override {
  143. return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
  144. }
  145. bool allow_ingest_behind() const override { return false; }
  146. bool preserve_deletes() const override { return false; }
  147. bool key_not_exists_beyond_output_level = false;
  148. bool is_bottommost_level = false;
  149. };
  150. // A simplifed snapshot checker which assumes each snapshot has a global
  151. // last visible sequence.
  152. class TestSnapshotChecker : public SnapshotChecker {
  153. public:
  154. explicit TestSnapshotChecker(
  155. SequenceNumber last_committed_sequence,
  156. const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {{}})
  157. : last_committed_sequence_(last_committed_sequence),
  158. snapshots_(snapshots) {}
  159. SnapshotCheckerResult CheckInSnapshot(
  160. SequenceNumber seq, SequenceNumber snapshot_seq) const override {
  161. if (snapshot_seq == kMaxSequenceNumber) {
  162. return seq <= last_committed_sequence_
  163. ? SnapshotCheckerResult::kInSnapshot
  164. : SnapshotCheckerResult::kNotInSnapshot;
  165. }
  166. assert(snapshots_.count(snapshot_seq) > 0);
  167. return seq <= snapshots_.at(snapshot_seq)
  168. ? SnapshotCheckerResult::kInSnapshot
  169. : SnapshotCheckerResult::kNotInSnapshot;
  170. }
  171. private:
  172. SequenceNumber last_committed_sequence_;
  173. // A map of valid snapshot to last visible sequence to the snapshot.
  174. std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
  175. };
  176. // Test param:
  177. // bool: whether to pass snapshot_checker to compaction iterator.
  178. class CompactionIteratorTest : public testing::TestWithParam<bool> {
  179. public:
  180. CompactionIteratorTest()
  181. : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
  182. void InitIterators(
  183. const std::vector<std::string>& ks, const std::vector<std::string>& vs,
  184. const std::vector<std::string>& range_del_ks,
  185. const std::vector<std::string>& range_del_vs,
  186. SequenceNumber last_sequence,
  187. SequenceNumber last_committed_sequence = kMaxSequenceNumber,
  188. MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
  189. bool bottommost_level = false,
  190. SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
  191. std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
  192. new test::VectorIterator(range_del_ks, range_del_vs));
  193. auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
  194. std::move(unfragmented_range_del_iter), icmp_);
  195. std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
  196. new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
  197. kMaxSequenceNumber));
  198. range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
  199. range_del_agg_->AddTombstones(std::move(range_del_iter));
  200. std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
  201. if (filter || bottommost_level) {
  202. compaction_proxy_ = new FakeCompaction();
  203. compaction_proxy_->is_bottommost_level = bottommost_level;
  204. compaction.reset(compaction_proxy_);
  205. }
  206. bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
  207. if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
  208. snapshot_checker_.reset(
  209. new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
  210. }
  211. merge_helper_.reset(
  212. new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
  213. 0 /*latest_snapshot*/, snapshot_checker_.get(),
  214. 0 /*level*/, nullptr /*statistics*/, &shutting_down_));
  215. iter_.reset(new LoggingForwardVectorIterator(ks, vs));
  216. iter_->SeekToFirst();
  217. c_iter_.reset(new CompactionIterator(
  218. iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
  219. earliest_write_conflict_snapshot, snapshot_checker_.get(),
  220. Env::Default(), false /* report_detailed_time */, false,
  221. range_del_agg_.get(), std::move(compaction), filter, &shutting_down_));
  222. }
  223. void AddSnapshot(SequenceNumber snapshot,
  224. SequenceNumber last_visible_seq = kMaxSequenceNumber) {
  225. snapshots_.push_back(snapshot);
  226. snapshot_map_[snapshot] = last_visible_seq;
  227. }
  228. virtual bool UseSnapshotChecker() const { return false; }
  229. void RunTest(
  230. const std::vector<std::string>& input_keys,
  231. const std::vector<std::string>& input_values,
  232. const std::vector<std::string>& expected_keys,
  233. const std::vector<std::string>& expected_values,
  234. SequenceNumber last_committed_seq = kMaxSequenceNumber,
  235. MergeOperator* merge_operator = nullptr,
  236. CompactionFilter* compaction_filter = nullptr,
  237. bool bottommost_level = false,
  238. SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
  239. InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
  240. last_committed_seq, merge_operator, compaction_filter,
  241. bottommost_level, earliest_write_conflict_snapshot);
  242. c_iter_->SeekToFirst();
  243. for (size_t i = 0; i < expected_keys.size(); i++) {
  244. std::string info = "i = " + ToString(i);
  245. ASSERT_TRUE(c_iter_->Valid()) << info;
  246. ASSERT_OK(c_iter_->status()) << info;
  247. ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
  248. ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
  249. c_iter_->Next();
  250. }
  251. ASSERT_FALSE(c_iter_->Valid());
  252. }
  253. const Comparator* cmp_;
  254. const InternalKeyComparator icmp_;
  255. std::vector<SequenceNumber> snapshots_;
  256. // A map of valid snapshot to last visible sequence to the snapshot.
  257. std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
  258. std::unique_ptr<MergeHelper> merge_helper_;
  259. std::unique_ptr<LoggingForwardVectorIterator> iter_;
  260. std::unique_ptr<CompactionIterator> c_iter_;
  261. std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
  262. std::unique_ptr<SnapshotChecker> snapshot_checker_;
  263. std::atomic<bool> shutting_down_{false};
  264. FakeCompaction* compaction_proxy_;
  265. };
  266. // It is possible that the output of the compaction iterator is empty even if
  267. // the input is not.
  268. TEST_P(CompactionIteratorTest, EmptyResult) {
  269. InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
  270. test::KeyStr("a", 3, kTypeValue)},
  271. {"", "val"}, {}, {}, 5);
  272. c_iter_->SeekToFirst();
  273. ASSERT_FALSE(c_iter_->Valid());
  274. }
  275. // If there is a corruption after a single deletion, the corrupted key should
  276. // be preserved.
  277. TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
  278. InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
  279. test::KeyStr("a", 3, kTypeValue, true),
  280. test::KeyStr("b", 10, kTypeValue)},
  281. {"", "val", "val2"}, {}, {}, 10);
  282. c_iter_->SeekToFirst();
  283. ASSERT_TRUE(c_iter_->Valid());
  284. ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
  285. c_iter_->key().ToString());
  286. c_iter_->Next();
  287. ASSERT_TRUE(c_iter_->Valid());
  288. ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
  289. c_iter_->Next();
  290. ASSERT_TRUE(c_iter_->Valid());
  291. ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
  292. c_iter_->Next();
  293. ASSERT_FALSE(c_iter_->Valid());
  294. }
  295. TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
  296. InitIterators({test::KeyStr("morning", 5, kTypeValue),
  297. test::KeyStr("morning", 2, kTypeValue),
  298. test::KeyStr("night", 3, kTypeValue)},
  299. {"zao", "zao", "wan"},
  300. {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
  301. c_iter_->SeekToFirst();
  302. ASSERT_TRUE(c_iter_->Valid());
  303. ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
  304. c_iter_->Next();
  305. ASSERT_TRUE(c_iter_->Valid());
  306. ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
  307. c_iter_->Next();
  308. ASSERT_FALSE(c_iter_->Valid());
  309. }
  310. TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
  311. AddSnapshot(10);
  312. std::vector<std::string> ks1;
  313. ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
  314. std::vector<std::string> vs1{"mz"};
  315. std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
  316. test::KeyStr("morning", 5, kTypeValue),
  317. test::KeyStr("night", 40, kTypeValue),
  318. test::KeyStr("night", 20, kTypeValue)};
  319. std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
  320. InitIterators(ks2, vs2, ks1, vs1, 40);
  321. c_iter_->SeekToFirst();
  322. ASSERT_TRUE(c_iter_->Valid());
  323. ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
  324. c_iter_->Next();
  325. ASSERT_TRUE(c_iter_->Valid());
  326. ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
  327. c_iter_->Next();
  328. ASSERT_FALSE(c_iter_->Valid());
  329. }
  330. TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
  331. class Filter : public CompactionFilter {
  332. Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
  333. const Slice& existing_value, std::string* /*new_value*/,
  334. std::string* skip_until) const override {
  335. std::string k = key.ToString();
  336. std::string v = existing_value.ToString();
  337. // See InitIterators() call below for the sequence of keys and their
  338. // filtering decisions. Here we closely assert that compaction filter is
  339. // called with the expected keys and only them, and with the right values.
  340. if (k == "a") {
  341. EXPECT_EQ(ValueType::kValue, t);
  342. EXPECT_EQ("av50", v);
  343. return Decision::kKeep;
  344. }
  345. if (k == "b") {
  346. EXPECT_EQ(ValueType::kValue, t);
  347. EXPECT_EQ("bv60", v);
  348. *skip_until = "d+";
  349. return Decision::kRemoveAndSkipUntil;
  350. }
  351. if (k == "e") {
  352. EXPECT_EQ(ValueType::kMergeOperand, t);
  353. EXPECT_EQ("em71", v);
  354. return Decision::kKeep;
  355. }
  356. if (k == "f") {
  357. if (v == "fm65") {
  358. EXPECT_EQ(ValueType::kMergeOperand, t);
  359. *skip_until = "f";
  360. } else {
  361. EXPECT_EQ("fm30", v);
  362. EXPECT_EQ(ValueType::kMergeOperand, t);
  363. *skip_until = "g+";
  364. }
  365. return Decision::kRemoveAndSkipUntil;
  366. }
  367. if (k == "h") {
  368. EXPECT_EQ(ValueType::kValue, t);
  369. EXPECT_EQ("hv91", v);
  370. return Decision::kKeep;
  371. }
  372. if (k == "i") {
  373. EXPECT_EQ(ValueType::kMergeOperand, t);
  374. EXPECT_EQ("im95", v);
  375. *skip_until = "z";
  376. return Decision::kRemoveAndSkipUntil;
  377. }
  378. ADD_FAILURE();
  379. return Decision::kKeep;
  380. }
  381. const char* Name() const override {
  382. return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
  383. }
  384. };
  385. NoMergingMergeOp merge_op;
  386. Filter filter;
  387. InitIterators(
  388. {test::KeyStr("a", 50, kTypeValue), // keep
  389. test::KeyStr("a", 45, kTypeMerge),
  390. test::KeyStr("b", 60, kTypeValue), // skip to "d+"
  391. test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
  392. test::KeyStr("d", 70, kTypeMerge),
  393. test::KeyStr("e", 71, kTypeMerge), // keep
  394. test::KeyStr("f", 65, kTypeMerge), // skip to "f", aka keep
  395. test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
  396. test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
  397. test::KeyStr("h", 91, kTypeValue), // keep
  398. test::KeyStr("i", 95, kTypeMerge), // skip to "z"
  399. test::KeyStr("j", 99, kTypeValue)},
  400. {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
  401. "fv25", "gv90", "hv91", "im95", "jv99"},
  402. {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
  403. // Compaction should output just "a", "e" and "h" keys.
  404. c_iter_->SeekToFirst();
  405. ASSERT_TRUE(c_iter_->Valid());
  406. ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
  407. ASSERT_EQ("av50", c_iter_->value().ToString());
  408. c_iter_->Next();
  409. ASSERT_TRUE(c_iter_->Valid());
  410. ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
  411. ASSERT_EQ("em71", c_iter_->value().ToString());
  412. c_iter_->Next();
  413. ASSERT_TRUE(c_iter_->Valid());
  414. ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
  415. ASSERT_EQ("hv91", c_iter_->value().ToString());
  416. c_iter_->Next();
  417. ASSERT_FALSE(c_iter_->Valid());
  418. // Check that the compaction iterator did the correct sequence of calls on
  419. // the underlying iterator.
  420. using A = LoggingForwardVectorIterator::Action;
  421. using T = A::Type;
  422. std::vector<A> expected_actions = {
  423. A(T::SEEK_TO_FIRST),
  424. A(T::NEXT),
  425. A(T::NEXT),
  426. A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
  427. A(T::NEXT),
  428. A(T::NEXT),
  429. A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
  430. A(T::NEXT),
  431. A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
  432. ASSERT_EQ(expected_actions, iter_->log);
  433. }
  434. TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
  435. NoMergingMergeOp merge_op;
  436. StallingFilter filter;
  437. InitIterators(
  438. {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
  439. test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
  440. {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
  441. &merge_op, &filter);
  442. // Don't leave tombstones (kTypeDeletion) for filtered keys.
  443. compaction_proxy_->key_not_exists_beyond_output_level = true;
  444. std::atomic<bool> seek_done{false};
  445. ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
  446. c_iter_->SeekToFirst();
  447. EXPECT_FALSE(c_iter_->Valid());
  448. EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
  449. seek_done.store(true);
  450. });
  451. // Let key 1 through.
  452. filter.WaitForStall(1);
  453. // Shutdown during compaction filter call for key 2.
  454. filter.WaitForStall(2);
  455. shutting_down_.store(true);
  456. EXPECT_FALSE(seek_done.load());
  457. // Unstall filter and wait for SeekToFirst() to return.
  458. filter.stall_at.store(3);
  459. compaction_thread.join();
  460. assert(seek_done.load());
  461. // Check that filter was never called again.
  462. EXPECT_EQ(2, filter.last_seen.load());
  463. }
  464. // Same as ShuttingDownInFilter, but shutdown happens during filter call for
  465. // a merge operand, not for a value.
  466. TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
  467. NoMergingMergeOp merge_op;
  468. StallingFilter filter;
  469. InitIterators(
  470. {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
  471. test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
  472. {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
  473. &merge_op, &filter);
  474. compaction_proxy_->key_not_exists_beyond_output_level = true;
  475. std::atomic<bool> seek_done{false};
  476. ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
  477. c_iter_->SeekToFirst();
  478. ASSERT_FALSE(c_iter_->Valid());
  479. ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
  480. seek_done.store(true);
  481. });
  482. // Let key 1 through.
  483. filter.WaitForStall(1);
  484. // Shutdown during compaction filter call for key 2.
  485. filter.WaitForStall(2);
  486. shutting_down_.store(true);
  487. EXPECT_FALSE(seek_done.load());
  488. // Unstall filter and wait for SeekToFirst() to return.
  489. filter.stall_at.store(3);
  490. compaction_thread.join();
  491. assert(seek_done.load());
  492. // Check that filter was never called again.
  493. EXPECT_EQ(2, filter.last_seen.load());
  494. }
  495. TEST_P(CompactionIteratorTest, SingleMergeOperand) {
  496. class Filter : public CompactionFilter {
  497. Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
  498. const Slice& existing_value, std::string* /*new_value*/,
  499. std::string* /*skip_until*/) const override {
  500. std::string k = key.ToString();
  501. std::string v = existing_value.ToString();
  502. // See InitIterators() call below for the sequence of keys and their
  503. // filtering decisions. Here we closely assert that compaction filter is
  504. // called with the expected keys and only them, and with the right values.
  505. if (k == "a") {
  506. EXPECT_EQ(ValueType::kMergeOperand, t);
  507. EXPECT_EQ("av1", v);
  508. return Decision::kKeep;
  509. } else if (k == "b") {
  510. EXPECT_EQ(ValueType::kMergeOperand, t);
  511. return Decision::kKeep;
  512. } else if (k == "c") {
  513. return Decision::kKeep;
  514. }
  515. ADD_FAILURE();
  516. return Decision::kKeep;
  517. }
  518. const char* Name() const override {
  519. return "CompactionIteratorTest.SingleMergeOperand::Filter";
  520. }
  521. };
  522. class SingleMergeOp : public MergeOperator {
  523. public:
  524. bool FullMergeV2(const MergeOperationInput& merge_in,
  525. MergeOperationOutput* merge_out) const override {
  526. // See InitIterators() call below for why "c" is the only key for which
  527. // FullMergeV2 should be called.
  528. EXPECT_EQ("c", merge_in.key.ToString());
  529. std::string temp_value;
  530. if (merge_in.existing_value != nullptr) {
  531. temp_value = merge_in.existing_value->ToString();
  532. }
  533. for (auto& operand : merge_in.operand_list) {
  534. temp_value.append(operand.ToString());
  535. }
  536. merge_out->new_value = temp_value;
  537. return true;
  538. }
  539. bool PartialMergeMulti(const Slice& key,
  540. const std::deque<Slice>& operand_list,
  541. std::string* new_value,
  542. Logger* /*logger*/) const override {
  543. std::string string_key = key.ToString();
  544. EXPECT_TRUE(string_key == "a" || string_key == "b");
  545. if (string_key == "a") {
  546. EXPECT_EQ(1, operand_list.size());
  547. } else if (string_key == "b") {
  548. EXPECT_EQ(2, operand_list.size());
  549. }
  550. std::string temp_value;
  551. for (auto& operand : operand_list) {
  552. temp_value.append(operand.ToString());
  553. }
  554. swap(temp_value, *new_value);
  555. return true;
  556. }
  557. const char* Name() const override {
  558. return "CompactionIteratorTest SingleMergeOp";
  559. }
  560. bool AllowSingleOperand() const override { return true; }
  561. };
  562. SingleMergeOp merge_op;
  563. Filter filter;
  564. InitIterators(
  565. // a should invoke PartialMergeMulti with a single merge operand.
  566. {test::KeyStr("a", 50, kTypeMerge),
  567. // b should invoke PartialMergeMulti with two operands.
  568. test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
  569. // c should invoke FullMerge due to kTypeValue at the beginning.
  570. test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
  571. {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
  572. kMaxSequenceNumber, &merge_op, &filter);
  573. c_iter_->SeekToFirst();
  574. ASSERT_TRUE(c_iter_->Valid());
  575. ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
  576. ASSERT_EQ("av1", c_iter_->value().ToString());
  577. c_iter_->Next();
  578. ASSERT_TRUE(c_iter_->Valid());
  579. ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
  580. c_iter_->Next();
  581. ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
  582. }
  583. // In bottommost level, values earlier than earliest snapshot can be output
  584. // with sequence = 0.
  585. TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
  586. AddSnapshot(1);
  587. RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
  588. {"v1", "v2"},
  589. {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
  590. {"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/,
  591. nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
  592. true /*bottommost_level*/);
  593. }
  594. // In bottommost level, deletions earlier than earliest snapshot can be removed
  595. // permanently.
  596. TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
  597. AddSnapshot(1);
  598. RunTest({test::KeyStr("a", 1, kTypeDeletion),
  599. test::KeyStr("b", 3, kTypeDeletion),
  600. test::KeyStr("b", 1, kTypeValue)},
  601. {"", "", ""},
  602. {test::KeyStr("b", 3, kTypeDeletion),
  603. test::KeyStr("b", 0, kTypeValue)},
  604. {"", ""},
  605. kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
  606. nullptr /*compaction_filter*/, true /*bottommost_level*/);
  607. }
  608. // In bottommost level, single deletions earlier than earliest snapshot can be
  609. // removed permanently.
  610. TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
  611. AddSnapshot(1);
  612. RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
  613. test::KeyStr("b", 2, kTypeSingleDeletion)},
  614. {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
  615. kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
  616. nullptr /*compaction_filter*/, true /*bottommost_level*/);
  617. }
  618. INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
  619. testing::Values(true, false));
  620. // Tests how CompactionIterator work together with SnapshotChecker.
  621. class CompactionIteratorWithSnapshotCheckerTest
  622. : public CompactionIteratorTest {
  623. public:
  624. bool UseSnapshotChecker() const override { return true; }
  625. };
  626. // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
  627. // while committed version of these keys should get compacted as usual.
  628. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  629. PreserveUncommittedKeys_Value) {
  630. RunTest(
  631. {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
  632. test::KeyStr("foo", 1, kTypeValue)},
  633. {"v3", "v2", "v1"},
  634. {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
  635. {"v3", "v2"}, 2 /*last_committed_seq*/);
  636. }
  637. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  638. PreserveUncommittedKeys_Deletion) {
  639. RunTest({test::KeyStr("foo", 2, kTypeDeletion),
  640. test::KeyStr("foo", 1, kTypeValue)},
  641. {"", "v1"},
  642. {test::KeyStr("foo", 2, kTypeDeletion),
  643. test::KeyStr("foo", 1, kTypeValue)},
  644. {"", "v1"}, 1 /*last_committed_seq*/);
  645. }
  646. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  647. PreserveUncommittedKeys_Merge) {
  648. auto merge_op = MergeOperators::CreateStringAppendOperator();
  649. RunTest(
  650. {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
  651. test::KeyStr("foo", 1, kTypeValue)},
  652. {"v3", "v2", "v1"},
  653. {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
  654. {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
  655. }
  656. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  657. PreserveUncommittedKeys_SingleDelete) {
  658. RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
  659. test::KeyStr("foo", 1, kTypeValue)},
  660. {"", "v1"},
  661. {test::KeyStr("foo", 2, kTypeSingleDeletion),
  662. test::KeyStr("foo", 1, kTypeValue)},
  663. {"", "v1"}, 1 /*last_committed_seq*/);
  664. }
  665. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  666. PreserveUncommittedKeys_BlobIndex) {
  667. RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
  668. test::KeyStr("foo", 2, kTypeBlobIndex),
  669. test::KeyStr("foo", 1, kTypeBlobIndex)},
  670. {"v3", "v2", "v1"},
  671. {test::KeyStr("foo", 3, kTypeBlobIndex),
  672. test::KeyStr("foo", 2, kTypeBlobIndex)},
  673. {"v3", "v2"}, 2 /*last_committed_seq*/);
  674. }
  675. // Test compaction iterator dedup keys visible to the same snapshot.
  676. TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
  677. AddSnapshot(2, 1);
  678. RunTest(
  679. {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
  680. test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
  681. {"v4", "v3", "v2", "v1"},
  682. {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
  683. test::KeyStr("foo", 1, kTypeValue)},
  684. {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
  685. }
  686. TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
  687. AddSnapshot(2, 1);
  688. RunTest(
  689. {test::KeyStr("foo", 4, kTypeValue),
  690. test::KeyStr("foo", 3, kTypeDeletion),
  691. test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
  692. {"v4", "", "v2", "v1"},
  693. {test::KeyStr("foo", 4, kTypeValue),
  694. test::KeyStr("foo", 3, kTypeDeletion),
  695. test::KeyStr("foo", 1, kTypeValue)},
  696. {"v4", "", "v1"}, 3 /*last_committed_seq*/);
  697. }
  698. TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
  699. AddSnapshot(2, 1);
  700. AddSnapshot(4, 3);
  701. auto merge_op = MergeOperators::CreateStringAppendOperator();
  702. RunTest(
  703. {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
  704. test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
  705. test::KeyStr("foo", 1, kTypeValue)},
  706. {"v5", "v4", "v3", "v2", "v1"},
  707. {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
  708. test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
  709. {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
  710. }
  711. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  712. DedupSameSnapshot_SingleDeletion) {
  713. AddSnapshot(2, 1);
  714. RunTest(
  715. {test::KeyStr("foo", 4, kTypeValue),
  716. test::KeyStr("foo", 3, kTypeSingleDeletion),
  717. test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
  718. {"v4", "", "v2", "v1"},
  719. {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
  720. {"v4", "v1"}, 3 /*last_committed_seq*/);
  721. }
  722. TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
  723. AddSnapshot(2, 1);
  724. RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
  725. test::KeyStr("foo", 3, kTypeBlobIndex),
  726. test::KeyStr("foo", 2, kTypeBlobIndex),
  727. test::KeyStr("foo", 1, kTypeBlobIndex)},
  728. {"v4", "v3", "v2", "v1"},
  729. {test::KeyStr("foo", 4, kTypeBlobIndex),
  730. test::KeyStr("foo", 3, kTypeBlobIndex),
  731. test::KeyStr("foo", 1, kTypeBlobIndex)},
  732. {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
  733. }
  734. // At bottom level, sequence numbers can be zero out, and deletions can be
  735. // removed, but only when they are visible to earliest snapshot.
  736. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  737. NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
  738. AddSnapshot(2, 1);
  739. RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
  740. test::KeyStr("c", 3, kTypeValue)},
  741. {"v1", "v2", "v3"},
  742. {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
  743. test::KeyStr("c", 3, kTypeValue)},
  744. {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/,
  745. nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
  746. true /*bottommost_level*/);
  747. }
  748. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  749. NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
  750. AddSnapshot(2, 1);
  751. RunTest(
  752. {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
  753. test::KeyStr("c", 3, kTypeDeletion)},
  754. {"", "", ""},
  755. {},
  756. {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
  757. nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
  758. true /*bottommost_level*/);
  759. }
  760. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  761. NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
  762. AddSnapshot(2,1);
  763. RunTest(
  764. {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue),
  765. test::KeyStr("b", 3, kTypeValue)},
  766. {"", "", ""},
  767. {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue),
  768. test::KeyStr("b", 3, kTypeValue)},
  769. {"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/,
  770. nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
  771. true /*bottommost_level*/);
  772. }
  773. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  774. NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
  775. AddSnapshot(2, 1);
  776. RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
  777. test::KeyStr("b", 2, kTypeSingleDeletion),
  778. test::KeyStr("c", 3, kTypeSingleDeletion)},
  779. {"", "", ""},
  780. {test::KeyStr("b", 2, kTypeSingleDeletion),
  781. test::KeyStr("c", 3, kTypeSingleDeletion)},
  782. {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
  783. nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
  784. true /*bottommost_level*/);
  785. }
  786. // Single delete should not cancel out values that not visible to the
  787. // same set of snapshots
  788. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  789. SingleDeleteAcrossSnapshotBoundary) {
  790. AddSnapshot(2, 1);
  791. RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
  792. test::KeyStr("a", 1, kTypeValue)},
  793. {"", "v1"},
  794. {test::KeyStr("a", 2, kTypeSingleDeletion),
  795. test::KeyStr("a", 1, kTypeValue)},
  796. {"", "v1"}, 2 /*last_committed_seq*/);
  797. }
  798. // Single delete should be kept in case it is not visible to the
  799. // earliest write conflict snapshot. If a single delete is kept for this reason,
  800. // corresponding value can be trimmed to save space.
  801. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  802. KeepSingleDeletionForWriteConflictChecking) {
  803. AddSnapshot(2, 0);
  804. RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
  805. test::KeyStr("a", 1, kTypeValue)},
  806. {"", "v1"},
  807. {test::KeyStr("a", 2, kTypeSingleDeletion),
  808. test::KeyStr("a", 1, kTypeValue)},
  809. {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
  810. nullptr /*compaction_filter*/, false /*bottommost_level*/,
  811. 2 /*earliest_write_conflict_snapshot*/);
  812. }
  813. // Compaction filter should keep uncommitted key as-is, and
  814. // * Convert the latest velue to deletion, and/or
  815. // * if latest value is a merge, apply filter to all suequent merges.
  816. TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
  817. std::unique_ptr<CompactionFilter> compaction_filter(
  818. new FilterAllKeysCompactionFilter());
  819. RunTest(
  820. {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
  821. test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
  822. {"v2", "v1", "v3", "v4"},
  823. {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
  824. test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
  825. {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
  826. nullptr /*merge_operator*/, compaction_filter.get());
  827. }
  828. TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
  829. std::unique_ptr<CompactionFilter> compaction_filter(
  830. new FilterAllKeysCompactionFilter());
  831. RunTest(
  832. {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
  833. {"", "v1"},
  834. {test::KeyStr("a", 2, kTypeDeletion),
  835. test::KeyStr("a", 1, kTypeDeletion)},
  836. {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
  837. compaction_filter.get());
  838. }
  839. TEST_F(CompactionIteratorWithSnapshotCheckerTest,
  840. CompactionFilter_PartialMerge) {
  841. std::shared_ptr<MergeOperator> merge_op =
  842. MergeOperators::CreateStringAppendOperator();
  843. std::unique_ptr<CompactionFilter> compaction_filter(
  844. new FilterAllKeysCompactionFilter());
  845. RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
  846. test::KeyStr("a", 1, kTypeMerge)},
  847. {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
  848. 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
  849. }
  850. TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
  851. std::shared_ptr<MergeOperator> merge_op =
  852. MergeOperators::CreateStringAppendOperator();
  853. std::unique_ptr<CompactionFilter> compaction_filter(
  854. new FilterAllKeysCompactionFilter());
  855. RunTest(
  856. {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
  857. test::KeyStr("a", 1, kTypeValue)},
  858. {"v3", "v2", "v1"},
  859. {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
  860. {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
  861. compaction_filter.get());
  862. }
  863. } // namespace ROCKSDB_NAMESPACE
  864. int main(int argc, char** argv) {
  865. ::testing::InitGoogleTest(&argc, argv);
  866. return RUN_ALL_TESTS();
  867. }