db_merge_operator_test.cc 35 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <string>
  6. #include <vector>
  7. #include "db/db_test_util.h"
  8. #include "db/dbformat.h"
  9. #include "db/forward_iterator.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/merge_operator.h"
  12. #include "rocksdb/snapshot.h"
  13. #include "rocksdb/utilities/debug.h"
  14. #include "util/random.h"
  15. #include "utilities/merge_operators.h"
  16. #include "utilities/merge_operators/string_append/stringappend2.h"
  17. namespace ROCKSDB_NAMESPACE {
  18. class TestReadCallback : public ReadCallback {
  19. public:
  20. TestReadCallback(SnapshotChecker* snapshot_checker,
  21. SequenceNumber snapshot_seq)
  22. : ReadCallback(snapshot_seq),
  23. snapshot_checker_(snapshot_checker),
  24. snapshot_seq_(snapshot_seq) {}
  25. bool IsVisibleFullCheck(SequenceNumber seq) override {
  26. return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
  27. SnapshotCheckerResult::kInSnapshot;
  28. }
  29. private:
  30. SnapshotChecker* snapshot_checker_;
  31. SequenceNumber snapshot_seq_;
  32. };
  33. // Test merge operator functionality.
  34. class DBMergeOperatorTest : public DBTestBase {
  35. public:
  36. DBMergeOperatorTest()
  37. : DBTestBase("db_merge_operator_test", /*env_do_fsync=*/false) {}
  38. std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
  39. const Slice& key,
  40. const Snapshot* snapshot = nullptr) {
  41. SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
  42. : snapshot->GetSequenceNumber();
  43. TestReadCallback read_callback(snapshot_checker, seq);
  44. ReadOptions read_opt;
  45. read_opt.snapshot = snapshot;
  46. PinnableSlice value;
  47. DBImpl::GetImplOptions get_impl_options;
  48. get_impl_options.column_family = db_->DefaultColumnFamily();
  49. get_impl_options.value = &value;
  50. get_impl_options.callback = &read_callback;
  51. Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
  52. if (!s.ok()) {
  53. return s.ToString();
  54. }
  55. return value.ToString();
  56. }
  57. };
  58. TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
  59. class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
  60. public:
  61. LimitedStringAppendMergeOp(int limit, char delim)
  62. : StringAppendTESTOperator(delim), limit_(limit) {}
  63. const char* Name() const override {
  64. return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
  65. }
  66. bool ShouldMerge(const std::vector<Slice>& operands) const override {
  67. if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
  68. return true;
  69. }
  70. return false;
  71. }
  72. private:
  73. size_t limit_ = 0;
  74. };
  75. Options options = CurrentOptions();
  76. options.create_if_missing = true;
  77. // Use only the latest two merge operands.
  78. options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
  79. options.env = env_;
  80. Reopen(options);
  81. // All K1 values are in memtable.
  82. ASSERT_OK(Merge("k1", "a"));
  83. ASSERT_OK(Merge("k1", "b"));
  84. ASSERT_OK(Merge("k1", "c"));
  85. ASSERT_OK(Merge("k1", "d"));
  86. std::string value;
  87. ASSERT_OK(db_->Get(ReadOptions(), "k1", &value));
  88. // Make sure that only the latest two merge operands are used. If this was
  89. // not the case the value would be "a,b,c,d".
  90. ASSERT_EQ(value, "c,d");
  91. // All K2 values are flushed to L0 into a single file.
  92. ASSERT_OK(Merge("k2", "a"));
  93. ASSERT_OK(Merge("k2", "b"));
  94. ASSERT_OK(Merge("k2", "c"));
  95. ASSERT_OK(Merge("k2", "d"));
  96. ASSERT_OK(Flush());
  97. ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
  98. ASSERT_EQ(value, "c,d");
  99. // All K3 values are flushed and are in different files.
  100. ASSERT_OK(Merge("k3", "ab"));
  101. ASSERT_OK(Flush());
  102. ASSERT_OK(Merge("k3", "bc"));
  103. ASSERT_OK(Flush());
  104. ASSERT_OK(Merge("k3", "cd"));
  105. ASSERT_OK(Flush());
  106. ASSERT_OK(Merge("k3", "de"));
  107. ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
  108. ASSERT_EQ(value, "cd,de");
  109. // Tests that merge operands reach exact limit at memtable.
  110. ASSERT_OK(Merge("k3", "fg"));
  111. ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
  112. ASSERT_EQ(value, "de,fg");
  113. // All K4 values are in different levels
  114. ASSERT_OK(Merge("k4", "ab"));
  115. ASSERT_OK(Flush());
  116. MoveFilesToLevel(4);
  117. ASSERT_OK(Merge("k4", "bc"));
  118. ASSERT_OK(Flush());
  119. MoveFilesToLevel(3);
  120. ASSERT_OK(Merge("k4", "cd"));
  121. ASSERT_OK(Flush());
  122. MoveFilesToLevel(1);
  123. ASSERT_OK(Merge("k4", "de"));
  124. ASSERT_OK(db_->Get(ReadOptions(), "k4", &value));
  125. ASSERT_EQ(value, "cd,de");
  126. }
  127. TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
  128. Options options = CurrentOptions();
  129. options.create_if_missing = true;
  130. options.merge_operator.reset(new TestPutOperator());
  131. options.env = env_;
  132. Reopen(options);
  133. ASSERT_OK(Merge("k1", "v1"));
  134. ASSERT_OK(Merge("k1", "corrupted"));
  135. std::string value;
  136. ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
  137. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
  138. }
  139. TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
  140. Options options = CurrentOptions();
  141. options.create_if_missing = true;
  142. options.merge_operator.reset(new TestPutOperator());
  143. options.max_successive_merges = 3;
  144. options.env = env_;
  145. Reopen(options);
  146. ASSERT_OK(Merge("k1", "v1"));
  147. ASSERT_OK(Merge("k1", "v2"));
  148. // Will trigger a merge when hitting max_successive_merges and the merge
  149. // will fail. The delta will be inserted nevertheless.
  150. ASSERT_OK(Merge("k1", "corrupted"));
  151. // Data should stay unmerged after the error.
  152. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
  153. }
  154. TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
  155. Options options = CurrentOptions();
  156. options.create_if_missing = true;
  157. options.merge_operator.reset(new TestPutOperator());
  158. options.env = env_;
  159. DestroyAndReopen(options);
  160. ASSERT_OK(Merge("k1", "v1"));
  161. ASSERT_OK(Merge("k1", "corrupted"));
  162. ASSERT_OK(Put("k2", "v2"));
  163. auto* iter = db_->NewIterator(ReadOptions());
  164. iter->Seek("k1");
  165. ASSERT_FALSE(iter->Valid());
  166. ASSERT_TRUE(iter->status().IsCorruption());
  167. delete iter;
  168. iter = db_->NewIterator(ReadOptions());
  169. iter->Seek("k2");
  170. ASSERT_TRUE(iter->Valid());
  171. ASSERT_OK(iter->status());
  172. iter->Prev();
  173. ASSERT_FALSE(iter->Valid());
  174. ASSERT_TRUE(iter->status().IsCorruption());
  175. delete iter;
  176. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
  177. DestroyAndReopen(options);
  178. ASSERT_OK(Merge("k1", "v1"));
  179. ASSERT_OK(Put("k2", "v2"));
  180. ASSERT_OK(Merge("k2", "corrupted"));
  181. iter = db_->NewIterator(ReadOptions());
  182. iter->Seek("k1");
  183. ASSERT_TRUE(iter->Valid());
  184. ASSERT_OK(iter->status());
  185. iter->Next();
  186. ASSERT_FALSE(iter->Valid());
  187. ASSERT_TRUE(iter->status().IsCorruption());
  188. delete iter;
  189. VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
  190. }
  191. TEST_F(DBMergeOperatorTest, MergeOperatorFailsWithMustMerge) {
  192. // This is like a mini-stress test dedicated to `OpFailureScope::kMustMerge`.
  193. // Some or most of it might be deleted upon adding that option to the actual
  194. // stress test.
  195. //
  196. // "k0" and "k2" are stable (uncorrupted) keys before and after a corrupted
  197. // key ("k1"). The outer loop (`i`) varies which write (`j`) to "k1" triggers
  198. // the corruption. Inside that loop there are three cases:
  199. //
  200. // - Case 1: pure `Merge()`s
  201. // - Case 2: `Merge()`s on top of a `Put()`
  202. // - Case 3: `Merge()`s on top of a `Delete()`
  203. //
  204. // For each case we test query results before flush, after flush, and after
  205. // compaction, as well as cleanup after deletion+compaction. The queries
  206. // expect "k0" and "k2" to always be readable. "k1" is expected to be readable
  207. // only by APIs that do not require merging, such as `GetMergeOperands()`.
  208. const int kNumOperands = 3;
  209. Options options = CurrentOptions();
  210. options.merge_operator.reset(new TestPutOperator());
  211. options.env = env_;
  212. Reopen(options);
  213. for (int i = 0; i < kNumOperands; ++i) {
  214. auto check_query = [&]() {
  215. {
  216. std::string value;
  217. ASSERT_OK(db_->Get(ReadOptions(), "k0", &value));
  218. Status s = db_->Get(ReadOptions(), "k1", &value);
  219. ASSERT_TRUE(s.IsCorruption());
  220. ASSERT_EQ(Status::SubCode::kMergeOperatorFailed, s.subcode());
  221. ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
  222. }
  223. {
  224. std::unique_ptr<Iterator> iter;
  225. iter.reset(db_->NewIterator(ReadOptions()));
  226. iter->SeekToFirst();
  227. ASSERT_TRUE(iter->Valid());
  228. ASSERT_EQ("k0", iter->key());
  229. iter->Next();
  230. ASSERT_TRUE(iter->status().IsCorruption());
  231. ASSERT_EQ(Status::SubCode::kMergeOperatorFailed,
  232. iter->status().subcode());
  233. iter->SeekToLast();
  234. ASSERT_TRUE(iter->Valid());
  235. ASSERT_EQ("k2", iter->key());
  236. iter->Prev();
  237. ASSERT_TRUE(iter->status().IsCorruption());
  238. iter->Seek("k2");
  239. ASSERT_TRUE(iter->Valid());
  240. ASSERT_EQ("k2", iter->key());
  241. }
  242. std::vector<PinnableSlice> values(kNumOperands);
  243. GetMergeOperandsOptions merge_operands_info;
  244. merge_operands_info.expected_max_number_of_operands = kNumOperands;
  245. int num_operands_found = 0;
  246. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  247. "k1", values.data(), &merge_operands_info,
  248. &num_operands_found));
  249. ASSERT_EQ(kNumOperands, num_operands_found);
  250. for (int j = 0; j < num_operands_found; ++j) {
  251. if (i == j) {
  252. ASSERT_EQ(values[j], "corrupted_must_merge");
  253. } else {
  254. ASSERT_EQ(values[j], "ok");
  255. }
  256. }
  257. };
  258. ASSERT_OK(Put("k0", "val"));
  259. ASSERT_OK(Put("k2", "val"));
  260. // Case 1
  261. for (int j = 0; j < kNumOperands; ++j) {
  262. if (j == i) {
  263. ASSERT_OK(Merge("k1", "corrupted_must_merge"));
  264. } else {
  265. ASSERT_OK(Merge("k1", "ok"));
  266. }
  267. }
  268. check_query();
  269. ASSERT_OK(Flush());
  270. check_query();
  271. {
  272. CompactRangeOptions cro;
  273. cro.bottommost_level_compaction =
  274. BottommostLevelCompaction::kForceOptimized;
  275. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  276. }
  277. check_query();
  278. // Case 2
  279. for (int j = 0; j < kNumOperands; ++j) {
  280. Slice val;
  281. if (j == i) {
  282. val = "corrupted_must_merge";
  283. } else {
  284. val = "ok";
  285. }
  286. if (j == 0) {
  287. ASSERT_OK(Put("k1", val));
  288. } else {
  289. ASSERT_OK(Merge("k1", val));
  290. }
  291. }
  292. check_query();
  293. ASSERT_OK(Flush());
  294. check_query();
  295. {
  296. CompactRangeOptions cro;
  297. cro.bottommost_level_compaction =
  298. BottommostLevelCompaction::kForceOptimized;
  299. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  300. }
  301. check_query();
  302. // Case 3
  303. ASSERT_OK(Delete("k1"));
  304. for (int j = 0; j < kNumOperands; ++j) {
  305. if (i == j) {
  306. ASSERT_OK(Merge("k1", "corrupted_must_merge"));
  307. } else {
  308. ASSERT_OK(Merge("k1", "ok"));
  309. }
  310. }
  311. check_query();
  312. ASSERT_OK(Flush());
  313. check_query();
  314. {
  315. CompactRangeOptions cro;
  316. cro.bottommost_level_compaction =
  317. BottommostLevelCompaction::kForceOptimized;
  318. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  319. }
  320. check_query();
  321. // Verify obsolete data removal still happens
  322. ASSERT_OK(Delete("k0"));
  323. ASSERT_OK(Delete("k1"));
  324. ASSERT_OK(Delete("k2"));
  325. ASSERT_EQ("NOT_FOUND", Get("k0"));
  326. ASSERT_EQ("NOT_FOUND", Get("k1"));
  327. ASSERT_EQ("NOT_FOUND", Get("k2"));
  328. CompactRangeOptions cro;
  329. cro.bottommost_level_compaction =
  330. BottommostLevelCompaction::kForceOptimized;
  331. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
  332. ASSERT_EQ("", FilesPerLevel());
  333. }
  334. }
  335. TEST_F(DBMergeOperatorTest, MergeOperandThresholdExceeded) {
  336. Options options = CurrentOptions();
  337. options.create_if_missing = true;
  338. options.merge_operator = MergeOperators::CreatePutOperator();
  339. options.env = env_;
  340. Reopen(options);
  341. std::vector<Slice> keys{"foo", "bar", "baz"};
  342. // Write base values.
  343. for (const auto& key : keys) {
  344. ASSERT_OK(Put(key, key.ToString() + "0"));
  345. }
  346. // Write merge operands. Note that the first key has 1 merge operand, the
  347. // second one has 2 merge operands, and the third one has 3 merge operands.
  348. // Also, we'll take some snapshots to make sure the merge operands are
  349. // preserved during flush.
  350. std::vector<ManagedSnapshot> snapshots;
  351. snapshots.reserve(3);
  352. for (size_t i = 0; i < keys.size(); ++i) {
  353. snapshots.emplace_back(db_);
  354. const std::string suffix = std::to_string(i + 1);
  355. for (size_t j = i; j < keys.size(); ++j) {
  356. ASSERT_OK(Merge(keys[j], keys[j].ToString() + suffix));
  357. }
  358. }
  359. // Verify the results and status codes of various types of point lookups.
  360. auto verify = [&](const std::optional<size_t>& threshold) {
  361. ReadOptions read_options;
  362. read_options.merge_operand_count_threshold = threshold;
  363. // Check Get()
  364. {
  365. for (size_t i = 0; i < keys.size(); ++i) {
  366. PinnableSlice value;
  367. const Status status =
  368. db_->Get(read_options, db_->DefaultColumnFamily(), keys[i], &value);
  369. ASSERT_OK(status);
  370. ASSERT_EQ(status.IsOkMergeOperandThresholdExceeded(),
  371. threshold.has_value() && i + 1 > threshold.value());
  372. ASSERT_EQ(value, keys[i].ToString() + std::to_string(i + 1));
  373. }
  374. }
  375. // Check old-style MultiGet()
  376. {
  377. std::vector<std::string> values;
  378. std::vector<Status> statuses = db_->MultiGet(read_options, keys, &values);
  379. for (size_t i = 0; i < keys.size(); ++i) {
  380. ASSERT_OK(statuses[i]);
  381. ASSERT_EQ(statuses[i].IsOkMergeOperandThresholdExceeded(),
  382. threshold.has_value() && i + 1 > threshold.value());
  383. ASSERT_EQ(values[i], keys[i].ToString() + std::to_string(i + 1));
  384. }
  385. }
  386. // Check batched MultiGet()
  387. {
  388. std::vector<PinnableSlice> values(keys.size());
  389. std::vector<Status> statuses(keys.size());
  390. db_->MultiGet(read_options, db_->DefaultColumnFamily(), keys.size(),
  391. keys.data(), values.data(), statuses.data());
  392. for (size_t i = 0; i < keys.size(); ++i) {
  393. ASSERT_OK(statuses[i]);
  394. ASSERT_EQ(statuses[i].IsOkMergeOperandThresholdExceeded(),
  395. threshold.has_value() && i + 1 > threshold.value());
  396. ASSERT_EQ(values[i], keys[i].ToString() + std::to_string(i + 1));
  397. }
  398. }
  399. };
  400. // Test the case when the feature is disabled as well as various thresholds.
  401. verify(std::nullopt);
  402. for (size_t i = 0; i < 5; ++i) {
  403. verify(i);
  404. }
  405. // Flush and try again to test the case when results are served from SSTs.
  406. ASSERT_OK(Flush());
  407. verify(std::nullopt);
  408. for (size_t i = 0; i < 5; ++i) {
  409. verify(i);
  410. }
  411. }
  412. TEST_F(DBMergeOperatorTest, DataBlockBinaryAndHash) {
  413. // Basic test to check that merge operator works with data block index type
  414. // DataBlockBinaryAndHash.
  415. Options options = CurrentOptions();
  416. options.create_if_missing = true;
  417. options.merge_operator.reset(new TestPutOperator());
  418. options.env = env_;
  419. BlockBasedTableOptions table_options;
  420. table_options.block_restart_interval = 16;
  421. table_options.data_block_index_type =
  422. BlockBasedTableOptions::DataBlockIndexType::kDataBlockBinaryAndHash;
  423. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  424. DestroyAndReopen(options);
  425. const int kNumKeys = 100;
  426. for (int i = 0; i < kNumKeys; ++i) {
  427. ASSERT_OK(db_->Merge(WriteOptions(), Key(i), std::to_string(i)));
  428. }
  429. ASSERT_OK(Flush());
  430. std::string value;
  431. for (int i = 0; i < kNumKeys; ++i) {
  432. ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
  433. ASSERT_EQ(std::to_string(i), value);
  434. }
  435. std::vector<const Snapshot*> snapshots;
  436. for (int i = 0; i < kNumKeys; ++i) {
  437. ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
  438. for (int j = 0; j < 3; ++j) {
  439. ASSERT_OK(db_->Merge(WriteOptions(), Key(i), std::to_string(i * 3 + j)));
  440. snapshots.push_back(db_->GetSnapshot());
  441. }
  442. }
  443. ASSERT_OK(Flush());
  444. for (int i = 0; i < kNumKeys; ++i) {
  445. ASSERT_OK(db_->Get(ReadOptions(), Key(i), &value));
  446. ASSERT_EQ(std::to_string(i * 3 + 2), value);
  447. }
  448. for (auto snapshot : snapshots) {
  449. db_->ReleaseSnapshot(snapshot);
  450. }
  451. }
  452. class MergeOperatorPinningTest : public DBMergeOperatorTest,
  453. public testing::WithParamInterface<bool> {
  454. public:
  455. MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
  456. bool disable_block_cache_;
  457. };
  458. INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
  459. ::testing::Bool());
  460. TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
  461. Options options = CurrentOptions();
  462. BlockBasedTableOptions table_options;
  463. table_options.block_size = 1; // every block will contain one entry
  464. table_options.no_block_cache = disable_block_cache_;
  465. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  466. options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
  467. options.level0_slowdown_writes_trigger = (1 << 30);
  468. options.level0_stop_writes_trigger = (1 << 30);
  469. options.disable_auto_compactions = true;
  470. DestroyAndReopen(options);
  471. const int kKeysPerFile = 10;
  472. const int kOperandsPerKeyPerFile = 7;
  473. const int kOperandSize = 100;
  474. // Filse to write in L0 before compacting to lower level
  475. const int kFilesPerLevel = 3;
  476. Random rnd(301);
  477. std::map<std::string, std::string> true_data;
  478. int batch_num = 1;
  479. int lvl_to_fill = 4;
  480. int key_id = 0;
  481. while (true) {
  482. for (int j = 0; j < kKeysPerFile; j++) {
  483. std::string key = Key(key_id % 35);
  484. key_id++;
  485. for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
  486. std::string val = rnd.RandomString(kOperandSize);
  487. ASSERT_OK(db_->Merge(WriteOptions(), key, val));
  488. if (true_data[key].size() == 0) {
  489. true_data[key] = val;
  490. } else {
  491. true_data[key] += "," + val;
  492. }
  493. }
  494. }
  495. if (lvl_to_fill == -1) {
  496. // Keep last batch in memtable and stop
  497. break;
  498. }
  499. ASSERT_OK(Flush());
  500. if (batch_num % kFilesPerLevel == 0) {
  501. if (lvl_to_fill != 0) {
  502. MoveFilesToLevel(lvl_to_fill);
  503. }
  504. lvl_to_fill--;
  505. }
  506. batch_num++;
  507. }
  508. // 3 L0 files
  509. // 1 L1 file
  510. // 3 L2 files
  511. // 1 L3 file
  512. // 3 L4 Files
  513. ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
  514. VerifyDBFromMap(true_data);
  515. }
  516. class MergeOperatorHook : public MergeOperator {
  517. public:
  518. explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
  519. : merge_op_(_merge_op) {}
  520. bool FullMergeV2(const MergeOperationInput& merge_in,
  521. MergeOperationOutput* merge_out) const override {
  522. before_merge_();
  523. bool res = merge_op_->FullMergeV2(merge_in, merge_out);
  524. after_merge_();
  525. return res;
  526. }
  527. const char* Name() const override { return merge_op_->Name(); }
  528. std::shared_ptr<MergeOperator> merge_op_;
  529. std::function<void()> before_merge_ = []() {};
  530. std::function<void()> after_merge_ = []() {};
  531. };
  532. TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
  533. Options options = CurrentOptions();
  534. auto merge_hook =
  535. std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
  536. options.merge_operator = merge_hook;
  537. options.disable_auto_compactions = true;
  538. options.level0_slowdown_writes_trigger = (1 << 30);
  539. options.level0_stop_writes_trigger = (1 << 30);
  540. options.max_open_files = 20;
  541. BlockBasedTableOptions bbto;
  542. bbto.no_block_cache = disable_block_cache_;
  543. if (bbto.no_block_cache == false) {
  544. bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
  545. } else {
  546. bbto.block_cache = nullptr;
  547. }
  548. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  549. DestroyAndReopen(options);
  550. const int kNumOperands = 30;
  551. const int kNumKeys = 1000;
  552. const int kOperandSize = 100;
  553. Random rnd(301);
  554. // 1000 keys every key have 30 operands, every operand is in a different file
  555. std::map<std::string, std::string> true_data;
  556. for (int i = 0; i < kNumOperands; i++) {
  557. for (int j = 0; j < kNumKeys; j++) {
  558. std::string k = Key(j);
  559. std::string v = rnd.RandomString(kOperandSize);
  560. ASSERT_OK(db_->Merge(WriteOptions(), k, v));
  561. true_data[k] = std::max(true_data[k], v);
  562. }
  563. ASSERT_OK(Flush());
  564. }
  565. std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
  566. ASSERT_EQ(file_numbers.size(), kNumOperands);
  567. int merge_cnt = 0;
  568. // Code executed before merge operation
  569. merge_hook->before_merge_ = [&]() {
  570. // Evict all tables from cache before every merge operation
  571. auto* table_cache = dbfull()->TEST_table_cache();
  572. for (uint64_t num : file_numbers) {
  573. TableCache::Evict(table_cache, num);
  574. }
  575. // Decrease cache capacity to force all unrefed blocks to be evicted
  576. if (bbto.block_cache) {
  577. bbto.block_cache->SetCapacity(1);
  578. }
  579. merge_cnt++;
  580. };
  581. // Code executed after merge operation
  582. merge_hook->after_merge_ = [&]() {
  583. // Increase capacity again after doing the merge
  584. if (bbto.block_cache) {
  585. bbto.block_cache->SetCapacity(64 * 1024 * 1024);
  586. }
  587. };
  588. size_t total_reads;
  589. VerifyDBFromMap(true_data, &total_reads);
  590. ASSERT_EQ(merge_cnt, total_reads);
  591. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  592. VerifyDBFromMap(true_data, &total_reads);
  593. }
  594. TEST_P(MergeOperatorPinningTest, TailingIterator) {
  595. Options options = CurrentOptions();
  596. options.merge_operator = MergeOperators::CreateMaxOperator();
  597. BlockBasedTableOptions bbto;
  598. bbto.no_block_cache = disable_block_cache_;
  599. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  600. DestroyAndReopen(options);
  601. const int kNumOperands = 100;
  602. const int kNumWrites = 100000;
  603. std::function<void()> writer_func = [&]() {
  604. int k = 0;
  605. for (int i = 0; i < kNumWrites; i++) {
  606. ASSERT_OK(db_->Merge(WriteOptions(), Key(k), Key(k)));
  607. if (i && i % kNumOperands == 0) {
  608. k++;
  609. }
  610. if (i && i % 127 == 0) {
  611. ASSERT_OK(Flush());
  612. }
  613. if (i && i % 317 == 0) {
  614. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  615. }
  616. }
  617. };
  618. std::function<void()> reader_func = [&]() {
  619. ReadOptions ro;
  620. ro.tailing = true;
  621. Iterator* iter = db_->NewIterator(ro);
  622. ASSERT_OK(iter->status());
  623. iter->SeekToFirst();
  624. for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
  625. while (!iter->Valid()) {
  626. // wait for the key to be written
  627. env_->SleepForMicroseconds(100);
  628. iter->Seek(Key(i));
  629. }
  630. ASSERT_EQ(iter->key(), Key(i));
  631. ASSERT_EQ(iter->value(), Key(i));
  632. iter->Next();
  633. }
  634. ASSERT_OK(iter->status());
  635. delete iter;
  636. };
  637. ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
  638. ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
  639. writer_thread.join();
  640. reader_thread.join();
  641. }
  642. TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
  643. Options options = CurrentOptions();
  644. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  645. DestroyAndReopen(options);
  646. // Overview of the test:
  647. // * There are two merge operands for the same key: one in an sst file,
  648. // another in a memtable.
  649. // * Seek a tailing iterator to this key.
  650. // * As part of the seek, the iterator will:
  651. // (a) first visit the operand in the memtable and tell ForwardIterator
  652. // to pin this operand, then
  653. // (b) move on to the operand in the sst file, then pass both operands
  654. // to merge operator.
  655. // * The memtable may get flushed and unreferenced by another thread between
  656. // (a) and (b). The test simulates it by flushing the memtable inside a
  657. // SyncPoint callback located between (a) and (b).
  658. // * In this case it's ForwardIterator's responsibility to keep the memtable
  659. // pinned until (b) is complete. There used to be a bug causing
  660. // ForwardIterator to not pin it in some circumstances. This test
  661. // reproduces it.
  662. ASSERT_OK(db_->Merge(WriteOptions(), "key", "sst"));
  663. ASSERT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion A
  664. ASSERT_OK(db_->Merge(WriteOptions(), "key", "memtable"));
  665. // Pin SuperVersion A
  666. std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
  667. ASSERT_OK(someone_else->status());
  668. bool pushed_first_operand = false;
  669. bool stepped_to_next_operand = false;
  670. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  671. "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
  672. EXPECT_FALSE(pushed_first_operand);
  673. pushed_first_operand = true;
  674. EXPECT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion B
  675. });
  676. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  677. "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
  678. EXPECT_FALSE(stepped_to_next_operand);
  679. stepped_to_next_operand = true;
  680. someone_else.reset(); // Unpin SuperVersion A
  681. });
  682. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  683. ReadOptions ro;
  684. ro.tailing = true;
  685. std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
  686. iter->Seek("key");
  687. ASSERT_OK(iter->status());
  688. ASSERT_TRUE(iter->Valid());
  689. EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
  690. EXPECT_TRUE(pushed_first_operand);
  691. EXPECT_TRUE(stepped_to_next_operand);
  692. }
  693. TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
  694. Options options = CurrentOptions();
  695. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  696. DestroyAndReopen(options);
  697. class TestSnapshotChecker : public SnapshotChecker {
  698. public:
  699. SnapshotCheckerResult CheckInSnapshot(
  700. SequenceNumber seq, SequenceNumber snapshot_seq) const override {
  701. return IsInSnapshot(seq, snapshot_seq)
  702. ? SnapshotCheckerResult::kInSnapshot
  703. : SnapshotCheckerResult::kNotInSnapshot;
  704. }
  705. bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
  706. switch (snapshot_seq) {
  707. case 0:
  708. return seq == 0;
  709. case 1:
  710. return seq <= 1;
  711. case 2:
  712. // seq = 2 not visible to snapshot with seq = 2
  713. return seq <= 1;
  714. case 3:
  715. return seq <= 3;
  716. case 4:
  717. // seq = 4 not visible to snpahost with seq = 4
  718. return seq <= 3;
  719. default:
  720. // seq >=4 is uncommitted
  721. return seq <= 4;
  722. };
  723. }
  724. };
  725. TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
  726. dbfull()->SetSnapshotChecker(snapshot_checker);
  727. std::string value;
  728. ASSERT_OK(Merge("foo", "v1"));
  729. ASSERT_EQ(1, db_->GetLatestSequenceNumber());
  730. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  731. ASSERT_OK(Merge("foo", "v2"));
  732. ASSERT_EQ(2, db_->GetLatestSequenceNumber());
  733. // v2 is not visible to latest snapshot, which has seq = 2.
  734. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  735. // Take a snapshot with seq = 2.
  736. const Snapshot* snapshot1 = db_->GetSnapshot();
  737. ASSERT_EQ(2, snapshot1->GetSequenceNumber());
  738. // v2 is not visible to snapshot1, which has seq = 2
  739. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  740. // Verify flush doesn't alter the result.
  741. ASSERT_OK(Flush());
  742. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  743. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  744. ASSERT_OK(Merge("foo", "v3"));
  745. ASSERT_EQ(3, db_->GetLatestSequenceNumber());
  746. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  747. ASSERT_OK(Merge("foo", "v4"));
  748. ASSERT_EQ(4, db_->GetLatestSequenceNumber());
  749. // v4 is not visible to latest snapshot, which has seq = 4.
  750. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  751. const Snapshot* snapshot2 = db_->GetSnapshot();
  752. ASSERT_EQ(4, snapshot2->GetSequenceNumber());
  753. // v4 is not visible to snapshot2, which has seq = 4.
  754. ASSERT_EQ("v1,v2,v3",
  755. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  756. // Verify flush doesn't alter the result.
  757. ASSERT_OK(Flush());
  758. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  759. ASSERT_EQ("v1,v2,v3",
  760. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  761. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  762. ASSERT_OK(Merge("foo", "v5"));
  763. ASSERT_EQ(5, db_->GetLatestSequenceNumber());
  764. // v5 is uncommitted
  765. ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
  766. // full manual compaction.
  767. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  768. // Verify compaction doesn't alter the result.
  769. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  770. ASSERT_EQ("v1,v2,v3",
  771. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  772. ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
  773. db_->ReleaseSnapshot(snapshot1);
  774. db_->ReleaseSnapshot(snapshot2);
  775. }
  776. class PerConfigMergeOperatorPinningTest
  777. : public DBMergeOperatorTest,
  778. public testing::WithParamInterface<std::tuple<bool, int>> {
  779. public:
  780. PerConfigMergeOperatorPinningTest() {
  781. std::tie(disable_block_cache_, option_config_) = GetParam();
  782. }
  783. bool disable_block_cache_;
  784. };
  785. INSTANTIATE_TEST_CASE_P(
  786. MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
  787. ::testing::Combine(::testing::Bool(),
  788. ::testing::Range(static_cast<int>(DBTestBase::kDefault),
  789. static_cast<int>(DBTestBase::kEnd))));
  790. TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
  791. if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
  792. return;
  793. }
  794. Options options = CurrentOptions();
  795. options.merge_operator = MergeOperators::CreateMaxOperator();
  796. BlockBasedTableOptions table_options;
  797. table_options.no_block_cache = disable_block_cache_;
  798. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  799. DestroyAndReopen(options);
  800. Random rnd(301);
  801. std::map<std::string, std::string> true_data;
  802. const int kTotalMerges = 5000;
  803. // Every key gets ~10 operands
  804. const int kKeyRange = kTotalMerges / 10;
  805. const int kOperandSize = 20;
  806. const int kNumPutBefore = kKeyRange / 10; // 10% value
  807. const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
  808. const int kNumDelete = kKeyRange / 10; // 10% delete
  809. // kNumPutBefore keys will have base values
  810. for (int i = 0; i < kNumPutBefore; i++) {
  811. std::string key = Key(rnd.Next() % kKeyRange);
  812. std::string value = rnd.RandomString(kOperandSize);
  813. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  814. true_data[key] = value;
  815. }
  816. // Do kTotalMerges merges
  817. for (int i = 0; i < kTotalMerges; i++) {
  818. std::string key = Key(rnd.Next() % kKeyRange);
  819. std::string value = rnd.RandomString(kOperandSize);
  820. ASSERT_OK(db_->Merge(WriteOptions(), key, value));
  821. if (true_data[key] < value) {
  822. true_data[key] = value;
  823. }
  824. }
  825. // Overwrite random kNumPutAfter keys
  826. for (int i = 0; i < kNumPutAfter; i++) {
  827. std::string key = Key(rnd.Next() % kKeyRange);
  828. std::string value = rnd.RandomString(kOperandSize);
  829. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  830. true_data[key] = value;
  831. }
  832. // Delete random kNumDelete keys
  833. for (int i = 0; i < kNumDelete; i++) {
  834. std::string key = Key(rnd.Next() % kKeyRange);
  835. ASSERT_OK(db_->Delete(WriteOptions(), key));
  836. true_data.erase(key);
  837. }
  838. VerifyDBFromMap(true_data);
  839. }
  840. TEST_F(DBMergeOperatorTest, MaxSuccessiveMergesBaseValues) {
  841. Options options = CurrentOptions();
  842. options.create_if_missing = true;
  843. options.merge_operator = MergeOperators::CreatePutOperator();
  844. options.max_successive_merges = 1;
  845. options.env = env_;
  846. Reopen(options);
  847. constexpr char foo[] = "foo";
  848. constexpr char bar[] = "bar";
  849. constexpr char baz[] = "baz";
  850. constexpr char qux[] = "qux";
  851. constexpr char corge[] = "corge";
  852. // No base value
  853. {
  854. const std::string key = "key1";
  855. ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, foo));
  856. ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar));
  857. PinnableSlice result;
  858. ASSERT_OK(
  859. db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result));
  860. ASSERT_EQ(result, bar);
  861. // We expect the second Merge to be converted to a Put because of
  862. // max_successive_merges.
  863. constexpr size_t max_key_versions = 8;
  864. std::vector<KeyVersion> key_versions;
  865. ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
  866. max_key_versions, &key_versions));
  867. ASSERT_EQ(key_versions.size(), 2);
  868. ASSERT_EQ(key_versions[0].type, kTypeValue);
  869. ASSERT_EQ(key_versions[1].type, kTypeMerge);
  870. }
  871. // Plain base value
  872. {
  873. const std::string key = "key2";
  874. ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), key, foo));
  875. ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar));
  876. ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, baz));
  877. PinnableSlice result;
  878. ASSERT_OK(
  879. db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result));
  880. ASSERT_EQ(result, baz);
  881. // We expect the second Merge to be converted to a Put because of
  882. // max_successive_merges.
  883. constexpr size_t max_key_versions = 8;
  884. std::vector<KeyVersion> key_versions;
  885. ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
  886. max_key_versions, &key_versions));
  887. ASSERT_EQ(key_versions.size(), 3);
  888. ASSERT_EQ(key_versions[0].type, kTypeValue);
  889. ASSERT_EQ(key_versions[1].type, kTypeMerge);
  890. ASSERT_EQ(key_versions[2].type, kTypeValue);
  891. }
  892. // Wide-column base value
  893. {
  894. const std::string key = "key3";
  895. const WideColumns columns{{kDefaultWideColumnName, foo}, {bar, baz}};
  896. ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), key,
  897. columns));
  898. ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, qux));
  899. ASSERT_OK(
  900. db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, corge));
  901. PinnableWideColumns result;
  902. ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key,
  903. &result));
  904. const WideColumns expected{{kDefaultWideColumnName, corge}, {bar, baz}};
  905. ASSERT_EQ(result.columns(), expected);
  906. // We expect the second Merge to be converted to a PutEntity because of
  907. // max_successive_merges.
  908. constexpr size_t max_key_versions = 8;
  909. std::vector<KeyVersion> key_versions;
  910. ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
  911. max_key_versions, &key_versions));
  912. ASSERT_EQ(key_versions.size(), 3);
  913. ASSERT_EQ(key_versions[0].type, kTypeWideColumnEntity);
  914. ASSERT_EQ(key_versions[1].type, kTypeMerge);
  915. ASSERT_EQ(key_versions[2].type, kTypeWideColumnEntity);
  916. }
  917. }
  918. } // namespace ROCKSDB_NAMESPACE
  919. int main(int argc, char** argv) {
  920. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  921. ::testing::InitGoogleTest(&argc, argv);
  922. return RUN_ALL_TESTS();
  923. }