db_merge_operand_test.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include "db/db_test_util.h"
  6. #include "port/stack_trace.h"
  7. #include "rocksdb/merge_operator.h"
  8. #include "rocksdb/perf_context.h"
  9. #include "rocksdb/utilities/debug.h"
  10. #include "table/block_based/block_builder.h"
  11. #include "test_util/sync_point.h"
  12. #include "utilities/fault_injection_env.h"
  13. #include "utilities/merge_operators.h"
  14. #include "utilities/merge_operators/sortlist.h"
  15. #include "utilities/merge_operators/string_append/stringappend2.h"
  16. namespace ROCKSDB_NAMESPACE {
  17. namespace {
  18. class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
  19. public:
  20. LimitedStringAppendMergeOp(int limit, char delim)
  21. : StringAppendTESTOperator(delim), limit_(limit) {}
  22. const char* Name() const override {
  23. return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
  24. }
  25. bool ShouldMerge(const std::vector<Slice>& operands) const override {
  26. if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
  27. return true;
  28. }
  29. return false;
  30. }
  31. private:
  32. size_t limit_ = 0;
  33. };
  34. } // anonymous namespace
  35. class DBMergeOperandTest : public DBTestBase {
  36. public:
  37. DBMergeOperandTest()
  38. : DBTestBase("db_merge_operand_test", /*env_do_fsync=*/true) {}
  39. };
  40. TEST_F(DBMergeOperandTest, CacheEvictedMergeOperandReadAfterFreeBug) {
  41. // There was a bug of reading merge operands after they are mistakely freed
  42. // in DB::GetMergeOperands, which is surfaced by cache full.
  43. // See PR#9507 for more.
  44. Options options = CurrentOptions();
  45. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  46. BlockBasedTableOptions table_options;
  47. // Small cache to simulate cache full
  48. table_options.block_cache = NewLRUCache(1);
  49. options.table_factory.reset(NewBlockBasedTableFactory(table_options));
  50. Reopen(options);
  51. int num_records = 4;
  52. int number_of_operands = 0;
  53. std::vector<PinnableSlice> values(num_records);
  54. GetMergeOperandsOptions merge_operands_info;
  55. merge_operands_info.expected_max_number_of_operands = num_records;
  56. ASSERT_OK(Merge("k1", "v1"));
  57. ASSERT_OK(Flush());
  58. ASSERT_OK(Merge("k1", "v2"));
  59. ASSERT_OK(Flush());
  60. ASSERT_OK(Merge("k1", "v3"));
  61. ASSERT_OK(Flush());
  62. ASSERT_OK(Merge("k1", "v4"));
  63. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  64. "k1", values.data(), &merge_operands_info,
  65. &number_of_operands));
  66. ASSERT_EQ(number_of_operands, 4);
  67. ASSERT_EQ(values[0].ToString(), "v1");
  68. ASSERT_EQ(values[1].ToString(), "v2");
  69. ASSERT_EQ(values[2].ToString(), "v3");
  70. ASSERT_EQ(values[3].ToString(), "v4");
  71. }
  72. TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {
  73. // Repro for a bug where a memtable containing a merge operand could be
  74. // deleted before the merge operand was saved to the result.
  75. auto options = CurrentOptions();
  76. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  77. Reopen(options);
  78. ASSERT_OK(Merge("key", "value"));
  79. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
  80. {{"DBImpl::GetImpl:PostMemTableGet:0",
  81. "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush"},
  82. {"DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush",
  83. "DBImpl::GetImpl:PostMemTableGet:1"}});
  84. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
  85. auto flush_thread = port::Thread([&]() {
  86. TEST_SYNC_POINT(
  87. "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PreFlush");
  88. ASSERT_OK(Flush());
  89. TEST_SYNC_POINT(
  90. "DBMergeOperandTest::FlushedMergeOperandReadAfterFreeBug:PostFlush");
  91. });
  92. PinnableSlice value;
  93. GetMergeOperandsOptions merge_operands_info;
  94. merge_operands_info.expected_max_number_of_operands = 1;
  95. int number_of_operands;
  96. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  97. "key", &value, &merge_operands_info,
  98. &number_of_operands));
  99. ASSERT_EQ(1, number_of_operands);
  100. flush_thread.join();
  101. }
  102. TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
  103. Options options = CurrentOptions();
  104. int limit = 2;
  105. // Use only the latest two merge operands.
  106. options.merge_operator =
  107. std::make_shared<LimitedStringAppendMergeOp>(limit, ',');
  108. Reopen(options);
  109. int num_records = 4;
  110. int number_of_operands = 0;
  111. std::vector<PinnableSlice> values(num_records);
  112. GetMergeOperandsOptions merge_operands_info;
  113. merge_operands_info.expected_max_number_of_operands = num_records;
  114. // k0 value in memtable
  115. ASSERT_OK(Put("k0", "PutARock"));
  116. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  117. "k0", values.data(), &merge_operands_info,
  118. &number_of_operands));
  119. ASSERT_EQ(values[0], "PutARock");
  120. // k0.1 value in SST
  121. ASSERT_OK(Put("k0.1", "RockInSST"));
  122. ASSERT_OK(Flush());
  123. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  124. "k0.1", values.data(), &merge_operands_info,
  125. &number_of_operands));
  126. ASSERT_EQ(values[0], "RockInSST");
  127. // All k1 values are in memtable.
  128. ASSERT_OK(Merge("k1", "a"));
  129. ASSERT_OK(Put("k1", "x"));
  130. ASSERT_OK(Merge("k1", "b"));
  131. ASSERT_OK(Merge("k1", "c"));
  132. ASSERT_OK(Merge("k1", "d"));
  133. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  134. "k1", values.data(), &merge_operands_info,
  135. &number_of_operands));
  136. ASSERT_EQ(values[0], "x");
  137. ASSERT_EQ(values[1], "b");
  138. ASSERT_EQ(values[2], "c");
  139. ASSERT_EQ(values[3], "d");
  140. // expected_max_number_of_operands is less than number of merge operands so
  141. // status should be Incomplete.
  142. merge_operands_info.expected_max_number_of_operands = num_records - 1;
  143. Status status = db_->GetMergeOperands(
  144. ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
  145. &merge_operands_info, &number_of_operands);
  146. ASSERT_EQ(status.IsIncomplete(), true);
  147. merge_operands_info.expected_max_number_of_operands = num_records;
  148. // All k1.1 values are in memtable.
  149. ASSERT_OK(Merge("k1.1", "r"));
  150. ASSERT_OK(Delete("k1.1"));
  151. ASSERT_OK(Merge("k1.1", "c"));
  152. ASSERT_OK(Merge("k1.1", "k"));
  153. ASSERT_OK(Merge("k1.1", "s"));
  154. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  155. "k1.1", values.data(), &merge_operands_info,
  156. &number_of_operands));
  157. ASSERT_EQ(values[0], "c");
  158. ASSERT_EQ(values[1], "k");
  159. ASSERT_EQ(values[2], "s");
  160. // All k2 values are flushed to L0 into a single file.
  161. ASSERT_OK(Merge("k2", "q"));
  162. ASSERT_OK(Merge("k2", "w"));
  163. ASSERT_OK(Merge("k2", "e"));
  164. ASSERT_OK(Merge("k2", "r"));
  165. ASSERT_OK(Flush());
  166. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  167. "k2", values.data(), &merge_operands_info,
  168. &number_of_operands));
  169. ASSERT_EQ(values[0], "q");
  170. ASSERT_EQ(values[1], "w");
  171. ASSERT_EQ(values[2], "e");
  172. ASSERT_EQ(values[3], "r");
  173. // All k2.1 values are flushed to L0 into a single file.
  174. ASSERT_OK(Merge("k2.1", "m"));
  175. ASSERT_OK(Put("k2.1", "l"));
  176. ASSERT_OK(Merge("k2.1", "n"));
  177. ASSERT_OK(Merge("k2.1", "o"));
  178. ASSERT_OK(Flush());
  179. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  180. "k2.1", values.data(), &merge_operands_info,
  181. &number_of_operands));
  182. ASSERT_EQ(values[0], "l,n,o");
  183. // All k2.2 values are flushed to L0 into a single file.
  184. ASSERT_OK(Merge("k2.2", "g"));
  185. ASSERT_OK(Delete("k2.2"));
  186. ASSERT_OK(Merge("k2.2", "o"));
  187. ASSERT_OK(Merge("k2.2", "t"));
  188. ASSERT_OK(Flush());
  189. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  190. "k2.2", values.data(), &merge_operands_info,
  191. &number_of_operands));
  192. ASSERT_EQ(values[0], "o,t");
  193. // Do some compaction that will make the following tests more predictable
  194. // Slice start("PutARock");
  195. // Slice end("t");
  196. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  197. // All k3 values are flushed and are in different files.
  198. ASSERT_OK(Merge("k3", "ab"));
  199. ASSERT_OK(Flush());
  200. ASSERT_OK(Merge("k3", "bc"));
  201. ASSERT_OK(Flush());
  202. ASSERT_OK(Merge("k3", "cd"));
  203. ASSERT_OK(Flush());
  204. ASSERT_OK(Merge("k3", "de"));
  205. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  206. "k3", values.data(), &merge_operands_info,
  207. &number_of_operands));
  208. ASSERT_EQ(values[0], "ab");
  209. ASSERT_EQ(values[1], "bc");
  210. ASSERT_EQ(values[2], "cd");
  211. ASSERT_EQ(values[3], "de");
  212. // All k3.1 values are flushed and are in different files.
  213. ASSERT_OK(Merge("k3.1", "ab"));
  214. ASSERT_OK(Flush());
  215. ASSERT_OK(Put("k3.1", "bc"));
  216. ASSERT_OK(Flush());
  217. ASSERT_OK(Merge("k3.1", "cd"));
  218. ASSERT_OK(Flush());
  219. ASSERT_OK(Merge("k3.1", "de"));
  220. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  221. "k3.1", values.data(), &merge_operands_info,
  222. &number_of_operands));
  223. ASSERT_EQ(values[0], "bc");
  224. ASSERT_EQ(values[1], "cd");
  225. ASSERT_EQ(values[2], "de");
  226. // All k3.2 values are flushed and are in different files.
  227. ASSERT_OK(Merge("k3.2", "ab"));
  228. ASSERT_OK(Flush());
  229. ASSERT_OK(Delete("k3.2"));
  230. ASSERT_OK(Flush());
  231. ASSERT_OK(Merge("k3.2", "cd"));
  232. ASSERT_OK(Flush());
  233. ASSERT_OK(Merge("k3.2", "de"));
  234. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  235. "k3.2", values.data(), &merge_operands_info,
  236. &number_of_operands));
  237. ASSERT_EQ(number_of_operands, 2);
  238. ASSERT_EQ(values[0], "cd");
  239. ASSERT_EQ(values[1], "de");
  240. // All K4 values are in different levels
  241. ASSERT_OK(Merge("k4", "ba"));
  242. ASSERT_OK(Flush());
  243. MoveFilesToLevel(4);
  244. ASSERT_OK(Merge("k4", "cb"));
  245. ASSERT_OK(Flush());
  246. MoveFilesToLevel(3);
  247. ASSERT_OK(Merge("k4", "dc"));
  248. ASSERT_OK(Flush());
  249. MoveFilesToLevel(1);
  250. ASSERT_OK(Merge("k4", "ed"));
  251. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  252. "k4", values.data(), &merge_operands_info,
  253. &number_of_operands));
  254. ASSERT_EQ(number_of_operands, 4);
  255. ASSERT_EQ(values[0], "ba");
  256. ASSERT_EQ(values[1], "cb");
  257. ASSERT_EQ(values[2], "dc");
  258. ASSERT_EQ(values[3], "ed");
  259. // First 3 k5 values are in SST and next 4 k5 values are in Immutable
  260. // Memtable
  261. ASSERT_OK(Merge("k5", "who"));
  262. ASSERT_OK(Merge("k5", "am"));
  263. ASSERT_OK(Merge("k5", "i"));
  264. ASSERT_OK(Flush());
  265. ASSERT_OK(Put("k5", "remember"));
  266. ASSERT_OK(Merge("k5", "i"));
  267. ASSERT_OK(Merge("k5", "am"));
  268. ASSERT_OK(Merge("k5", "rocks"));
  269. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  270. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  271. "k5", values.data(), &merge_operands_info,
  272. &number_of_operands));
  273. ASSERT_EQ(number_of_operands, 4);
  274. ASSERT_EQ(values[0], "remember");
  275. ASSERT_EQ(values[1], "i");
  276. ASSERT_EQ(values[2], "am");
  277. ASSERT_EQ(values[3], "rocks");
  278. // GetMergeOperands() in ReadOnly DB
  279. ASSERT_OK(Merge("k6", "better"));
  280. ASSERT_OK(Merge("k6", "call"));
  281. ASSERT_OK(Merge("k6", "saul"));
  282. ASSERT_OK(ReadOnlyReopen(options));
  283. std::vector<PinnableSlice> readonly_values(num_records);
  284. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  285. "k6", readonly_values.data(),
  286. &merge_operands_info, &number_of_operands));
  287. ASSERT_EQ(number_of_operands, 3);
  288. ASSERT_EQ(readonly_values[0], "better");
  289. ASSERT_EQ(readonly_values[1], "call");
  290. ASSERT_EQ(readonly_values[2], "saul");
  291. }
  292. TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
  293. Options options = CurrentOptions();
  294. options.enable_blob_files = true;
  295. options.min_blob_size = 0;
  296. // Use only the latest two merge operands.
  297. options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
  298. Reopen(options);
  299. int num_records = 4;
  300. int number_of_operands = 0;
  301. std::vector<PinnableSlice> values(num_records);
  302. GetMergeOperandsOptions merge_operands_info;
  303. merge_operands_info.expected_max_number_of_operands = num_records;
  304. // All k1 values are in memtable.
  305. ASSERT_OK(Put("k1", "x"));
  306. ASSERT_OK(Merge("k1", "b"));
  307. ASSERT_OK(Merge("k1", "c"));
  308. ASSERT_OK(Merge("k1", "d"));
  309. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  310. "k1", values.data(), &merge_operands_info,
  311. &number_of_operands));
  312. ASSERT_EQ(values[0], "x");
  313. ASSERT_EQ(values[1], "b");
  314. ASSERT_EQ(values[2], "c");
  315. ASSERT_EQ(values[3], "d");
  316. // expected_max_number_of_operands is less than number of merge operands so
  317. // status should be Incomplete.
  318. merge_operands_info.expected_max_number_of_operands = num_records - 1;
  319. Status status = db_->GetMergeOperands(
  320. ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(),
  321. &merge_operands_info, &number_of_operands);
  322. ASSERT_EQ(status.IsIncomplete(), true);
  323. merge_operands_info.expected_max_number_of_operands = num_records;
  324. // All k2 values are flushed to L0 into a single file.
  325. ASSERT_OK(Put("k2", "q"));
  326. ASSERT_OK(Merge("k2", "w"));
  327. ASSERT_OK(Merge("k2", "e"));
  328. ASSERT_OK(Merge("k2", "r"));
  329. ASSERT_OK(Flush());
  330. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  331. "k2", values.data(), &merge_operands_info,
  332. &number_of_operands));
  333. ASSERT_EQ(values[0], "q,w,e,r");
  334. // Do some compaction that will make the following tests more predictable
  335. ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  336. // All k3 values are flushed and are in different files.
  337. ASSERT_OK(Put("k3", "ab"));
  338. ASSERT_OK(Flush());
  339. ASSERT_OK(Merge("k3", "bc"));
  340. ASSERT_OK(Flush());
  341. ASSERT_OK(Merge("k3", "cd"));
  342. ASSERT_OK(Flush());
  343. ASSERT_OK(Merge("k3", "de"));
  344. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  345. "k3", values.data(), &merge_operands_info,
  346. &number_of_operands));
  347. ASSERT_EQ(values[0], "ab");
  348. ASSERT_EQ(values[1], "bc");
  349. ASSERT_EQ(values[2], "cd");
  350. ASSERT_EQ(values[3], "de");
  351. // All K4 values are in different levels
  352. ASSERT_OK(Put("k4", "ba"));
  353. ASSERT_OK(Flush());
  354. MoveFilesToLevel(4);
  355. ASSERT_OK(Merge("k4", "cb"));
  356. ASSERT_OK(Flush());
  357. MoveFilesToLevel(3);
  358. ASSERT_OK(Merge("k4", "dc"));
  359. ASSERT_OK(Flush());
  360. MoveFilesToLevel(1);
  361. ASSERT_OK(Merge("k4", "ed"));
  362. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  363. "k4", values.data(), &merge_operands_info,
  364. &number_of_operands));
  365. ASSERT_EQ(values[0], "ba");
  366. ASSERT_EQ(values[1], "cb");
  367. ASSERT_EQ(values[2], "dc");
  368. ASSERT_EQ(values[3], "ed");
  369. }
  370. TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) {
  371. // These constants are chosen to trigger the large result optimization
  372. // (pinning a bundle of `DBImpl` resources).
  373. const int kNumOperands = 1024;
  374. const int kOperandLen = 1024;
  375. Options options = CurrentOptions();
  376. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  377. DestroyAndReopen(options);
  378. Random rnd(301);
  379. std::vector<std::string> expected_merge_operands;
  380. expected_merge_operands.reserve(kNumOperands);
  381. for (int i = 0; i < kNumOperands; ++i) {
  382. expected_merge_operands.emplace_back(rnd.RandomString(kOperandLen));
  383. ASSERT_OK(Merge("key", expected_merge_operands.back()));
  384. }
  385. std::vector<PinnableSlice> merge_operands(kNumOperands);
  386. GetMergeOperandsOptions merge_operands_info;
  387. merge_operands_info.expected_max_number_of_operands = kNumOperands;
  388. int num_merge_operands = 0;
  389. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  390. "key", merge_operands.data(),
  391. &merge_operands_info, &num_merge_operands));
  392. ASSERT_EQ(num_merge_operands, kNumOperands);
  393. // Ensures the large result optimization was used.
  394. for (int i = 0; i < kNumOperands; ++i) {
  395. ASSERT_TRUE(merge_operands[i].IsPinned());
  396. }
  397. // Add a Flush() to change the `SuperVersion` to challenge the resource
  398. // pinning.
  399. ASSERT_OK(Flush());
  400. for (int i = 0; i < kNumOperands; ++i) {
  401. ASSERT_EQ(expected_merge_operands[i], merge_operands[i]);
  402. }
  403. }
  404. TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInMemtable) {
  405. const int kNumOperands = 10;
  406. const int kNumOperandsToFetch = 5;
  407. Options options = CurrentOptions();
  408. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  409. DestroyAndReopen(options);
  410. Random rnd(301);
  411. std::vector<std::string> expected_merge_operands;
  412. expected_merge_operands.reserve(kNumOperands);
  413. for (int i = 0; i < kNumOperands; ++i) {
  414. expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
  415. ASSERT_OK(Merge("key", expected_merge_operands.back()));
  416. }
  417. std::vector<PinnableSlice> merge_operands(kNumOperands);
  418. GetMergeOperandsOptions merge_operands_info;
  419. merge_operands_info.expected_max_number_of_operands = kNumOperands;
  420. int num_fetched = 0;
  421. merge_operands_info.continue_cb = [&](Slice /* value */) {
  422. num_fetched++;
  423. return num_fetched != kNumOperandsToFetch;
  424. };
  425. int num_merge_operands = 0;
  426. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  427. "key", merge_operands.data(),
  428. &merge_operands_info, &num_merge_operands));
  429. ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
  430. ASSERT_EQ(kNumOperandsToFetch, num_fetched);
  431. for (int i = 0; i < kNumOperandsToFetch; ++i) {
  432. ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
  433. merge_operands[i]);
  434. }
  435. }
  436. TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitBaseValue) {
  437. // The continuation callback doesn't need to be called on a base value because
  438. // there's no remaining work to be saved.
  439. Options options = CurrentOptions();
  440. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  441. DestroyAndReopen(options);
  442. Random rnd(301);
  443. std::string expected_value = rnd.RandomString(7 /* len */);
  444. ASSERT_OK(Put("key", expected_value));
  445. std::vector<PinnableSlice> merge_operands(1);
  446. GetMergeOperandsOptions merge_operands_info;
  447. merge_operands_info.expected_max_number_of_operands = 1;
  448. int num_fetched = 0;
  449. merge_operands_info.continue_cb = [&num_fetched](Slice /* value */) {
  450. num_fetched++;
  451. return true;
  452. };
  453. int num_merge_operands = 0;
  454. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  455. "key", merge_operands.data(),
  456. &merge_operands_info, &num_merge_operands));
  457. ASSERT_EQ(1, num_merge_operands);
  458. ASSERT_EQ(0, num_fetched);
  459. ASSERT_EQ(expected_value, merge_operands[0]);
  460. }
  461. TEST_F(DBMergeOperandTest, GetMergeOperandsShortCircuitInSst) {
  462. const int kNumOperands = 10;
  463. const int kNumOperandsToFetch = 5;
  464. Options options = CurrentOptions();
  465. options.disable_auto_compactions = true;
  466. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  467. DestroyAndReopen(options);
  468. Random rnd(301);
  469. std::vector<std::string> expected_merge_operands;
  470. expected_merge_operands.reserve(kNumOperands);
  471. for (int i = 0; i < kNumOperands; ++i) {
  472. expected_merge_operands.emplace_back(rnd.RandomString(7 /* len */));
  473. ASSERT_OK(Merge("key", expected_merge_operands.back()));
  474. ASSERT_OK(Flush());
  475. }
  476. std::vector<PinnableSlice> merge_operands(kNumOperands);
  477. GetMergeOperandsOptions merge_operands_info;
  478. merge_operands_info.expected_max_number_of_operands = kNumOperands;
  479. int num_fetched = 0;
  480. merge_operands_info.continue_cb = [&](Slice /* value */) {
  481. num_fetched++;
  482. return num_fetched != kNumOperandsToFetch;
  483. };
  484. int num_merge_operands = 0;
  485. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  486. "key", merge_operands.data(),
  487. &merge_operands_info, &num_merge_operands));
  488. ASSERT_EQ(kNumOperandsToFetch, num_merge_operands);
  489. ASSERT_EQ(kNumOperandsToFetch, num_fetched);
  490. for (int i = 0; i < kNumOperandsToFetch; ++i) {
  491. ASSERT_EQ(expected_merge_operands[kNumOperands - kNumOperandsToFetch + i],
  492. merge_operands[i]);
  493. }
  494. }
  495. TEST_F(DBMergeOperandTest, GetMergeOperandsBaseDeletionInImmMem) {
  496. // In this test, "k1" has a MERGE in a mutable memtable on top of a base
  497. // DELETE in an immutable memtable.
  498. Options opts = CurrentOptions();
  499. opts.max_write_buffer_number = 10;
  500. opts.min_write_buffer_number_to_merge = 10;
  501. opts.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
  502. Reopen(opts);
  503. ASSERT_OK(Put("k1", "val"));
  504. ASSERT_OK(Flush());
  505. ASSERT_OK(Put("k0", "val"));
  506. ASSERT_OK(Delete("k1"));
  507. ASSERT_OK(Put("k2", "val"));
  508. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  509. ASSERT_OK(Merge("k1", "val"));
  510. {
  511. std::vector<PinnableSlice> values(2);
  512. GetMergeOperandsOptions merge_operands_info;
  513. merge_operands_info.expected_max_number_of_operands =
  514. static_cast<int>(values.size());
  515. std::string key = "k1", from_db;
  516. int number_of_operands = 0;
  517. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  518. key, values.data(), &merge_operands_info,
  519. &number_of_operands));
  520. ASSERT_EQ(1, number_of_operands);
  521. from_db = values[0].ToString();
  522. ASSERT_EQ("val", from_db);
  523. }
  524. {
  525. std::string val;
  526. ASSERT_OK(db_->Get(ReadOptions(), "k1", &val));
  527. ASSERT_EQ("val", val);
  528. }
  529. }
  530. TEST_F(DBMergeOperandTest, GetMergeOperandCallbackStopAtImm) {
  531. Options options = CurrentOptions();
  532. options.max_write_buffer_number = 10;
  533. options.merge_operator = MergeOperators::CreateStringAppendOperator();
  534. DestroyAndReopen(options);
  535. Random rnd(301);
  536. ASSERT_OK(db_->PauseBackgroundWork());
  537. ASSERT_OK(Merge("key", "v1"));
  538. ASSERT_OK(dbfull()->TEST_SwitchMemtable());
  539. // Keep this merge in an immutable memtable
  540. uint64_t num_imm = 0;
  541. ASSERT_TRUE(
  542. db_->GetIntProperty(DB::Properties::kNumImmutableMemTable, &num_imm));
  543. ASSERT_EQ(num_imm, 1);
  544. ASSERT_OK(Merge("key", "v2"));
  545. std::vector<PinnableSlice> merge_operands(2);
  546. GetMergeOperandsOptions merge_operands_info;
  547. merge_operands_info.expected_max_number_of_operands = 2;
  548. int num_fetched = 0;
  549. merge_operands_info.continue_cb = [&num_fetched](Slice /* value */) {
  550. num_fetched++;
  551. // Stop in the first immutable memtable.
  552. return num_fetched < 2;
  553. };
  554. int num_merge_operands = 0;
  555. ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
  556. "key", merge_operands.data(),
  557. &merge_operands_info, &num_merge_operands));
  558. ASSERT_EQ(2, num_merge_operands);
  559. ASSERT_EQ(2, num_fetched);
  560. ASSERT_EQ("v1", merge_operands[0]);
  561. ASSERT_EQ("v2", merge_operands[1]);
  562. }
  563. } // namespace ROCKSDB_NAMESPACE
  564. int main(int argc, char** argv) {
  565. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  566. ::testing::InitGoogleTest(&argc, argv);
  567. return RUN_ALL_TESTS();
  568. }