merge_test.cc 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include <cassert>
  7. #include <iostream>
  8. #include <memory>
  9. #include "db/db_impl/db_impl.h"
  10. #include "db/dbformat.h"
  11. #include "db/write_batch_internal.h"
  12. #include "port/stack_trace.h"
  13. #include "rocksdb/cache.h"
  14. #include "rocksdb/comparator.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/env.h"
  17. #include "rocksdb/merge_operator.h"
  18. #include "rocksdb/utilities/db_ttl.h"
  19. #include "rocksdb/wide_columns.h"
  20. #include "test_util/testharness.h"
  21. #include "util/coding.h"
  22. #include "utilities/merge_operators.h"
  23. namespace ROCKSDB_NAMESPACE {
  24. bool use_compression;
  25. class MergeTest : public testing::Test {};
  26. size_t num_merge_operator_calls;
  27. void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
  28. size_t num_partial_merge_calls;
  29. void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
  30. class CountMergeOperator : public AssociativeMergeOperator {
  31. public:
  32. CountMergeOperator() {
  33. mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
  34. }
  35. bool Merge(const Slice& key, const Slice* existing_value, const Slice& value,
  36. std::string* new_value, Logger* logger) const override {
  37. assert(new_value->empty());
  38. ++num_merge_operator_calls;
  39. if (existing_value == nullptr) {
  40. new_value->assign(value.data(), value.size());
  41. return true;
  42. }
  43. return mergeOperator_->PartialMerge(key, *existing_value, value, new_value,
  44. logger);
  45. }
  46. bool PartialMergeMulti(const Slice& key,
  47. const std::deque<Slice>& operand_list,
  48. std::string* new_value,
  49. Logger* logger) const override {
  50. assert(new_value->empty());
  51. ++num_partial_merge_calls;
  52. return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
  53. logger);
  54. }
  55. const char* Name() const override { return "UInt64AddOperator"; }
  56. private:
  57. std::shared_ptr<MergeOperator> mergeOperator_;
  58. };
  59. class EnvMergeTest : public EnvWrapper {
  60. public:
  61. EnvMergeTest() : EnvWrapper(Env::Default()) {}
  62. static const char* kClassName() { return "MergeEnv"; }
  63. const char* Name() const override { return kClassName(); }
  64. // ~EnvMergeTest() override {}
  65. uint64_t NowNanos() override {
  66. ++now_nanos_count_;
  67. return target()->NowNanos();
  68. }
  69. static uint64_t now_nanos_count_;
  70. static std::unique_ptr<EnvMergeTest> singleton_;
  71. static EnvMergeTest* GetInstance() {
  72. if (nullptr == singleton_) {
  73. singleton_.reset(new EnvMergeTest);
  74. }
  75. return singleton_.get();
  76. }
  77. };
  78. uint64_t EnvMergeTest::now_nanos_count_{0};
  79. std::unique_ptr<EnvMergeTest> EnvMergeTest::singleton_;
  80. std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
  81. const size_t max_successive_merges = 0) {
  82. DB* db;
  83. Options options;
  84. options.create_if_missing = true;
  85. options.merge_operator = std::make_shared<CountMergeOperator>();
  86. options.max_successive_merges = max_successive_merges;
  87. options.env = EnvMergeTest::GetInstance();
  88. EXPECT_OK(DestroyDB(dbname, Options()));
  89. Status s;
  90. if (ttl) {
  91. DBWithTTL* db_with_ttl;
  92. s = DBWithTTL::Open(options, dbname, &db_with_ttl);
  93. db = db_with_ttl;
  94. } else {
  95. s = DB::Open(options, dbname, &db);
  96. }
  97. EXPECT_OK(s);
  98. assert(s.ok());
  99. // Allowed to call NowNanos during DB creation (in GenerateRawUniqueId() for
  100. // session ID)
  101. EnvMergeTest::now_nanos_count_ = 0;
  102. return std::shared_ptr<DB>(db);
  103. }
  104. // Imagine we are maintaining a set of uint64 counters.
  105. // Each counter has a distinct name. And we would like
  106. // to support four high level operations:
  107. // set, add, get and remove
  108. // This is a quick implementation without a Merge operation.
  109. class Counters {
  110. protected:
  111. std::shared_ptr<DB> db_;
  112. WriteOptions put_option_;
  113. ReadOptions get_option_;
  114. WriteOptions delete_option_;
  115. uint64_t default_;
  116. public:
  117. explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
  118. : db_(db),
  119. put_option_(),
  120. get_option_(),
  121. delete_option_(),
  122. default_(defaultCount) {
  123. assert(db_);
  124. }
  125. virtual ~Counters() = default;
  126. // public interface of Counters.
  127. // All four functions return false
  128. // if the underlying level db operation failed.
  129. // mapped to a levedb Put
  130. bool set(const std::string& key, uint64_t value) {
  131. // just treat the internal rep of int64 as the string
  132. char buf[sizeof(value)];
  133. EncodeFixed64(buf, value);
  134. Slice slice(buf, sizeof(value));
  135. auto s = db_->Put(put_option_, key, slice);
  136. if (s.ok()) {
  137. return true;
  138. } else {
  139. std::cerr << s.ToString() << std::endl;
  140. return false;
  141. }
  142. }
  143. // mapped to a rocksdb Delete
  144. bool remove(const std::string& key) {
  145. auto s = db_->Delete(delete_option_, key);
  146. if (s.ok()) {
  147. return true;
  148. } else {
  149. std::cerr << s.ToString() << std::endl;
  150. return false;
  151. }
  152. }
  153. // mapped to a rocksdb Get
  154. bool get(const std::string& key, uint64_t* value) {
  155. std::string str;
  156. auto s = db_->Get(get_option_, key, &str);
  157. if (s.IsNotFound()) {
  158. // return default value if not found;
  159. *value = default_;
  160. return true;
  161. } else if (s.ok()) {
  162. // deserialization
  163. if (str.size() != sizeof(uint64_t)) {
  164. std::cerr << "value corruption\n";
  165. return false;
  166. }
  167. *value = DecodeFixed64(str.data());
  168. return true;
  169. } else {
  170. std::cerr << s.ToString() << std::endl;
  171. return false;
  172. }
  173. }
  174. // 'add' is implemented as get -> modify -> set
  175. // An alternative is a single merge operation, see MergeBasedCounters
  176. virtual bool add(const std::string& key, uint64_t value) {
  177. uint64_t base = default_;
  178. return get(key, &base) && set(key, base + value);
  179. }
  180. // convenience functions for testing
  181. void assert_set(const std::string& key, uint64_t value) {
  182. assert(set(key, value));
  183. }
  184. void assert_remove(const std::string& key) { assert(remove(key)); }
  185. uint64_t assert_get(const std::string& key) {
  186. uint64_t value = default_;
  187. int result = get(key, &value);
  188. assert(result);
  189. if (result == 0) {
  190. exit(1); // Disable unused variable warning.
  191. }
  192. return value;
  193. }
  194. void assert_add(const std::string& key, uint64_t value) {
  195. int result = add(key, value);
  196. assert(result);
  197. if (result == 0) {
  198. exit(1); // Disable unused variable warning.
  199. }
  200. }
  201. };
  202. // Implement 'add' directly with the new Merge operation
  203. class MergeBasedCounters : public Counters {
  204. private:
  205. WriteOptions merge_option_; // for merge
  206. public:
  207. explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
  208. : Counters(db, defaultCount), merge_option_() {}
  209. // mapped to a rocksdb Merge operation
  210. bool add(const std::string& key, uint64_t value) override {
  211. char encoded[sizeof(uint64_t)];
  212. EncodeFixed64(encoded, value);
  213. Slice slice(encoded, sizeof(uint64_t));
  214. auto s = db_->Merge(merge_option_, key, slice);
  215. if (s.ok()) {
  216. return true;
  217. } else {
  218. std::cerr << s.ToString() << std::endl;
  219. return false;
  220. }
  221. }
  222. };
  223. void dumpDb(DB* db) {
  224. auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
  225. for (it->SeekToFirst(); it->Valid(); it->Next()) {
  226. // uint64_t value = DecodeFixed64(it->value().data());
  227. // std::cout << it->key().ToString() << ": " << value << std::endl;
  228. }
  229. assert(it->status().ok()); // Check for any errors found during the scan
  230. }
  231. void testCounters(Counters& counters, DB* db, bool test_compaction) {
  232. FlushOptions o;
  233. o.wait = true;
  234. counters.assert_set("a", 1);
  235. if (test_compaction) {
  236. ASSERT_OK(db->Flush(o));
  237. }
  238. ASSERT_EQ(counters.assert_get("a"), 1);
  239. counters.assert_remove("b");
  240. // defaut value is 0 if non-existent
  241. ASSERT_EQ(counters.assert_get("b"), 0);
  242. counters.assert_add("a", 2);
  243. if (test_compaction) {
  244. ASSERT_OK(db->Flush(o));
  245. }
  246. // 1+2 = 3
  247. ASSERT_EQ(counters.assert_get("a"), 3);
  248. dumpDb(db);
  249. // 1+...+49 = ?
  250. uint64_t sum = 0;
  251. for (int i = 1; i < 50; i++) {
  252. counters.assert_add("b", i);
  253. sum += i;
  254. }
  255. ASSERT_EQ(counters.assert_get("b"), sum);
  256. dumpDb(db);
  257. if (test_compaction) {
  258. ASSERT_OK(db->Flush(o));
  259. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  260. dumpDb(db);
  261. ASSERT_EQ(counters.assert_get("a"), 3);
  262. ASSERT_EQ(counters.assert_get("b"), sum);
  263. }
  264. }
  265. void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
  266. ASSERT_OK(db->Put({}, "1", "1"));
  267. ASSERT_OK(db->Flush(FlushOptions()));
  268. std::atomic<int> cnt{0};
  269. const auto get_thread_id = [&cnt]() {
  270. thread_local int thread_id{cnt++};
  271. return thread_id;
  272. };
  273. SyncPoint::GetInstance()->DisableProcessing();
  274. SyncPoint::GetInstance()->ClearAllCallBacks();
  275. SyncPoint::GetInstance()->SetCallBack(
  276. "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
  277. int thread_id = get_thread_id();
  278. if (1 == thread_id) {
  279. TEST_SYNC_POINT(
  280. "testCountersWithFlushAndCompaction::bg_compact_thread:0");
  281. } else if (2 == thread_id) {
  282. TEST_SYNC_POINT(
  283. "testCountersWithFlushAndCompaction::bg_flush_thread:0");
  284. }
  285. });
  286. SyncPoint::GetInstance()->SetCallBack(
  287. "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
  288. int thread_id = get_thread_id();
  289. if (0 == thread_id) {
  290. TEST_SYNC_POINT(
  291. "testCountersWithFlushAndCompaction::set_options_thread:0");
  292. TEST_SYNC_POINT(
  293. "testCountersWithFlushAndCompaction::set_options_thread:1");
  294. }
  295. });
  296. SyncPoint::GetInstance()->SetCallBack(
  297. "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
  298. auto* mutex = static_cast<InstrumentedMutex*>(arg);
  299. mutex->AssertHeld();
  300. int thread_id = get_thread_id();
  301. ASSERT_EQ(2, thread_id);
  302. mutex->Unlock();
  303. TEST_SYNC_POINT(
  304. "testCountersWithFlushAndCompaction::bg_flush_thread:1");
  305. TEST_SYNC_POINT(
  306. "testCountersWithFlushAndCompaction::bg_flush_thread:2");
  307. mutex->Lock();
  308. });
  309. SyncPoint::GetInstance()->LoadDependency({
  310. {"testCountersWithFlushAndCompaction::set_options_thread:0",
  311. "testCountersWithCompactionAndFlush:BeforeCompact"},
  312. {"testCountersWithFlushAndCompaction::bg_compact_thread:0",
  313. "testCountersWithFlushAndCompaction:BeforeIncCounters"},
  314. {"testCountersWithFlushAndCompaction::bg_flush_thread:0",
  315. "testCountersWithFlushAndCompaction::set_options_thread:1"},
  316. {"testCountersWithFlushAndCompaction::bg_flush_thread:1",
  317. "testCountersWithFlushAndCompaction:BeforeVerification"},
  318. {"testCountersWithFlushAndCompaction:AfterGet",
  319. "testCountersWithFlushAndCompaction::bg_flush_thread:2"},
  320. });
  321. // This test relies on old behavior of SetOptions writing to the
  322. // manifest. Here we restore that old behavior for reproducer purposes.
  323. // (Brief attempts to use an alternative to SetOptions failed.)
  324. SyncPoint::GetInstance()->SetCallBack(
  325. "DBImpl::SetOptions:dummy_edit", [&](void* arg) {
  326. auto* dummy_edit = static_cast<VersionEdit*>(arg);
  327. dummy_edit->Clear();
  328. });
  329. SyncPoint::GetInstance()->EnableProcessing();
  330. port::Thread set_options_thread([&]() {
  331. ASSERT_OK(static_cast<DBImpl*>(db)->SetOptions(
  332. {{"disable_auto_compactions", "false"}}));
  333. });
  334. TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
  335. port::Thread compact_thread([&]() {
  336. ASSERT_OK(static_cast<DBImpl*>(db)->CompactRange(
  337. CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
  338. });
  339. TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
  340. counters.add("test-key", 1);
  341. FlushOptions flush_opts;
  342. flush_opts.wait = false;
  343. ASSERT_OK(db->Flush(flush_opts));
  344. TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
  345. std::string expected;
  346. PutFixed64(&expected, 1);
  347. std::string actual;
  348. Status s = db->Get(ReadOptions(), "test-key", &actual);
  349. TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
  350. set_options_thread.join();
  351. compact_thread.join();
  352. ASSERT_OK(s);
  353. ASSERT_EQ(expected, actual);
  354. SyncPoint::GetInstance()->DisableProcessing();
  355. SyncPoint::GetInstance()->ClearAllCallBacks();
  356. }
  357. void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
  358. size_t num_merges) {
  359. counters.assert_remove("z");
  360. uint64_t sum = 0;
  361. for (size_t i = 1; i <= num_merges; ++i) {
  362. resetNumMergeOperatorCalls();
  363. counters.assert_add("z", i);
  364. sum += i;
  365. if (i % (max_num_merges + 1) == 0) {
  366. ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1);
  367. } else {
  368. ASSERT_EQ(num_merge_operator_calls, 0);
  369. }
  370. resetNumMergeOperatorCalls();
  371. ASSERT_EQ(counters.assert_get("z"), sum);
  372. ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1));
  373. }
  374. }
  375. void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
  376. size_t min_merge, size_t count) {
  377. FlushOptions o;
  378. o.wait = true;
  379. // Test case 1: partial merge should be called when the number of merge
  380. // operands exceeds the threshold.
  381. uint64_t tmp_sum = 0;
  382. resetNumPartialMergeCalls();
  383. for (size_t i = 1; i <= count; i++) {
  384. counters->assert_add("b", i);
  385. tmp_sum += i;
  386. }
  387. ASSERT_OK(db->Flush(o));
  388. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  389. ASSERT_EQ(tmp_sum, counters->assert_get("b"));
  390. if (count > max_merge) {
  391. // in this case, FullMerge should be called instead.
  392. ASSERT_EQ(num_partial_merge_calls, 0U);
  393. } else {
  394. // if count >= min_merge, then partial merge should be called once.
  395. ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
  396. }
  397. // Test case 2: partial merge should not be called when a put is found.
  398. resetNumPartialMergeCalls();
  399. tmp_sum = 0;
  400. ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10"));
  401. for (size_t i = 1; i <= count; i++) {
  402. counters->assert_add("c", i);
  403. tmp_sum += i;
  404. }
  405. ASSERT_OK(db->Flush(o));
  406. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  407. ASSERT_EQ(tmp_sum, counters->assert_get("c"));
  408. ASSERT_EQ(num_partial_merge_calls, 0U);
  409. // NowNanos was previously called in MergeHelper::FilterMerge(), which
  410. // harmed performance.
  411. ASSERT_EQ(EnvMergeTest::now_nanos_count_, 0U);
  412. }
  413. void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
  414. size_t num_merges) {
  415. ASSERT_GT(num_merges, max_num_merges);
  416. Slice key("BatchSuccessiveMerge");
  417. uint64_t merge_value = 1;
  418. char buf[sizeof(merge_value)];
  419. EncodeFixed64(buf, merge_value);
  420. Slice merge_value_slice(buf, sizeof(merge_value));
  421. // Create the batch
  422. WriteBatch batch;
  423. for (size_t i = 0; i < num_merges; ++i) {
  424. ASSERT_OK(batch.Merge(key, merge_value_slice));
  425. }
  426. // Apply to memtable and count the number of merges
  427. resetNumMergeOperatorCalls();
  428. ASSERT_OK(db->Write(WriteOptions(), &batch));
  429. ASSERT_EQ(
  430. num_merge_operator_calls,
  431. static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
  432. // Get the value
  433. resetNumMergeOperatorCalls();
  434. std::string get_value_str;
  435. ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str));
  436. assert(get_value_str.size() == sizeof(uint64_t));
  437. uint64_t get_value = DecodeFixed64(get_value_str.data());
  438. ASSERT_EQ(get_value, num_merges * merge_value);
  439. ASSERT_EQ(num_merge_operator_calls,
  440. static_cast<size_t>((num_merges % (max_num_merges + 1))));
  441. }
  442. void runTest(const std::string& dbname, const bool use_ttl = false) {
  443. {
  444. auto db = OpenDb(dbname, use_ttl);
  445. {
  446. Counters counters(db, 0);
  447. testCounters(counters, db.get(), true);
  448. }
  449. {
  450. MergeBasedCounters counters(db, 0);
  451. testCounters(counters, db.get(), use_compression);
  452. }
  453. }
  454. ASSERT_OK(DestroyDB(dbname, Options()));
  455. {
  456. size_t max_merge = 5;
  457. auto db = OpenDb(dbname, use_ttl, max_merge);
  458. MergeBasedCounters counters(db, 0);
  459. testCounters(counters, db.get(), use_compression);
  460. testSuccessiveMerge(counters, max_merge, max_merge * 2);
  461. testSingleBatchSuccessiveMerge(db.get(), 5, 7);
  462. ASSERT_OK(db->Close());
  463. ASSERT_OK(DestroyDB(dbname, Options()));
  464. }
  465. {
  466. size_t max_merge = 100;
  467. // Min merge is hard-coded to 2.
  468. uint32_t min_merge = 2;
  469. for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
  470. auto db = OpenDb(dbname, use_ttl, max_merge);
  471. MergeBasedCounters counters(db, 0);
  472. testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
  473. ASSERT_OK(db->Close());
  474. ASSERT_OK(DestroyDB(dbname, Options()));
  475. }
  476. {
  477. auto db = OpenDb(dbname, use_ttl, max_merge);
  478. MergeBasedCounters counters(db, 0);
  479. testPartialMerge(&counters, db.get(), max_merge, min_merge,
  480. min_merge * 10);
  481. ASSERT_OK(db->Close());
  482. ASSERT_OK(DestroyDB(dbname, Options()));
  483. }
  484. }
  485. {
  486. {
  487. auto db = OpenDb(dbname);
  488. MergeBasedCounters counters(db, 0);
  489. counters.add("test-key", 1);
  490. counters.add("test-key", 1);
  491. counters.add("test-key", 1);
  492. ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
  493. }
  494. DB* reopen_db;
  495. ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
  496. std::string value;
  497. ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value));
  498. delete reopen_db;
  499. ASSERT_OK(DestroyDB(dbname, Options()));
  500. }
  501. /* Temporary remove this test
  502. {
  503. std::cout << "Test merge-operator not set after reopen (recovery case)\n";
  504. {
  505. auto db = OpenDb(dbname);
  506. MergeBasedCounters counters(db, 0);
  507. counters.add("test-key", 1);
  508. counters.add("test-key", 1);
  509. counters.add("test-key", 1);
  510. }
  511. DB* reopen_db;
  512. ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
  513. }
  514. */
  515. }
  516. TEST_F(MergeTest, MergeDbTest) {
  517. runTest(test::PerThreadDBPath("merge_testdb"));
  518. }
  519. TEST_F(MergeTest, MergeDbTtlTest) {
  520. runTest(test::PerThreadDBPath("merge_testdbttl"),
  521. true); // Run test on TTL database
  522. }
  523. TEST_F(MergeTest, MergeWithCompactionAndFlush) {
  524. const std::string dbname =
  525. test::PerThreadDBPath("merge_with_compaction_and_flush");
  526. {
  527. auto db = OpenDb(dbname);
  528. {
  529. MergeBasedCounters counters(db, 0);
  530. testCountersWithFlushAndCompaction(counters, db.get());
  531. }
  532. }
  533. ASSERT_OK(DestroyDB(dbname, Options()));
  534. }
  535. TEST_F(MergeTest, FullMergeV3FallbackNewValue) {
  536. // Test that the default FullMergeV3 implementation correctly handles the case
  537. // when FullMergeV2 results in a new value.
  538. const Slice key("foo");
  539. const MergeOperator::MergeOperationInputV3::OperandList operands{
  540. "first", "second", "third"};
  541. constexpr Logger* logger = nullptr;
  542. auto append_operator =
  543. MergeOperators::CreateStringAppendOperator(std::string());
  544. // No existing value
  545. {
  546. MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
  547. const MergeOperator::MergeOperationInputV3 merge_in(
  548. key, std::move(existing_value), operands, logger);
  549. MergeOperator::MergeOperationOutputV3 merge_out;
  550. ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
  551. const auto& result = std::get<std::string>(merge_out.new_value);
  552. ASSERT_EQ(result, operands[0].ToString() + operands[1].ToString() +
  553. operands[2].ToString());
  554. }
  555. // Plain existing value
  556. {
  557. const Slice plain("plain");
  558. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
  559. const MergeOperator::MergeOperationInputV3 merge_in(
  560. key, std::move(existing_value), operands, logger);
  561. MergeOperator::MergeOperationOutputV3 merge_out;
  562. ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
  563. const auto& result = std::get<std::string>(merge_out.new_value);
  564. ASSERT_EQ(result, plain.ToString() + operands[0].ToString() +
  565. operands[1].ToString() + operands[2].ToString());
  566. }
  567. // Wide-column existing value with default column
  568. {
  569. const WideColumns entity{
  570. {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
  571. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  572. const MergeOperator::MergeOperationInputV3 merge_in(
  573. key, std::move(existing_value), operands, logger);
  574. MergeOperator::MergeOperationOutputV3 merge_out;
  575. ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
  576. const auto& result =
  577. std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
  578. merge_out.new_value);
  579. ASSERT_EQ(result.size(), entity.size());
  580. ASSERT_EQ(result[0].first, entity[0].name());
  581. ASSERT_EQ(result[0].second,
  582. entity[0].value().ToString() + operands[0].ToString() +
  583. operands[1].ToString() + operands[2].ToString());
  584. ASSERT_EQ(result[1].first, entity[1].name());
  585. ASSERT_EQ(result[1].second, entity[1].value());
  586. ASSERT_EQ(result[2].first, entity[2].name());
  587. ASSERT_EQ(result[2].second, entity[2].value());
  588. }
  589. // Wide-column existing value without default column
  590. {
  591. const WideColumns entity{{"one", "1"}, {"two", "2"}};
  592. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  593. const MergeOperator::MergeOperationInputV3 merge_in(
  594. key, std::move(existing_value), operands, logger);
  595. MergeOperator::MergeOperationOutputV3 merge_out;
  596. ASSERT_TRUE(append_operator->FullMergeV3(merge_in, &merge_out));
  597. const auto& result =
  598. std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
  599. merge_out.new_value);
  600. ASSERT_EQ(result.size(), entity.size() + 1);
  601. ASSERT_EQ(result[0].first, kDefaultWideColumnName);
  602. ASSERT_EQ(result[0].second, operands[0].ToString() +
  603. operands[1].ToString() +
  604. operands[2].ToString());
  605. ASSERT_EQ(result[1].first, entity[0].name());
  606. ASSERT_EQ(result[1].second, entity[0].value());
  607. ASSERT_EQ(result[2].first, entity[1].name());
  608. ASSERT_EQ(result[2].second, entity[1].value());
  609. }
  610. }
  611. TEST_F(MergeTest, FullMergeV3FallbackExistingOperand) {
  612. // Test that the default FullMergeV3 implementation correctly handles the case
  613. // when FullMergeV2 results in an existing operand.
  614. const Slice key("foo");
  615. const MergeOperator::MergeOperationInputV3::OperandList operands{
  616. "first", "second", "third"};
  617. constexpr Logger* logger = nullptr;
  618. auto put_operator = MergeOperators::CreatePutOperator();
  619. // No existing value
  620. {
  621. MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
  622. const MergeOperator::MergeOperationInputV3 merge_in(
  623. key, std::move(existing_value), operands, logger);
  624. MergeOperator::MergeOperationOutputV3 merge_out;
  625. ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
  626. const auto& result = std::get<Slice>(merge_out.new_value);
  627. ASSERT_EQ(result.data(), operands.back().data());
  628. ASSERT_EQ(result.size(), operands.back().size());
  629. }
  630. // Plain existing value
  631. {
  632. const Slice plain("plain");
  633. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
  634. const MergeOperator::MergeOperationInputV3 merge_in(
  635. key, std::move(existing_value), operands, logger);
  636. MergeOperator::MergeOperationOutputV3 merge_out;
  637. ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
  638. const auto& result = std::get<Slice>(merge_out.new_value);
  639. ASSERT_EQ(result.data(), operands.back().data());
  640. ASSERT_EQ(result.size(), operands.back().size());
  641. }
  642. // Wide-column existing value with default column
  643. {
  644. const WideColumns entity{
  645. {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
  646. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  647. const MergeOperator::MergeOperationInputV3 merge_in(
  648. key, std::move(existing_value), operands, logger);
  649. MergeOperator::MergeOperationOutputV3 merge_out;
  650. ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
  651. const auto& result =
  652. std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
  653. merge_out.new_value);
  654. ASSERT_EQ(result.size(), entity.size());
  655. ASSERT_EQ(result[0].first, entity[0].name());
  656. ASSERT_EQ(result[0].second, operands.back());
  657. ASSERT_EQ(result[1].first, entity[1].name());
  658. ASSERT_EQ(result[1].second, entity[1].value());
  659. ASSERT_EQ(result[2].first, entity[2].name());
  660. ASSERT_EQ(result[2].second, entity[2].value());
  661. }
  662. // Wide-column existing value without default column
  663. {
  664. const WideColumns entity{{"one", "1"}, {"two", "2"}};
  665. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  666. const MergeOperator::MergeOperationInputV3 merge_in(
  667. key, std::move(existing_value), operands, logger);
  668. MergeOperator::MergeOperationOutputV3 merge_out;
  669. ASSERT_TRUE(put_operator->FullMergeV3(merge_in, &merge_out));
  670. const auto& result =
  671. std::get<MergeOperator::MergeOperationOutputV3::NewColumns>(
  672. merge_out.new_value);
  673. ASSERT_EQ(result.size(), entity.size() + 1);
  674. ASSERT_EQ(result[0].first, kDefaultWideColumnName);
  675. ASSERT_EQ(result[0].second, operands.back());
  676. ASSERT_EQ(result[1].first, entity[0].name());
  677. ASSERT_EQ(result[1].second, entity[0].value());
  678. ASSERT_EQ(result[2].first, entity[1].name());
  679. ASSERT_EQ(result[2].second, entity[1].value());
  680. }
  681. }
  682. TEST_F(MergeTest, FullMergeV3FallbackFailure) {
  683. // Test that the default FullMergeV3 implementation correctly handles the case
  684. // when FullMergeV2 fails.
  685. const Slice key("foo");
  686. const MergeOperator::MergeOperationInputV3::OperandList operands{
  687. "first", "second", "third"};
  688. constexpr Logger* logger = nullptr;
  689. class FailMergeOperator : public MergeOperator {
  690. public:
  691. bool FullMergeV2(const MergeOperationInput& /* merge_in */,
  692. MergeOperationOutput* merge_out) const override {
  693. assert(merge_out);
  694. merge_out->op_failure_scope = OpFailureScope::kMustMerge;
  695. return false;
  696. }
  697. const char* Name() const override { return "FailMergeOperator"; }
  698. };
  699. FailMergeOperator fail_operator;
  700. // No existing value
  701. {
  702. MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
  703. const MergeOperator::MergeOperationInputV3 merge_in(
  704. key, std::move(existing_value), operands, logger);
  705. MergeOperator::MergeOperationOutputV3 merge_out;
  706. ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
  707. ASSERT_EQ(merge_out.op_failure_scope,
  708. MergeOperator::OpFailureScope::kMustMerge);
  709. }
  710. // Plain existing value
  711. {
  712. const Slice plain("plain");
  713. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(plain);
  714. const MergeOperator::MergeOperationInputV3 merge_in(
  715. key, std::move(existing_value), operands, logger);
  716. MergeOperator::MergeOperationOutputV3 merge_out;
  717. ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
  718. ASSERT_EQ(merge_out.op_failure_scope,
  719. MergeOperator::OpFailureScope::kMustMerge);
  720. }
  721. // Wide-column existing value with default column
  722. {
  723. const WideColumns entity{
  724. {kDefaultWideColumnName, "default"}, {"one", "1"}, {"two", "2"}};
  725. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  726. const MergeOperator::MergeOperationInputV3 merge_in(
  727. key, std::move(existing_value), operands, logger);
  728. MergeOperator::MergeOperationOutputV3 merge_out;
  729. ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
  730. ASSERT_EQ(merge_out.op_failure_scope,
  731. MergeOperator::OpFailureScope::kMustMerge);
  732. }
  733. // Wide-column existing value without default column
  734. {
  735. const WideColumns entity{{"one", "1"}, {"two", "2"}};
  736. MergeOperator::MergeOperationInputV3::ExistingValue existing_value(entity);
  737. const MergeOperator::MergeOperationInputV3 merge_in(
  738. key, std::move(existing_value), operands, logger);
  739. MergeOperator::MergeOperationOutputV3 merge_out;
  740. ASSERT_FALSE(fail_operator.FullMergeV3(merge_in, &merge_out));
  741. ASSERT_EQ(merge_out.op_failure_scope,
  742. MergeOperator::OpFailureScope::kMustMerge);
  743. }
  744. }
  745. } // namespace ROCKSDB_NAMESPACE
  746. int main(int argc, char** argv) {
  747. ROCKSDB_NAMESPACE::use_compression = false;
  748. if (argc > 1) {
  749. ROCKSDB_NAMESPACE::use_compression = true;
  750. }
  751. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  752. ::testing::InitGoogleTest(&argc, argv);
  753. return RUN_ALL_TESTS();
  754. }