db_merge_operator_test.cc 21 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <string>
  6. #include <vector>
  7. #include "db/db_test_util.h"
  8. #include "db/forward_iterator.h"
  9. #include "port/stack_trace.h"
  10. #include "rocksdb/merge_operator.h"
  11. #include "utilities/merge_operators.h"
  12. #include "utilities/merge_operators/string_append/stringappend2.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. class TestReadCallback : public ReadCallback {
  15. public:
  16. TestReadCallback(SnapshotChecker* snapshot_checker,
  17. SequenceNumber snapshot_seq)
  18. : ReadCallback(snapshot_seq),
  19. snapshot_checker_(snapshot_checker),
  20. snapshot_seq_(snapshot_seq) {}
  21. bool IsVisibleFullCheck(SequenceNumber seq) override {
  22. return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
  23. SnapshotCheckerResult::kInSnapshot;
  24. }
  25. private:
  26. SnapshotChecker* snapshot_checker_;
  27. SequenceNumber snapshot_seq_;
  28. };
  29. // Test merge operator functionality.
  30. class DBMergeOperatorTest : public DBTestBase {
  31. public:
  32. DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {}
  33. std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
  34. const Slice& key,
  35. const Snapshot* snapshot = nullptr) {
  36. SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
  37. : snapshot->GetSequenceNumber();
  38. TestReadCallback read_callback(snapshot_checker, seq);
  39. ReadOptions read_opt;
  40. read_opt.snapshot = snapshot;
  41. PinnableSlice value;
  42. DBImpl::GetImplOptions get_impl_options;
  43. get_impl_options.column_family = db_->DefaultColumnFamily();
  44. get_impl_options.value = &value;
  45. get_impl_options.callback = &read_callback;
  46. Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
  47. if (!s.ok()) {
  48. return s.ToString();
  49. }
  50. return value.ToString();
  51. }
  52. };
  53. TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
  54. class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
  55. public:
  56. LimitedStringAppendMergeOp(int limit, char delim)
  57. : StringAppendTESTOperator(delim), limit_(limit) {}
  58. const char* Name() const override {
  59. return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
  60. }
  61. bool ShouldMerge(const std::vector<Slice>& operands) const override {
  62. if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
  63. return true;
  64. }
  65. return false;
  66. }
  67. private:
  68. size_t limit_ = 0;
  69. };
  70. Options options;
  71. options.create_if_missing = true;
  72. // Use only the latest two merge operands.
  73. options.merge_operator =
  74. std::make_shared<LimitedStringAppendMergeOp>(2, ',');
  75. options.env = env_;
  76. Reopen(options);
  77. // All K1 values are in memtable.
  78. ASSERT_OK(Merge("k1", "a"));
  79. ASSERT_OK(Merge("k1", "b"));
  80. ASSERT_OK(Merge("k1", "c"));
  81. ASSERT_OK(Merge("k1", "d"));
  82. std::string value;
  83. ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).ok());
  84. // Make sure that only the latest two merge operands are used. If this was
  85. // not the case the value would be "a,b,c,d".
  86. ASSERT_EQ(value, "c,d");
  87. // All K2 values are flushed to L0 into a single file.
  88. ASSERT_OK(Merge("k2", "a"));
  89. ASSERT_OK(Merge("k2", "b"));
  90. ASSERT_OK(Merge("k2", "c"));
  91. ASSERT_OK(Merge("k2", "d"));
  92. ASSERT_OK(Flush());
  93. ASSERT_TRUE(db_->Get(ReadOptions(), "k2", &value).ok());
  94. ASSERT_EQ(value, "c,d");
  95. // All K3 values are flushed and are in different files.
  96. ASSERT_OK(Merge("k3", "ab"));
  97. ASSERT_OK(Flush());
  98. ASSERT_OK(Merge("k3", "bc"));
  99. ASSERT_OK(Flush());
  100. ASSERT_OK(Merge("k3", "cd"));
  101. ASSERT_OK(Flush());
  102. ASSERT_OK(Merge("k3", "de"));
  103. ASSERT_TRUE(db_->Get(ReadOptions(), "k3", &value).ok());
  104. ASSERT_EQ(value, "cd,de");
  105. // All K4 values are in different levels
  106. ASSERT_OK(Merge("k4", "ab"));
  107. ASSERT_OK(Flush());
  108. MoveFilesToLevel(4);
  109. ASSERT_OK(Merge("k4", "bc"));
  110. ASSERT_OK(Flush());
  111. MoveFilesToLevel(3);
  112. ASSERT_OK(Merge("k4", "cd"));
  113. ASSERT_OK(Flush());
  114. MoveFilesToLevel(1);
  115. ASSERT_OK(Merge("k4", "de"));
  116. ASSERT_TRUE(db_->Get(ReadOptions(), "k4", &value).ok());
  117. ASSERT_EQ(value, "cd,de");
  118. }
  119. TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
  120. Options options;
  121. options.create_if_missing = true;
  122. options.merge_operator.reset(new TestPutOperator());
  123. options.env = env_;
  124. Reopen(options);
  125. ASSERT_OK(Merge("k1", "v1"));
  126. ASSERT_OK(Merge("k1", "corrupted"));
  127. std::string value;
  128. ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
  129. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
  130. }
  131. TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
  132. Options options;
  133. options.create_if_missing = true;
  134. options.merge_operator.reset(new TestPutOperator());
  135. options.max_successive_merges = 3;
  136. options.env = env_;
  137. Reopen(options);
  138. ASSERT_OK(Merge("k1", "v1"));
  139. ASSERT_OK(Merge("k1", "v2"));
  140. // Will trigger a merge when hitting max_successive_merges and the merge
  141. // will fail. The delta will be inserted nevertheless.
  142. ASSERT_OK(Merge("k1", "corrupted"));
  143. // Data should stay unmerged after the error.
  144. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
  145. }
  146. TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
  147. Options options;
  148. options.create_if_missing = true;
  149. options.merge_operator.reset(new TestPutOperator());
  150. options.env = env_;
  151. DestroyAndReopen(options);
  152. ASSERT_OK(Merge("k1", "v1"));
  153. ASSERT_OK(Merge("k1", "corrupted"));
  154. ASSERT_OK(Put("k2", "v2"));
  155. auto* iter = db_->NewIterator(ReadOptions());
  156. iter->Seek("k1");
  157. ASSERT_FALSE(iter->Valid());
  158. ASSERT_TRUE(iter->status().IsCorruption());
  159. delete iter;
  160. iter = db_->NewIterator(ReadOptions());
  161. iter->Seek("k2");
  162. ASSERT_TRUE(iter->Valid());
  163. ASSERT_OK(iter->status());
  164. iter->Prev();
  165. ASSERT_FALSE(iter->Valid());
  166. ASSERT_TRUE(iter->status().IsCorruption());
  167. delete iter;
  168. VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
  169. DestroyAndReopen(options);
  170. ASSERT_OK(Merge("k1", "v1"));
  171. ASSERT_OK(Put("k2", "v2"));
  172. ASSERT_OK(Merge("k2", "corrupted"));
  173. iter = db_->NewIterator(ReadOptions());
  174. iter->Seek("k1");
  175. ASSERT_TRUE(iter->Valid());
  176. ASSERT_OK(iter->status());
  177. iter->Next();
  178. ASSERT_FALSE(iter->Valid());
  179. ASSERT_TRUE(iter->status().IsCorruption());
  180. delete iter;
  181. VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
  182. }
  183. class MergeOperatorPinningTest : public DBMergeOperatorTest,
  184. public testing::WithParamInterface<bool> {
  185. public:
  186. MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
  187. bool disable_block_cache_;
  188. };
  189. INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
  190. ::testing::Bool());
  191. #ifndef ROCKSDB_LITE
  192. TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
  193. Options options = CurrentOptions();
  194. BlockBasedTableOptions table_options;
  195. table_options.block_size = 1; // every block will contain one entry
  196. table_options.no_block_cache = disable_block_cache_;
  197. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  198. options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
  199. options.level0_slowdown_writes_trigger = (1 << 30);
  200. options.level0_stop_writes_trigger = (1 << 30);
  201. options.disable_auto_compactions = true;
  202. DestroyAndReopen(options);
  203. const int kKeysPerFile = 10;
  204. const int kOperandsPerKeyPerFile = 7;
  205. const int kOperandSize = 100;
  206. // Filse to write in L0 before compacting to lower level
  207. const int kFilesPerLevel = 3;
  208. Random rnd(301);
  209. std::map<std::string, std::string> true_data;
  210. int batch_num = 1;
  211. int lvl_to_fill = 4;
  212. int key_id = 0;
  213. while (true) {
  214. for (int j = 0; j < kKeysPerFile; j++) {
  215. std::string key = Key(key_id % 35);
  216. key_id++;
  217. for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
  218. std::string val = RandomString(&rnd, kOperandSize);
  219. ASSERT_OK(db_->Merge(WriteOptions(), key, val));
  220. if (true_data[key].size() == 0) {
  221. true_data[key] = val;
  222. } else {
  223. true_data[key] += "," + val;
  224. }
  225. }
  226. }
  227. if (lvl_to_fill == -1) {
  228. // Keep last batch in memtable and stop
  229. break;
  230. }
  231. ASSERT_OK(Flush());
  232. if (batch_num % kFilesPerLevel == 0) {
  233. if (lvl_to_fill != 0) {
  234. MoveFilesToLevel(lvl_to_fill);
  235. }
  236. lvl_to_fill--;
  237. }
  238. batch_num++;
  239. }
  240. // 3 L0 files
  241. // 1 L1 file
  242. // 3 L2 files
  243. // 1 L3 file
  244. // 3 L4 Files
  245. ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
  246. VerifyDBFromMap(true_data);
  247. }
  248. class MergeOperatorHook : public MergeOperator {
  249. public:
  250. explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
  251. : merge_op_(_merge_op) {}
  252. bool FullMergeV2(const MergeOperationInput& merge_in,
  253. MergeOperationOutput* merge_out) const override {
  254. before_merge_();
  255. bool res = merge_op_->FullMergeV2(merge_in, merge_out);
  256. after_merge_();
  257. return res;
  258. }
  259. const char* Name() const override { return merge_op_->Name(); }
  260. std::shared_ptr<MergeOperator> merge_op_;
  261. std::function<void()> before_merge_ = []() {};
  262. std::function<void()> after_merge_ = []() {};
  263. };
  264. TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
  265. Options options = CurrentOptions();
  266. auto merge_hook =
  267. std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
  268. options.merge_operator = merge_hook;
  269. options.disable_auto_compactions = true;
  270. options.level0_slowdown_writes_trigger = (1 << 30);
  271. options.level0_stop_writes_trigger = (1 << 30);
  272. options.max_open_files = 20;
  273. BlockBasedTableOptions bbto;
  274. bbto.no_block_cache = disable_block_cache_;
  275. if (bbto.no_block_cache == false) {
  276. bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
  277. } else {
  278. bbto.block_cache = nullptr;
  279. }
  280. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  281. DestroyAndReopen(options);
  282. const int kNumOperands = 30;
  283. const int kNumKeys = 1000;
  284. const int kOperandSize = 100;
  285. Random rnd(301);
  286. // 1000 keys every key have 30 operands, every operand is in a different file
  287. std::map<std::string, std::string> true_data;
  288. for (int i = 0; i < kNumOperands; i++) {
  289. for (int j = 0; j < kNumKeys; j++) {
  290. std::string k = Key(j);
  291. std::string v = RandomString(&rnd, kOperandSize);
  292. ASSERT_OK(db_->Merge(WriteOptions(), k, v));
  293. true_data[k] = std::max(true_data[k], v);
  294. }
  295. ASSERT_OK(Flush());
  296. }
  297. std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
  298. ASSERT_EQ(file_numbers.size(), kNumOperands);
  299. int merge_cnt = 0;
  300. // Code executed before merge operation
  301. merge_hook->before_merge_ = [&]() {
  302. // Evict all tables from cache before every merge operation
  303. for (uint64_t num : file_numbers) {
  304. TableCache::Evict(dbfull()->TEST_table_cache(), num);
  305. }
  306. // Decrease cache capacity to force all unrefed blocks to be evicted
  307. if (bbto.block_cache) {
  308. bbto.block_cache->SetCapacity(1);
  309. }
  310. merge_cnt++;
  311. };
  312. // Code executed after merge operation
  313. merge_hook->after_merge_ = [&]() {
  314. // Increase capacity again after doing the merge
  315. if (bbto.block_cache) {
  316. bbto.block_cache->SetCapacity(64 * 1024 * 1024);
  317. }
  318. };
  319. size_t total_reads;
  320. VerifyDBFromMap(true_data, &total_reads);
  321. ASSERT_EQ(merge_cnt, total_reads);
  322. db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  323. VerifyDBFromMap(true_data, &total_reads);
  324. }
  325. TEST_P(MergeOperatorPinningTest, TailingIterator) {
  326. Options options = CurrentOptions();
  327. options.merge_operator = MergeOperators::CreateMaxOperator();
  328. BlockBasedTableOptions bbto;
  329. bbto.no_block_cache = disable_block_cache_;
  330. options.table_factory.reset(NewBlockBasedTableFactory(bbto));
  331. DestroyAndReopen(options);
  332. const int kNumOperands = 100;
  333. const int kNumWrites = 100000;
  334. std::function<void()> writer_func = [&]() {
  335. int k = 0;
  336. for (int i = 0; i < kNumWrites; i++) {
  337. db_->Merge(WriteOptions(), Key(k), Key(k));
  338. if (i && i % kNumOperands == 0) {
  339. k++;
  340. }
  341. if (i && i % 127 == 0) {
  342. ASSERT_OK(Flush());
  343. }
  344. if (i && i % 317 == 0) {
  345. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  346. }
  347. }
  348. };
  349. std::function<void()> reader_func = [&]() {
  350. ReadOptions ro;
  351. ro.tailing = true;
  352. Iterator* iter = db_->NewIterator(ro);
  353. iter->SeekToFirst();
  354. for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
  355. while (!iter->Valid()) {
  356. // wait for the key to be written
  357. env_->SleepForMicroseconds(100);
  358. iter->Seek(Key(i));
  359. }
  360. ASSERT_EQ(iter->key(), Key(i));
  361. ASSERT_EQ(iter->value(), Key(i));
  362. iter->Next();
  363. }
  364. delete iter;
  365. };
  366. ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
  367. ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
  368. writer_thread.join();
  369. reader_thread.join();
  370. }
  371. TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
  372. Options options = CurrentOptions();
  373. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  374. DestroyAndReopen(options);
  375. // Overview of the test:
  376. // * There are two merge operands for the same key: one in an sst file,
  377. // another in a memtable.
  378. // * Seek a tailing iterator to this key.
  379. // * As part of the seek, the iterator will:
  380. // (a) first visit the operand in the memtable and tell ForwardIterator
  381. // to pin this operand, then
  382. // (b) move on to the operand in the sst file, then pass both operands
  383. // to merge operator.
  384. // * The memtable may get flushed and unreferenced by another thread between
  385. // (a) and (b). The test simulates it by flushing the memtable inside a
  386. // SyncPoint callback located between (a) and (b).
  387. // * In this case it's ForwardIterator's responsibility to keep the memtable
  388. // pinned until (b) is complete. There used to be a bug causing
  389. // ForwardIterator to not pin it in some circumstances. This test
  390. // reproduces it.
  391. db_->Merge(WriteOptions(), "key", "sst");
  392. db_->Flush(FlushOptions()); // Switch to SuperVersion A
  393. db_->Merge(WriteOptions(), "key", "memtable");
  394. // Pin SuperVersion A
  395. std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
  396. bool pushed_first_operand = false;
  397. bool stepped_to_next_operand = false;
  398. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  399. "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
  400. EXPECT_FALSE(pushed_first_operand);
  401. pushed_first_operand = true;
  402. db_->Flush(FlushOptions()); // Switch to SuperVersion B
  403. });
  404. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
  405. "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
  406. EXPECT_FALSE(stepped_to_next_operand);
  407. stepped_to_next_operand = true;
  408. someone_else.reset(); // Unpin SuperVersion A
  409. });
  410. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  411. ReadOptions ro;
  412. ro.tailing = true;
  413. std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
  414. iter->Seek("key");
  415. ASSERT_TRUE(iter->status().ok());
  416. ASSERT_TRUE(iter->Valid());
  417. EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
  418. EXPECT_TRUE(pushed_first_operand);
  419. EXPECT_TRUE(stepped_to_next_operand);
  420. }
  421. #endif // ROCKSDB_LITE
  422. TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
  423. Options options = CurrentOptions();
  424. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  425. DestroyAndReopen(options);
  426. class TestSnapshotChecker : public SnapshotChecker {
  427. public:
  428. SnapshotCheckerResult CheckInSnapshot(
  429. SequenceNumber seq, SequenceNumber snapshot_seq) const override {
  430. return IsInSnapshot(seq, snapshot_seq)
  431. ? SnapshotCheckerResult::kInSnapshot
  432. : SnapshotCheckerResult::kNotInSnapshot;
  433. }
  434. bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
  435. switch (snapshot_seq) {
  436. case 0:
  437. return seq == 0;
  438. case 1:
  439. return seq <= 1;
  440. case 2:
  441. // seq = 2 not visible to snapshot with seq = 2
  442. return seq <= 1;
  443. case 3:
  444. return seq <= 3;
  445. case 4:
  446. // seq = 4 not visible to snpahost with seq = 4
  447. return seq <= 3;
  448. default:
  449. // seq >=4 is uncommitted
  450. return seq <= 4;
  451. };
  452. }
  453. };
  454. TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
  455. dbfull()->SetSnapshotChecker(snapshot_checker);
  456. std::string value;
  457. ASSERT_OK(Merge("foo", "v1"));
  458. ASSERT_EQ(1, db_->GetLatestSequenceNumber());
  459. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  460. ASSERT_OK(Merge("foo", "v2"));
  461. ASSERT_EQ(2, db_->GetLatestSequenceNumber());
  462. // v2 is not visible to latest snapshot, which has seq = 2.
  463. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  464. // Take a snapshot with seq = 2.
  465. const Snapshot* snapshot1 = db_->GetSnapshot();
  466. ASSERT_EQ(2, snapshot1->GetSequenceNumber());
  467. // v2 is not visible to snapshot1, which has seq = 2
  468. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  469. // Verify flush doesn't alter the result.
  470. ASSERT_OK(Flush());
  471. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  472. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
  473. ASSERT_OK(Merge("foo", "v3"));
  474. ASSERT_EQ(3, db_->GetLatestSequenceNumber());
  475. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  476. ASSERT_OK(Merge("foo", "v4"));
  477. ASSERT_EQ(4, db_->GetLatestSequenceNumber());
  478. // v4 is not visible to latest snapshot, which has seq = 4.
  479. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  480. const Snapshot* snapshot2 = db_->GetSnapshot();
  481. ASSERT_EQ(4, snapshot2->GetSequenceNumber());
  482. // v4 is not visible to snapshot2, which has seq = 4.
  483. ASSERT_EQ("v1,v2,v3",
  484. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  485. // Verify flush doesn't alter the result.
  486. ASSERT_OK(Flush());
  487. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  488. ASSERT_EQ("v1,v2,v3",
  489. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  490. ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
  491. ASSERT_OK(Merge("foo", "v5"));
  492. ASSERT_EQ(5, db_->GetLatestSequenceNumber());
  493. // v5 is uncommitted
  494. ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
  495. // full manual compaction.
  496. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  497. // Verify compaction doesn't alter the result.
  498. ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
  499. ASSERT_EQ("v1,v2,v3",
  500. GetWithReadCallback(snapshot_checker, "foo", snapshot2));
  501. ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
  502. db_->ReleaseSnapshot(snapshot1);
  503. db_->ReleaseSnapshot(snapshot2);
  504. }
  505. class PerConfigMergeOperatorPinningTest
  506. : public DBMergeOperatorTest,
  507. public testing::WithParamInterface<std::tuple<bool, int>> {
  508. public:
  509. PerConfigMergeOperatorPinningTest() {
  510. std::tie(disable_block_cache_, option_config_) = GetParam();
  511. }
  512. bool disable_block_cache_;
  513. };
  514. INSTANTIATE_TEST_CASE_P(
  515. MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
  516. ::testing::Combine(::testing::Bool(),
  517. ::testing::Range(static_cast<int>(DBTestBase::kDefault),
  518. static_cast<int>(DBTestBase::kEnd))));
  519. TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
  520. if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
  521. return;
  522. }
  523. Options options = CurrentOptions();
  524. options.merge_operator = MergeOperators::CreateMaxOperator();
  525. BlockBasedTableOptions table_options;
  526. table_options.no_block_cache = disable_block_cache_;
  527. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  528. DestroyAndReopen(options);
  529. Random rnd(301);
  530. std::map<std::string, std::string> true_data;
  531. const int kTotalMerges = 5000;
  532. // Every key gets ~10 operands
  533. const int kKeyRange = kTotalMerges / 10;
  534. const int kOperandSize = 20;
  535. const int kNumPutBefore = kKeyRange / 10; // 10% value
  536. const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
  537. const int kNumDelete = kKeyRange / 10; // 10% delete
  538. // kNumPutBefore keys will have base values
  539. for (int i = 0; i < kNumPutBefore; i++) {
  540. std::string key = Key(rnd.Next() % kKeyRange);
  541. std::string value = RandomString(&rnd, kOperandSize);
  542. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  543. true_data[key] = value;
  544. }
  545. // Do kTotalMerges merges
  546. for (int i = 0; i < kTotalMerges; i++) {
  547. std::string key = Key(rnd.Next() % kKeyRange);
  548. std::string value = RandomString(&rnd, kOperandSize);
  549. ASSERT_OK(db_->Merge(WriteOptions(), key, value));
  550. if (true_data[key] < value) {
  551. true_data[key] = value;
  552. }
  553. }
  554. // Overwrite random kNumPutAfter keys
  555. for (int i = 0; i < kNumPutAfter; i++) {
  556. std::string key = Key(rnd.Next() % kKeyRange);
  557. std::string value = RandomString(&rnd, kOperandSize);
  558. ASSERT_OK(db_->Put(WriteOptions(), key, value));
  559. true_data[key] = value;
  560. }
  561. // Delete random kNumDelete keys
  562. for (int i = 0; i < kNumDelete; i++) {
  563. std::string key = Key(rnd.Next() % kKeyRange);
  564. ASSERT_OK(db_->Delete(WriteOptions(), key));
  565. true_data.erase(key);
  566. }
  567. VerifyDBFromMap(true_data);
  568. }
  569. } // namespace ROCKSDB_NAMESPACE
  570. int main(int argc, char** argv) {
  571. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  572. ::testing::InitGoogleTest(&argc, argv);
  573. return RUN_ALL_TESTS();
  574. }