merge_test.cc 13 KB


  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. //
  6. #include <assert.h>
  7. #include <memory>
  8. #include <iostream>
  9. #include "db/db_impl/db_impl.h"
  10. #include "db/dbformat.h"
  11. #include "db/write_batch_internal.h"
  12. #include "port/stack_trace.h"
  13. #include "rocksdb/cache.h"
  14. #include "rocksdb/comparator.h"
  15. #include "rocksdb/db.h"
  16. #include "rocksdb/env.h"
  17. #include "rocksdb/merge_operator.h"
  18. #include "rocksdb/utilities/db_ttl.h"
  19. #include "test_util/testharness.h"
  20. #include "utilities/merge_operators.h"
  21. namespace ROCKSDB_NAMESPACE {
  22. bool use_compression;
  23. class MergeTest : public testing::Test {};
  24. size_t num_merge_operator_calls;
  25. void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
  26. size_t num_partial_merge_calls;
  27. void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
  28. class CountMergeOperator : public AssociativeMergeOperator {
  29. public:
  30. CountMergeOperator() {
  31. mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
  32. }
  33. bool Merge(const Slice& key, const Slice* existing_value, const Slice& value,
  34. std::string* new_value, Logger* logger) const override {
  35. assert(new_value->empty());
  36. ++num_merge_operator_calls;
  37. if (existing_value == nullptr) {
  38. new_value->assign(value.data(), value.size());
  39. return true;
  40. }
  41. return mergeOperator_->PartialMerge(
  42. key,
  43. *existing_value,
  44. value,
  45. new_value,
  46. logger);
  47. }
  48. bool PartialMergeMulti(const Slice& key,
  49. const std::deque<Slice>& operand_list,
  50. std::string* new_value,
  51. Logger* logger) const override {
  52. assert(new_value->empty());
  53. ++num_partial_merge_calls;
  54. return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
  55. logger);
  56. }
  57. const char* Name() const override { return "UInt64AddOperator"; }
  58. private:
  59. std::shared_ptr<MergeOperator> mergeOperator_;
  60. };
  61. std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
  62. const size_t max_successive_merges = 0) {
  63. DB* db;
  64. Options options;
  65. options.create_if_missing = true;
  66. options.merge_operator = std::make_shared<CountMergeOperator>();
  67. options.max_successive_merges = max_successive_merges;
  68. Status s;
  69. DestroyDB(dbname, Options());
  70. // DBWithTTL is not supported in ROCKSDB_LITE
  71. #ifndef ROCKSDB_LITE
  72. if (ttl) {
  73. DBWithTTL* db_with_ttl;
  74. s = DBWithTTL::Open(options, dbname, &db_with_ttl);
  75. db = db_with_ttl;
  76. } else {
  77. s = DB::Open(options, dbname, &db);
  78. }
  79. #else
  80. assert(!ttl);
  81. s = DB::Open(options, dbname, &db);
  82. #endif // !ROCKSDB_LITE
  83. if (!s.ok()) {
  84. std::cerr << s.ToString() << std::endl;
  85. assert(false);
  86. }
  87. return std::shared_ptr<DB>(db);
  88. }
  89. // Imagine we are maintaining a set of uint64 counters.
  90. // Each counter has a distinct name. And we would like
  91. // to support four high level operations:
  92. // set, add, get and remove
  93. // This is a quick implementation without a Merge operation.
  94. class Counters {
  95. protected:
  96. std::shared_ptr<DB> db_;
  97. WriteOptions put_option_;
  98. ReadOptions get_option_;
  99. WriteOptions delete_option_;
  100. uint64_t default_;
  101. public:
  102. explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
  103. : db_(db),
  104. put_option_(),
  105. get_option_(),
  106. delete_option_(),
  107. default_(defaultCount) {
  108. assert(db_);
  109. }
  110. virtual ~Counters() {}
  111. // public interface of Counters.
  112. // All four functions return false
  113. // if the underlying level db operation failed.
  114. // mapped to a levedb Put
  115. bool set(const std::string& key, uint64_t value) {
  116. // just treat the internal rep of int64 as the string
  117. char buf[sizeof(value)];
  118. EncodeFixed64(buf, value);
  119. Slice slice(buf, sizeof(value));
  120. auto s = db_->Put(put_option_, key, slice);
  121. if (s.ok()) {
  122. return true;
  123. } else {
  124. std::cerr << s.ToString() << std::endl;
  125. return false;
  126. }
  127. }
  128. // mapped to a rocksdb Delete
  129. bool remove(const std::string& key) {
  130. auto s = db_->Delete(delete_option_, key);
  131. if (s.ok()) {
  132. return true;
  133. } else {
  134. std::cerr << s.ToString() << std::endl;
  135. return false;
  136. }
  137. }
  138. // mapped to a rocksdb Get
  139. bool get(const std::string& key, uint64_t* value) {
  140. std::string str;
  141. auto s = db_->Get(get_option_, key, &str);
  142. if (s.IsNotFound()) {
  143. // return default value if not found;
  144. *value = default_;
  145. return true;
  146. } else if (s.ok()) {
  147. // deserialization
  148. if (str.size() != sizeof(uint64_t)) {
  149. std::cerr << "value corruption\n";
  150. return false;
  151. }
  152. *value = DecodeFixed64(&str[0]);
  153. return true;
  154. } else {
  155. std::cerr << s.ToString() << std::endl;
  156. return false;
  157. }
  158. }
  159. // 'add' is implemented as get -> modify -> set
  160. // An alternative is a single merge operation, see MergeBasedCounters
  161. virtual bool add(const std::string& key, uint64_t value) {
  162. uint64_t base = default_;
  163. return get(key, &base) && set(key, base + value);
  164. }
  165. // convenience functions for testing
  166. void assert_set(const std::string& key, uint64_t value) {
  167. assert(set(key, value));
  168. }
  169. void assert_remove(const std::string& key) { assert(remove(key)); }
  170. uint64_t assert_get(const std::string& key) {
  171. uint64_t value = default_;
  172. int result = get(key, &value);
  173. assert(result);
  174. if (result == 0) exit(1); // Disable unused variable warning.
  175. return value;
  176. }
  177. void assert_add(const std::string& key, uint64_t value) {
  178. int result = add(key, value);
  179. assert(result);
  180. if (result == 0) exit(1); // Disable unused variable warning.
  181. }
  182. };
  183. // Implement 'add' directly with the new Merge operation
  184. class MergeBasedCounters : public Counters {
  185. private:
  186. WriteOptions merge_option_; // for merge
  187. public:
  188. explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
  189. : Counters(db, defaultCount),
  190. merge_option_() {
  191. }
  192. // mapped to a rocksdb Merge operation
  193. bool add(const std::string& key, uint64_t value) override {
  194. char encoded[sizeof(uint64_t)];
  195. EncodeFixed64(encoded, value);
  196. Slice slice(encoded, sizeof(uint64_t));
  197. auto s = db_->Merge(merge_option_, key, slice);
  198. if (s.ok()) {
  199. return true;
  200. } else {
  201. std::cerr << s.ToString() << std::endl;
  202. return false;
  203. }
  204. }
  205. };
  206. void dumpDb(DB* db) {
  207. auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
  208. for (it->SeekToFirst(); it->Valid(); it->Next()) {
  209. //uint64_t value = DecodeFixed64(it->value().data());
  210. //std::cout << it->key().ToString() << ": " << value << std::endl;
  211. }
  212. assert(it->status().ok()); // Check for any errors found during the scan
  213. }
  214. void testCounters(Counters& counters, DB* db, bool test_compaction) {
  215. FlushOptions o;
  216. o.wait = true;
  217. counters.assert_set("a", 1);
  218. if (test_compaction) db->Flush(o);
  219. assert(counters.assert_get("a") == 1);
  220. counters.assert_remove("b");
  221. // defaut value is 0 if non-existent
  222. assert(counters.assert_get("b") == 0);
  223. counters.assert_add("a", 2);
  224. if (test_compaction) db->Flush(o);
  225. // 1+2 = 3
  226. assert(counters.assert_get("a")== 3);
  227. dumpDb(db);
  228. // 1+...+49 = ?
  229. uint64_t sum = 0;
  230. for (int i = 1; i < 50; i++) {
  231. counters.assert_add("b", i);
  232. sum += i;
  233. }
  234. assert(counters.assert_get("b") == sum);
  235. dumpDb(db);
  236. if (test_compaction) {
  237. db->Flush(o);
  238. db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  239. dumpDb(db);
  240. assert(counters.assert_get("a")== 3);
  241. assert(counters.assert_get("b") == sum);
  242. }
  243. }
  244. void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
  245. size_t num_merges) {
  246. counters.assert_remove("z");
  247. uint64_t sum = 0;
  248. for (size_t i = 1; i <= num_merges; ++i) {
  249. resetNumMergeOperatorCalls();
  250. counters.assert_add("z", i);
  251. sum += i;
  252. if (i % (max_num_merges + 1) == 0) {
  253. assert(num_merge_operator_calls == max_num_merges + 1);
  254. } else {
  255. assert(num_merge_operator_calls == 0);
  256. }
  257. resetNumMergeOperatorCalls();
  258. assert(counters.assert_get("z") == sum);
  259. assert(num_merge_operator_calls == i % (max_num_merges + 1));
  260. }
  261. }
  262. void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
  263. size_t min_merge, size_t count) {
  264. FlushOptions o;
  265. o.wait = true;
  266. // Test case 1: partial merge should be called when the number of merge
  267. // operands exceeds the threshold.
  268. uint64_t tmp_sum = 0;
  269. resetNumPartialMergeCalls();
  270. for (size_t i = 1; i <= count; i++) {
  271. counters->assert_add("b", i);
  272. tmp_sum += i;
  273. }
  274. db->Flush(o);
  275. db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  276. ASSERT_EQ(tmp_sum, counters->assert_get("b"));
  277. if (count > max_merge) {
  278. // in this case, FullMerge should be called instead.
  279. ASSERT_EQ(num_partial_merge_calls, 0U);
  280. } else {
  281. // if count >= min_merge, then partial merge should be called once.
  282. ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
  283. }
  284. // Test case 2: partial merge should not be called when a put is found.
  285. resetNumPartialMergeCalls();
  286. tmp_sum = 0;
  287. db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10");
  288. for (size_t i = 1; i <= count; i++) {
  289. counters->assert_add("c", i);
  290. tmp_sum += i;
  291. }
  292. db->Flush(o);
  293. db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  294. ASSERT_EQ(tmp_sum, counters->assert_get("c"));
  295. ASSERT_EQ(num_partial_merge_calls, 0U);
  296. }
  297. void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
  298. size_t num_merges) {
  299. assert(num_merges > max_num_merges);
  300. Slice key("BatchSuccessiveMerge");
  301. uint64_t merge_value = 1;
  302. char buf[sizeof(merge_value)];
  303. EncodeFixed64(buf, merge_value);
  304. Slice merge_value_slice(buf, sizeof(merge_value));
  305. // Create the batch
  306. WriteBatch batch;
  307. for (size_t i = 0; i < num_merges; ++i) {
  308. batch.Merge(key, merge_value_slice);
  309. }
  310. // Apply to memtable and count the number of merges
  311. resetNumMergeOperatorCalls();
  312. {
  313. Status s = db->Write(WriteOptions(), &batch);
  314. assert(s.ok());
  315. }
  316. ASSERT_EQ(
  317. num_merge_operator_calls,
  318. static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
  319. // Get the value
  320. resetNumMergeOperatorCalls();
  321. std::string get_value_str;
  322. {
  323. Status s = db->Get(ReadOptions(), key, &get_value_str);
  324. assert(s.ok());
  325. }
  326. assert(get_value_str.size() == sizeof(uint64_t));
  327. uint64_t get_value = DecodeFixed64(&get_value_str[0]);
  328. ASSERT_EQ(get_value, num_merges * merge_value);
  329. ASSERT_EQ(num_merge_operator_calls,
  330. static_cast<size_t>((num_merges % (max_num_merges + 1))));
  331. }
  332. void runTest(const std::string& dbname, const bool use_ttl = false) {
  333. {
  334. auto db = OpenDb(dbname, use_ttl);
  335. {
  336. Counters counters(db, 0);
  337. testCounters(counters, db.get(), true);
  338. }
  339. {
  340. MergeBasedCounters counters(db, 0);
  341. testCounters(counters, db.get(), use_compression);
  342. }
  343. }
  344. DestroyDB(dbname, Options());
  345. {
  346. size_t max_merge = 5;
  347. auto db = OpenDb(dbname, use_ttl, max_merge);
  348. MergeBasedCounters counters(db, 0);
  349. testCounters(counters, db.get(), use_compression);
  350. testSuccessiveMerge(counters, max_merge, max_merge * 2);
  351. testSingleBatchSuccessiveMerge(db.get(), 5, 7);
  352. DestroyDB(dbname, Options());
  353. }
  354. {
  355. size_t max_merge = 100;
  356. // Min merge is hard-coded to 2.
  357. uint32_t min_merge = 2;
  358. for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
  359. auto db = OpenDb(dbname, use_ttl, max_merge);
  360. MergeBasedCounters counters(db, 0);
  361. testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
  362. DestroyDB(dbname, Options());
  363. }
  364. {
  365. auto db = OpenDb(dbname, use_ttl, max_merge);
  366. MergeBasedCounters counters(db, 0);
  367. testPartialMerge(&counters, db.get(), max_merge, min_merge,
  368. min_merge * 10);
  369. DestroyDB(dbname, Options());
  370. }
  371. }
  372. {
  373. {
  374. auto db = OpenDb(dbname);
  375. MergeBasedCounters counters(db, 0);
  376. counters.add("test-key", 1);
  377. counters.add("test-key", 1);
  378. counters.add("test-key", 1);
  379. db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
  380. }
  381. DB* reopen_db;
  382. ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
  383. std::string value;
  384. ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
  385. delete reopen_db;
  386. DestroyDB(dbname, Options());
  387. }
  388. /* Temporary remove this test
  389. {
  390. std::cout << "Test merge-operator not set after reopen (recovery case)\n";
  391. {
  392. auto db = OpenDb(dbname);
  393. MergeBasedCounters counters(db, 0);
  394. counters.add("test-key", 1);
  395. counters.add("test-key", 1);
  396. counters.add("test-key", 1);
  397. }
  398. DB* reopen_db;
  399. ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
  400. }
  401. */
  402. }
  403. TEST_F(MergeTest, MergeDbTest) {
  404. runTest(test::PerThreadDBPath("merge_testdb"));
  405. }
  406. #ifndef ROCKSDB_LITE
  407. TEST_F(MergeTest, MergeDbTtlTest) {
  408. runTest(test::PerThreadDBPath("merge_testdbttl"),
  409. true); // Run test on TTL database
  410. }
  411. #endif // !ROCKSDB_LITE
  412. } // namespace ROCKSDB_NAMESPACE
  413. int main(int argc, char** argv) {
  414. ROCKSDB_NAMESPACE::use_compression = false;
  415. if (argc > 1) {
  416. ROCKSDB_NAMESPACE::use_compression = true;
  417. }
  418. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  419. ::testing::InitGoogleTest(&argc, argv);
  420. return RUN_ALL_TESTS();
  421. }