db_memtable_test.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
  2. // This source code is licensed under both the GPLv2 (found in the
  3. // COPYING file in the root directory) and Apache 2.0 License
  4. // (found in the LICENSE.Apache file in the root directory).
  5. #include <memory>
  6. #include <string>
  7. #include "db/db_test_util.h"
  8. #include "db/memtable.h"
  9. #include "db/range_del_aggregator.h"
  10. #include "port/stack_trace.h"
  11. #include "rocksdb/memtablerep.h"
  12. #include "rocksdb/slice_transform.h"
  13. namespace ROCKSDB_NAMESPACE {
  14. class DBMemTableTest : public DBTestBase {
  15. public:
  16. DBMemTableTest() : DBTestBase("/db_memtable_test") {}
  17. };
  18. class MockMemTableRep : public MemTableRep {
  19. public:
  20. explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
  21. : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
  22. KeyHandle Allocate(const size_t len, char** buf) override {
  23. return rep_->Allocate(len, buf);
  24. }
  25. void Insert(KeyHandle handle) override { rep_->Insert(handle); }
  26. void InsertWithHint(KeyHandle handle, void** hint) override {
  27. num_insert_with_hint_++;
  28. EXPECT_NE(nullptr, hint);
  29. last_hint_in_ = *hint;
  30. rep_->InsertWithHint(handle, hint);
  31. last_hint_out_ = *hint;
  32. }
  33. bool Contains(const char* key) const override { return rep_->Contains(key); }
  34. void Get(const LookupKey& k, void* callback_args,
  35. bool (*callback_func)(void* arg, const char* entry)) override {
  36. rep_->Get(k, callback_args, callback_func);
  37. }
  38. size_t ApproximateMemoryUsage() override {
  39. return rep_->ApproximateMemoryUsage();
  40. }
  41. Iterator* GetIterator(Arena* arena) override {
  42. return rep_->GetIterator(arena);
  43. }
  44. void* last_hint_in() { return last_hint_in_; }
  45. void* last_hint_out() { return last_hint_out_; }
  46. int num_insert_with_hint() { return num_insert_with_hint_; }
  47. private:
  48. std::unique_ptr<MemTableRep> rep_;
  49. void* last_hint_in_;
  50. void* last_hint_out_;
  51. int num_insert_with_hint_;
  52. };
  53. class MockMemTableRepFactory : public MemTableRepFactory {
  54. public:
  55. MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
  56. Allocator* allocator,
  57. const SliceTransform* transform,
  58. Logger* logger) override {
  59. SkipListFactory factory;
  60. MemTableRep* skiplist_rep =
  61. factory.CreateMemTableRep(cmp, allocator, transform, logger);
  62. mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);
  63. return mock_rep_;
  64. }
  65. MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
  66. Allocator* allocator,
  67. const SliceTransform* transform,
  68. Logger* logger,
  69. uint32_t column_family_id) override {
  70. last_column_family_id_ = column_family_id;
  71. return CreateMemTableRep(cmp, allocator, transform, logger);
  72. }
  73. const char* Name() const override { return "MockMemTableRepFactory"; }
  74. MockMemTableRep* rep() { return mock_rep_; }
  75. bool IsInsertConcurrentlySupported() const override { return false; }
  76. uint32_t GetLastColumnFamilyId() { return last_column_family_id_; }
  77. private:
  78. MockMemTableRep* mock_rep_;
  79. // workaround since there's no port::kMaxUint32 yet.
  80. uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);
  81. };
  82. class TestPrefixExtractor : public SliceTransform {
  83. public:
  84. const char* Name() const override { return "TestPrefixExtractor"; }
  85. Slice Transform(const Slice& key) const override {
  86. const char* p = separator(key);
  87. if (p == nullptr) {
  88. return Slice();
  89. }
  90. return Slice(key.data(), p - key.data() + 1);
  91. }
  92. bool InDomain(const Slice& key) const override {
  93. return separator(key) != nullptr;
  94. }
  95. bool InRange(const Slice& /*key*/) const override { return false; }
  96. private:
  97. const char* separator(const Slice& key) const {
  98. return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));
  99. }
  100. };
  101. // Test that ::Add properly returns false when inserting duplicate keys
  102. TEST_F(DBMemTableTest, DuplicateSeq) {
  103. SequenceNumber seq = 123;
  104. std::string value;
  105. Status s;
  106. MergeContext merge_context;
  107. Options options;
  108. InternalKeyComparator ikey_cmp(options.comparator);
  109. ReadRangeDelAggregator range_del_agg(&ikey_cmp,
  110. kMaxSequenceNumber /* upper_bound */);
  111. // Create a MemTable
  112. InternalKeyComparator cmp(BytewiseComparator());
  113. auto factory = std::make_shared<SkipListFactory>();
  114. options.memtable_factory = factory;
  115. ImmutableCFOptions ioptions(options);
  116. WriteBufferManager wb(options.db_write_buffer_size);
  117. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  118. kMaxSequenceNumber, 0 /* column_family_id */);
  119. // Write some keys and make sure it returns false on duplicates
  120. bool res;
  121. res = mem->Add(seq, kTypeValue, "key", "value2");
  122. ASSERT_TRUE(res);
  123. res = mem->Add(seq, kTypeValue, "key", "value2");
  124. ASSERT_FALSE(res);
  125. // Changing the type should still cause the duplicatae key
  126. res = mem->Add(seq, kTypeMerge, "key", "value2");
  127. ASSERT_FALSE(res);
  128. // Changing the seq number will make the key fresh
  129. res = mem->Add(seq + 1, kTypeMerge, "key", "value2");
  130. ASSERT_TRUE(res);
  131. // Test with different types for duplicate keys
  132. res = mem->Add(seq, kTypeDeletion, "key", "");
  133. ASSERT_FALSE(res);
  134. res = mem->Add(seq, kTypeSingleDeletion, "key", "");
  135. ASSERT_FALSE(res);
  136. // Test the duplicate keys under stress
  137. for (int i = 0; i < 10000; i++) {
  138. bool insert_dup = i % 10 == 1;
  139. if (!insert_dup) {
  140. seq++;
  141. }
  142. res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
  143. if (insert_dup) {
  144. ASSERT_FALSE(res);
  145. } else {
  146. ASSERT_TRUE(res);
  147. }
  148. }
  149. delete mem;
  150. // Test with InsertWithHint
  151. options.memtable_insert_with_hint_prefix_extractor.reset(
  152. new TestPrefixExtractor()); // which uses _ to extract the prefix
  153. ioptions = ImmutableCFOptions(options);
  154. mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  155. kMaxSequenceNumber, 0 /* column_family_id */);
  156. // Insert a duplicate key with _ in it
  157. res = mem->Add(seq, kTypeValue, "key_1", "value");
  158. ASSERT_TRUE(res);
  159. res = mem->Add(seq, kTypeValue, "key_1", "value");
  160. ASSERT_FALSE(res);
  161. delete mem;
  162. // Test when InsertConcurrently will be invoked
  163. options.allow_concurrent_memtable_write = true;
  164. ioptions = ImmutableCFOptions(options);
  165. mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  166. kMaxSequenceNumber, 0 /* column_family_id */);
  167. MemTablePostProcessInfo post_process_info;
  168. res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
  169. ASSERT_TRUE(res);
  170. res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
  171. ASSERT_FALSE(res);
  172. delete mem;
  173. }
  174. // A simple test to verify that the concurrent merge writes is functional
  175. TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
  176. int num_ops = 1000;
  177. std::string value;
  178. Status s;
  179. MergeContext merge_context;
  180. Options options;
  181. // A merge operator that is not sensitive to concurrent writes since in this
  182. // test we don't order the writes.
  183. options.merge_operator = MergeOperators::CreateUInt64AddOperator();
  184. // Create a MemTable
  185. InternalKeyComparator cmp(BytewiseComparator());
  186. auto factory = std::make_shared<SkipListFactory>();
  187. options.memtable_factory = factory;
  188. options.allow_concurrent_memtable_write = true;
  189. ImmutableCFOptions ioptions(options);
  190. WriteBufferManager wb(options.db_write_buffer_size);
  191. MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
  192. kMaxSequenceNumber, 0 /* column_family_id */);
  193. // Put 0 as the base
  194. PutFixed64(&value, static_cast<uint64_t>(0));
  195. bool res = mem->Add(0, kTypeValue, "key", value);
  196. ASSERT_TRUE(res);
  197. value.clear();
  198. // Write Merge concurrently
  199. ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {
  200. MemTablePostProcessInfo post_process_info1;
  201. std::string v1;
  202. for (int seq = 1; seq < num_ops / 2; seq++) {
  203. PutFixed64(&v1, seq);
  204. bool res1 =
  205. mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1);
  206. ASSERT_TRUE(res1);
  207. v1.clear();
  208. }
  209. });
  210. ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {
  211. MemTablePostProcessInfo post_process_info2;
  212. std::string v2;
  213. for (int seq = num_ops / 2; seq < num_ops; seq++) {
  214. PutFixed64(&v2, seq);
  215. bool res2 =
  216. mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2);
  217. ASSERT_TRUE(res2);
  218. v2.clear();
  219. }
  220. });
  221. write_thread1.join();
  222. write_thread2.join();
  223. Status status;
  224. ReadOptions roptions;
  225. SequenceNumber max_covering_tombstone_seq = 0;
  226. LookupKey lkey("key", kMaxSequenceNumber);
  227. res = mem->Get(lkey, &value, &status, &merge_context,
  228. &max_covering_tombstone_seq, roptions);
  229. ASSERT_TRUE(res);
  230. uint64_t ivalue = DecodeFixed64(Slice(value).data());
  231. uint64_t sum = 0;
  232. for (int seq = 0; seq < num_ops; seq++) {
  233. sum += seq;
  234. }
  235. ASSERT_EQ(ivalue, sum);
  236. delete mem;
  237. }
  238. TEST_F(DBMemTableTest, InsertWithHint) {
  239. Options options;
  240. options.allow_concurrent_memtable_write = false;
  241. options.create_if_missing = true;
  242. options.memtable_factory.reset(new MockMemTableRepFactory());
  243. options.memtable_insert_with_hint_prefix_extractor.reset(
  244. new TestPrefixExtractor());
  245. options.env = env_;
  246. Reopen(options);
  247. MockMemTableRep* rep =
  248. reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
  249. ->rep();
  250. ASSERT_OK(Put("foo_k1", "foo_v1"));
  251. ASSERT_EQ(nullptr, rep->last_hint_in());
  252. void* hint_foo = rep->last_hint_out();
  253. ASSERT_OK(Put("foo_k2", "foo_v2"));
  254. ASSERT_EQ(hint_foo, rep->last_hint_in());
  255. ASSERT_EQ(hint_foo, rep->last_hint_out());
  256. ASSERT_OK(Put("foo_k3", "foo_v3"));
  257. ASSERT_EQ(hint_foo, rep->last_hint_in());
  258. ASSERT_EQ(hint_foo, rep->last_hint_out());
  259. ASSERT_OK(Put("bar_k1", "bar_v1"));
  260. ASSERT_EQ(nullptr, rep->last_hint_in());
  261. void* hint_bar = rep->last_hint_out();
  262. ASSERT_NE(hint_foo, hint_bar);
  263. ASSERT_OK(Put("bar_k2", "bar_v2"));
  264. ASSERT_EQ(hint_bar, rep->last_hint_in());
  265. ASSERT_EQ(hint_bar, rep->last_hint_out());
  266. ASSERT_EQ(5, rep->num_insert_with_hint());
  267. ASSERT_OK(Put("whitelisted", "vvv"));
  268. ASSERT_EQ(5, rep->num_insert_with_hint());
  269. ASSERT_EQ("foo_v1", Get("foo_k1"));
  270. ASSERT_EQ("foo_v2", Get("foo_k2"));
  271. ASSERT_EQ("foo_v3", Get("foo_k3"));
  272. ASSERT_EQ("bar_v1", Get("bar_k1"));
  273. ASSERT_EQ("bar_v2", Get("bar_k2"));
  274. ASSERT_EQ("vvv", Get("whitelisted"));
  275. }
  276. TEST_F(DBMemTableTest, ColumnFamilyId) {
  277. // Verifies MemTableRepFactory is told the right column family id.
  278. Options options;
  279. options.allow_concurrent_memtable_write = false;
  280. options.create_if_missing = true;
  281. options.memtable_factory.reset(new MockMemTableRepFactory());
  282. DestroyAndReopen(options);
  283. CreateAndReopenWithCF({"pikachu"}, options);
  284. for (uint32_t cf = 0; cf < 2; ++cf) {
  285. ASSERT_OK(Put(cf, "key", "val"));
  286. ASSERT_OK(Flush(cf));
  287. ASSERT_EQ(
  288. cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())
  289. ->GetLastColumnFamilyId());
  290. }
  291. }
  292. } // namespace ROCKSDB_NAMESPACE
  293. int main(int argc, char** argv) {
  294. ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
  295. ::testing::InitGoogleTest(&argc, argv);
  296. return RUN_ALL_TESTS();
  297. }