| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 | //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.//  This source code is licensed under both the GPLv2 (found in the//  COPYING file in the root directory) and Apache 2.0 License//  (found in the LICENSE.Apache file in the root directory).#include <memory>#include <string>#include "db/db_test_util.h"#include "db/memtable.h"#include "db/range_del_aggregator.h"#include "port/stack_trace.h"#include "rocksdb/memtablerep.h"#include "rocksdb/slice_transform.h"namespace ROCKSDB_NAMESPACE {class DBMemTableTest : public DBTestBase { public:  DBMemTableTest() : DBTestBase("/db_memtable_test") {}};class MockMemTableRep : public MemTableRep { public:  explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)      : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}  KeyHandle Allocate(const size_t len, char** buf) override {    return rep_->Allocate(len, buf);  }  void Insert(KeyHandle handle) override { rep_->Insert(handle); }  void InsertWithHint(KeyHandle handle, void** hint) override {    num_insert_with_hint_++;    EXPECT_NE(nullptr, hint);    last_hint_in_ = *hint;    rep_->InsertWithHint(handle, hint);    last_hint_out_ = *hint;  }  bool Contains(const char* key) const override { return rep_->Contains(key); }  void Get(const LookupKey& k, void* callback_args,           bool (*callback_func)(void* arg, const char* entry)) override {    rep_->Get(k, callback_args, callback_func);  }  size_t ApproximateMemoryUsage() override {    return rep_->ApproximateMemoryUsage();  }  Iterator* GetIterator(Arena* arena) override {    return rep_->GetIterator(arena);  }  void* last_hint_in() { return last_hint_in_; }  void* last_hint_out() { return last_hint_out_; }  int num_insert_with_hint() { return num_insert_with_hint_; } private:  std::unique_ptr<MemTableRep> rep_;  void* last_hint_in_;  void* last_hint_out_;  int num_insert_with_hint_;};class MockMemTableRepFactory : public MemTableRepFactory { public:  MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,                                 Allocator* allocator,                                 const SliceTransform* transform,                                 Logger* logger) override {    SkipListFactory factory;    MemTableRep* skiplist_rep =        factory.CreateMemTableRep(cmp, allocator, transform, logger);    mock_rep_ = new MockMemTableRep(allocator, skiplist_rep);    return mock_rep_;  }  MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,                                 Allocator* allocator,                                 const SliceTransform* transform,                                 Logger* logger,                                 uint32_t column_family_id) override {    last_column_family_id_ = column_family_id;    return CreateMemTableRep(cmp, allocator, transform, logger);  }  const char* Name() const override { return "MockMemTableRepFactory"; }  MockMemTableRep* rep() { return mock_rep_; }  bool IsInsertConcurrentlySupported() const override { return false; }  uint32_t GetLastColumnFamilyId() { return last_column_family_id_; } private:  MockMemTableRep* mock_rep_;  // workaround since there's no port::kMaxUint32 yet.  uint32_t last_column_family_id_ = static_cast<uint32_t>(-1);};class TestPrefixExtractor : public SliceTransform { public:  const char* Name() const override { return "TestPrefixExtractor"; }  Slice Transform(const Slice& key) const override {    const char* p = separator(key);    if (p == nullptr) {      return Slice();    }    return Slice(key.data(), p - key.data() + 1);  }  bool InDomain(const Slice& key) const override {    return separator(key) != nullptr;  }  bool InRange(const Slice& /*key*/) const override { return false; } private:  const char* separator(const Slice& key) const {    return reinterpret_cast<const char*>(memchr(key.data(), '_', key.size()));  }};// Test that ::Add properly returns false when inserting duplicate keysTEST_F(DBMemTableTest, DuplicateSeq) {  SequenceNumber seq = 123;  std::string value;  Status s;  MergeContext merge_context;  Options options;  InternalKeyComparator ikey_cmp(options.comparator);  ReadRangeDelAggregator range_del_agg(&ikey_cmp,                                       kMaxSequenceNumber /* upper_bound */);  // Create a MemTable  InternalKeyComparator cmp(BytewiseComparator());  auto factory = std::make_shared<SkipListFactory>();  options.memtable_factory = factory;  ImmutableCFOptions ioptions(options);  WriteBufferManager wb(options.db_write_buffer_size);  MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,                               kMaxSequenceNumber, 0 /* column_family_id */);  // Write some keys and make sure it returns false on duplicates  bool res;  res = mem->Add(seq, kTypeValue, "key", "value2");  ASSERT_TRUE(res);  res = mem->Add(seq, kTypeValue, "key", "value2");  ASSERT_FALSE(res);  // Changing the type should still cause the duplicatae key  res = mem->Add(seq, kTypeMerge, "key", "value2");  ASSERT_FALSE(res);  // Changing the seq number will make the key fresh  res = mem->Add(seq + 1, kTypeMerge, "key", "value2");  ASSERT_TRUE(res);  // Test with different types for duplicate keys  res = mem->Add(seq, kTypeDeletion, "key", "");  ASSERT_FALSE(res);  res = mem->Add(seq, kTypeSingleDeletion, "key", "");  ASSERT_FALSE(res);  // Test the duplicate keys under stress  for (int i = 0; i < 10000; i++) {    bool insert_dup = i % 10 == 1;    if (!insert_dup) {      seq++;    }    res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));    if (insert_dup) {      ASSERT_FALSE(res);    } else {      ASSERT_TRUE(res);    }  }  delete mem;  // Test with InsertWithHint  options.memtable_insert_with_hint_prefix_extractor.reset(      new TestPrefixExtractor());  // which uses _ to extract the prefix  ioptions = ImmutableCFOptions(options);  mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,                     kMaxSequenceNumber, 0 /* column_family_id */);  // Insert a duplicate key with _ in it  res = mem->Add(seq, kTypeValue, "key_1", "value");  ASSERT_TRUE(res);  res = mem->Add(seq, kTypeValue, "key_1", "value");  ASSERT_FALSE(res);  delete mem;  // Test when InsertConcurrently will be invoked  options.allow_concurrent_memtable_write = true;  ioptions = ImmutableCFOptions(options);  mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,                     kMaxSequenceNumber, 0 /* column_family_id */);  MemTablePostProcessInfo post_process_info;  res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);  ASSERT_TRUE(res);  res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);  ASSERT_FALSE(res);  delete mem;}// A simple test to verify that the concurrent merge writes is functionalTEST_F(DBMemTableTest, ConcurrentMergeWrite) {  int num_ops = 1000;  std::string value;  Status s;  MergeContext merge_context;  Options options;  // A merge operator that is not sensitive to concurrent writes since in this  // test we don't order the writes.  options.merge_operator = MergeOperators::CreateUInt64AddOperator();  // Create a MemTable  InternalKeyComparator cmp(BytewiseComparator());  auto factory = std::make_shared<SkipListFactory>();  options.memtable_factory = factory;  options.allow_concurrent_memtable_write = true;  ImmutableCFOptions ioptions(options);  WriteBufferManager wb(options.db_write_buffer_size);  MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,                               kMaxSequenceNumber, 0 /* column_family_id */);  // Put 0 as the base  PutFixed64(&value, static_cast<uint64_t>(0));  bool res = mem->Add(0, kTypeValue, "key", value);  ASSERT_TRUE(res);  value.clear();  // Write Merge concurrently  ROCKSDB_NAMESPACE::port::Thread write_thread1([&]() {    MemTablePostProcessInfo post_process_info1;    std::string v1;    for (int seq = 1; seq < num_ops / 2; seq++) {      PutFixed64(&v1, seq);      bool res1 =          mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1);      ASSERT_TRUE(res1);      v1.clear();    }  });  ROCKSDB_NAMESPACE::port::Thread write_thread2([&]() {    MemTablePostProcessInfo post_process_info2;    std::string v2;    for (int seq = num_ops / 2; seq < num_ops; seq++) {      PutFixed64(&v2, seq);      bool res2 =          mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2);      ASSERT_TRUE(res2);      v2.clear();    }  });  write_thread1.join();  write_thread2.join();  Status status;  ReadOptions roptions;  SequenceNumber max_covering_tombstone_seq = 0;  LookupKey lkey("key", kMaxSequenceNumber);  res = mem->Get(lkey, &value, &status, &merge_context,                 &max_covering_tombstone_seq, roptions);  ASSERT_TRUE(res);  uint64_t ivalue = DecodeFixed64(Slice(value).data());  uint64_t sum = 0;  for (int seq = 0; seq < num_ops; seq++) {    sum += seq;  }  ASSERT_EQ(ivalue, sum);  delete mem;}TEST_F(DBMemTableTest, InsertWithHint) {  Options options;  options.allow_concurrent_memtable_write = false;  options.create_if_missing = true;  options.memtable_factory.reset(new MockMemTableRepFactory());  options.memtable_insert_with_hint_prefix_extractor.reset(      new TestPrefixExtractor());  options.env = env_;  Reopen(options);  MockMemTableRep* rep =      reinterpret_cast<MockMemTableRepFactory*>(options.memtable_factory.get())          ->rep();  ASSERT_OK(Put("foo_k1", "foo_v1"));  ASSERT_EQ(nullptr, rep->last_hint_in());  void* hint_foo = rep->last_hint_out();  ASSERT_OK(Put("foo_k2", "foo_v2"));  ASSERT_EQ(hint_foo, rep->last_hint_in());  ASSERT_EQ(hint_foo, rep->last_hint_out());  ASSERT_OK(Put("foo_k3", "foo_v3"));  ASSERT_EQ(hint_foo, rep->last_hint_in());  ASSERT_EQ(hint_foo, rep->last_hint_out());  ASSERT_OK(Put("bar_k1", "bar_v1"));  ASSERT_EQ(nullptr, rep->last_hint_in());  void* hint_bar = rep->last_hint_out();  ASSERT_NE(hint_foo, hint_bar);  ASSERT_OK(Put("bar_k2", "bar_v2"));  ASSERT_EQ(hint_bar, rep->last_hint_in());  ASSERT_EQ(hint_bar, rep->last_hint_out());  ASSERT_EQ(5, rep->num_insert_with_hint());  ASSERT_OK(Put("whitelisted", "vvv"));  ASSERT_EQ(5, rep->num_insert_with_hint());  ASSERT_EQ("foo_v1", Get("foo_k1"));  ASSERT_EQ("foo_v2", Get("foo_k2"));  ASSERT_EQ("foo_v3", Get("foo_k3"));  ASSERT_EQ("bar_v1", Get("bar_k1"));  ASSERT_EQ("bar_v2", Get("bar_k2"));  ASSERT_EQ("vvv", Get("whitelisted"));}TEST_F(DBMemTableTest, ColumnFamilyId) {  // Verifies MemTableRepFactory is told the right column family id.  Options options;  options.allow_concurrent_memtable_write = false;  options.create_if_missing = true;  options.memtable_factory.reset(new MockMemTableRepFactory());  DestroyAndReopen(options);  CreateAndReopenWithCF({"pikachu"}, options);  for (uint32_t cf = 0; cf < 2; ++cf) {    ASSERT_OK(Put(cf, "key", "val"));    ASSERT_OK(Flush(cf));    ASSERT_EQ(        cf, static_cast<MockMemTableRepFactory*>(options.memtable_factory.get())                ->GetLastColumnFamilyId());  }}}  // namespace ROCKSDB_NAMESPACEint main(int argc, char** argv) {  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();  ::testing::InitGoogleTest(&argc, argv);  return RUN_ALL_TESTS();}
 |